C语言基于paho实现MQTT客户端实战案例

C语言基于paho实现MQTT客户端实战案例

  • 目标
  • 说明
  • 项目代码
    • make文件 mqttClient.pro
    • 项目入口 main.c
    • 链表 list.h
    • 消息队列 queue.h
    • 消息队列 queue.c
    • MQTT客户端模块 mqttClientMgr.h
    • MQTT客户端模块 mqttClientMgr.c
  • 项目验证
  • 附录

目标

1.实现MQTT客户端,可以订阅主题进行处理,也可以进行主题消息发布。
2.建立缓存队列进行MQTT消息的处理。

说明

本案例在centos虚拟机中用QT CREATOR运行,所以没有单独的makefile,利用了QT CREATOR提供的qmake。
CPU类型为X86。
项目文件列表如下,日志模块可以参考博文 嵌入式系统简易日志模块搭建
在这里插入图片描述
log.c和log.h的代码就不再贴了。需要的话可以去下整个工程,工程下载链接放在文章最后。
不算日志模块的话整个工程的代码量也就千行左右。
验证的时候需要装个mosquitto—mqtt的服务端。MQTT的原理可以去百度下。默认端口一般是1883.

项目代码

make文件 mqttClient.pro

TEMPLATE = app
CONFIG += console
CONFIG -= app_bundle
CONFIG -= qtSOURCES += main.c log/log.c mqtt/mqttClientMgr.c queue/queue.c HEADERS += log/log.h mqtt/mqttClientMgr.h include/list.h queue/queue.h INCLUDEPATH += $$PWD/log $$PWD/mqtt $$PWD/include $$PWD/queue LIBS += -pthread
LIBS += -ldlLIBS += -lpaho-mqtt3c
LIBS += -ltinyxml2
LIBS += /usr/lib64/libsqlite3.so.0.8.6#LIBS += /home/ymj/mqttClient/bin/lib_arm/libpaho-mqtt3c.so.1.0
#LIBS += -L/home/ymj/mqttClient/bin/lib_arm -ltinyxml -lpaho-mqtt3c
#LIBS += /home/ymj/mqttClient/bin/lib_arm/libsqlite3.so.0.8.6LIBS += -lrtDESTDIR=$$PWD/binQMAKE_LFLAGS += -Wl,-Bsymbolic
QMAKE_LFLAGS += -Wl,-rpath=./lib

链的库有点多,除了paho还有数据库sqlite3和tinyxml2,之后也许会抽空写下这俩东东的使用方法。
自己运行的时候电脑里没有这俩库的话可以把这两行删掉,只要有paho库就可以。
文章最后会提供整个工程的下载,里面有这三个第三方库的arm和x86版本。

项目入口 main.c

#include <stdio.h>
#include <string.h>
#include <dirent.h>
#include <stdlib.h>
#include <unistd.h>#include "log.h"
#include "mqttClientMgr.h"#define  WORK_DIR "../"int changeWorkDir(void)
{char dir[128] = {0};if(chdir(WORK_DIR) == -1){LS_LOG(LOG_ERROR, "Could not chdir to "%s": abortingn", WORK_DIR);return LS_ERR;}getcwd(dir, sizeof(dir));LS_LOG(LOG_INFO, "---workdir:%s---n", dir);return LS_OK;
}/*C语言调用paho实现MQTT客户端
*/
int main()
{//日志模块logDataInit();logManage();//修改工作目录changeWorkDir();LS_LOG(LOG_INFO, "---mqttClient start---n");void *thread_reval = NULL;//MQTT客户端mqttClientInit();threadWaitDone(&thread_reval);if(thread_reval != NULL){LS_LOG(LOG_DEBUG, "thread_reval : %s is not nulln", (char *)thread_reval);}//不会运行至此LS_LOG(LOG_INFO, "---robot end---n");return 0;}

没啥说的,mqttClientInit()接口为本文的重点内容。
threadWaitDone(&thread_reval)这玩意保证了编出来的进程会一直运行,不会一下就执行完return了。
调用的头文件可能有点多,继续往后走。
由于需要用到链表和队列,下面先把这两个小工具的代码贴出来。

链表 list.h

/*LIST*/
/****************************************************************************                                    服务接口定义*****************************************************************************/struct list_node
{struct list_node *next;                          /**< point to next node. */struct list_node *prev;                          /**< point to prev node. */
};
typedef struct list_node list_t;                  /**< Type for lists. */struct slist_node
{struct slist_node *next;                         /**< point to next node. */
};
typedef struct slist_node slist_t;/******************************************************************************  * 根据结构体成员地址返回结构体指针*****************************************************************************/
#define container_of(ptr, type, member) ((type *)((char *)(ptr) - (unsigned long)(&((type *)0)->member)))/****************************************************************************  * 初始化一个链表对象*****************************************************************************/
#define LIST_HEAD_INIT(name) { &(name), &(name) }#define LIST_HEAD(name) struct list_node name = LIST_HEAD_INIT(name)
/******************************************************************************  * 初始化链表*****************************************************************************/
static inline void list_init(list_t *l)
{l->next = l->prev = l;
}/******************************************************************************  * 在链表尾处插入节点*****************************************************************************/
static inline void list_insert_after(list_t *l, list_t *n)
{l->next->prev = n;n->next = l->next;l->next = n;n->prev = l;
}/******************************************************************************  * 在链表头处插入节点*****************************************************************************/
static inline void list_insert_before(list_t *l, list_t *n)
{l->prev->next = n;n->prev = l->prev;l->prev = n;n->next = l;
}/******************************************************************************  * 移除节点*****************************************************************************/
static inline void list_remove(list_t *n)
{n->next->prev = n->prev;n->prev->next = n->next;n->next = n->prev = n;
}/******************************************************************************  * 判断链表是否为空*****************************************************************************/
static inline int list_isempty(const list_t *l)
{return l->next == l;
}/******************************************************************************  * 获取联表节点个数*****************************************************************************/
static inline unsigned int list_len(const list_t *l)
{unsigned int len = 0;const list_t *p = l;while (p->next != l){p = p->next;len ++;}return len;
}/******************************************************************************  * 获取节点所属结构体指针*****************************************************************************/
#define list_entry(node, type, member) container_of(node, type, member)/******************************************************************************  * 遍历链表节点*****************************************************************************/
#define list_for_each(pos, head) for (pos = (head)->next; pos != (head); pos = pos->next)/******************************************************************************  * 安全遍历链表节点*****************************************************************************/
#define list_for_each_safe(pos, n, head) for (pos = (head)->next, n = pos->next; pos != (head); pos = n, n = pos->next)/******************************************************************************  * 遍历链表节点所属结构体*****************************************************************************/
#define list_for_each_entry(pos, head, member) for (pos = list_entry((head)->next, typeof(*pos), member); &pos->member != (head); pos = list_entry(pos->member.next, typeof(*pos), member))/******************************************************************************  * 安全遍历链表节点所属结构体*****************************************************************************/
#define list_for_each_entry_safe(pos, n, head, member) for (pos = list_entry((head)->next, typeof(*pos), member), n = list_entry(pos->member.next, typeof(*pos), member); &pos->member != (head); pos = n, n = list_entry(n->member.next, typeof(*n), member))/*****************************************************************************  * rt_list_first_entry - 获取链表中的第一个节点*  * @ptr:    the list head to take the element from.*  * @type:   the type of the struct this is embedded in.*  * @member: the name of the list_struct within the struct.*  **  * 注意:that list is expected to be not empty.****************************************************************************/
#define list_first_entry(ptr, type, member) list_entry((ptr)->next, type, member)/****************************************************************************  * @brief 单链表初始化*  **  * @param l the single list to be initialized***************************************************************************/
static inline void slist_init(slist_t *l)
{l->next = NULL;
}static inline void slist_append(slist_t *l, slist_t *n)
{struct slist_node *node;node = l;while (node->next) node = node->next;/* append the node to the tail */node->next = n;n->next = NULL;
}static inline void slist_insert(slist_t *l, slist_t *n)
{n->next = l->next;l->next = n;
}static inline unsigned int slist_len(const slist_t *l)
{unsigned int len = 0;const slist_t *list = l->next;while (list != NULL){list = list->next;len ++;}return len;
}static inline slist_t *slist_remove(slist_t *l, slist_t *n)
{/* remove slist head */struct slist_node *node = l;while (node->next && node->next != n) node = node->next;/* remove node */if (node->next != (slist_t *)0) node->next = node->next->next;return l;
}static inline slist_t *slist_first(slist_t *l)
{return l->next;
}static inline slist_t *slist_tail(slist_t *l)
{while (l->next) l = l->next;return l;
}static inline slist_t *slist_next(slist_t *n)
{return n->next;
}

消息队列 queue.h


typedef struct QUEUE_ST
{char* data;int ElemNum;int ElemSize;int front;int rear;
}QueueSt;/***************************************************************************
function: QueueCreate
input:
output:
Description:提供统一队列创建函数
****************************************************************************/
QueueSt *QueueCreate(int ElementSize,int ElementNum);/***************************************************************************
function: IsQueueEmpty
input:
output:
Description:判断队列是否为空
****************************************************************************/
int IsQueueEmpty(QueueSt *queue);/***************************************************************************
function: EnQueue
input:
output:
Description:入队操作
****************************************************************************/
int EnQueue(QueueSt *queue, void* InData,int size);/***************************************************************************
function: ExQueue
input:
output:
Description:出队操作
****************************************************************************/
int ExQueue(QueueSt *queue, void *OutData,int size);

消息队列 queue.c

#include <string.h>#include "queue.h"
#include "log.h"/***************************************************************************
function: MqttQueueCreate
input:
output:
Description:创建队列
****************************************************************************/
QueueSt *QueueCreate(int ElementSize,int ElementNum)
{QueueSt *queue = (QueueSt *)malloc(sizeof(QueueSt));if(NULL == queue){LS_LOG(LOG_ERROR, "malloc fail!n");return NULL;}memset((char *)queue, 0, sizeof(QueueSt));queue->data =(char*)malloc((unsigned long)(ElementSize * ElementNum));if(NULL == queue->data){LS_LOG(LOG_ERROR, "malloc fail !n");return NULL;}queue->ElemNum  = ElementNum;queue->ElemSize = ElementSize;queue->rear  = 0;queue->front = 0;return queue;
}/***************************************************************************
function: IsQueueEmpty
input:
output:
Description:判断队列是否为空
****************************************************************************/
int IsQueueEmpty(QueueSt *queue)
{if(NULL == queue){LS_LOG(LOG_ERROR, "queue is NULL!n");return LS_ERR;}return (queue->front == queue->rear ? TRUE : FALSE); //空为TRUE,否则为FALSE
}
/***************************************************************************
function: ExQueue
input:
output:
Description:出队操作
****************************************************************************/
int ExQueue(QueueSt *queue, void *OutData, int size)
{if(NULL == queue || NULL == OutData){LS_LOG(LOG_ERROR, "input para is NULL!n");return LS_ERR;}if(TRUE == IsQueueEmpty(queue)){LS_LOG(LOG_INFO, "queue empty!n");return LS_ERR;}if(size > queue->ElemSize){LS_LOG(LOG_ERROR, "error:size > ElemSize!n");return LS_ERR;}if (OutData != NULL){memcpy(OutData,queue->data + queue->front * queue->ElemSize, (unsigned long)size);}else{LS_LOG(LOG_ERROR, "OutData is NULL!n");return LS_ERR;}queue->front = (queue->front + 1) % queue->ElemNum; //++队首 % Nreturn LS_OK;}/***************************************************************************
function: IsQueueFull
input:
output:
Description:判断队列是否为满
****************************************************************************/
int IsQueueFull(QueueSt *queue)
{if(NULL == queue){LS_LOG(LOG_ERROR,"queue is NULL!n");return LS_ERR;}return ((queue->rear + 1) % queue->ElemNum == queue->front ? TRUE : FALSE); //空为TRUE,否则为FALSE
}/***************************************************************************
function: EnQueue
input:
output:
Description:入队操作
****************************************************************************/
int EnQueue(QueueSt *queue, void* InData, int size)
{if(NULL == queue || NULL == InData){LS_LOG(LOG_ERROR, "input para is NULL!n");return LS_ERR;}if(TRUE == IsQueueFull(queue)){LS_LOG(LOG_ERROR, "queue is full!n");return LS_ERR;}if(size > queue->ElemSize){LS_LOG(LOG_ERROR, "error:size > ElemSize!n");return LS_ERR;}memcpy(queue->data + queue->rear * queue->ElemSize, (char *)InData, (unsigned long)size);queue->rear = (queue->rear + 1) % queue->ElemNum; //++队尾 % Nreturn LS_OK;
}

接下来就是本文的重头戏了。

MQTT客户端模块 mqttClientMgr.h

#include <pthread.h>
#include <semaphore.h>#include "MQTTClient.h"
#include "list.h"
#include "queue.h"#define CLIENTID    "ExampleClientSub"
#define TOPIC       "MQTT Examples"
#define PAYLOAD     "Hello World!"
#define QOS         0
#define TIMEOUT     10000L
#define DISCONNECT	"out"/**********订阅主题**********/
#define SUB_TOPIC_NUM   1#define  TOPIC_SUB_IOT_REG               "v1/127001/response/iotregister"/********发布主题*********/
#define PUB_TOPIC_NUM   1#define  TOPIC_PUB_REG_DEVICE_ID         0
#define  TOPIC_PUB_REG_DEVICE            "v1/127001/sensor/add"typedef struct SEND_RECORD_ST
{long long mid;char type[64];list_t list;
}SendRecordSt;typedef struct MQTT_PUB_DATA_ST
{int TopicId;int len;char* data;
}MqttPubDataSt;typedef struct MQTT_SUB_DATA_ST
{int clientId;//用于区分不同的mqtt客户端int len;char *topic;char *data;
}MqttSubDataSt;typedef struct MQTT_PUB_MGR_ST
{MqttPubDataSt stMqttPubData;QueueSt *pMqttPubQueue;list_t   PubDataList;pthread_mutex_t  lock;//访问互斥锁sem_t QueueWaitsem;sem_t DeliverOksem;
}MqttPubMgrSt;typedef struct MQTT_SUB_MGR_ST
{MqttSubDataSt stMqttSubData;QueueSt *pMqttSubQueue;pthread_mutex_t  lock;//访问互斥锁sem_t QueueWaitsem;
}MqttSubMgrSt;typedef struct MQTT_MGR_ST
{MQTTClient MqttClient;int  MqttConnectFlag;int  StartBusinessFlag;long long  MessageId;pthread_mutex_t  lock;//访问互斥锁MqttPubMgrSt stPubMgr;MqttSubMgrSt stSubMgr;list_t AddFailList;int ReAddCnt;sem_t ReAddSem;pthread_mutex_t  Addlock;//访问互斥锁list_t ActivateFailList;int ReActivateCnt;sem_t ReActivateSem;pthread_mutex_t  Activatelock;//访问互斥锁list_t UpdateFailList;int ReUpdateCnt;sem_t ReUpdateSem;pthread_mutex_t  Updatelock;//访问互斥锁
}MqttMgrSt;void threadWaitDone(void **reval);
int mqttClientInit(void);

新增主题或者删除主题的订阅或者发布需要修改SUB_TOPIC_NUM和PUB_TOPIC_NUM ,不然会数组越界挂掉,看到mqttClientMgr.c就知道原因了。
加了一些锁和信号量。

MQTT客户端模块 mqttClientMgr.c

#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <time.h>
#include <unistd.h>#include "mqttClientMgr.h"
#include "log.h"static MqttMgrSt stMqttMgr;
static volatile MQTTClient_deliveryToken deliveredtoken;
static pthread_t MqttClientThreadId;
static pthread_t ProcSubQueueThreadId;#define  PUB_QUEUE_ELEM_NUM      200
#define  SUB_QUEUE_ELEM_NUM      500
#define  MAX_BROKER_ADDR_LEN     32
#define MQTT_PLATFORM_ENABLE     (1)#define BROKER_IP     "127.0.0.1"
#define BROKER_PORT   "1883"
#define CLIENT_ID     "anyhow"
#define USER_NAME     "mosquitto"
#define PWD           "ls@123"static char * SubTopics[SUB_TOPIC_NUM]={TOPIC_SUB_IOT_REG};
static char * PubTopics[PUB_TOPIC_NUM]={TOPIC_PUB_REG_DEVICE};void threadWaitDone(void **reval)
{pthread_join(MqttClientThreadId, reval);
}void SetThreadAttr(pthread_attr_t *thread_attr, unsigned int priority, int policy, size_t stack_size)
{size_t StackSize = 0;pthread_attr_init(thread_attr);//首先需要对属性变量进行初始化pthread_attr_setscope(thread_attr,PTHREAD_SCOPE_PROCESS);pthread_attr_setdetachstate(thread_attr,PTHREAD_CREATE_JOINABLE);pthread_attr_setschedpolicy(thread_attr,policy);struct sched_param param;pthread_attr_getschedparam(thread_attr, &param);param.sched_priority = (int)priority;pthread_attr_setschedparam(thread_attr, &param);if(stack_size < 0x4000) //以字节为单位  最小16k{StackSize = 0x4000;}pthread_attr_setstacksize(thread_attr,StackSize);
}int CreateThread(pthread_t *thread_id, const pthread_attr_t *attr,void *(*thread_fun) (void *), void *thread_arg)
{int ret = -1;ret = pthread_create(thread_id,attr,thread_fun,thread_arg);return(ret);
}/***************************************************************************
function: GetBrokerAddrinput:
output:
Description:通过配置构建MQTT broker地址
****************************************************************************/
static int GetBrokerAddr(char *BrokerAddr,int len)
{unsigned int size = 0;char BrokerPortStr[6]; //格式:protocol://host:portif(NULL == BrokerAddr){LS_LOG(LOG_ERROR, "GetBrokerAddr: BrokerAddr is NULL.n");return LS_ERR;}memset(BrokerPortStr, 0, sizeof(BrokerPortStr));memcpy(BrokerAddr, "tcp://", strlen("tcp://"));size = (unsigned int)len - strlen("tcp://");if(strlen((char*)BROKER_IP) > size){LS_LOG(LOG_ERROR, "GetBrokerAddr: BrokerAddr buf is not enough!n");return LS_ERR;}strcat(BrokerAddr, (char*)BROKER_IP);size = size - strlen((char*)BROKER_IP);if(size < 1){LS_LOG(LOG_ERROR, "GetBrokerAddr: BrokerAddr buf is not enough!n");return LS_ERR;}strcat(BrokerAddr, ":");strcat(BrokerAddr, BROKER_PORT);/*size = size - 1;TransIntToStr(BROKER_PORT,BrokerPortStr);if(size < (strlen(BROKER_PORT) + 1)){LS_LOG(LOG_ERROR,"GetBrokerAddr: BrokerAddr buf is not enough !n");return LS_ERR;}strcat(BrokerAddr,BrokerPortStr);*/return LS_OK;
}/***************************************************************************
function: MqttSubscribeAllTopics
input:
output:
Description:客户端初始化的时候完成所有主题的订阅
****************************************************************************/
void MqttSubscribeAllTopics(void)
{int i = 0;for(i = 0;i < SUB_TOPIC_NUM; i++){LS_LOG(LOG_INFO, "Subscribing topic %snfor client %s using QoS %dnn",SubTopics[i], CLIENTID, QOS);MQTTClient_subscribe(stMqttMgr.MqttClient, SubTopics[i], QOS);}
}/***************************************************************************
function: MqttClientRelease
input:
output:
Description:MQTT客户端释放
****************************************************************************/
void MqttClientRelease(MQTTClient client)
{MQTTClient_unsubscribe(client, TOPIC);MQTTClient_disconnect(client, 10000);MQTTClient_destroy(&client);
}/***************************************************************************
function: connlost
input:
output:
Description:客户端注册的连接失败回调函数
****************************************************************************/
void connlost(void *context, char *cause)
{LS_LOG(LOG_ERROR, "nConnection lostn");LS_LOG(LOG_ERROR, " cause: %sn", cause);stMqttMgr.MqttConnectFlag = 0;sem_post(&stMqttMgr.stPubMgr.QueueWaitsem);
}/***************************************************************************
function: MqttTransData
input:
output:
Description:MqttTransData
****************************************************************************/
int MqttTransData(MqttSubDataSt *tdata)
{int ret = LS_ERR;pthread_mutex_lock(&stMqttMgr.stSubMgr.lock);ret= EnQueue(stMqttMgr.stSubMgr.pMqttSubQueue, (char *)tdata, sizeof(MqttSubDataSt));pthread_mutex_unlock(&stMqttMgr.stSubMgr.lock);if(LS_OK == ret){sem_post(&stMqttMgr.stSubMgr.QueueWaitsem);}return ret;
}/***************************************************************************
function: msgarrvd
input:
output:
Description:客户端注册的收到订阅消息回调函数
****************************************************************************/
int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{char* payloadptr = NULL;MqttSubDataSt stMqttSubData;char* pData = NULL;LS_LOG(LOG_DEBUG, "Message arrived on topic: %sn", topicName);memset(&stMqttSubData, 0, sizeof(MqttSubDataSt));stMqttSubData.clientId = 0;payloadptr = message->payload;if(strcmp(payloadptr, DISCONNECT) == 0){LS_LOG(LOG_WARN, " DISCONNECTn out!!n");stMqttMgr.MqttConnectFlag = FALSE;}LS_LOG(LOG_DEBUG, "message payload: %sn", (char *)message->payload);pData = malloc((unsigned long)message->payloadlen);  //由队列处理线程free pDataif(NULL == pData){LS_LOG(LOG_ERROR, "malloc failed!!n");goto fail;}memset(pData, 0, (unsigned long)message->payloadlen);memcpy(pData, payloadptr, (unsigned long)message->payloadlen);stMqttSubData.data = pData;stMqttSubData.len = message->payloadlen;stMqttSubData.topic = topicName;if(LS_OK != MqttTransData(&stMqttSubData))//成功,由队列处理线程free pData{LS_LOG(LOG_WARN, "MqttTransData failed!!n");free(stMqttSubData.data);}
fail:MQTTClient_freeMessage(&message);MQTTClient_free(topicName);return 1;
}/***************************************************************************
function: delivered
input:
output:
Description:客户端注册的消息delivered回调函数
****************************************************************************/
void delivered(void *context, MQTTClient_deliveryToken dt)
{LS_LOG(LOG_INFO, "Message with token value %d delivery confirmedn", dt);deliveredtoken = dt;
}/***************************************************************************
function: MqttClientThread()
input:
output:
Description:MQTT客户端处理线程
****************************************************************************/
void *MqttClientThread()
{int rc = 0;MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;MQTTClient_message pubmsg = MQTTClient_message_initializer;MQTTClient_deliveryToken token = 0;char BrokerAddr[MAX_BROKER_ADDR_LEN];memset(BrokerAddr, 0, sizeof(BrokerAddr));if(LS_OK != GetBrokerAddr(BrokerAddr,MAX_BROKER_ADDR_LEN)){LS_LOG(LOG_ERROR, "GetBrokerAddr Err.n");return NULL;}LS_LOG(LOG_INFO, "BrokerAddr: %sn", BrokerAddr);MQTTClient_create(&stMqttMgr.MqttClient, BrokerAddr, (char*)CLIENT_ID, MQTTCLIENT_PERSISTENCE_NONE, NULL);if(NULL == stMqttMgr.MqttClient){LS_LOG(LOG_ERROR,"MQTTClient_create g_MottClient fail!n");return NULL;}conn_opts.keepAliveInterval = 20;conn_opts.cleansession      = 1;conn_opts.username          = (char*)USER_NAME;conn_opts.password          = (char*)PWD;MQTTClient_setCallbacks(stMqttMgr.MqttClient, NULL, connlost, msgarrvd, delivered);ReLink:do{LS_LOG(LOG_INFO, "start to connect MQTT server...n");if ((rc = MQTTClient_connect(stMqttMgr.MqttClient, &conn_opts)) == MQTTCLIENT_SUCCESS){LS_LOG(LOG_INFO, "connect success!n");stMqttMgr.MqttConnectFlag = TRUE;break;}else{LS_LOG(LOG_ERROR, "Failed to connect, return code %dn", rc);sleep(20); //时间可配置}}while(1);//完成所有主题的订阅初始化if(MQTT_PLATFORM_ENABLE){MqttSubscribeAllTopics();}//客户端连接正常情况下进行信息发布while(TRUE == stMqttMgr.MqttConnectFlag) //发布消息{sem_wait(&stMqttMgr.stPubMgr.QueueWaitsem);//LS_LOG(LOG_INFO,"rev QueueWaitsem !!n");pthread_mutex_lock(&stMqttMgr.stPubMgr.lock);do{if(LS_OK == ExQueue(stMqttMgr.stPubMgr.pMqttPubQueue, (void*)&stMqttMgr.stPubMgr.stMqttPubData, sizeof(MqttPubDataSt))){pubmsg.payload    = stMqttMgr.stPubMgr.stMqttPubData.data; //添加一个成员用以决定发布TOPIC 记得释放freepubmsg.payloadlen = stMqttMgr.stPubMgr.stMqttPubData.len;pubmsg.qos        = QOS;pubmsg.retained   = 0;deliveredtoken    = 0;if(MQTT_PLATFORM_ENABLE){rc = MQTTClient_publishMessage(stMqttMgr.MqttClient, PubTopics[stMqttMgr.stPubMgr.stMqttPubData.TopicId], &pubmsg, &token);if(strlen(pubmsg.payload) < 1024){LS_LOG(LOG_INFO, "MQTT Publish message %sn""on topic %s token %d n",(char*)pubmsg.payload,PubTopics[stMqttMgr.stPubMgr.stMqttPubData.TopicId],token);}}if(rc != 0){LS_LOG(LOG_ERROR, "MQTT Publish message Failed, rc=%d !!n", rc);}free(stMqttMgr.stPubMgr.stMqttPubData.data); //重要,异地malloc,此地释放}}while(FALSE == IsQueueEmpty(stMqttMgr.stPubMgr.pMqttPubQueue));pthread_mutex_unlock(&stMqttMgr.stPubMgr.lock);}//客户端连接断开后进行重连处理goto ReLink;//本应用不会运行至此MqttClientRelease(stMqttMgr.MqttClient);
}/***************************************************************************
function: MqttProcessIotResponse()
input:
output:
Description:APP处理Iot平台回应
****************************************************************************/
void MqttProcessIotResponse(MqttSubDataSt *tSubData)
{//此处进行数据处理}/***************************************************************************
function: MqttProcSubInfoThread
input:
output:
Description:MQTT处理订阅消息队列线程
****************************************************************************/
void *MqttProcSubInfoThread(void *param)
{LS_LOG(LOG_INFO, "MqttProcSubInfoThread start !n");while(1){sem_wait(&stMqttMgr.stSubMgr.QueueWaitsem);pthread_mutex_lock(&stMqttMgr.stSubMgr.lock);do{if(LS_OK == ExQueue(stMqttMgr.stSubMgr.pMqttSubQueue, (void*)&stMqttMgr.stSubMgr.stMqttSubData, sizeof(MqttSubDataSt))){MqttProcessIotResponse(&stMqttMgr.stSubMgr.stMqttSubData);free(stMqttMgr.stSubMgr.stMqttSubData.data);}}while(FALSE == IsQueueEmpty(stMqttMgr.stSubMgr.pMqttSubQueue));pthread_mutex_unlock(&stMqttMgr.stSubMgr.lock);}
}int mqttClientInit(void)
{int ret = -1;pthread_attr_t Thread_Attr;memset((char *)&stMqttMgr,0,sizeof(MqttMgrSt));stMqttMgr.MqttConnectFlag = FALSE;stMqttMgr.StartBusinessFlag = FALSE;stMqttMgr.MessageId = 1000000000000000;pthread_mutex_init(&stMqttMgr.lock,NULL);sem_init(&stMqttMgr.stPubMgr.QueueWaitsem,0,0);pthread_mutex_init(&stMqttMgr.stPubMgr.lock,NULL);list_init(&stMqttMgr.stPubMgr.PubDataList);sem_init(&stMqttMgr.stSubMgr.QueueWaitsem,0,0);pthread_mutex_init(&stMqttMgr.stSubMgr.lock,NULL);list_init(&stMqttMgr.AddFailList);stMqttMgr.ReAddCnt = 0;sem_init(&stMqttMgr.ReAddSem,0,0);pthread_mutex_init(&stMqttMgr.Addlock,NULL);list_init(&stMqttMgr.ActivateFailList);stMqttMgr.ReActivateCnt = 0;sem_init(&stMqttMgr.ReActivateSem,0,0);pthread_mutex_init(&stMqttMgr.Activatelock,NULL);list_init(&stMqttMgr.UpdateFailList);stMqttMgr.ReUpdateCnt = 0;sem_init(&stMqttMgr.ReUpdateSem,0,0);pthread_mutex_init(&stMqttMgr.Updatelock,NULL);stMqttMgr.stPubMgr.pMqttPubQueue = QueueCreate(sizeof(MqttPubDataSt), PUB_QUEUE_ELEM_NUM);if(NULL == stMqttMgr.stPubMgr.pMqttPubQueue){LS_LOG(LOG_ERROR, "Create MQTT Pub Queue failed!n");return LS_ERR;}stMqttMgr.stSubMgr.pMqttSubQueue = QueueCreate(sizeof(MqttSubDataSt), SUB_QUEUE_ELEM_NUM);if(NULL == stMqttMgr.stSubMgr.pMqttSubQueue){LS_LOG(LOG_ERROR, "Create MQTT Sub Queue failed!n");return LS_ERR;}SetThreadAttr(&Thread_Attr, 20, SCHED_OTHER, 0x100000);ret = CreateThread(&MqttClientThreadId, &Thread_Attr, MqttClientThread, NULL);if(ret != 0){LS_LOG(LOG_ERROR, "Create MQTT client thread failed!n");return LS_ERR;}SetThreadAttr(&Thread_Attr, 20, SCHED_OTHER, 0x8000);ret = CreateThread(&ProcSubQueueThreadId, &Thread_Attr, MqttProcSubInfoThread, NULL);if(ret != 0){LS_LOG(LOG_ERROR, "Create MQTT Process Sub Info thread failed!n");return LS_ERR;}return LS_OK;
}/***************************************************************************
function: MqttSendData
input:  MqttPubDataSt类型指针的数据
output:
Description:MQTT发送数据
return: LS_OK-成功  LS_ERR-失败(需要重新发送)
****************************************************************************/
int MqttSendData(char *data, int len, int TopicId, SendRecordSt *pSendRecord)
{int ret      = LS_ERR;MqttPubDataSt StPubData;char *pdata  = NULL;memset(&StPubData, 0, sizeof(MqttPubDataSt));if(NULL != data && len > 0){pdata = (char *)malloc((unsigned long)len);if(NULL == pdata){LS_LOG(LOG_INFO, "malloc failed!!n");return LS_ERR;}memset(pdata, 0, (unsigned long)len);memcpy(pdata, data, (unsigned long)len);}StPubData.TopicId = TopicId;StPubData.data    = pdata;StPubData.len     = len;pthread_mutex_lock(&stMqttMgr.stPubMgr.lock);ret= EnQueue(stMqttMgr.stPubMgr.pMqttPubQueue, (char *)&StPubData, sizeof(MqttPubDataSt));if(NULL != pSendRecord)  //加入发送管理链表{list_insert_before(&stMqttMgr.stPubMgr.PubDataList, &pSendRecord->list);}pthread_mutex_unlock(&stMqttMgr.stPubMgr.lock);if(LS_OK != ret){LS_LOG(LOG_INFO, "EnQueue error!!n");free(pdata);return LS_ERR;}sem_post(&stMqttMgr.stPubMgr.QueueWaitsem);return LS_OK;
}

新增或删除主题的订阅和发布需要修改这俩全局数组
static char * SubTopics[SUB_TOPIC_NUM]={TOPIC_SUB_IOT_REG};
static char * PubTopics[PUB_TOPIC_NUM]={TOPIC_PUB_REG_DEVICE};

MqttSendData接口提供了MQTT消息发布的接口,就是先把MQTT消息进队列,然后在MqttProcSubInfoThread线程出队列,进行发布。
mqttClientInit开始,做了锁,信号俩个,消息队列的初始化,消息队列有俩,一个用来处理发布,一个用来处理订阅;建立俩线程,一个是MQTT客户端的建立线程,一个是订阅的MQTT消息处理线程。
msgarrvd接口里就可以对收到的消息进行处理,本文的处理还是把MQTT消息进队列,MqttProcSubInfoThread线程出队列,各位可以在MqttProcessIotResponse接口里对订阅到的MQTT消息进行处理。

到此为止,整个工程算是贴完了,下面就是见证奇迹的时刻。

项目验证

ssh端用tail -f /home/ymj/mqttClient/log/mqttlog/mqttlog.log命令动态查看日志。
使用软件自带的工具运行代码。

在这里插入图片描述

结果显示
[20220908-16:45:46][INFO][main.c][changeWorkDir][22]—workdir:/home/ymj/mqttClient—
[20220908-16:45:46][INFO][main.c][main][39]—mqttClient start—
[20220908-16:45:46][INFO][mqttClientMgr.c][MqttClientThread][255]BrokerAddr: tcp://127.0.0.1:1883
[20220908-16:45:46][INFO][mqttClientMgr.c][MqttClientThread][275]start to connect MQTT server…
[20220908-16:45:46][INFO][mqttClientMgr.c][MqttProcSubInfoThread][363]MqttProcSubInfoThread start !
[20220908-16:45:46][ERROR][mqttClientMgr.c][MqttClientThread][284]Failed to connect, return code -1
[20220908-16:46:06][INFO][mqttClientMgr.c][MqttClientThread][275]start to connect MQTT server…
[20220908-16:46:07][ERROR][mqttClientMgr.c][MqttClientThread][284]Failed to connect, return code -1

连接MQTT服务端失败,为啥呢?因为没有起服务端的broker,哈哈。
运行mosquitto,默认端口为1883,也可以自己指定端口,方法自己百度去。

在这里插入图片描述
日志显示连接broker成功,并打印出订阅的主题
在这里插入图片描述
宿主机打开MQTT客户端工具mqtt.fx,这个软件也可以从网上直接搜到
我的虚拟机ip是192.168.8.235,所以配置连接如下
在这里插入图片描述
在这里插入图片描述
配置完成后点击OK,退回来,connect。
填入代码里订阅的主题和要发布的内容
在这里插入图片描述
点击发布
在这里插入图片描述
程序成功接收到,验证完成,我擦,后面的乱码是啥,不管了,你们自己优化去。
程序的MQTT发布也自己写代码验证去。

附录

项目工程如下
https://download.csdn.net/download/qqq1112345/86512208

Published by

风君子

独自遨游何稽首 揭天掀地慰生平