367 lines
16 KiB
C
367 lines
16 KiB
C
/*****************************************************************************
|
||
* @copyright 1997-2050, . POWER SUPPLY CO., LTD.
|
||
* @file bsp_mqttClient.c
|
||
* @brief mqtt通信客户端程序
|
||
* @author mdy
|
||
* @date 2024-09-29
|
||
* @remark
|
||
*****************************************************************************/
|
||
#pragma once
|
||
#include <stdio.h>
|
||
#include <stdlib.h>
|
||
#include <string.h>
|
||
#include <pthread.h>
|
||
#include <assert.h>
|
||
#include "MQTTClient.h"
|
||
#include "MQTTAsync.h"
|
||
#include <cjson/cJSON.h>
|
||
#include "bsp_rtdb.h"
|
||
#include "bsp_comm.h"
|
||
#include <semaphore.h>
|
||
#include <signal.h>
|
||
#include "kit_db.h"
|
||
#include "kit_core.h"
|
||
#include "logic_bcu2bsu.h"
|
||
#include "logic_main.h"
|
||
#include "logic_dido.h"
|
||
|
||
#define D_MAX_MQTT_DEV_POINT_NUM (2000) // 每类设备最大的上云测点个数
|
||
#define D_MAX_DEV_NUM (100) // 子设备最大数量
|
||
#define D_MAX_DEV_NAME_LEN (100)
|
||
#define D_MAX_DEV_KEYWORD (11)
|
||
#define D_MAX_IPV4_LENGTH (16)
|
||
#define D_MAX_USERNAME_LENGTH (20)
|
||
#define D_MAX_PASSWORD_LENGTH (20)
|
||
#define QOS 1
|
||
#define TIMEOUT 10000L
|
||
#define MQTT_ROOT_TOPIC_KEY "root"
|
||
#define MQTT_PERIOD_TOPIC_KEY "period"
|
||
#define MQTT_CHANGE_TOPIC_KEY "change"
|
||
#define MQTT_HISTORY_TOPIC_KEY "history"
|
||
#define MQTT_CONTROL_TOPIC_KEY "control"
|
||
#define MQTT_READ_TOPIC_KEY "read"
|
||
#define MQTT_REPLY_TOPIC_KEY "reply"
|
||
#define MQTT_PAYLOAD_MAX_LEN (1024 * 1024) // 能源云限制报文长度最大为1MB
|
||
#define MQTT_PERIOD_SEND_FAIL_WORDS "period send fail." // 周期发送失败
|
||
#define D_MAX_TOPIC_NUM (7)
|
||
#define MQTT_STATUS_DONT_TRANS -100 // 还未执行完策略,当前的配置参数不上传
|
||
#define MQTT_STATUS_CFG_UPLOADED 100 // 策略配置传送完毕
|
||
#define MAX_MQTT_HISTORY_MSG_NUM 1000
|
||
#define D_PERIODSEND_T_SEQ (0)
|
||
#define D_HISTORY_T_SEQ (1)
|
||
#define D_DATACHK_T_SEQ (2)
|
||
#define D_MAX_THREAD_NUM (3)
|
||
#define D_STR_CONNECT_WORD "已连接"
|
||
#define D_STR_DISCONN_WORD "已断开"
|
||
#define RECORDNETSTATUS(word) kit_insert_lost_contact_record(word);
|
||
|
||
// MQTT主题枚举定义
|
||
typedef enum
|
||
{
|
||
kEmsTopicType_Start,
|
||
kEmsTopicType_root = kEmsTopicType_Start, // 初始化主题
|
||
kEmsTopicType_period, // 周期上报主题
|
||
kEmsTopicType_change, // 突变上报主题
|
||
kEmsTopicType_history, // 历史数据上报主题
|
||
kEmsTopicType_control, // 控制指令下发主题
|
||
kEmsTopicType_read, // 读参数主题
|
||
kEmsTopicType_reply_control, // 回复控制指令下发主题
|
||
kEmsTopicType_reply_read, // 回复读参数主题
|
||
kEmsTopicType_End = kEmsTopicType_reply_read,
|
||
} ems_topic_type_e;
|
||
|
||
// 数据变化事件分类
|
||
typedef enum
|
||
{
|
||
kEvent_topo_send = 1, // 拓扑发送完成
|
||
kEvent_has_history_record = 2, // 存在历史记录record
|
||
kEvent_change_devTopo = 4, // 设备列表变化
|
||
kEvent_rename_anydev = 8, // 重命名子设备
|
||
kEvent_int_change = 16, // 子设备整型测点值变化越死区
|
||
kEvent_float_change = 32, // 子设备浮点型测点值变化越死区
|
||
kEvent_pointlistcfg_change = 64, // 设备测点配置改变
|
||
kEvent_Reconnect_success = 128, // MQTT重连成功
|
||
kEvent_Reconnect_failed = 256, // MQTT重连失败
|
||
kEvent_Period_Task_begin = 512, // 周期发送任务启动完成
|
||
|
||
} kEvent_ems_data;
|
||
|
||
// EMS总的工作模式
|
||
typedef enum
|
||
{
|
||
KEms_cfg_Start, // 无
|
||
KEms_cfg_DEBUG, // 调试模式
|
||
KEms_cfg_PEAKVALLY, // 削峰填谷模式
|
||
KEms_cfg_DEMANDRES, // 需求响应模式
|
||
KEms_cfg_LOADTRACK, // 负载跟踪模式
|
||
KEms_cfg_DMDCTRL, // 需量控制
|
||
KEms_cfg_PFCTRL, // 功率因数
|
||
KEms_cfg_Protect, // 保护策略
|
||
KEms_cfg_END,
|
||
} cfgType_e;
|
||
typedef enum
|
||
{
|
||
kQos_0,//最多交付一次
|
||
kQos_1,//至少交付一次
|
||
kQos_2,//只交付一次
|
||
}Qos_e;
|
||
// 拓扑结构结构体
|
||
typedef struct
|
||
{
|
||
int dbId; // 键:数据库中的 ID
|
||
topology_t *topology; // 值:指向 topology_t 结构体的指针
|
||
UT_hash_handle hh; // uthash 处理句柄,使结构体可哈希
|
||
} topology_hash_entry_t;
|
||
|
||
// MQTT单个测点的db查找表
|
||
typedef struct
|
||
{
|
||
// 取测点使用参数
|
||
rtdb_type_e dbType; // 数据库类型
|
||
uint16_t devType; // 设备类型
|
||
uint16_t devId; // 区分同类型不同设备实体的标识
|
||
uint16_t pointId; // 测点id
|
||
up_dis_config_type_e txOrRx; // 是上传的测点还是接收云端指令0-仅上报 1-下发
|
||
bool whether; // 接收用户输入,1-上传:0-不上传
|
||
double value; // 该测点值的备份用于检测变化事件
|
||
bool ifchg; // 是否需要变化上送
|
||
} mqtt2db_t;
|
||
typedef mqtt2db_t mqttGetDBElem;
|
||
typedef mqtt2db_t mqttSetDBElem;
|
||
|
||
void pointListener(mqtt2db_t *tab);
|
||
// 一个设备实体的测点信息表结构体
|
||
typedef struct
|
||
{
|
||
mqttGetDBElem rtdbGetTab[D_MAX_MQTT_DEV_POINT_NUM]; // 实时数据库查找表,对应上传点位
|
||
mqttSetDBElem rtdbSetTab[D_MAX_MQTT_DEV_POINT_NUM]; // 实时数据库查找表,对应相应云端控制的下发点位,云端写RTDB操作只能操作该表中的点
|
||
bool bChgFlag; // 变化标志
|
||
int txcount; // 记录加入payload的测点数量
|
||
int rxcount;
|
||
} payloadlist_t;
|
||
|
||
// mqtt通信参数结构体
|
||
typedef struct
|
||
{
|
||
char stationName[100]; // 站点名称
|
||
char stationID[20]; // 站点id
|
||
dev_type_e devType; // 设备类别
|
||
char *devName[D_MAX_DEV_NAME_LEN]; // 设备名称
|
||
char sn[D_MAX_DEV_NUM][60]; // 设备SN
|
||
char identifier[60]; // 消息服务标识
|
||
int devNum; // 设备数量
|
||
payloadlist_t pllist[D_MAX_DEV_NUM]; // 该类设备下,每个设备实体一张mqtt2db关联表,依据该表中的元素操作RTDB
|
||
bool bChgFlag;
|
||
} mqtt_option_map_t;
|
||
// payloadlists
|
||
|
||
// 保留给回调函数的上下文结构体
|
||
typedef struct
|
||
{
|
||
MQTTClient client;
|
||
MQTTClient_connectOptions connopts;
|
||
mqtt_lib_t mlib;
|
||
int modeword;
|
||
} ConnContext;
|
||
|
||
//所有算法配置的联合体
|
||
typedef union
|
||
{
|
||
peakvalley_zone_tab_t pkvly; // 削峰填谷参数 IN
|
||
demandRes_params_t demand; // 需求响应参数
|
||
loadTrack_params_t loadTrack; // 负载跟踪参数
|
||
debug_params_t debug; // 调试参数 IN
|
||
protect_params_t protect; // 保护参数 IN
|
||
power_distr_t distr; // 功率分配参数 IN
|
||
} logic_Params;
|
||
|
||
//中断记录与历史数据数组关联的结构体
|
||
typedef struct
|
||
{
|
||
break_record_t record; // 中断记录
|
||
UT_array *dataArray; // 其关联的数据存储集合
|
||
} break_record_with_data_t;
|
||
|
||
//从云端MQTT报文返回的信息结构体
|
||
typedef struct
|
||
{
|
||
char *transaction;
|
||
int modeWord;
|
||
} arvcfgInfo_ret_t;
|
||
typedef void (*setCfgCB)(void *);
|
||
void setdbgCB(void *arg);
|
||
void setPkvlCB(void *arg);
|
||
void setDmdRspCB(void *arg);
|
||
void setLoadTrackCB(void *arg);
|
||
void setDmdCtrlCB(void *arg);
|
||
void setPFCtrlCB(void *arg);
|
||
/*****************************************************************************
|
||
* @brief 消息传送成功触发的回调函数
|
||
* @param[in] context: MQTT上下文结构体
|
||
* @param[in] dt: 标识符
|
||
* @return NONE
|
||
*****************************************************************************/
|
||
void deliveredCB(void *context, MQTTClient_deliveryToken dt);
|
||
|
||
/*****************************************************************************
|
||
* @brief 消息接收成功触发的回调函数
|
||
* @param[in] context: MQTT上下文结构体
|
||
* @param[in] topicName:主题
|
||
* @param[in] topicLen:主题长度
|
||
* @return
|
||
*****************************************************************************/
|
||
int msgarrvdCB(void *context, char *topicName, int topicLen, MQTTClient_message *message);
|
||
|
||
/*****************************************************************************
|
||
* @brief 连接丢失触发的回调函数
|
||
* @param[in] context: MQTT上下文结构体
|
||
* @param[in] cause:中断原因
|
||
* @return NONE
|
||
*****************************************************************************/
|
||
void connlostCB(void *context, char *cause);
|
||
|
||
/*****************************************************************************
|
||
* @brief 更新MQTT参数
|
||
* @param[in] mlt: MQTT连接配置
|
||
* @param[in] map:测点配置表
|
||
* @return NONE
|
||
*****************************************************************************/
|
||
void updateMqttPara(mqtt_lib_t *mlt, mqtt_option_map_t *map);
|
||
|
||
/*****************************************************************************
|
||
* @brief 更新MQTT参数
|
||
* @param[in] mlt: MQTT连接配置
|
||
* @param[in] map:测点配置表
|
||
* @return NONE
|
||
*****************************************************************************/
|
||
int chkDataChange(mqtt_option_map_t *now, mqtt_option_map_t *old);
|
||
|
||
/*****************************************************************************
|
||
* @brief MQTT周期发送线程
|
||
* @param[in] arg: 可选参数
|
||
* @return NONE
|
||
*****************************************************************************/
|
||
void *periodSendTask(void *arg);
|
||
/*****************************************************************************
|
||
* @brief 数据变化监视器
|
||
* @param[in] arg: 可选参数
|
||
* @return NONE
|
||
************************************************************************/
|
||
void *dataChgMonitor(void *arg);
|
||
/*****************************************************************************
|
||
* @brief 从MQTT字符串中获取关键字返回主题类型
|
||
* @param[in] topic: MQTT主题字符串
|
||
* @return 主题类型枚举值
|
||
*****************************************************************************/
|
||
int getKeywordsByTopic(char *topic);
|
||
|
||
/*****************************************************************************
|
||
* @brief 根据设备大类组包
|
||
* @param[in] payload:目标字符串指针
|
||
* @param[in] type:设备类型
|
||
* @param[in] tot:设备点表结构体
|
||
* @return NONE
|
||
*****************************************************************************/
|
||
void genDevGrpPeriodPayload(char *payload, dev_type_e type, mqtt_option_map_t *tot);
|
||
|
||
/*****************************************************************************
|
||
* @brief 递归添加子节点到 JSON 结构中
|
||
* @param[in] parentArray: 父节点的 JSON 数组
|
||
* @param[in] parentId: 父节点的 ID
|
||
* @param[in] topologyById: 存储拓扑信息的哈希表
|
||
* @return 无
|
||
*****************************************************************************/
|
||
void addChildrenToJson(cJSON *parentArray, int parentId, topology_hash_entry_t *topologyById);
|
||
|
||
/*****************************************************************************
|
||
* @brief 将数据库中的 EMS 设备拓扑结构数据转换为 JSON 字符串
|
||
* @param[out] json: 用于存储生成的 JSON 字符串的缓冲区。确保该缓冲区有足够的空间容纳生成的 JSON 字符串。
|
||
* @return 0-成功 1-失败
|
||
*****************************************************************************/
|
||
int getTopologyJsonByDb(char **json);
|
||
|
||
/*****************************************************************************
|
||
* @brief 将设备类型枚举值转换为与云端通信协议中的字符串
|
||
* @param[out] type:设备类型枚举值
|
||
* @return 设备类型缩写字符串
|
||
*****************************************************************************/
|
||
char *devTypeToString(dev_type_e type);
|
||
|
||
/*****************************************************************************
|
||
* @brief 比较两个设备节点是否相同
|
||
* @param[in] a:左操作数
|
||
* @param[in] b:左操作数
|
||
* @return 0-不同,1-相同
|
||
*****************************************************************************/
|
||
bool ifSameTopoElem(const topology_t *a, const topology_t *b);
|
||
|
||
/*****************************************************************************
|
||
* @brief 比较两个设备拓扑是否相同
|
||
* @param[in] arr1:左操作数
|
||
* @param[in] arr2:左操作数
|
||
* @return 0-不同,1-相同
|
||
*****************************************************************************/
|
||
bool chkTopoDiff(UT_array *arr1, UT_array *arr2);
|
||
|
||
/*****************************************************************************
|
||
* @brief 获得当前时间戳字符串
|
||
* @return 当前时间戳字符串
|
||
*****************************************************************************/
|
||
char *getTmstr2();
|
||
/*****************************************************************************
|
||
* @brief 解析云端消息
|
||
* @param[in] jsonString:云端消息json串
|
||
* @param[in] arr:指向配置参数的指针
|
||
* @return 0-成功;1-失败
|
||
*****************************************************************************/
|
||
arvcfgInfo_ret_t parseEmsCfgJson(const char *jsonString, logic_Params *lP);
|
||
/*****************************************************************************
|
||
* @brief 单个设备测点值是否改变
|
||
* @param[in] tot: mqtt_lib_t中的主题字符串指针
|
||
* @return false-未改变 true-改变
|
||
*****************************************************************************/
|
||
bool ifSingledevPointsChg(payloadlist_t *list);
|
||
/*****************************************************************************
|
||
* @brief 单个设备测点值是否改变
|
||
* @param[in] map: 设备组测点列表
|
||
* @return false-未改变 true-改变
|
||
*****************************************************************************/
|
||
bool ifDevGrpPointsChg(mqtt_option_map_t *map);
|
||
/*****************************************************************************
|
||
* @brief 装载不同设备组变化上送主题有效载荷
|
||
* @param[in] topic: mqtt_lib_t中的主题字符串指针
|
||
* @param[in] payload:主题类型枚举,枚举与mqtt_lib_t中的成员应对应,请确保payload的缓冲区长度足够
|
||
* @return NONE
|
||
*****************************************************************************/
|
||
int genChgDevGrpPeriodPayload(char *payload, dev_type_e type, mqtt_option_map_t *tot);
|
||
|
||
/*****************************************************************************
|
||
* @brief 根据云端下发的配置信息,将参数写入联合体
|
||
* @return 设备类型字符串
|
||
*****************************************************************************/
|
||
int parseStrategyJsonObject(cJSON *json, int modeword, logic_Params *lp);
|
||
/*****************************************************************************
|
||
* @brief 根据配置信息和模式组装成 JSON 字符串,并包含额外的顶层字段
|
||
* @param[in] arvcfgInfo_ret_t cfg 结构包含模式字、交易标识等信息
|
||
* void *arg 指向配置数据的指针(例如,指向 logic_Params 的指针)
|
||
* @return char* 表示 JSON 字符串(需要调用者释放)
|
||
*****************************************************************************/
|
||
char *createStrategyCfgJsonString(arvcfgInfo_ret_t cfg, const void *arg, int rw);
|
||
/*****************************************************************************
|
||
* @brief 判断通信是否断开
|
||
* @param[in] arg:tcp连接上下文
|
||
* @return 0-中断;1-正常
|
||
*****************************************************************************/
|
||
bool is_mqtt_connected();
|
||
/*****************************************************************************
|
||
* @brief MQTT主函数
|
||
* @param[in] arg: 可选参数
|
||
* @return NONE
|
||
*****************************************************************************/
|
||
void *MqttTask(void *arg);
|
||
|
||
/*****************************************************************************
|
||
* @brief MQTT转发任务入口
|
||
* @return NONE
|
||
*****************************************************************************/
|
||
void creatNetMqttTaskEntry(); |