From ba2c443ffe1d7c8a3c8063243a79565e18d0a60c Mon Sep 17 00:00:00 2001 From: wangk <2865709459@qq.com> Date: Fri, 22 Nov 2024 09:44:21 +0800 Subject: [PATCH] Add MQTT sub and pub function --- CMakeLists.txt | 4 +- drv/drv_4g.h | 2 +- kit/kit_mqtt.c | 186 +++++++++++++++++++++++++++++++++++++++++ kit/kit_mqtt.h | 21 +++++ kit/kit_mqtt_sub.c | 203 +++++++++++++++++++++++++++++++++++++++++++++ kit/kit_mqtt_sub.h | 21 +++++ test/test.c | 28 ++++++- test/test.h | 2 + 8 files changed, 462 insertions(+), 5 deletions(-) create mode 100644 kit/kit_mqtt_sub.c create mode 100644 kit/kit_mqtt_sub.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 40579a1..4e6662c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -13,7 +13,7 @@ set(CMAKE_BUILD_TYPE "Debug") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g") # 设置一个字符串变量,用与编译文件名称生成 -set(ProjectName "EMS_C_V_1.0.0") +set(ProjectName "EMS_C_V") project(${ProjectName} LANGUAGES C) @@ -61,6 +61,8 @@ set(KIT_SOURCE # Logger kit源文件 ${PROJECT_SOURCE_DIR}/kit/kit_logger.c ${PROJECT_SOURCE_DIR}/kit/kit_safe_memcpy.c + ${PROJECT_SOURCE_DIR}/kit/kit_mqtt.c + ${PROJECT_SOURCE_DIR}/kit/kit_mqtt_sub.c ) # 添加库头文件 diff --git a/drv/drv_4g.h b/drv/drv_4g.h index 68e33b4..19a7d65 100644 --- a/drv/drv_4g.h +++ b/drv/drv_4g.h @@ -16,7 +16,7 @@ #include #include -#define INTERFACE "usb0" +#define INTERFACE "mlan0" #define SERVER_ADDR "47.120.14.45" #define PORT 3001 diff --git a/kit/kit_mqtt.c b/kit/kit_mqtt.c index e69de29..cc37c2c 100644 --- a/kit/kit_mqtt.c +++ b/kit/kit_mqtt.c @@ -0,0 +1,186 @@ +#include "kit_mqtt.h" + +#define ADDRESS "tcp://47.120.14.45:3011" +#define CLIENTID "ExampleClientPub" +#define TOPIC "aa" +#define PAYLOAD "Hello World!" +#define QOS 1 +#define TIMEOUT 10000L + +int finished = 0; + +void closeInterface() +{ + int status = system("ifconfig eth0 down"); + if (status == -1) + { + perror("system"); + } + + int status1 = system("ifconfig usb0 down"); + if (status1 == -1) + { + perror("system1"); + } +} + +void openInterface() +{ + int status = system("ifconfig eth0 up"); + if (status == -1) + { + perror("system"); + } + + int status1 = system("ifconfig usb0 up"); + if (status1 == -1) + { + perror("system1"); + } +} + +void connlost(void *context, char *cause) +{ + MQTTAsync client = (MQTTAsync)context; + MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; + int rc; + + printf("\nConnection lost\n"); + if (cause) + printf(" cause: %s\n", cause); + + printf("Reconnecting\n"); + conn_opts.keepAliveInterval = 20; + conn_opts.cleansession = 1; + if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) + { + printf("Failed to start connect, return code %d\n", rc); + finished = 1; + } +} + +void onDisconnectFailure(void* context, MQTTAsync_failureData* response) +{ + printf("Disconnect failed\n"); + finished = 1; +} + +void onDisconnect(void* context, MQTTAsync_successData* response) +{ + printf("Successful disconnection\n"); + finished = 1; +} + +void onSendFailure(void* context, MQTTAsync_failureData* response) +{ + MQTTAsync client = (MQTTAsync)context; + MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer; + int rc; + + printf("Message send failed token %d error code %d\n", response->token, response->code); + opts.onSuccess = onDisconnect; + opts.onFailure = onDisconnectFailure; + opts.context = client; + if ((rc = MQTTAsync_disconnect(client, &opts)) != MQTTASYNC_SUCCESS) + { + printf("Failed to start disconnect, return code %d\n", rc); + exit(EXIT_FAILURE); + } +} + +void onSend(void* context, MQTTAsync_successData* response) +{ + MQTTAsync client = (MQTTAsync)context; + MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer; + int rc; + + printf("Message with token value %d delivery confirmed\n", response->token); + opts.onSuccess = onDisconnect; + opts.onFailure = onDisconnectFailure; + opts.context = client; + if ((rc = MQTTAsync_disconnect(client, &opts)) != MQTTASYNC_SUCCESS) + { + printf("Failed to start disconnect, return code %d\n", rc); + exit(EXIT_FAILURE); + } +} + + +void onConnectFailure(void* context, MQTTAsync_failureData* response) +{ + printf("Connect failed, rc %d\n", response ? response->code : 0); + finished = 1; +} + + +void onConnect(void* context, MQTTAsync_successData* response) +{ + MQTTAsync client = (MQTTAsync)context; + MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; + MQTTAsync_message pubmsg = MQTTAsync_message_initializer; + int rc; + + printf("Successful connection\n"); + opts.onSuccess = onSend; + opts.onFailure = onSendFailure; + opts.context = client; + pubmsg.payload = PAYLOAD; + pubmsg.payloadlen = (int)strlen(PAYLOAD); + pubmsg.qos = QOS; + pubmsg.retained = 0; + if ((rc = MQTTAsync_sendMessage(client, TOPIC, &pubmsg, &opts)) != MQTTASYNC_SUCCESS) + { + printf("Failed to start sendMessage, return code %d\n", rc); + exit(EXIT_FAILURE); + } +} + +int messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* m) +{ + /* not expecting any messages */ + return 1; +} + +int pubClient() +{ + closeInterface(); + MQTTAsync client; + MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; + int rc; + + if ((rc = MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTASYNC_SUCCESS) + { + printf("Failed to create client object, return code %d\n", rc); + exit(EXIT_FAILURE); + } + + if ((rc = MQTTAsync_setCallbacks(client, client, connlost, messageArrived, NULL)) != MQTTASYNC_SUCCESS) + { + printf("Failed to set callback, return code %d\n", rc); + exit(EXIT_FAILURE); + } + + conn_opts.keepAliveInterval = 20; + conn_opts.cleansession = 1; + conn_opts.onSuccess = onConnect; + conn_opts.onFailure = onConnectFailure; + conn_opts.context = client; + if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) + { + printf("Failed to start connect, return code %d\n", rc); + exit(EXIT_FAILURE); + } + + printf("Waiting for publication of %s\n" + "on topic %s for client with ClientID: %s\n", + PAYLOAD, TOPIC, CLIENTID); + while (!finished) + { + sleep(1); + } + + MQTTAsync_destroy(&client); + openInterface(); + return rc; +} + diff --git a/kit/kit_mqtt.h b/kit/kit_mqtt.h index e69de29..b56faaf 100644 --- a/kit/kit_mqtt.h +++ b/kit/kit_mqtt.h @@ -0,0 +1,21 @@ +#ifndef __KIT_MQTT_H_ +#define __KIT_MQTT_H_ + +#include +#include +#include +#include "libpaho-mqtt/include/MQTTClient.h" +#include "libpaho-mqtt/include/MQTTAsync.h" + +void closeInterface(); +void openInterface(); +void connlost(void *context, char *cause); +void onDisconnectFailure(void* context, MQTTAsync_failureData* response); +void onDisconnect(void* context, MQTTAsync_successData* response); +void onSendFailure(void* context, MQTTAsync_failureData* response); +void onSend(void* context, MQTTAsync_successData* response); +void onConnectFailure(void* context, MQTTAsync_failureData* response); +void onConnect(void* context, MQTTAsync_successData* response); +int pubClient(); + +#endif diff --git a/kit/kit_mqtt_sub.c b/kit/kit_mqtt_sub.c new file mode 100644 index 0000000..f9e2247 --- /dev/null +++ b/kit/kit_mqtt_sub.c @@ -0,0 +1,203 @@ +#include "kit_mqtt_sub.h" + +#define ADDRESS "tcp://47.120.14.45:3011" +#define CLIENTID "ExampleClientSub" +#define TOPIC "aa" +#define PAYLOAD "Hello World!" +#define QOS 1 +#define TIMEOUT 10000L + +int disc_finished = 0; +int subscribed = 0; +int finished1 = 0; + +void closeInterface1() +{ + int status = system("ifconfig eth0 down"); + if (status == -1) + { + perror("system"); + } + + int status1 = system("ifconfig usb0 down"); + if (status1 == -1) + { + perror("system1"); + } +} + +void openInterface1() +{ + int status = system("ifconfig eth0 up"); + if (status == -1) + { + perror("system"); + } + + int status1 = system("ifconfig usb0 up"); + if (status1 == -1) + { + perror("system1"); + } +} + +void connlost1(void *context, char *cause) +{ + MQTTAsync client = (MQTTAsync)context; + MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; + int rc; + + printf("\nConnection lost\n"); + if (cause) + printf(" cause: %s\n", cause); + + printf("Reconnecting\n"); + conn_opts.keepAliveInterval = 20; + conn_opts.cleansession = 1; + conn_opts.onSuccess = onConnect1; + conn_opts.onFailure = onConnectFailure1; + if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) + { + printf("Failed to start connect, return code %d\n", rc); + finished1 = 1; + } +} + + +int msgarrvd(void *context, char *topicName, int topicLen, MQTTAsync_message *message) +{ + printf("Message arrived\n"); + printf(" topic: %s\n", topicName); + printf(" message: %.*s\n", message->payloadlen, (char*)message->payload); + MQTTAsync_freeMessage(&message); + MQTTAsync_free(topicName); + return 1; +} + +void onDisconnectFailure1(void* context, MQTTAsync_failureData* response) +{ + printf("Disconnect failed, rc %d\n", response->code); + disc_finished = 1; +} + +void onDisconnect1(void* context, MQTTAsync_successData* response) +{ + printf("Successful disconnection\n"); + disc_finished = 1; +} + +void onSubscribe(void* context, MQTTAsync_successData* response) +{ + printf("Subscribe succeeded\n"); + subscribed = 1; +} + +void onSubscribeFailure(void* context, MQTTAsync_failureData* response) +{ + printf("Subscribe failed, rc %d\n", response->code); + finished1= 1; +} + + +void onConnectFailure1(void* context, MQTTAsync_failureData* response) +{ + printf("Connect failed, rc %d\n", response->code); + finished1 = 1; +} + + +void onConnect1(void* context, MQTTAsync_successData* response) +{ + MQTTAsync client = (MQTTAsync)context; + MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; + int rc; + + printf("Successful connection\n"); + + printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n" + "Press Q to quit\n\n", TOPIC, CLIENTID, QOS); + opts.onSuccess = onSubscribe; + opts.onFailure = onSubscribeFailure; + opts.context = client; + if ((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &opts)) != MQTTASYNC_SUCCESS) + { + printf("Failed to start subscribe, return code %d\n", rc); + finished1 = 1; + } +} + + +int subClient() +{ + closeInterface1(); + MQTTAsync client; + MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; + MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer; + int rc; + int ch; + + if ((rc = MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL)) + != MQTTASYNC_SUCCESS) + { + printf("Failed to create client, return code %d\n", rc); + rc = EXIT_FAILURE; + goto exit; + } + else{ + printf("create success\n"); + } + + if ((rc = MQTTAsync_setCallbacks(client, client, connlost1, msgarrvd, NULL)) != MQTTASYNC_SUCCESS) + { + printf("Failed to set callbacks, return code %d\n", rc); + rc = EXIT_FAILURE; + goto destroy_exit; + } + + conn_opts.keepAliveInterval = 20; + conn_opts.cleansession = 1; + conn_opts.onSuccess = onConnect1; + conn_opts.onFailure = onConnectFailure1; + conn_opts.context = client; + if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) + { + printf("Failed to start connect, return code %d\n", rc); + rc = EXIT_FAILURE; + goto destroy_exit; + } + else{ + printf("connect successful\n"); + } + + while (!subscribed && !finished1) + { + sleep(1); + } + + if (finished1) + goto exit; + + do + { + ch = getchar(); + } while (ch!='Q' && ch != 'q'); + + disc_opts.onSuccess = onDisconnect1; + disc_opts.onFailure = onDisconnectFailure1; + if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS) + { + printf("Failed to start disconnect, return code %d\n", rc); + rc = EXIT_FAILURE; + goto destroy_exit; + } + while (!disc_finished) + { + sleep(1); + } + +destroy_exit: + MQTTAsync_destroy(&client); + openInterface1(); +exit: + return rc; +} diff --git a/kit/kit_mqtt_sub.h b/kit/kit_mqtt_sub.h new file mode 100644 index 0000000..538a8a4 --- /dev/null +++ b/kit/kit_mqtt_sub.h @@ -0,0 +1,21 @@ +#ifndef __KIT_MQTT_SUB_H_ +#define __KIT_MQTT_SUB_H_ + +#include +#include +#include +#include "libpaho-mqtt/include/MQTTAsync.h" + +void closeInterface1(); +void openInterface1(); +void connlost(void *context, char *cause); +int msgarrvd(void *context, char *topicName, int topicLen, MQTTAsync_message *message); +void onDisconnectFailure(void* context, MQTTAsync_failureData* response); +void onDisconnect1(void* context, MQTTAsync_successData* response); +void onSubscribe(void* context, MQTTAsync_successData* response); +void onSubscribeFailure1(void* context, MQTTAsync_failureData* response); +void onConnectFailure1(void* context, MQTTAsync_failureData* response); +void onConnect1(void* context, MQTTAsync_successData* response); +int subClient(); + +# endif diff --git a/test/test.c b/test/test.c index b86cc06..a903c71 100644 --- a/test/test.c +++ b/test/test.c @@ -9,7 +9,9 @@ #include "drv_can.h" #include "drv_tcp.h" #include "drv_4g.h" -#include "libmodbus/include/modbus-rtu.h" +// #include "libmodbus/include/modbus-rtu.h" +#include "kit_mqtt.h" +#include "kit_mqtt_sub.h" // #include "MQTTAsync.h" @@ -27,7 +29,7 @@ void testCreatThreadTask() { printf("testThreadTask\n"); logger_init("./log"); - pthread_t tTestLogger, tTestDIDetect, tTestUart, tTestTcp, tTest4G, tTestCanSend, tTestCanRecv, tTestModbus; + pthread_t tTestLogger, tTestDIDetect, tTestUart, tTestTcp, tTest4G, tTestCanSend, tTestCanRecv, tTestModbus, tTestMQTT; // pthread_create(&tTestLogger, NULL, testLoggerThread, "testLoggerThread"); // pthread_join(tTestLogger, NULL); // int ret = pthread_create(&tTestDIDetect, NULL, testDIDetectThread, "testDIDetectThread"); @@ -45,6 +47,8 @@ void testCreatThreadTask() // pthread_join(tTestCanRecv, NULL); // pthread_create(&tTestModbus, NULL, testModbusThread, "testModbusThread"); // pthread_join(tTestModbus, NULL); + pthread_create(&tTestMQTT, NULL, testMQTTThread, "testMQTTThread"); + pthread_join(tTestMQTT, NULL); logger_destroy(); } @@ -180,5 +184,23 @@ void *testModbusThread(void *arg) { logger_level_printf(LOGGER_DEBUG_LEVEL, arg); // MQTTAsync_malloc(12); - modbus_new_rtu("dev", 9600, '1', '8', 1); + // modbus_new_rtu("dev", 9600, '1', '8', 1); +} + +//MQTT测试线程 +void *testMQTTThread(void *arg) +{ + logger_level_printf(LOGGER_DEBUG_LEVEL, arg); + printf("MQTT test!\n"); + // int rc = pubClient(); + // if (rc < 0) + // { + // printf("MQTT wrong\n"); + // } + + int rc = subClient(); + if (rc < 0) + { + printf("MQTT wrong\n"); + } } diff --git a/test/test.h b/test/test.h index 39fcf2a..ee26a60 100644 --- a/test/test.h +++ b/test/test.h @@ -14,4 +14,6 @@ void *test4GThread(void *arg); void *testCanSendThread(void *arg); void *testCanRecvThread(void *arg); void *testModbusThread(void *arg); +void *testMQTTThread(void *arg); + #endif