153 lines
4.4 KiB
C
153 lines
4.4 KiB
C
|
||
#include "bsp_mqttAsync.h"
|
||
|
||
volatile int finished = 0;
|
||
Queue g_qMsg;
|
||
Queue g_qHistory;
|
||
// 当连接成功时的回调函数
|
||
void onConnect(void *context, MQTTAsync_successData *response)
|
||
{
|
||
|
||
printf("Connected successfully\n");
|
||
MQTTAsync client = (MQTTAsync)context;
|
||
int rc;
|
||
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
|
||
|
||
// 设置响应选项
|
||
opts.onSuccess = NULL;
|
||
opts.onFailure = NULL;
|
||
opts.context = client;
|
||
|
||
// 订阅指定的主题
|
||
if ((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &opts)) != MQTTASYNC_SUCCESS)
|
||
{
|
||
printf("Failed to start subscribe, return code %d\n", rc);
|
||
}
|
||
|
||
printf("Subscribed to topic %s with QOS %d\n", TOPIC, QOS);
|
||
}
|
||
|
||
// 当连接失败时的回调函数
|
||
void onConnectFailure(void *context, MQTTAsync_failureData *response)
|
||
{
|
||
// 创建一个新的断点记录
|
||
|
||
// 尝试重新连接
|
||
|
||
while (1)
|
||
{
|
||
// 尝试重新连接
|
||
|
||
// 将存储的历史数据入数据库
|
||
}
|
||
}
|
||
|
||
// 当成功断开连接时的回调函数
|
||
void onDisconnect(void *context, MQTTAsync_successData *response)
|
||
{
|
||
printf("Disconnected successfully\n");
|
||
finished = 1; // 设置完成标志以终止任务
|
||
}
|
||
|
||
// 当订阅失败时的回调函数
|
||
void onSubscribeFailure(void *context, MQTTAsync_failureData *response)
|
||
{
|
||
printf("Subscribe failed, rc %d\n", response ? response->code : 0);
|
||
finished = 1; // 设置完成标志以终止任务
|
||
}
|
||
|
||
// 当消息到达时的回调函数
|
||
int messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
|
||
{
|
||
printf("Message arrived on topic %s\n", topicName);
|
||
printf("Message: %.*s\n", message->payloadlen, (char *)message->payload);
|
||
|
||
// 释放消息和主题名内存
|
||
MQTTAsync_freeMessage(&message);
|
||
MQTTAsync_free(topicName);
|
||
return 1; // 消息处理成功
|
||
}
|
||
|
||
// 发布消息的函数
|
||
void publishMessage(MQTTAsync client)
|
||
{
|
||
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
|
||
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
|
||
Queue *pNode = (Queue *)&g_qMsg;
|
||
while (!isQueueEmpty(pNode))
|
||
{
|
||
|
||
// 设置消息参数
|
||
pubmsg.payload = pNode->head->data;
|
||
pubmsg.payloadlen = (int)strlen(pNode->head->data);
|
||
pubmsg.qos = QOS;
|
||
pubmsg.retained = 0;
|
||
opts.onSuccess = NULL;
|
||
opts.onFailure = NULL;
|
||
opts.context = client;
|
||
|
||
int rc = 0;
|
||
// 发送消息
|
||
if ((rc = MQTTAsync_sendMessage(client, pNode->head->topic, &pubmsg, &opts)) != MQTTASYNC_SUCCESS)
|
||
{
|
||
// 发送成功,则将消息出队
|
||
dequeue(pNode);
|
||
printf("Failed to publish message, return code %d\n", rc);
|
||
}
|
||
else
|
||
{
|
||
// 修改主题为历史数据,入历史数据队列,并将元素出队
|
||
pNode->head->topic;
|
||
enqueue(&g_qHistory, pNode->head);
|
||
dequeue(pNode);
|
||
}
|
||
}
|
||
}
|
||
|
||
// 主任务函数,管理MQTT连接、订阅和发布
|
||
int MQTTAsyncTask(int argc, char *argv[])
|
||
{
|
||
initQueue(&g_qMsg);
|
||
initQueue(&g_qHistory);
|
||
MQTTAsync client;
|
||
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
|
||
|
||
// 创建MQTT客户端
|
||
MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
|
||
|
||
// 设置回调函数
|
||
MQTTAsync_setCallbacks(client, NULL, NULL, messageArrived, NULL);
|
||
|
||
// 设置连接选项
|
||
conn_opts.keepAliveInterval = 20; // 保持连接的心跳间隔
|
||
conn_opts.cleansession = 1; // 创建一个会话
|
||
conn_opts.onSuccess = onConnect; // 设置连接成功的回调
|
||
conn_opts.onFailure = onConnectFailure; // 设置连接失败的回调
|
||
conn_opts.context = client;
|
||
|
||
// 发起连接
|
||
if (MQTTAsync_connect(client, &conn_opts) != MQTTASYNC_SUCCESS)
|
||
{
|
||
printf("Failed to start connect\n");
|
||
return EXIT_FAILURE;
|
||
}
|
||
|
||
// 等待任务完成
|
||
while (!finished)
|
||
{
|
||
usleep(10000L); // 微小延迟,用以减少CPU占用
|
||
}
|
||
//void *msg = NULL;
|
||
// 发布消息到主题
|
||
publishMessage(client);
|
||
|
||
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
|
||
disc_opts.onSuccess = onDisconnect; // 设置断开成功的回调
|
||
|
||
// 断开连接
|
||
MQTTAsync_disconnect(client, &disc_opts);
|
||
|
||
// 销毁MQTT客户端
|
||
MQTTAsync_destroy(&client);
|
||
return EXIT_SUCCESS;
|
||
} |