#include "mqtt_api.h" #include "MQTTPacket.h" #include #include #include char *mqttClientId="Device1|securemode=3,signmethod=hmacsha1|"; int keepalive=180; int cleansession=1; char *mqttUsername="Device1&hp8oQhMZJ67"; char *password="EB0DC81620DA35DA9C31BDE6B5339418FCE54E15"; uint8 buf[1024]={0}; int buflen = sizeof(buf); /******解析收到的ACK报文*********/ int mqtt_decode_msg(unsigned char*buf) { int rc = -1; MQTTHeader header = {0}; header.byte = buf[0]; rc = header.bits.type; return rc; } //连接MQTT服务器函数 uint8 connectMqtt(SOCKET Socket){ int len,rc,wait_ack_time=0; MQTTPacket_connectData data = MQTTPacket_connectData_initializer;//配置部分可变头部的值 data.clientID.cstring = mqttClientId; //客户端标识,用于区分每个客户端 data.keepAliveInterval = keepalive; //保活计时器,定义了服务器收到客户端消息的最大时间间隔 data.cleansession = cleansession; //该标志置1服务器必须丢弃之前保持的客户端的信息,将该连接视为“不存在” data.username.cstring = mqttUsername; data.password.cstring = password; memset(buf,0,buflen); len = MQTTSerialize_connect(buf, buflen, &data); /*1 构造连接报文*/ rc = send(Socket,buf,len); //发送连接请求 if(rc != len) { printf("Send Connect Error: rc=%d len=%d\n\r",rc,len); return 1; } //循环获取Connect Ack do{ delay_ms(10); len=getSn_RX_RSR(Socket); wait_ack_time++; if(wait_ack_time>MAX_OVERTIME) { printf("Wait CONNACK Overtime\n\r"); return 2; } }while(len<=0); recv(Socket,buf,len);//接收数据 判断是否为Connect Ack if(mqtt_decode_msg(buf)!=CONNACK){ printf("Error CONNACK:%s\n\r",buf); return 3; } printf("Connect Is Ok:%s\r\n",buf); return 0; } //PING服务器 保持存活 uint8 pingMqtt(SOCKET Socket){ int len,rc,wait_ack_time=0; memset(buf,0,buflen); len=MQTTSerialize_pingreq(buf,buflen); rc=send(Socket,buf, len); if(len!=rc) { printf("Send Ping Error: rc=%d len=%d\n\r",rc,len); return 1; } do{ delay_ms(10); len=getSn_RX_RSR(Socket); wait_ack_time++; if(wait_ack_time>MAX_OVERTIME) { printf("Wait PINGRESP Overtime\n\r"); return 2; } }while(len<=0); recv(Socket,buf,len); if(mqtt_decode_msg(buf) != PINGRESP){ printf("Error PINGRESP:%s\n\r",buf); return 3; } printf("PING Successfully:%s\n\r",buf); // if(len>2){ // if(mqtt_decode_msg(buf+2) == PUBLISH){ // dealMqtt(Socket,buf+2,buflen-2); // } // } return 0; } //MQTT发布消息函数 uint8 publishMqtt(SOCKET Socket,char *pTopic,char *pMessage) { int len,rc,wait_ack_time=0; MQTTString topicString = MQTTString_initializer; int msglen = strlen(pMessage);//计算发布消息的长度 memset(buf,0,buflen); topicString.cstring = pTopic; len = MQTTSerialize_publish(buf, buflen, 0, 1, 0, 0, topicString, (unsigned char*)pMessage, msglen); /*2 构造发布消息的报文*/ rc = send(Socket,buf,len); //发送消息 if (rc != len) { printf("Send Publish Error: rc=%d len=%d\n\r",rc,len); return 1; } do{ delay_ms(10); len=getSn_RX_RSR(Socket); wait_ack_time++; if(wait_ack_time>MAX_OVERTIME) { printf("Wait PUBACK Overtime\n\r"); return 2; } }while(len<=0); recv(Socket,buf,len); if(mqtt_decode_msg(buf) != PUBACK){ printf("error PUBACK:%s\n\r",buf); return 3; } printf("Publish Successfully:%s\n\r",buf); return 0; } //MQTT订阅函数 uint8 subscribMqtt(SOCKET Socket,char *pTopic) { int len,rc,wait_ack_time=0; MQTTString topicString = MQTTString_initializer; int msgid = 1; //该值为消息标识符 int req_qos = 1; memset(buf,0,buflen); topicString.cstring = pTopic; len = MQTTSerialize_subscribe(buf, buflen, 0, msgid, 1, &topicString, &req_qos);//构造订阅主题报文 rc = send(Socket,buf,len); //发送消息 if (rc != len) { printf("Send Subscrib Error: rc=%d len=%d\n\r",rc,len); return 1; } do{ delay_ms(10); len=getSn_RX_RSR(Socket); wait_ack_time++; if(wait_ack_time>MAX_OVERTIME) { printf("Wait SUBACK Overtime\n\r"); return 2; } }while(len<=0); recv(Socket,buf,len); if(mqtt_decode_msg(buf) != SUBACK){ printf("Error SUBACK:%s\n\r",buf); return 3; } printf("Subscrib Successfully:%s\n\r",buf); return 0; } //MQTT处理订阅消息函数 void dealPublish(SOCKET Socket,uint8 *msgbuf,uint16 msglen){ //////////获取订阅消息参数////////////// unsigned char dup; int qos; unsigned char retained; unsigned short mssageid; int payloadlen_in; unsigned char* payload_in; MQTTString receivedTopic; ///////////////////////////////////////// int len,rc; MQTTDeserialize_publish(&dup, &qos, &retained, &mssageid, &receivedTopic, &payload_in, &payloadlen_in, msgbuf, msglen); printf("message id: %d\n\r",mssageid); printf("message qos: %d\n\r",qos); printf("message receivedTopic: %s\n\r",receivedTopic.lenstring.data); printf("message arrived[%d]: %s\n\r", payloadlen_in, payload_in); if(qos>0){//需要回复 len = MQTTSerialize_puback(buf, buflen, mssageid);//构造ack报文 rc = send(Socket,buf,len); //发送消息 if(len!=rc)printf("Send PUBACK Error: rc=%d len=%d\n\r",rc,len); } } uint8 do_mqtt(SOCKET Socket,uint8 *sip,uint16 sport,uint16 lport){ //sport源端口 lport本地端口 static uint8 CONNECT_FLAG = 0; uint16 rlen; switch(getSn_SR(Socket)){ // get socket status case SOCK_INIT://init state connect(Socket,sip,sport);//此函数以活动(客户端)模式为通道建立连接。这个函数会一直等待,直到连接建立。 break; case SOCK_ESTABLISHED: //success to connect if(getSn_IR(Socket)&Sn_IR_CON)setSn_IR(Socket,Sn_IR_CON);//getSn_IR,获取Socket中断状态;Sn_IR_CON,已建立连接 if(!CONNECT_FLAG)if(!connectMqtt(Socket))CONNECT_FLAG=1; rlen=getSn_RX_RSR(Socket);//获取socketRX接收大小 if(rlen>0){ recv(Socket,buf,rlen); switch(mqtt_decode_msg(buf)){ case PUBLISH: dealPublish(Socket,buf,rlen); break; default: printf("recv:%s",buf); break; } } break; case SOCK_CLOSE_WAIT: disconnect(Socket); close(Socket); break; case SOCK_CLOSED: CONNECT_FLAG=0; socket(Socket,Sn_MR_TCP,lport,Sn_MR_ND); break; } return CONNECT_FLAG; }