ems/ems_c/bsp/bsp_mqttAsync.c

153 lines
4.4 KiB
C
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "bsp_mqttAsync.h"
volatile int finished = 0;
Queue g_qMsg;
Queue g_qHistory;
// 当连接成功时的回调函数
void onConnect(void *context, MQTTAsync_successData *response)
{
printf("Connected successfully\n");
MQTTAsync client = (MQTTAsync)context;
int rc;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
// 设置响应选项
opts.onSuccess = NULL;
opts.onFailure = NULL;
opts.context = client;
// 订阅指定的主题
if ((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start subscribe, return code %d\n", rc);
}
printf("Subscribed to topic %s with QOS %d\n", TOPIC, QOS);
}
// 当连接失败时的回调函数
void onConnectFailure(void *context, MQTTAsync_failureData *response)
{
// 创建一个新的断点记录
// 尝试重新连接
while (1)
{
// 尝试重新连接
// 将存储的历史数据入数据库
}
}
// 当成功断开连接时的回调函数
void onDisconnect(void *context, MQTTAsync_successData *response)
{
printf("Disconnected successfully\n");
finished = 1; // 设置完成标志以终止任务
}
// 当订阅失败时的回调函数
void onSubscribeFailure(void *context, MQTTAsync_failureData *response)
{
printf("Subscribe failed, rc %d\n", response ? response->code : 0);
finished = 1; // 设置完成标志以终止任务
}
// 当消息到达时的回调函数
int messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
{
printf("Message arrived on topic %s\n", topicName);
printf("Message: %.*s\n", message->payloadlen, (char *)message->payload);
// 释放消息和主题名内存
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1; // 消息处理成功
}
// 发布消息的函数
void publishMessage(MQTTAsync client)
{
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
Queue *pNode = (Queue *)&g_qMsg;
while (!isQueueEmpty(pNode))
{
// 设置消息参数
pubmsg.payload = pNode->head->data;
pubmsg.payloadlen = (int)strlen(pNode->head->data);
pubmsg.qos = QOS;
pubmsg.retained = 0;
opts.onSuccess = NULL;
opts.onFailure = NULL;
opts.context = client;
int rc = 0;
// 发送消息
if ((rc = MQTTAsync_sendMessage(client, pNode->head->topic, &pubmsg, &opts)) != MQTTASYNC_SUCCESS)
{
// 发送成功,则将消息出队
dequeue(pNode);
printf("Failed to publish message, return code %d\n", rc);
}
else
{
// 修改主题为历史数据,入历史数据队列,并将元素出队
pNode->head->topic;
enqueue(&g_qHistory, pNode->head);
dequeue(pNode);
}
}
}
// 主任务函数管理MQTT连接、订阅和发布
int MQTTAsyncTask(int argc, char *argv[])
{
initQueue(&g_qMsg);
initQueue(&g_qHistory);
MQTTAsync client;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
// 创建MQTT客户端
MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
// 设置回调函数
MQTTAsync_setCallbacks(client, NULL, NULL, messageArrived, NULL);
// 设置连接选项
conn_opts.keepAliveInterval = 20; // 保持连接的心跳间隔
conn_opts.cleansession = 1; // 创建一个会话
conn_opts.onSuccess = onConnect; // 设置连接成功的回调
conn_opts.onFailure = onConnectFailure; // 设置连接失败的回调
conn_opts.context = client;
// 发起连接
if (MQTTAsync_connect(client, &conn_opts) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect\n");
return EXIT_FAILURE;
}
// 等待任务完成
while (!finished)
{
usleep(10000L); // 微小延迟用以减少CPU占用
}
//void *msg = NULL;
// 发布消息到主题
publishMessage(client);
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
disc_opts.onSuccess = onDisconnect; // 设置断开成功的回调
// 断开连接
MQTTAsync_disconnect(client, &disc_opts);
// 销毁MQTT客户端
MQTTAsync_destroy(&client);
return EXIT_SUCCESS;
}