EMS/kit/kit_mqtt_sub.c

196 lines
5.0 KiB
C
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "kit_mqtt_sub.h"
#define ADDRESS "tcp://47.120.14.45:3011"
#define CLIENTID "ExampleClientSub"
#define TOPIC "aa"
#define PAYLOAD "Hello World!"
#define QOS 1
#define TIMEOUT 10000L
int disc_finished = 0;
int subscribed = 0;
int finished1 = 0;
void connlost1(void *context, char *cause)
{
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
int rc;
printf("\nConnection lost\n");
if (cause)
printf(" cause: %s\n", cause);
printf("Reconnecting\n");
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.onSuccess = onConnect1;
conn_opts.onFailure = onConnectFailure1;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
finished1 = 1;
}
}
int msgarrvd(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
{
//解析MQTT的json格式
// 将payload转换为C字符串并确保为其分配足够的空间
char* payload_str = (char*)malloc(message->payloadlen + 1);
if (payload_str) {
memcpy(payload_str, message->payload, message->payloadlen);
payload_str[message->payloadlen] = '\0'; // 确保字符串以空字符结尾
cJSON *json = cJSON_Parse(payload_str);
if (json == NULL) {
// 处理解析错误
const char *error_ptr = cJSON_GetErrorPtr();
if (error_ptr != NULL) {
fprintf(stderr, "Error before: %s\n", error_ptr);
}
} else {
// 成功解析,访问对象
const char *name = cJSON_GetStringValue(cJSON_GetObjectItem(json, "msg"));
if (name) {
printf("解析后的json格式msg: %s\n", name);
}
cJSON_Delete(json);
}
free(payload_str);
}
printf("Message arrived\n");
printf(" topic: %s\n", topicName);
printf(" message: %.*s\n", message->payloadlen, (char*)message->payload);
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}
void onDisconnectFailure1(void* context, MQTTAsync_failureData* response)
{
printf("Disconnect failed, rc %d\n", response->code);
disc_finished = 1;
}
void onDisconnect1(void* context, MQTTAsync_successData* response)
{
printf("Successful disconnection\n");
disc_finished = 1;
}
void onSubscribe(void* context, MQTTAsync_successData* response)
{
printf("Subscribe succeeded\n");
subscribed = 1;
}
void onSubscribeFailure(void* context, MQTTAsync_failureData* response)
{
printf("Subscribe failed, rc %d\n", response->code);
finished1= 1;
}
void onConnectFailure1(void* context, MQTTAsync_failureData* response)
{
printf("Connect failed, rc %d\n", response->code);
finished1 = 1;
}
void onConnect1(void* context, MQTTAsync_successData* response)
{
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
printf("Successful connection\n");
printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
"Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
opts.onSuccess = onSubscribe;
opts.onFailure = onSubscribeFailure;
opts.context = client;
if ((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start subscribe, return code %d\n", rc);
finished1 = 1;
}
}
int subClient()
{
MQTTAsync client;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
int rc;
int ch;
if ((rc = MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL))
!= MQTTASYNC_SUCCESS)
{
printf("Failed to create client, return code %d\n", rc);
rc = EXIT_FAILURE;
goto exit;
}
else{
printf("create success\n");
}
if ((rc = MQTTAsync_setCallbacks(client, client, connlost1, msgarrvd, NULL)) != MQTTASYNC_SUCCESS)
{
printf("Failed to set callbacks, return code %d\n", rc);
rc = EXIT_FAILURE;
goto destroy_exit;
}
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.onSuccess = onConnect1;
conn_opts.onFailure = onConnectFailure1;
conn_opts.context = client;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
rc = EXIT_FAILURE;
goto destroy_exit;
}
else{
printf("connect successful\n");
}
while (!subscribed && !finished1)
{
sleep(1);
}
if (finished1)
goto exit;
do
{
ch = getchar();
} while (ch!='Q' && ch != 'q');
disc_opts.onSuccess = onDisconnect1;
disc_opts.onFailure = onDisconnectFailure1;
if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start disconnect, return code %d\n", rc);
rc = EXIT_FAILURE;
goto destroy_exit;
}
while (!disc_finished)
{
sleep(1);
}
destroy_exit:
MQTTAsync_destroy(&client);
exit:
return rc;
}