forked from gary/ems
2
0
Fork 0
sun_ems/ems_c/bsp/bsp_mqttAsync.c

153 lines
4.4 KiB
C
Raw Normal View History

2025-05-13 17:49:49 +08:00
#include "bsp_mqttAsync.h"
volatile int finished = 0;
Queue g_qMsg;
Queue g_qHistory;
// 当连接成功时的回调函数
void onConnect(void *context, MQTTAsync_successData *response)
{
printf("Connected successfully\n");
MQTTAsync client = (MQTTAsync)context;
int rc;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
// 设置响应选项
opts.onSuccess = NULL;
opts.onFailure = NULL;
opts.context = client;
// 订阅指定的主题
if ((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start subscribe, return code %d\n", rc);
}
printf("Subscribed to topic %s with QOS %d\n", TOPIC, QOS);
}
// 当连接失败时的回调函数
void onConnectFailure(void *context, MQTTAsync_failureData *response)
{
// 创建一个新的断点记录
// 尝试重新连接
while (1)
{
// 尝试重新连接
// 将存储的历史数据入数据库
}
}
// 当成功断开连接时的回调函数
void onDisconnect(void *context, MQTTAsync_successData *response)
{
printf("Disconnected successfully\n");
finished = 1; // 设置完成标志以终止任务
}
// 当订阅失败时的回调函数
void onSubscribeFailure(void *context, MQTTAsync_failureData *response)
{
printf("Subscribe failed, rc %d\n", response ? response->code : 0);
finished = 1; // 设置完成标志以终止任务
}
// 当消息到达时的回调函数
int messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
{
printf("Message arrived on topic %s\n", topicName);
printf("Message: %.*s\n", message->payloadlen, (char *)message->payload);
// 释放消息和主题名内存
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1; // 消息处理成功
}
// 发布消息的函数
void publishMessage(MQTTAsync client)
{
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
Queue *pNode = (Queue *)&g_qMsg;
while (!isQueueEmpty(pNode))
{
// 设置消息参数
pubmsg.payload = pNode->head->data;
pubmsg.payloadlen = (int)strlen(pNode->head->data);
pubmsg.qos = QOS;
pubmsg.retained = 0;
opts.onSuccess = NULL;
opts.onFailure = NULL;
opts.context = client;
int rc = 0;
// 发送消息
if ((rc = MQTTAsync_sendMessage(client, pNode->head->topic, &pubmsg, &opts)) != MQTTASYNC_SUCCESS)
{
// 发送成功,则将消息出队
dequeue(pNode);
printf("Failed to publish message, return code %d\n", rc);
}
else
{
// 修改主题为历史数据,入历史数据队列,并将元素出队
pNode->head->topic;
enqueue(&g_qHistory, pNode->head);
dequeue(pNode);
}
}
}
// 主任务函数管理MQTT连接、订阅和发布
int MQTTAsyncTask(int argc, char *argv[])
{
initQueue(&g_qMsg);
initQueue(&g_qHistory);
MQTTAsync client;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
// 创建MQTT客户端
MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
// 设置回调函数
MQTTAsync_setCallbacks(client, NULL, NULL, messageArrived, NULL);
// 设置连接选项
conn_opts.keepAliveInterval = 20; // 保持连接的心跳间隔
conn_opts.cleansession = 1; // 创建一个会话
conn_opts.onSuccess = onConnect; // 设置连接成功的回调
conn_opts.onFailure = onConnectFailure; // 设置连接失败的回调
conn_opts.context = client;
// 发起连接
if (MQTTAsync_connect(client, &conn_opts) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect\n");
return EXIT_FAILURE;
}
// 等待任务完成
while (!finished)
{
usleep(10000L); // 微小延迟用以减少CPU占用
}
//void *msg = NULL;
// 发布消息到主题
publishMessage(client);
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
disc_opts.onSuccess = onDisconnect; // 设置断开成功的回调
// 断开连接
MQTTAsync_disconnect(client, &disc_opts);
// 销毁MQTT客户端
MQTTAsync_destroy(&client);
return EXIT_SUCCESS;
}