C语言基于paho实现MQTT客户端实战案例
- 目标
- 说明
- 项目代码
-
- make文件 mqttClient.pro
- 项目入口 main.c
- 链表 list.h
- 消息队列 queue.h
- 消息队列 queue.c
- MQTT客户端模块 mqttClientMgr.h
- MQTT客户端模块 mqttClientMgr.c
- 项目验证
- 附录
目标
1.实现MQTT客户端,可以订阅主题进行处理,也可以进行主题消息发布。
2.建立缓存队列进行MQTT消息的处理。
说明
本案例在centos虚拟机中用QT CREATOR运行,所以没有单独的makefile,利用了QT CREATOR提供的qmake。
CPU类型为X86。
项目文件列表如下,日志模块可以参考博文 嵌入式系统简易日志模块搭建
log.c和log.h的代码就不再贴了。需要的话可以去下整个工程,工程下载链接放在文章最后。
不算日志模块的话整个工程的代码量也就千行左右。
验证的时候需要装个mosquitto—mqtt的服务端。MQTT的原理可以去百度下。默认端口一般是1883.
项目代码
make文件 mqttClient.pro
TEMPLATE = app
CONFIG += console
CONFIG -= app_bundle
CONFIG -= qtSOURCES += main.c log/log.c mqtt/mqttClientMgr.c queue/queue.c HEADERS += log/log.h mqtt/mqttClientMgr.h include/list.h queue/queue.h INCLUDEPATH += $$PWD/log $$PWD/mqtt $$PWD/include $$PWD/queue LIBS += -pthread
LIBS += -ldlLIBS += -lpaho-mqtt3c
LIBS += -ltinyxml2
LIBS += /usr/lib64/libsqlite3.so.0.8.6#LIBS += /home/ymj/mqttClient/bin/lib_arm/libpaho-mqtt3c.so.1.0
#LIBS += -L/home/ymj/mqttClient/bin/lib_arm -ltinyxml -lpaho-mqtt3c
#LIBS += /home/ymj/mqttClient/bin/lib_arm/libsqlite3.so.0.8.6LIBS += -lrtDESTDIR=$$PWD/binQMAKE_LFLAGS += -Wl,-Bsymbolic
QMAKE_LFLAGS += -Wl,-rpath=./lib
链的库有点多,除了paho还有数据库sqlite3和tinyxml2,之后也许会抽空写下这俩东东的使用方法。
自己运行的时候电脑里没有这俩库的话可以把这两行删掉,只要有paho库就可以。
文章最后会提供整个工程的下载,里面有这三个第三方库的arm和x86版本。
项目入口 main.c
#include <stdio.h>
#include <string.h>
#include <dirent.h>
#include <stdlib.h>
#include <unistd.h>#include "log.h"
#include "mqttClientMgr.h"#define WORK_DIR "../"int changeWorkDir(void)
{char dir[128] = {0};if(chdir(WORK_DIR) == -1){LS_LOG(LOG_ERROR, "Could not chdir to "%s": abortingn", WORK_DIR);return LS_ERR;}getcwd(dir, sizeof(dir));LS_LOG(LOG_INFO, "---workdir:%s---n", dir);return LS_OK;
}/*C语言调用paho实现MQTT客户端
*/
int main()
{//日志模块logDataInit();logManage();//修改工作目录changeWorkDir();LS_LOG(LOG_INFO, "---mqttClient start---n");void *thread_reval = NULL;//MQTT客户端mqttClientInit();threadWaitDone(&thread_reval);if(thread_reval != NULL){LS_LOG(LOG_DEBUG, "thread_reval : %s is not nulln", (char *)thread_reval);}//不会运行至此LS_LOG(LOG_INFO, "---robot end---n");return 0;}
没啥说的,mqttClientInit()接口为本文的重点内容。
threadWaitDone(&thread_reval)这玩意保证了编出来的进程会一直运行,不会一下就执行完return了。
调用的头文件可能有点多,继续往后走。
由于需要用到链表和队列,下面先把这两个小工具的代码贴出来。
链表 list.h
/*LIST*/
/**************************************************************************** 服务接口定义*****************************************************************************/struct list_node
{struct list_node *next; /**< point to next node. */struct list_node *prev; /**< point to prev node. */
};
typedef struct list_node list_t; /**< Type for lists. */struct slist_node
{struct slist_node *next; /**< point to next node. */
};
typedef struct slist_node slist_t;/****************************************************************************** * 根据结构体成员地址返回结构体指针*****************************************************************************/
#define container_of(ptr, type, member) ((type *)((char *)(ptr) - (unsigned long)(&((type *)0)->member)))/**************************************************************************** * 初始化一个链表对象*****************************************************************************/
#define LIST_HEAD_INIT(name) { &(name), &(name) }#define LIST_HEAD(name) struct list_node name = LIST_HEAD_INIT(name)
/****************************************************************************** * 初始化链表*****************************************************************************/
static inline void list_init(list_t *l)
{l->next = l->prev = l;
}/****************************************************************************** * 在链表尾处插入节点*****************************************************************************/
static inline void list_insert_after(list_t *l, list_t *n)
{l->next->prev = n;n->next = l->next;l->next = n;n->prev = l;
}/****************************************************************************** * 在链表头处插入节点*****************************************************************************/
static inline void list_insert_before(list_t *l, list_t *n)
{l->prev->next = n;n->prev = l->prev;l->prev = n;n->next = l;
}/****************************************************************************** * 移除节点*****************************************************************************/
static inline void list_remove(list_t *n)
{n->next->prev = n->prev;n->prev->next = n->next;n->next = n->prev = n;
}/****************************************************************************** * 判断链表是否为空*****************************************************************************/
static inline int list_isempty(const list_t *l)
{return l->next == l;
}/****************************************************************************** * 获取联表节点个数*****************************************************************************/
static inline unsigned int list_len(const list_t *l)
{unsigned int len = 0;const list_t *p = l;while (p->next != l){p = p->next;len ++;}return len;
}/****************************************************************************** * 获取节点所属结构体指针*****************************************************************************/
#define list_entry(node, type, member) container_of(node, type, member)/****************************************************************************** * 遍历链表节点*****************************************************************************/
#define list_for_each(pos, head) for (pos = (head)->next; pos != (head); pos = pos->next)/****************************************************************************** * 安全遍历链表节点*****************************************************************************/
#define list_for_each_safe(pos, n, head) for (pos = (head)->next, n = pos->next; pos != (head); pos = n, n = pos->next)/****************************************************************************** * 遍历链表节点所属结构体*****************************************************************************/
#define list_for_each_entry(pos, head, member) for (pos = list_entry((head)->next, typeof(*pos), member); &pos->member != (head); pos = list_entry(pos->member.next, typeof(*pos), member))/****************************************************************************** * 安全遍历链表节点所属结构体*****************************************************************************/
#define list_for_each_entry_safe(pos, n, head, member) for (pos = list_entry((head)->next, typeof(*pos), member), n = list_entry(pos->member.next, typeof(*pos), member); &pos->member != (head); pos = n, n = list_entry(n->member.next, typeof(*n), member))/***************************************************************************** * rt_list_first_entry - 获取链表中的第一个节点* * @ptr: the list head to take the element from.* * @type: the type of the struct this is embedded in.* * @member: the name of the list_struct within the struct.* ** * 注意:that list is expected to be not empty.****************************************************************************/
#define list_first_entry(ptr, type, member) list_entry((ptr)->next, type, member)/**************************************************************************** * @brief 单链表初始化* ** * @param l the single list to be initialized***************************************************************************/
static inline void slist_init(slist_t *l)
{l->next = NULL;
}static inline void slist_append(slist_t *l, slist_t *n)
{struct slist_node *node;node = l;while (node->next) node = node->next;/* append the node to the tail */node->next = n;n->next = NULL;
}static inline void slist_insert(slist_t *l, slist_t *n)
{n->next = l->next;l->next = n;
}static inline unsigned int slist_len(const slist_t *l)
{unsigned int len = 0;const slist_t *list = l->next;while (list != NULL){list = list->next;len ++;}return len;
}static inline slist_t *slist_remove(slist_t *l, slist_t *n)
{/* remove slist head */struct slist_node *node = l;while (node->next && node->next != n) node = node->next;/* remove node */if (node->next != (slist_t *)0) node->next = node->next->next;return l;
}static inline slist_t *slist_first(slist_t *l)
{return l->next;
}static inline slist_t *slist_tail(slist_t *l)
{while (l->next) l = l->next;return l;
}static inline slist_t *slist_next(slist_t *n)
{return n->next;
}
消息队列 queue.h
typedef struct QUEUE_ST
{char* data;int ElemNum;int ElemSize;int front;int rear;
}QueueSt;/***************************************************************************
function: QueueCreate
input:
output:
Description:提供统一队列创建函数
****************************************************************************/
QueueSt *QueueCreate(int ElementSize,int ElementNum);/***************************************************************************
function: IsQueueEmpty
input:
output:
Description:判断队列是否为空
****************************************************************************/
int IsQueueEmpty(QueueSt *queue);/***************************************************************************
function: EnQueue
input:
output:
Description:入队操作
****************************************************************************/
int EnQueue(QueueSt *queue, void* InData,int size);/***************************************************************************
function: ExQueue
input:
output:
Description:出队操作
****************************************************************************/
int ExQueue(QueueSt *queue, void *OutData,int size);
消息队列 queue.c
#include <string.h>#include "queue.h"
#include "log.h"/***************************************************************************
function: MqttQueueCreate
input:
output:
Description:创建队列
****************************************************************************/
QueueSt *QueueCreate(int ElementSize,int ElementNum)
{QueueSt *queue = (QueueSt *)malloc(sizeof(QueueSt));if(NULL == queue){LS_LOG(LOG_ERROR, "malloc fail!n");return NULL;}memset((char *)queue, 0, sizeof(QueueSt));queue->data =(char*)malloc((unsigned long)(ElementSize * ElementNum));if(NULL == queue->data){LS_LOG(LOG_ERROR, "malloc fail !n");return NULL;}queue->ElemNum = ElementNum;queue->ElemSize = ElementSize;queue->rear = 0;queue->front = 0;return queue;
}/***************************************************************************
function: IsQueueEmpty
input:
output:
Description:判断队列是否为空
****************************************************************************/
int IsQueueEmpty(QueueSt *queue)
{if(NULL == queue){LS_LOG(LOG_ERROR, "queue is NULL!n");return LS_ERR;}return (queue->front == queue->rear ? TRUE : FALSE); //空为TRUE,否则为FALSE
}
/***************************************************************************
function: ExQueue
input:
output:
Description:出队操作
****************************************************************************/
int ExQueue(QueueSt *queue, void *OutData, int size)
{if(NULL == queue || NULL == OutData){LS_LOG(LOG_ERROR, "input para is NULL!n");return LS_ERR;}if(TRUE == IsQueueEmpty(queue)){LS_LOG(LOG_INFO, "queue empty!n");return LS_ERR;}if(size > queue->ElemSize){LS_LOG(LOG_ERROR, "error:size > ElemSize!n");return LS_ERR;}if (OutData != NULL){memcpy(OutData,queue->data + queue->front * queue->ElemSize, (unsigned long)size);}else{LS_LOG(LOG_ERROR, "OutData is NULL!n");return LS_ERR;}queue->front = (queue->front + 1) % queue->ElemNum; //++队首 % Nreturn LS_OK;}/***************************************************************************
function: IsQueueFull
input:
output:
Description:判断队列是否为满
****************************************************************************/
int IsQueueFull(QueueSt *queue)
{if(NULL == queue){LS_LOG(LOG_ERROR,"queue is NULL!n");return LS_ERR;}return ((queue->rear + 1) % queue->ElemNum == queue->front ? TRUE : FALSE); //空为TRUE,否则为FALSE
}/***************************************************************************
function: EnQueue
input:
output:
Description:入队操作
****************************************************************************/
int EnQueue(QueueSt *queue, void* InData, int size)
{if(NULL == queue || NULL == InData){LS_LOG(LOG_ERROR, "input para is NULL!n");return LS_ERR;}if(TRUE == IsQueueFull(queue)){LS_LOG(LOG_ERROR, "queue is full!n");return LS_ERR;}if(size > queue->ElemSize){LS_LOG(LOG_ERROR, "error:size > ElemSize!n");return LS_ERR;}memcpy(queue->data + queue->rear * queue->ElemSize, (char *)InData, (unsigned long)size);queue->rear = (queue->rear + 1) % queue->ElemNum; //++队尾 % Nreturn LS_OK;
}
接下来就是本文的重头戏了。
MQTT客户端模块 mqttClientMgr.h
#include <pthread.h>
#include <semaphore.h>#include "MQTTClient.h"
#include "list.h"
#include "queue.h"#define CLIENTID "ExampleClientSub"
#define TOPIC "MQTT Examples"
#define PAYLOAD "Hello World!"
#define QOS 0
#define TIMEOUT 10000L
#define DISCONNECT "out"/**********订阅主题**********/
#define SUB_TOPIC_NUM 1#define TOPIC_SUB_IOT_REG "v1/127001/response/iotregister"/********发布主题*********/
#define PUB_TOPIC_NUM 1#define TOPIC_PUB_REG_DEVICE_ID 0
#define TOPIC_PUB_REG_DEVICE "v1/127001/sensor/add"typedef struct SEND_RECORD_ST
{long long mid;char type[64];list_t list;
}SendRecordSt;typedef struct MQTT_PUB_DATA_ST
{int TopicId;int len;char* data;
}MqttPubDataSt;typedef struct MQTT_SUB_DATA_ST
{int clientId;//用于区分不同的mqtt客户端int len;char *topic;char *data;
}MqttSubDataSt;typedef struct MQTT_PUB_MGR_ST
{MqttPubDataSt stMqttPubData;QueueSt *pMqttPubQueue;list_t PubDataList;pthread_mutex_t lock;//访问互斥锁sem_t QueueWaitsem;sem_t DeliverOksem;
}MqttPubMgrSt;typedef struct MQTT_SUB_MGR_ST
{MqttSubDataSt stMqttSubData;QueueSt *pMqttSubQueue;pthread_mutex_t lock;//访问互斥锁sem_t QueueWaitsem;
}MqttSubMgrSt;typedef struct MQTT_MGR_ST
{MQTTClient MqttClient;int MqttConnectFlag;int StartBusinessFlag;long long MessageId;pthread_mutex_t lock;//访问互斥锁MqttPubMgrSt stPubMgr;MqttSubMgrSt stSubMgr;list_t AddFailList;int ReAddCnt;sem_t ReAddSem;pthread_mutex_t Addlock;//访问互斥锁list_t ActivateFailList;int ReActivateCnt;sem_t ReActivateSem;pthread_mutex_t Activatelock;//访问互斥锁list_t UpdateFailList;int ReUpdateCnt;sem_t ReUpdateSem;pthread_mutex_t Updatelock;//访问互斥锁
}MqttMgrSt;void threadWaitDone(void **reval);
int mqttClientInit(void);
新增主题或者删除主题的订阅或者发布需要修改SUB_TOPIC_NUM和PUB_TOPIC_NUM ,不然会数组越界挂掉,看到mqttClientMgr.c就知道原因了。
加了一些锁和信号量。
MQTT客户端模块 mqttClientMgr.c
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <time.h>
#include <unistd.h>#include "mqttClientMgr.h"
#include "log.h"static MqttMgrSt stMqttMgr;
static volatile MQTTClient_deliveryToken deliveredtoken;
static pthread_t MqttClientThreadId;
static pthread_t ProcSubQueueThreadId;#define PUB_QUEUE_ELEM_NUM 200
#define SUB_QUEUE_ELEM_NUM 500
#define MAX_BROKER_ADDR_LEN 32
#define MQTT_PLATFORM_ENABLE (1)#define BROKER_IP "127.0.0.1"
#define BROKER_PORT "1883"
#define CLIENT_ID "anyhow"
#define USER_NAME "mosquitto"
#define PWD "ls@123"static char * SubTopics[SUB_TOPIC_NUM]={TOPIC_SUB_IOT_REG};
static char * PubTopics[PUB_TOPIC_NUM]={TOPIC_PUB_REG_DEVICE};void threadWaitDone(void **reval)
{pthread_join(MqttClientThreadId, reval);
}void SetThreadAttr(pthread_attr_t *thread_attr, unsigned int priority, int policy, size_t stack_size)
{size_t StackSize = 0;pthread_attr_init(thread_attr);//首先需要对属性变量进行初始化pthread_attr_setscope(thread_attr,PTHREAD_SCOPE_PROCESS);pthread_attr_setdetachstate(thread_attr,PTHREAD_CREATE_JOINABLE);pthread_attr_setschedpolicy(thread_attr,policy);struct sched_param param;pthread_attr_getschedparam(thread_attr, ¶m);param.sched_priority = (int)priority;pthread_attr_setschedparam(thread_attr, ¶m);if(stack_size < 0x4000) //以字节为单位 最小16k{StackSize = 0x4000;}pthread_attr_setstacksize(thread_attr,StackSize);
}int CreateThread(pthread_t *thread_id, const pthread_attr_t *attr,void *(*thread_fun) (void *), void *thread_arg)
{int ret = -1;ret = pthread_create(thread_id,attr,thread_fun,thread_arg);return(ret);
}/***************************************************************************
function: GetBrokerAddrinput:
output:
Description:通过配置构建MQTT broker地址
****************************************************************************/
static int GetBrokerAddr(char *BrokerAddr,int len)
{unsigned int size = 0;char BrokerPortStr[6]; //格式:protocol://host:portif(NULL == BrokerAddr){LS_LOG(LOG_ERROR, "GetBrokerAddr: BrokerAddr is NULL.n");return LS_ERR;}memset(BrokerPortStr, 0, sizeof(BrokerPortStr));memcpy(BrokerAddr, "tcp://", strlen("tcp://"));size = (unsigned int)len - strlen("tcp://");if(strlen((char*)BROKER_IP) > size){LS_LOG(LOG_ERROR, "GetBrokerAddr: BrokerAddr buf is not enough!n");return LS_ERR;}strcat(BrokerAddr, (char*)BROKER_IP);size = size - strlen((char*)BROKER_IP);if(size < 1){LS_LOG(LOG_ERROR, "GetBrokerAddr: BrokerAddr buf is not enough!n");return LS_ERR;}strcat(BrokerAddr, ":");strcat(BrokerAddr, BROKER_PORT);/*size = size - 1;TransIntToStr(BROKER_PORT,BrokerPortStr);if(size < (strlen(BROKER_PORT) + 1)){LS_LOG(LOG_ERROR,"GetBrokerAddr: BrokerAddr buf is not enough !n");return LS_ERR;}strcat(BrokerAddr,BrokerPortStr);*/return LS_OK;
}/***************************************************************************
function: MqttSubscribeAllTopics
input:
output:
Description:客户端初始化的时候完成所有主题的订阅
****************************************************************************/
void MqttSubscribeAllTopics(void)
{int i = 0;for(i = 0;i < SUB_TOPIC_NUM; i++){LS_LOG(LOG_INFO, "Subscribing topic %snfor client %s using QoS %dnn",SubTopics[i], CLIENTID, QOS);MQTTClient_subscribe(stMqttMgr.MqttClient, SubTopics[i], QOS);}
}/***************************************************************************
function: MqttClientRelease
input:
output:
Description:MQTT客户端释放
****************************************************************************/
void MqttClientRelease(MQTTClient client)
{MQTTClient_unsubscribe(client, TOPIC);MQTTClient_disconnect(client, 10000);MQTTClient_destroy(&client);
}/***************************************************************************
function: connlost
input:
output:
Description:客户端注册的连接失败回调函数
****************************************************************************/
void connlost(void *context, char *cause)
{LS_LOG(LOG_ERROR, "nConnection lostn");LS_LOG(LOG_ERROR, " cause: %sn", cause);stMqttMgr.MqttConnectFlag = 0;sem_post(&stMqttMgr.stPubMgr.QueueWaitsem);
}/***************************************************************************
function: MqttTransData
input:
output:
Description:MqttTransData
****************************************************************************/
int MqttTransData(MqttSubDataSt *tdata)
{int ret = LS_ERR;pthread_mutex_lock(&stMqttMgr.stSubMgr.lock);ret= EnQueue(stMqttMgr.stSubMgr.pMqttSubQueue, (char *)tdata, sizeof(MqttSubDataSt));pthread_mutex_unlock(&stMqttMgr.stSubMgr.lock);if(LS_OK == ret){sem_post(&stMqttMgr.stSubMgr.QueueWaitsem);}return ret;
}/***************************************************************************
function: msgarrvd
input:
output:
Description:客户端注册的收到订阅消息回调函数
****************************************************************************/
int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{char* payloadptr = NULL;MqttSubDataSt stMqttSubData;char* pData = NULL;LS_LOG(LOG_DEBUG, "Message arrived on topic: %sn", topicName);memset(&stMqttSubData, 0, sizeof(MqttSubDataSt));stMqttSubData.clientId = 0;payloadptr = message->payload;if(strcmp(payloadptr, DISCONNECT) == 0){LS_LOG(LOG_WARN, " DISCONNECTn out!!n");stMqttMgr.MqttConnectFlag = FALSE;}LS_LOG(LOG_DEBUG, "message payload: %sn", (char *)message->payload);pData = malloc((unsigned long)message->payloadlen); //由队列处理线程free pDataif(NULL == pData){LS_LOG(LOG_ERROR, "malloc failed!!n");goto fail;}memset(pData, 0, (unsigned long)message->payloadlen);memcpy(pData, payloadptr, (unsigned long)message->payloadlen);stMqttSubData.data = pData;stMqttSubData.len = message->payloadlen;stMqttSubData.topic = topicName;if(LS_OK != MqttTransData(&stMqttSubData))//成功,由队列处理线程free pData{LS_LOG(LOG_WARN, "MqttTransData failed!!n");free(stMqttSubData.data);}
fail:MQTTClient_freeMessage(&message);MQTTClient_free(topicName);return 1;
}/***************************************************************************
function: delivered
input:
output:
Description:客户端注册的消息delivered回调函数
****************************************************************************/
void delivered(void *context, MQTTClient_deliveryToken dt)
{LS_LOG(LOG_INFO, "Message with token value %d delivery confirmedn", dt);deliveredtoken = dt;
}/***************************************************************************
function: MqttClientThread()
input:
output:
Description:MQTT客户端处理线程
****************************************************************************/
void *MqttClientThread()
{int rc = 0;MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;MQTTClient_message pubmsg = MQTTClient_message_initializer;MQTTClient_deliveryToken token = 0;char BrokerAddr[MAX_BROKER_ADDR_LEN];memset(BrokerAddr, 0, sizeof(BrokerAddr));if(LS_OK != GetBrokerAddr(BrokerAddr,MAX_BROKER_ADDR_LEN)){LS_LOG(LOG_ERROR, "GetBrokerAddr Err.n");return NULL;}LS_LOG(LOG_INFO, "BrokerAddr: %sn", BrokerAddr);MQTTClient_create(&stMqttMgr.MqttClient, BrokerAddr, (char*)CLIENT_ID, MQTTCLIENT_PERSISTENCE_NONE, NULL);if(NULL == stMqttMgr.MqttClient){LS_LOG(LOG_ERROR,"MQTTClient_create g_MottClient fail!n");return NULL;}conn_opts.keepAliveInterval = 20;conn_opts.cleansession = 1;conn_opts.username = (char*)USER_NAME;conn_opts.password = (char*)PWD;MQTTClient_setCallbacks(stMqttMgr.MqttClient, NULL, connlost, msgarrvd, delivered);ReLink:do{LS_LOG(LOG_INFO, "start to connect MQTT server...n");if ((rc = MQTTClient_connect(stMqttMgr.MqttClient, &conn_opts)) == MQTTCLIENT_SUCCESS){LS_LOG(LOG_INFO, "connect success!n");stMqttMgr.MqttConnectFlag = TRUE;break;}else{LS_LOG(LOG_ERROR, "Failed to connect, return code %dn", rc);sleep(20); //时间可配置}}while(1);//完成所有主题的订阅初始化if(MQTT_PLATFORM_ENABLE){MqttSubscribeAllTopics();}//客户端连接正常情况下进行信息发布while(TRUE == stMqttMgr.MqttConnectFlag) //发布消息{sem_wait(&stMqttMgr.stPubMgr.QueueWaitsem);//LS_LOG(LOG_INFO,"rev QueueWaitsem !!n");pthread_mutex_lock(&stMqttMgr.stPubMgr.lock);do{if(LS_OK == ExQueue(stMqttMgr.stPubMgr.pMqttPubQueue, (void*)&stMqttMgr.stPubMgr.stMqttPubData, sizeof(MqttPubDataSt))){pubmsg.payload = stMqttMgr.stPubMgr.stMqttPubData.data; //添加一个成员用以决定发布TOPIC 记得释放freepubmsg.payloadlen = stMqttMgr.stPubMgr.stMqttPubData.len;pubmsg.qos = QOS;pubmsg.retained = 0;deliveredtoken = 0;if(MQTT_PLATFORM_ENABLE){rc = MQTTClient_publishMessage(stMqttMgr.MqttClient, PubTopics[stMqttMgr.stPubMgr.stMqttPubData.TopicId], &pubmsg, &token);if(strlen(pubmsg.payload) < 1024){LS_LOG(LOG_INFO, "MQTT Publish message %sn""on topic %s token %d n",(char*)pubmsg.payload,PubTopics[stMqttMgr.stPubMgr.stMqttPubData.TopicId],token);}}if(rc != 0){LS_LOG(LOG_ERROR, "MQTT Publish message Failed, rc=%d !!n", rc);}free(stMqttMgr.stPubMgr.stMqttPubData.data); //重要,异地malloc,此地释放}}while(FALSE == IsQueueEmpty(stMqttMgr.stPubMgr.pMqttPubQueue));pthread_mutex_unlock(&stMqttMgr.stPubMgr.lock);}//客户端连接断开后进行重连处理goto ReLink;//本应用不会运行至此MqttClientRelease(stMqttMgr.MqttClient);
}/***************************************************************************
function: MqttProcessIotResponse()
input:
output:
Description:APP处理Iot平台回应
****************************************************************************/
void MqttProcessIotResponse(MqttSubDataSt *tSubData)
{//此处进行数据处理}/***************************************************************************
function: MqttProcSubInfoThread
input:
output:
Description:MQTT处理订阅消息队列线程
****************************************************************************/
void *MqttProcSubInfoThread(void *param)
{LS_LOG(LOG_INFO, "MqttProcSubInfoThread start !n");while(1){sem_wait(&stMqttMgr.stSubMgr.QueueWaitsem);pthread_mutex_lock(&stMqttMgr.stSubMgr.lock);do{if(LS_OK == ExQueue(stMqttMgr.stSubMgr.pMqttSubQueue, (void*)&stMqttMgr.stSubMgr.stMqttSubData, sizeof(MqttSubDataSt))){MqttProcessIotResponse(&stMqttMgr.stSubMgr.stMqttSubData);free(stMqttMgr.stSubMgr.stMqttSubData.data);}}while(FALSE == IsQueueEmpty(stMqttMgr.stSubMgr.pMqttSubQueue));pthread_mutex_unlock(&stMqttMgr.stSubMgr.lock);}
}int mqttClientInit(void)
{int ret = -1;pthread_attr_t Thread_Attr;memset((char *)&stMqttMgr,0,sizeof(MqttMgrSt));stMqttMgr.MqttConnectFlag = FALSE;stMqttMgr.StartBusinessFlag = FALSE;stMqttMgr.MessageId = 1000000000000000;pthread_mutex_init(&stMqttMgr.lock,NULL);sem_init(&stMqttMgr.stPubMgr.QueueWaitsem,0,0);pthread_mutex_init(&stMqttMgr.stPubMgr.lock,NULL);list_init(&stMqttMgr.stPubMgr.PubDataList);sem_init(&stMqttMgr.stSubMgr.QueueWaitsem,0,0);pthread_mutex_init(&stMqttMgr.stSubMgr.lock,NULL);list_init(&stMqttMgr.AddFailList);stMqttMgr.ReAddCnt = 0;sem_init(&stMqttMgr.ReAddSem,0,0);pthread_mutex_init(&stMqttMgr.Addlock,NULL);list_init(&stMqttMgr.ActivateFailList);stMqttMgr.ReActivateCnt = 0;sem_init(&stMqttMgr.ReActivateSem,0,0);pthread_mutex_init(&stMqttMgr.Activatelock,NULL);list_init(&stMqttMgr.UpdateFailList);stMqttMgr.ReUpdateCnt = 0;sem_init(&stMqttMgr.ReUpdateSem,0,0);pthread_mutex_init(&stMqttMgr.Updatelock,NULL);stMqttMgr.stPubMgr.pMqttPubQueue = QueueCreate(sizeof(MqttPubDataSt), PUB_QUEUE_ELEM_NUM);if(NULL == stMqttMgr.stPubMgr.pMqttPubQueue){LS_LOG(LOG_ERROR, "Create MQTT Pub Queue failed!n");return LS_ERR;}stMqttMgr.stSubMgr.pMqttSubQueue = QueueCreate(sizeof(MqttSubDataSt), SUB_QUEUE_ELEM_NUM);if(NULL == stMqttMgr.stSubMgr.pMqttSubQueue){LS_LOG(LOG_ERROR, "Create MQTT Sub Queue failed!n");return LS_ERR;}SetThreadAttr(&Thread_Attr, 20, SCHED_OTHER, 0x100000);ret = CreateThread(&MqttClientThreadId, &Thread_Attr, MqttClientThread, NULL);if(ret != 0){LS_LOG(LOG_ERROR, "Create MQTT client thread failed!n");return LS_ERR;}SetThreadAttr(&Thread_Attr, 20, SCHED_OTHER, 0x8000);ret = CreateThread(&ProcSubQueueThreadId, &Thread_Attr, MqttProcSubInfoThread, NULL);if(ret != 0){LS_LOG(LOG_ERROR, "Create MQTT Process Sub Info thread failed!n");return LS_ERR;}return LS_OK;
}/***************************************************************************
function: MqttSendData
input: MqttPubDataSt类型指针的数据
output:
Description:MQTT发送数据
return: LS_OK-成功 LS_ERR-失败(需要重新发送)
****************************************************************************/
int MqttSendData(char *data, int len, int TopicId, SendRecordSt *pSendRecord)
{int ret = LS_ERR;MqttPubDataSt StPubData;char *pdata = NULL;memset(&StPubData, 0, sizeof(MqttPubDataSt));if(NULL != data && len > 0){pdata = (char *)malloc((unsigned long)len);if(NULL == pdata){LS_LOG(LOG_INFO, "malloc failed!!n");return LS_ERR;}memset(pdata, 0, (unsigned long)len);memcpy(pdata, data, (unsigned long)len);}StPubData.TopicId = TopicId;StPubData.data = pdata;StPubData.len = len;pthread_mutex_lock(&stMqttMgr.stPubMgr.lock);ret= EnQueue(stMqttMgr.stPubMgr.pMqttPubQueue, (char *)&StPubData, sizeof(MqttPubDataSt));if(NULL != pSendRecord) //加入发送管理链表{list_insert_before(&stMqttMgr.stPubMgr.PubDataList, &pSendRecord->list);}pthread_mutex_unlock(&stMqttMgr.stPubMgr.lock);if(LS_OK != ret){LS_LOG(LOG_INFO, "EnQueue error!!n");free(pdata);return LS_ERR;}sem_post(&stMqttMgr.stPubMgr.QueueWaitsem);return LS_OK;
}
新增或删除主题的订阅和发布需要修改这俩全局数组
static char * SubTopics[SUB_TOPIC_NUM]={TOPIC_SUB_IOT_REG};
static char * PubTopics[PUB_TOPIC_NUM]={TOPIC_PUB_REG_DEVICE};
MqttSendData接口提供了MQTT消息发布的接口,就是先把MQTT消息进队列,然后在MqttProcSubInfoThread线程出队列,进行发布。
从mqttClientInit开始,做了锁,信号俩个,消息队列的初始化,消息队列有俩,一个用来处理发布,一个用来处理订阅;建立俩线程,一个是MQTT客户端的建立线程,一个是订阅的MQTT消息处理线程。
msgarrvd接口里就可以对收到的消息进行处理,本文的处理还是把MQTT消息进队列,MqttProcSubInfoThread线程出队列,各位可以在MqttProcessIotResponse接口里对订阅到的MQTT消息进行处理。
到此为止,整个工程算是贴完了,下面就是见证奇迹的时刻。
项目验证
ssh端用tail -f /home/ymj/mqttClient/log/mqttlog/mqttlog.log命令动态查看日志。
使用软件自带的工具运行代码。
结果显示
[20220908-16:45:46][INFO][main.c][changeWorkDir][22]—workdir:/home/ymj/mqttClient—
[20220908-16:45:46][INFO][main.c][main][39]—mqttClient start—
[20220908-16:45:46][INFO][mqttClientMgr.c][MqttClientThread][255]BrokerAddr: tcp://127.0.0.1:1883
[20220908-16:45:46][INFO][mqttClientMgr.c][MqttClientThread][275]start to connect MQTT server…
[20220908-16:45:46][INFO][mqttClientMgr.c][MqttProcSubInfoThread][363]MqttProcSubInfoThread start !
[20220908-16:45:46][ERROR][mqttClientMgr.c][MqttClientThread][284]Failed to connect, return code -1
[20220908-16:46:06][INFO][mqttClientMgr.c][MqttClientThread][275]start to connect MQTT server…
[20220908-16:46:07][ERROR][mqttClientMgr.c][MqttClientThread][284]Failed to connect, return code -1
连接MQTT服务端失败,为啥呢?因为没有起服务端的broker,哈哈。
运行mosquitto,默认端口为1883,也可以自己指定端口,方法自己百度去。
日志显示连接broker成功,并打印出订阅的主题
宿主机打开MQTT客户端工具mqtt.fx,这个软件也可以从网上直接搜到
我的虚拟机ip是192.168.8.235,所以配置连接如下
配置完成后点击OK,退回来,connect。
填入代码里订阅的主题和要发布的内容
点击发布
程序成功接收到,验证完成,我擦,后面的乱码是啥,不管了,你们自己优化去。
程序的MQTT发布也自己写代码验证去。
附录
项目工程如下
https://download.csdn.net/download/qqq1112345/86512208