#include "bsp_mqttAsync.h" volatile int finished = 0; Queue g_qMsg; Queue g_qHistory; // 当连接成功时的回调函数 void onConnect(void *context, MQTTAsync_successData *response) { printf("Connected successfully\n"); MQTTAsync client = (MQTTAsync)context; int rc; MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; // 设置响应选项 opts.onSuccess = NULL; opts.onFailure = NULL; opts.context = client; // 订阅指定的主题 if ((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &opts)) != MQTTASYNC_SUCCESS) { printf("Failed to start subscribe, return code %d\n", rc); } printf("Subscribed to topic %s with QOS %d\n", TOPIC, QOS); } // 当连接失败时的回调函数 void onConnectFailure(void *context, MQTTAsync_failureData *response) { // 创建一个新的断点记录 // 尝试重新连接 while (1) { // 尝试重新连接 // 将存储的历史数据入数据库 } } // 当成功断开连接时的回调函数 void onDisconnect(void *context, MQTTAsync_successData *response) { printf("Disconnected successfully\n"); finished = 1; // 设置完成标志以终止任务 } // 当订阅失败时的回调函数 void onSubscribeFailure(void *context, MQTTAsync_failureData *response) { printf("Subscribe failed, rc %d\n", response ? response->code : 0); finished = 1; // 设置完成标志以终止任务 } // 当消息到达时的回调函数 int messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message) { printf("Message arrived on topic %s\n", topicName); printf("Message: %.*s\n", message->payloadlen, (char *)message->payload); // 释放消息和主题名内存 MQTTAsync_freeMessage(&message); MQTTAsync_free(topicName); return 1; // 消息处理成功 } // 发布消息的函数 void publishMessage(MQTTAsync client) { MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; MQTTAsync_message pubmsg = MQTTAsync_message_initializer; Queue *pNode = (Queue *)&g_qMsg; while (!isQueueEmpty(pNode)) { // 设置消息参数 pubmsg.payload = pNode->head->data; pubmsg.payloadlen = (int)strlen(pNode->head->data); pubmsg.qos = QOS; pubmsg.retained = 0; opts.onSuccess = NULL; opts.onFailure = NULL; opts.context = client; int rc = 0; // 发送消息 if ((rc = MQTTAsync_sendMessage(client, pNode->head->topic, &pubmsg, &opts)) != MQTTASYNC_SUCCESS) { // 发送成功,则将消息出队 dequeue(pNode); printf("Failed to publish message, return code %d\n", rc); } else { // 修改主题为历史数据,入历史数据队列,并将元素出队 pNode->head->topic; enqueue(&g_qHistory, pNode->head); dequeue(pNode); } } } // 主任务函数,管理MQTT连接、订阅和发布 int MQTTAsyncTask(int argc, char *argv[]) { initQueue(&g_qMsg); initQueue(&g_qHistory); MQTTAsync client; MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; // 创建MQTT客户端 MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL); // 设置回调函数 MQTTAsync_setCallbacks(client, NULL, NULL, messageArrived, NULL); // 设置连接选项 conn_opts.keepAliveInterval = 20; // 保持连接的心跳间隔 conn_opts.cleansession = 1; // 创建一个会话 conn_opts.onSuccess = onConnect; // 设置连接成功的回调 conn_opts.onFailure = onConnectFailure; // 设置连接失败的回调 conn_opts.context = client; // 发起连接 if (MQTTAsync_connect(client, &conn_opts) != MQTTASYNC_SUCCESS) { printf("Failed to start connect\n"); return EXIT_FAILURE; } // 等待任务完成 while (!finished) { usleep(10000L); // 微小延迟,用以减少CPU占用 } //void *msg = NULL; // 发布消息到主题 publishMessage(client); MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer; disc_opts.onSuccess = onDisconnect; // 设置断开成功的回调 // 断开连接 MQTTAsync_disconnect(client, &disc_opts); // 销毁MQTT客户端 MQTTAsync_destroy(&client); return EXIT_SUCCESS; }