Add mqtt send and recv and json para

This commit is contained in:
wangk 2024-11-22 16:44:38 +08:00
parent ba2c443ffe
commit 595f17127e
9 changed files with 86 additions and 111 deletions

View File

@ -3,9 +3,9 @@
#define BUFFER_SIZE 1024 #define BUFFER_SIZE 1024
#define LOCAL "10.244.216.42" #define LOCAL "10.244.216.42"
void client(char interface[], char server_addr[], int server_port) { void client(char server_addr[], int server_port) {
int sock = 0; int sock = 0;
struct ifreq ifr; // struct ifreq ifr;
char buffer[BUFFER_SIZE]; char buffer[BUFFER_SIZE];
@ -48,17 +48,17 @@ void client(char interface[], char server_addr[], int server_port) {
// } // }
// 设置要绑定的网卡接口名称 // 设置要绑定的网卡接口名称
strcpy(ifr.ifr_name, interface); // strcpy(ifr.ifr_name, interface);
// 将套接字绑定到指定的网卡 // // 将套接字绑定到指定的网卡
if (setsockopt(sock, SOL_SOCKET, SO_BINDTODEVICE, &ifr, sizeof(ifr)) < 0) { // if (setsockopt(sock, SOL_SOCKET, SO_BINDTODEVICE, &ifr, sizeof(ifr)) < 0) {
perror("setsockopt"); // perror("setsockopt");
close(sock); // close(sock);
exit(1); // exit(1);
} // }
else{ // else{
printf("connect the local interface:%s\n", ifr.ifr_name); // printf("connect the local interface:%s\n", ifr.ifr_name);
} // }
//绑定服务端的ip //绑定服务端的ip
// 设置服务器地址结构体 // 设置服务器地址结构体

View File

@ -16,10 +16,10 @@
#include <pthread.h> #include <pthread.h>
#include <sys/ioctl.h> #include <sys/ioctl.h>
#define INTERFACE "mlan0" // #define INTERFACE "mlan0"
#define SERVER_ADDR "47.120.14.45" #define SERVER_ADDR "47.120.14.45"
#define PORT 3001 #define PORT 3001
void client(char interface[], char server_addr[], int server_port); void client(char server_addr[], int server_port);
#endif #endif

View File

@ -69,7 +69,7 @@ static int split_new_file()
if ( dir != NULL ) if ( dir != NULL )
{ {
char file_date_min[MAX_FILE_PATH_LEN] = "~~"; char file_date_min[MAX_FILE_PATH_LEN] = "~~";
boolean is_find = false; boolean1 is_find = false;
struct dirent *entry; struct dirent *entry;
int file_num = 0; int file_num = 0;
while ( (entry = readdir(dir)) != NULL ) while ( (entry = readdir(dir)) != NULL )
@ -162,7 +162,7 @@ void logger_init(char *log_path)
// reuse // reuse
DIR *dir = opendir(log_config->log_dir); DIR *dir = opendir(log_config->log_dir);
char file_date_max[MAX_FILE_PATH_LEN] = " "; char file_date_max[MAX_FILE_PATH_LEN] = " ";
boolean log_exist = false; boolean1 log_exist = false;
if ( dir != NULL ) if ( dir != NULL )
{ {
struct dirent *entry; struct dirent *entry;

View File

@ -21,7 +21,7 @@ extern long syscall(long pid, ...);
#define FAILURE -1 #define FAILURE -1
#define SUCCESS 0 #define SUCCESS 0
#define boolean int #define boolean1 int
#define true 1 #define true 1
#define false 0 #define false 0

View File

@ -9,36 +9,6 @@
int finished = 0; int finished = 0;
void closeInterface()
{
int status = system("ifconfig eth0 down");
if (status == -1)
{
perror("system");
}
int status1 = system("ifconfig usb0 down");
if (status1 == -1)
{
perror("system1");
}
}
void openInterface()
{
int status = system("ifconfig eth0 up");
if (status == -1)
{
perror("system");
}
int status1 = system("ifconfig usb0 up");
if (status1 == -1)
{
perror("system1");
}
}
void connlost(void *context, char *cause) void connlost(void *context, char *cause)
{ {
MQTTAsync client = (MQTTAsync)context; MQTTAsync client = (MQTTAsync)context;
@ -117,22 +87,39 @@ void onConnect(void* context, MQTTAsync_successData* response)
{ {
MQTTAsync client = (MQTTAsync)context; MQTTAsync client = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
int rc; int rc;
printf("Successful connection\n"); printf("Successful connection\n");
opts.onSuccess = onSend;
opts.onFailure = onSendFailure; cJSON *json_object = cJSON_CreateObject();
opts.context = client; cJSON_AddStringToObject(json_object, "manage", "xudex");
pubmsg.payload = PAYLOAD; cJSON_AddStringToObject(json_object, "employee", "wangk");
pubmsg.payloadlen = (int)strlen(PAYLOAD); cJSON_AddBoolToObject(json_object, "ok or not", 0);
pubmsg.qos = QOS;
pubmsg.retained = 0; char *json_string = cJSON_Print(json_object);
if ((rc = MQTTAsync_sendMessage(client, TOPIC, &pubmsg, &opts)) != MQTTASYNC_SUCCESS) if (json_string == NULL) {
{ fprintf(stderr, "Failed to print cJSON.\n");
printf("Failed to start sendMessage, return code %d\n", rc); exit(EXIT_FAILURE);
exit(EXIT_FAILURE); }
}
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
MQTTAsync_responseOptions resp_opts = MQTTAsync_responseOptions_initializer;
resp_opts.onSuccess = onSend;
resp_opts.onFailure = onSendFailure;
resp_opts.context = client;
pubmsg.payload = (void *)json_string;
pubmsg.payloadlen = strlen(json_string);
pubmsg.qos = QOS;
pubmsg.retained = 0;
rc = MQTTAsync_sendMessage(client, "aa", &pubmsg, &resp_opts);
if (rc != MQTTASYNC_SUCCESS) {
fprintf(stderr, "Failed to send message, return code %d\n", rc);
exit(EXIT_FAILURE);
}
free(json_string);
cJSON_Delete(json_object);
} }
int messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* m) int messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* m)
@ -143,7 +130,6 @@ int messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_messa
int pubClient() int pubClient()
{ {
closeInterface();
MQTTAsync client; MQTTAsync client;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
int rc; int rc;
@ -171,16 +157,14 @@ int pubClient()
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
printf("Waiting for publication of %s\n" printf("Waiting\n"
"on topic %s for client with ClientID: %s\n", "on topic %s for client with ClientID: %s\n", TOPIC, CLIENTID);
PAYLOAD, TOPIC, CLIENTID);
while (!finished) while (!finished)
{ {
sleep(1); sleep(1);
} }
MQTTAsync_destroy(&client); MQTTAsync_destroy(&client);
openInterface();
return rc; return rc;
} }

View File

@ -4,11 +4,11 @@
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include "libpaho-mqtt/include/MQTTClient.h" // #include "libpaho-mqtt/include/MQTTClient.h"
#include "libpaho-mqtt/include/MQTTAsync.h" #include "libpaho-mqtt/include/MQTTAsync.h"
#include "libcjson/include/cjson/cJSON.h"
void closeInterface();
void openInterface();
void connlost(void *context, char *cause); void connlost(void *context, char *cause);
void onDisconnectFailure(void* context, MQTTAsync_failureData* response); void onDisconnectFailure(void* context, MQTTAsync_failureData* response);
void onDisconnect(void* context, MQTTAsync_successData* response); void onDisconnect(void* context, MQTTAsync_successData* response);

View File

@ -11,36 +11,6 @@ int disc_finished = 0;
int subscribed = 0; int subscribed = 0;
int finished1 = 0; int finished1 = 0;
void closeInterface1()
{
int status = system("ifconfig eth0 down");
if (status == -1)
{
perror("system");
}
int status1 = system("ifconfig usb0 down");
if (status1 == -1)
{
perror("system1");
}
}
void openInterface1()
{
int status = system("ifconfig eth0 up");
if (status == -1)
{
perror("system");
}
int status1 = system("ifconfig usb0 up");
if (status1 == -1)
{
perror("system1");
}
}
void connlost1(void *context, char *cause) void connlost1(void *context, char *cause)
{ {
MQTTAsync client = (MQTTAsync)context; MQTTAsync client = (MQTTAsync)context;
@ -66,6 +36,30 @@ void connlost1(void *context, char *cause)
int msgarrvd(void *context, char *topicName, int topicLen, MQTTAsync_message *message) int msgarrvd(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
{ {
//解析MQTT的json格式
// 将payload转换为C字符串并确保为其分配足够的空间
char* payload_str = (char*)malloc(message->payloadlen + 1);
if (payload_str) {
memcpy(payload_str, message->payload, message->payloadlen);
payload_str[message->payloadlen] = '\0'; // 确保字符串以空字符结尾
cJSON *json = cJSON_Parse(payload_str);
if (json == NULL) {
// 处理解析错误
const char *error_ptr = cJSON_GetErrorPtr();
if (error_ptr != NULL) {
fprintf(stderr, "Error before: %s\n", error_ptr);
}
} else {
// 成功解析,访问对象
const char *name = cJSON_GetStringValue(cJSON_GetObjectItem(json, "msg"));
if (name) {
printf("解析后的json格式msg: %s\n", name);
}
cJSON_Delete(json);
}
free(payload_str);
}
printf("Message arrived\n"); printf("Message arrived\n");
printf(" topic: %s\n", topicName); printf(" topic: %s\n", topicName);
printf(" message: %.*s\n", message->payloadlen, (char*)message->payload); printf(" message: %.*s\n", message->payloadlen, (char*)message->payload);
@ -129,7 +123,6 @@ void onConnect1(void* context, MQTTAsync_successData* response)
int subClient() int subClient()
{ {
closeInterface1();
MQTTAsync client; MQTTAsync client;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer; MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
@ -197,7 +190,6 @@ int subClient()
destroy_exit: destroy_exit:
MQTTAsync_destroy(&client); MQTTAsync_destroy(&client);
openInterface1();
exit: exit:
return rc; return rc;
} }

View File

@ -5,9 +5,8 @@
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include "libpaho-mqtt/include/MQTTAsync.h" #include "libpaho-mqtt/include/MQTTAsync.h"
#include "libcjson/include/cjson/cJSON.h"
void closeInterface1();
void openInterface1();
void connlost(void *context, char *cause); void connlost(void *context, char *cause);
int msgarrvd(void *context, char *topicName, int topicLen, MQTTAsync_message *message); int msgarrvd(void *context, char *topicName, int topicLen, MQTTAsync_message *message);
void onDisconnectFailure(void* context, MQTTAsync_failureData* response); void onDisconnectFailure(void* context, MQTTAsync_failureData* response);

View File

@ -149,7 +149,7 @@ void *test4GThread(void *arg)
{ {
logger_level_printf(LOGGER_DEBUG_LEVEL, arg); logger_level_printf(LOGGER_DEBUG_LEVEL, arg);
printf("client test!\n"); printf("client test!\n");
client(INTERFACE, SERVER_ADDR, PORT); client(SERVER_ADDR, PORT);
} }
//Can接收功能测试线程 //Can接收功能测试线程
@ -192,15 +192,15 @@ void *testMQTTThread(void *arg)
{ {
logger_level_printf(LOGGER_DEBUG_LEVEL, arg); logger_level_printf(LOGGER_DEBUG_LEVEL, arg);
printf("MQTT test!\n"); printf("MQTT test!\n");
// int rc = pubClient(); int rc = pubClient();
// if (rc < 0)
// {
// printf("MQTT wrong\n");
// }
int rc = subClient();
if (rc < 0) if (rc < 0)
{ {
printf("MQTT wrong\n"); printf("MQTT wrong\n");
} }
// int rc = subClient();
// if (rc < 0)
// {
// printf("MQTT wrong\n");
// }
} }