You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
incubator_embeded/MQTT/mqtt_api.c

229 lines
6.0 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#include "mqtt_api.h"
#include "MQTTPacket.h"
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
char *mqttClientId="Device1|securemode=3,signmethod=hmacsha1|";
int keepalive=180;
int cleansession=1;
char *mqttUsername="Device1&hp8oQhMZJ67";
char *password="EB0DC81620DA35DA9C31BDE6B5339418FCE54E15";
uint8 buf[1024]={0};
int buflen = sizeof(buf);
/******解析收到的ACK报文*********/
int mqtt_decode_msg(unsigned char*buf)
{
int rc = -1;
MQTTHeader header = {0};
header.byte = buf[0];
rc = header.bits.type;
return rc;
}
//连接MQTT服务器函数
uint8 connectMqtt(SOCKET Socket){
int len,rc,wait_ack_time=0;
MQTTPacket_connectData data = MQTTPacket_connectData_initializer;//配置部分可变头部的值
data.clientID.cstring = mqttClientId; //客户端标识,用于区分每个客户端
data.keepAliveInterval = keepalive; //保活计时器,定义了服务器收到客户端消息的最大时间间隔
data.cleansession = cleansession; //该标志置1服务器必须丢弃之前保持的客户端的信息将该连接视为“不存在”
data.username.cstring = mqttUsername;
data.password.cstring = password;
memset(buf,0,buflen);
len = MQTTSerialize_connect(buf, buflen, &data); /*1 构造连接报文*/
rc = send(Socket,buf,len); //发送连接请求
if(rc != len)
{
printf("Send Connect Error: rc=%d len=%d\n\r",rc,len);
return 1;
}
//循环获取Connect Ack
do{
delay_ms(10);
len=getSn_RX_RSR(Socket);
wait_ack_time++;
if(wait_ack_time>MAX_OVERTIME)
{
printf("Wait CONNACK Overtime\n\r");
return 2;
}
}while(len<=0);
recv(Socket,buf,len);//接收数据 判断是否为Connect Ack
if(mqtt_decode_msg(buf)!=CONNACK){
printf("Error CONNACK:%s\n\r",buf);
return 3;
}
printf("Connect Is Ok:%s\r\n",buf);
return 0;
}
//PING服务器 保持存活
uint8 pingMqtt(SOCKET Socket){
int len,rc,wait_ack_time=0;
memset(buf,0,buflen);
len=MQTTSerialize_pingreq(buf,buflen);
rc=send(Socket,buf, len);
if(len!=rc)
{
printf("Send Ping Error: rc=%d len=%d\n\r",rc,len);
return 1;
}
do{
delay_ms(10);
len=getSn_RX_RSR(Socket);
wait_ack_time++;
if(wait_ack_time>MAX_OVERTIME)
{
printf("Wait PINGRESP Overtime\n\r");
return 2;
}
}while(len<=0);
recv(Socket,buf,len);
if(mqtt_decode_msg(buf) != PINGRESP){
printf("Error PINGRESP:%s\n\r",buf);
return 3;
}
printf("PING Successfully:%s\n\r",buf);
// if(len>2){
// if(mqtt_decode_msg(buf+2) == PUBLISH){
// dealMqtt(Socket,buf+2,buflen-2);
// }
// }
return 0;
}
//MQTT发布消息函数
uint8 publishMqtt(SOCKET Socket,char *pTopic,char *pMessage)
{
int len,rc,wait_ack_time=0;
MQTTString topicString = MQTTString_initializer;
int msglen = strlen(pMessage);//计算发布消息的长度
memset(buf,0,buflen);
topicString.cstring = pTopic;
len = MQTTSerialize_publish(buf, buflen, 0, 1, 0, 0, topicString, (unsigned char*)pMessage, msglen); /*2 构造发布消息的报文*/
rc = send(Socket,buf,len); //发送消息
if (rc != len)
{
printf("Send Publish Error: rc=%d len=%d\n\r",rc,len);
return 1;
}
do{
delay_ms(10);
len=getSn_RX_RSR(Socket);
wait_ack_time++;
if(wait_ack_time>MAX_OVERTIME)
{
printf("Wait PUBACK Overtime\n\r");
return 2;
}
}while(len<=0);
recv(Socket,buf,len);
if(mqtt_decode_msg(buf) != PUBACK){
printf("error PUBACK:%s\n\r",buf);
return 3;
}
printf("Publish Successfully:%s\n\r",buf);
return 0;
}
//MQTT订阅函数
uint8 subscribMqtt(SOCKET Socket,char *pTopic)
{
int len,rc,wait_ack_time=0;
MQTTString topicString = MQTTString_initializer;
int msgid = 1; //该值为消息标识符
int req_qos = 1;
memset(buf,0,buflen);
topicString.cstring = pTopic;
len = MQTTSerialize_subscribe(buf, buflen, 0, msgid, 1, &topicString, &req_qos);//构造订阅主题报文
rc = send(Socket,buf,len); //发送消息
if (rc != len)
{
printf("Send Subscrib Error: rc=%d len=%d\n\r",rc,len);
return 1;
}
do{
delay_ms(10);
len=getSn_RX_RSR(Socket);
wait_ack_time++;
if(wait_ack_time>MAX_OVERTIME)
{
printf("Wait SUBACK Overtime\n\r");
return 2;
}
}while(len<=0);
recv(Socket,buf,len);
if(mqtt_decode_msg(buf) != SUBACK){
printf("Error SUBACK:%s\n\r",buf);
return 3;
}
printf("Subscrib Successfully:%s\n\r",buf);
return 0;
}
//MQTT处理订阅消息函数
void dealPublish(SOCKET Socket,uint8 *msgbuf,uint16 msglen){
//////////获取订阅消息参数//////////////
unsigned char dup;
int qos;
unsigned char retained;
unsigned short mssageid;
int payloadlen_in;
unsigned char* payload_in;
MQTTString receivedTopic;
/////////////////////////////////////////
int len,rc;
MQTTDeserialize_publish(&dup, &qos, &retained, &mssageid, &receivedTopic,
&payload_in, &payloadlen_in, msgbuf, msglen);
printf("message id: %d\n\r",mssageid);
printf("message qos: %d\n\r",qos);
printf("message receivedTopic: %s\n\r",receivedTopic.lenstring.data);
printf("message arrived[%d]: %s\n\r", payloadlen_in, payload_in);
if(qos>0){//需要回复
len = MQTTSerialize_puback(buf, buflen, mssageid);//构造ack报文
rc = send(Socket,buf,len); //发送消息
if(len!=rc)printf("Send PUBACK Error: rc=%d len=%d\n\r",rc,len);
}
}
uint8 do_mqtt(SOCKET Socket,uint8 *sip,uint16 sport,uint16 lport){ //sport源端口 lport本地端口
static uint8 CONNECT_FLAG = 0;
uint16 rlen;
switch(getSn_SR(Socket)){ // get socket status
case SOCK_INIT://init state
connect(Socket,sip,sport);//此函数以活动(客户端)模式为通道建立连接。这个函数会一直等待,直到连接建立。
break;
case SOCK_ESTABLISHED: //success to connect
if(getSn_IR(Socket)&Sn_IR_CON)setSn_IR(Socket,Sn_IR_CON);//getSn_IR获取Socket中断状态Sn_IR_CON已建立连接
if(!CONNECT_FLAG)if(!connectMqtt(Socket))CONNECT_FLAG=1;//connectMqtt连接服务器
printf("success connect aliyun!!!\r\n");
rlen=getSn_RX_RSR(Socket);//获取socketRX接收大小
if(rlen>0){
recv(Socket,buf,rlen);
switch(mqtt_decode_msg(buf)){//解析收到的ACK报文
case PUBLISH:
dealPublish(Socket,buf,rlen);
break;
default:
printf("recv:%s",buf);
break;
}
}
break;
case SOCK_CLOSE_WAIT:
disconnect(Socket);
close(Socket);
break;
case SOCK_CLOSED:
CONNECT_FLAG=0;
socket(Socket,Sn_MR_TCP,lport,Sn_MR_ND);
break;
}
return CONNECT_FLAG;
}