找回密码
 立即注册

QQ登录

只需一步,快速开始

搜索
查看: 1834|回复: 1
打印 上一主题 下一主题
收起左侧

MQTT底层封包协议

[复制链接]
跳转到指定楼层
楼主
ID:471375 发表于 2020-6-26 15:31 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。
在基于C的封包底层上有许多比较精妙的地方,可以参考看一看
  1. /**
  2.   ******************************************************************************
  3.   * @version V1.0.0
  4.   * @date    2019/12/15
  5.   * @brief   
  6.   ******************************************************************************
  7.        
  8.   ******************************************************************************
  9.   */

  10. #define MQTTCLIENT_C_//如果没有定义

  11. #include "mqtt_msg.h"
  12. #include "string.h"
  13. #include "stm32f10x.h"


  14. #define MQTT_MAX_FIXED_HEADER_SIZE 3

  15. uint16_t mqtt_message_id = 0;

  16. enum mqtt_connect_flag
  17. {
  18.   MQTT_CONNECT_FLAG_USERNAME = 1 << 7,
  19.   MQTT_CONNECT_FLAG_PASSWORD = 1 << 6,
  20.   MQTT_CONNECT_FLAG_WILL_RETAIN = 1 << 5,
  21.   MQTT_CONNECT_FLAG_WILL = 1 << 2,
  22.   MQTT_CONNECT_FLAG_CLEAN_SESSION = 1 << 1
  23. };
  24. //__attribute((__packed__))
  25. struct  mqtt_connect_variable_header
  26. {
  27.   uint8_t lengthMsb;
  28.   uint8_t lengthLsb;
  29.   uint8_t magic[4];
  30.   uint8_t version;
  31.   uint8_t flags;
  32.   uint8_t keepaliveMsb;
  33.   uint8_t keepaliveLsb;
  34. };


  35. int mqtt_get_type(char* buffer) { return (buffer[0] & 0xf0) >> 4; }
  36. int mqtt_get_connect_ret_code(char* buffer) { return (buffer[3]); }
  37. int mqtt_get_qos(char* buffer) { return (buffer[0] & 0x06) >> 1; }


  38. int append_string(int *length, char* buffer,int buffer_length, char* string, int len)
  39. {
  40.   if((*length) + len + 2 > buffer_length)//加上 ClientID 和 记录 ClientID个数(两位) 以后超出了数组
  41.     return -1;

  42.   buffer[(*length)++] = len >> 8;
  43.   buffer[(*length)++] = len & 0xff;
  44.   c_memcpy(buffer + (*length), string, len);
  45.   (*length) += len;
  46.   return len + 2;
  47. }



  48. uint16_t append_message_id(int *length, char* buffer,int buffer_length, uint16_t message_id)
  49. {
  50.   // If message_id is zero then we should assign one, otherwise
  51.   // we'll use the one supplied by the caller
  52.   while(message_id == 0)
  53.     message_id = ++mqtt_message_id;

  54.   if((*length) + 2 > buffer_length)
  55.     return 0;

  56.   buffer[(*length)++] = message_id >> 8;
  57.   buffer[(*length)++] = message_id & 0xff;
  58.        
  59.   return message_id;
  60. }


  61. int fini_message(char **data_ptr,int        length,char* buffer, int type, int dup, int qos, int retain)
  62. {
  63.   int remaining_length = length - MQTT_MAX_FIXED_HEADER_SIZE;
  64.        
  65.   if(remaining_length > 127)
  66.   {
  67.     buffer[0] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1);
  68.     buffer[1] = 0x80 | (remaining_length % 128);
  69.     buffer[2] = remaining_length / 128;
  70.     length = remaining_length + 3;
  71.     *data_ptr = buffer;
  72.   }
  73.   else
  74.   {
  75.     buffer[1] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1);
  76.     buffer[2] = remaining_length;
  77.     length = remaining_length + 2;
  78.     *data_ptr = buffer + 1;
  79.   }

  80.   return length;
  81. }



  82. uint16_t mqtt_get_id(char* buffer, uint16_t length)
  83. {
  84.   if(length < 1)
  85.     return 0;
  86.        
  87.   switch(mqtt_get_type(buffer))
  88.   {
  89.     case MQTT_MSG_TYPE_PUBLISH:
  90.     {
  91.       int i;
  92.       int topiclen;

  93.       for(i = 1; i < length; ++i)
  94.       {
  95.         if((buffer[i] & 0x80) == 0)
  96.         {
  97.           ++i;
  98.           break;
  99.         }
  100.       }

  101.       if(i + 2 >= length)
  102.         return 0;
  103.       topiclen = buffer[i++] << 8;
  104.       topiclen |= buffer[i++];

  105.       if(i + topiclen >= length)
  106.         return 0;
  107.       i += topiclen;

  108.       if(mqtt_get_qos(buffer) > 0)
  109.       {
  110.         if(i + 2 >= length)
  111.           return 0;
  112.         //i += 2;
  113.       } else {
  114.               return 0;
  115.       }
  116.       return (buffer[i] << 8) | buffer[i + 1];
  117.     }
  118.     case MQTT_MSG_TYPE_PUBACK:
  119.     case MQTT_MSG_TYPE_PUBREC:
  120.     case MQTT_MSG_TYPE_PUBREL:
  121.     case MQTT_MSG_TYPE_PUBCOMP:
  122.     case MQTT_MSG_TYPE_SUBACK:
  123.     case MQTT_MSG_TYPE_UNSUBACK:
  124.     case MQTT_MSG_TYPE_SUBSCRIBE:
  125.     {
  126.       // This requires the remaining length to be encoded in 1 byte,
  127.       // which it should be.
  128.       if(length >= 4 && (buffer[1] & 0x80) == 0)
  129.         return (buffer[2] << 8) | buffer[3];
  130.       else
  131.         return 0;
  132.     }

  133.     default:
  134.       return 0;
  135.   }
  136. }






  137. /**
  138. * @brief   打包连接MQTT指令
  139. * @param   info     MQTT信息
  140. * @param   data_ptr 打包的数据首地址
  141. * @param   buffer   打包进的数组
  142. * @param   buffer_length 数组长度
  143. * @retval  数据长度
  144. * @warning None
  145. * @example
  146. **/
  147. int mqtt_msg_connect(mqtt_connect_info_t* info,char **data_ptr,char* buffer,int buffer_length)
  148. {
  149.         int length;
  150.   struct mqtt_connect_variable_header* variable_header;
  151.        
  152.         mqtt_message_id = 0;
  153.        
  154.         length = MQTT_MAX_FIXED_HEADER_SIZE;//头.连接类型1位,数据个数2位(如果大于127就需要两位)
  155.        
  156.   if(length + sizeof(*variable_header) > buffer_length)//数组不够存储的
  157.     return 0;
  158.        
  159.   variable_header = (void*)(buffer + length);//把数组分给这个结构体里面的变量
  160.   length += sizeof(*variable_header);//存储完 连接类型,整个数据个数,版本号个数,版本号,等
  161.        
  162.   variable_header->lengthMsb = 0;//版本名称个数高位
  163.   variable_header->lengthLsb = 4;//版本名称个数低位
  164.   c_memcpy(variable_header->magic, "MQTT", 4);//版本名称MQTT
  165.   variable_header->version = 4;//版本号
  166.   variable_header->flags = 0;//先清零
  167.   variable_header->keepaliveMsb = info->keepalive >> 8;//心跳包时间
  168.   variable_header->keepaliveLsb = info->keepalive & 0xff;//心跳包时间

  169.   if(info->clean_session)//清除连接信息
  170.     variable_header->flags |= MQTT_CONNECT_FLAG_CLEAN_SESSION;

  171.   if(info->client_id != NULL && info->client_id[0] != '\0')//client_id
  172.   {
  173.     if(append_string(&length,buffer,buffer_length, info->client_id, c_strlen(info->client_id)) < 0)//拷贝
  174.       return -1;//数组不够用呀...
  175.   }
  176.   else
  177.     return -2;//没有设置client_id

  178.   if(info->will_topic != NULL && info->will_topic[0] != '\0')//遗嘱
  179.   {
  180.     if(append_string(&length,buffer,buffer_length , info->will_topic, c_strlen(info->will_topic)) < 0)//遗嘱的主题
  181.       return -3;

  182.     if(append_string(&length,buffer,buffer_length , info->will_message, c_strlen(info->will_message)) < 0)//遗嘱的消息
  183.       return -4;

  184.     variable_header->flags |= MQTT_CONNECT_FLAG_WILL;//需要遗嘱
  185.     if(info->will_retain)//遗嘱是够需要服务器保留
  186.       variable_header->flags |= MQTT_CONNECT_FLAG_WILL_RETAIN;//保留遗嘱
  187.     variable_header->flags |= (info->will_qos & 3) << 3;//遗嘱消息等级
  188.   }

  189.   if(info->username != NULL && info->username[0] != '\0')//username
  190.   {
  191.     if(append_string(&length,buffer,buffer_length, info->username, c_strlen(info->username)) < 0)//拷贝用户名
  192.       return -5;

  193.     variable_header->flags |= MQTT_CONNECT_FLAG_USERNAME;//有用户名
  194.   }
  195.        
  196.   if(info->password != NULL && info->password[0] != '\0')//password
  197.   {
  198.     if(append_string(&length,buffer,buffer_length, info->password, c_strlen(info->password)) < 0)
  199.       return -6;

  200.     variable_header->flags |= MQTT_CONNECT_FLAG_PASSWORD;//有密码
  201.   }

  202.   return fini_message(data_ptr,length, buffer, MQTT_MSG_TYPE_CONNECT, 0, 0, 0);//最终组合连接MQTT的指令
  203. }


  204. /**
  205. * @brief  判断是否连接上MQTT
  206. * @param  服务器返回的数据
  207. * @param  
  208. * @retval 0 连接成功
  209. * @example
  210. **/
  211. int  mqtt_msg_connect_ack(char *buff)
  212. {
  213.         if(mqtt_get_type(buff) == MQTT_MSG_TYPE_CONNACK)
  214.         {
  215.                 return mqtt_get_connect_ret_code(buff);
  216.         }
  217.         return -1;
  218. }


  219. /**
  220. * @brief   断开连接
  221. * @param   data_ptr 打包的数据首地址
  222. * @param   buffer   打包进的数组
  223. * @param   buffer_length 数组长度
  224. * @retval  数据长度
  225. * @warning None
  226. * @example
  227. **/
  228. int mqtt_msg_disconnect(char **data_ptr,char* buffer,int buffer_length)
  229. {
  230.         int length;
  231.         length = MQTT_MAX_FIXED_HEADER_SIZE;
  232.   return fini_message(data_ptr,length, buffer, MQTT_MSG_TYPE_DISCONNECT, 0, 0, 0);
  233. }



  234. /**
  235. * @brief   订阅主题
  236. * @param   topic   订阅的主题
  237. * @param   qos     消息等级
  238. * @param   data_ptr 打包的数据首地址
  239. * @param   buffer   打包进的数组
  240. * @param   buffer_length 数组长度
  241. * @retval  数据长度
  242. * @warning None
  243. * @example
  244. **/
  245. int mqtt_msg_subscribe_topic(char* topic, int qos,char **data_ptr,char* buffer,int buffer_length)
  246. {
  247.         int length;
  248.         length = MQTT_MAX_FIXED_HEADER_SIZE;
  249.        
  250.         if(topic == NULL || topic[0] == '\0')
  251.                 return -1;
  252.        
  253.         if((mqtt_message_id = append_message_id(&length, buffer, buffer_length, 0)) == 0)
  254.                 return -2;
  255.        
  256.         if(append_string(&length, buffer, buffer_length, topic, c_strlen(topic)) < 0)
  257.                 return -3;
  258.        
  259.         if(length + 1 > buffer_length)
  260.     return -4;
  261.   buffer[length++] = qos;
  262.        
  263.         return fini_message(data_ptr,length, buffer, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0);
  264. }



  265. /**
  266. * @brief  判断是否成功订阅
  267. * @param  buffer  服务器返回的数据
  268. * @param  length  服务器返回的数据长度
  269. * @retval 0:成功  1:失败
  270. * @example
  271. **/
  272. int mqtt_msg_subscribe_ack(char* buffer, uint16_t length)
  273. {
  274.         if(mqtt_get_type(buffer) == MQTT_MSG_TYPE_SUBACK)
  275.         {
  276.                 if(mqtt_get_id(buffer,length) == mqtt_message_id)
  277.                 {
  278.                         return 0;
  279.                 }
  280.                 else
  281.                 {
  282.                         return 1;
  283.                 }
  284.         }
  285.         else
  286.         {
  287.                 return 1;
  288.         }
  289. }


  290. /**
  291. * @brief   发布消息
  292. * @param   topic    主题
  293. * @param   data     消息
  294. * @param   data_length 消息长度
  295. * @param   qos      消息等级
  296. * @param   retain   是否需要保留消息
  297. * @param   data_ptr 打包的数据首地址
  298. * @param   buffer   打包进的数组
  299. * @param   buffer_length 数组长度
  300. * @retval  数据长度
  301. * @warning None
  302. * @example
  303. **/
  304. int mqtt_msg_publish(char* topic, char* date, int data_length, int qos, int retain, char **data_ptr,char* buffer,int buffer_length)
  305. {
  306.         int length;
  307.         length = MQTT_MAX_FIXED_HEADER_SIZE;

  308.   if(topic == NULL || topic[0] == '\0')
  309.     return -1;

  310.   if(append_string(&length, buffer, buffer_length, topic, strlen(topic)) < 0)
  311.     return -2;

  312.   if(qos > 0)
  313.   {
  314.     if((mqtt_message_id = append_message_id(&length, buffer, buffer_length,  0)) == 0)
  315.       return -3;
  316.   }
  317.   else
  318.     mqtt_message_id = 0;

  319.   if(length + data_length > buffer_length)
  320.     return -4;
  321.   memcpy(buffer + length, date, data_length);
  322.   length += data_length;

  323.   return fini_message(data_ptr,length, buffer, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain);
  324. }



  325. int mqtt_msg_puback(uint16_t message_id,char **data_ptr,char* buffer,int buffer_length)
  326. {
  327.         int length;
  328.   length = MQTT_MAX_FIXED_HEADER_SIZE;
  329.   if(append_message_id(&length, buffer, buffer_length,message_id) == 0)
  330.     return -1;
  331.   return fini_message(data_ptr,length, buffer, MQTT_MSG_TYPE_PUBACK, 0, 0, 0);
  332. }


  333. int mqtt_msg_pubrec(uint16_t message_id,char **data_ptr,char* buffer,int buffer_length)
  334. {
  335.         int length;
  336.   length = MQTT_MAX_FIXED_HEADER_SIZE;
  337.   if(append_message_id(&length, buffer, buffer_length,message_id) == 0)
  338.     return -1;
  339.   return fini_message(data_ptr,length, buffer, MQTT_MSG_TYPE_PUBREC, 0, 0, 0);
  340. }


  341. int mqtt_msg_pubrel(uint16_t message_id,char **data_ptr,char* buffer,int buffer_length)
  342. {
  343.         int length;
  344.   length = MQTT_MAX_FIXED_HEADER_SIZE;
  345.        
  346.   if(append_message_id(&length, buffer, buffer_length,message_id) == 0)
  347.     return -1;
  348.   return fini_message(data_ptr,length, buffer, MQTT_MSG_TYPE_PUBREL, 0, 1, 0);
  349. }


  350. int mqtt_msg_pubcomp(uint16_t message_id,char **data_ptr,char* buffer,int buffer_length)
  351. {
  352.         int length;
  353.   length = MQTT_MAX_FIXED_HEADER_SIZE;
  354.   if(append_message_id(&length, buffer, buffer_length,message_id) == 0)
  355.     return -1;
  356.   return fini_message(data_ptr,length, buffer, MQTT_MSG_TYPE_PUBCOMP, 0, 0, 0);
  357. }


  358. const char* mqtt_get_publish_topic(char* buffer, uint16_t* length)
  359. {
  360.   int i;
  361.   int totlen = 0;
  362.   int topiclen;

  363.   for(i = 1; i < *length; ++i)
  364.   {
  365.     totlen += (buffer[i] & 0x7f) << (7 * (i -1));
  366.     if((buffer[i] & 0x80) == 0)
  367.     {
  368.       ++i;
  369.       break;
  370.     }
  371.   }
  372.   totlen += i;

  373.   if(i + 2 >= *length)
  374.     return NULL;
  375.   topiclen = buffer[i++] << 8;
  376.   topiclen |= buffer[i++];

  377.   if(i + topiclen > *length)
  378.     return NULL;

  379.   *length = topiclen;
  380.   return (const char*)(buffer + i);
  381. }


  382. const char* mqtt_get_publish_data(char* buffer, uint16_t* length)
  383. {
  384.   int i;
  385.   int totlen = 0;
  386.   int topiclen;
  387.   int blength = *length;
  388.   *length = 0;

  389.   for(i = 1; i < blength; ++i)
  390.   {
  391.     totlen += (buffer[i] & 0x7f) << (7 * (i - 1));
  392.     if((buffer[i] & 0x80) == 0)
  393.     {
  394.       ++i;
  395.       break;
  396.     }
  397.   }
  398.   totlen += i;

  399.   if(i + 2 >= blength)
  400.     return NULL;
  401.   topiclen = buffer[i++] << 8;
  402.   topiclen |= buffer[i++];

  403.   if(i + topiclen >= blength)
  404.     return NULL;

  405.   i += topiclen;

  406.   if(mqtt_get_qos(buffer) > 0)
  407.   {
  408.     if(i + 2 >= blength)
  409.       return NULL;
  410.     i += 2;
  411.   }

  412.   if(totlen < i)
  413.     return NULL;

  414.   if(totlen <= blength)
  415.     *length = totlen - i;
  416.   else
  417.     *length = blength - i;
  418.   return (const char*)(buffer + i);
  419. }

  420. /**
  421. * @brief   打包服务器返回的心跳包数据(用不到)
  422. * @param   data_ptr 打包的数据首地址
  423. * @param   buffer   打包进的数组
  424. * @param   buffer_length 数组长度
  425. * @retval  数据长度
  426. * @warning None
  427. * @example
  428. **/
  429. int mqtt_msg_pingresp(char **data_ptr,char* buffer,int buffer_length)
  430. {
  431.         int length;
  432.         length = MQTT_MAX_FIXED_HEADER_SIZE;       
  433.   return fini_message(data_ptr,length, buffer, MQTT_MSG_TYPE_PINGRESP, 0, 0, 0);
  434. }

  435. /**
  436. * @brief   获取发送给服务器的心跳包数据
  437. * @param   data_ptr 打包的数据首地址
  438. * @param   buffer   打包进的数组
  439. * @param   buffer_length 数组长度
  440. * @retval  数据长度
  441. * @warning None
  442. * @example
  443. **/
  444. int mqtt_msg_pingreq(char **data_ptr,char* buffer,int buffer_length)
  445. {
  446.         int length;
  447.         length = MQTT_MAX_FIXED_HEADER_SIZE;       
  448.   return fini_message(data_ptr,length, buffer, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0);
  449. }


复制代码



MQTT封包解包底层(C语言).zip

4.14 KB, 下载次数: 27, 下载积分: 黑币 -5

分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享淘帖 顶 踩
回复

使用道具 举报

沙发
ID:767423 发表于 2020-7-7 13:00 | 只看该作者
资料不错,下来看看。如果来个教程就完美了。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|小黑屋|51黑电子论坛 |51黑电子论坛6群 QQ 管理员QQ:125739409;技术交流QQ群281945664

Powered by 单片机教程网

快速回复 返回顶部 返回列表