ems/ems_c/bsp/bsp_mqttClient.h

367 lines
16 KiB
C
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/*****************************************************************************
* @copyright 1997-2050, . POWER SUPPLY CO., LTD.
* @file bsp_mqttClient.c
* @brief mqtt通信客户端程序
* @author mdy
* @date 2024-09-29
* @remark
*****************************************************************************/
#pragma once
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <assert.h>
#include "MQTTClient.h"
#include "MQTTAsync.h"
#include <cjson/cJSON.h>
#include "bsp_rtdb.h"
#include "bsp_comm.h"
#include <semaphore.h>
#include <signal.h>
#include "kit_db.h"
#include "kit_core.h"
#include "logic_bcu2bsu.h"
#include "logic_main.h"
#include "logic_dido.h"
#define D_MAX_MQTT_DEV_POINT_NUM (2000) // 每类设备最大的上云测点个数
#define D_MAX_DEV_NUM (100) // 子设备最大数量
#define D_MAX_DEV_NAME_LEN (100)
#define D_MAX_DEV_KEYWORD (11)
#define D_MAX_IPV4_LENGTH (16)
#define D_MAX_USERNAME_LENGTH (20)
#define D_MAX_PASSWORD_LENGTH (20)
#define QOS 1
#define TIMEOUT 10000L
#define MQTT_ROOT_TOPIC_KEY "root"
#define MQTT_PERIOD_TOPIC_KEY "period"
#define MQTT_CHANGE_TOPIC_KEY "change"
#define MQTT_HISTORY_TOPIC_KEY "history"
#define MQTT_CONTROL_TOPIC_KEY "control"
#define MQTT_READ_TOPIC_KEY "read"
#define MQTT_REPLY_TOPIC_KEY "reply"
#define MQTT_PAYLOAD_MAX_LEN (1024 * 1024) // 能源云限制报文长度最大为1MB
#define MQTT_PERIOD_SEND_FAIL_WORDS "period send fail." // 周期发送失败
#define D_MAX_TOPIC_NUM (7)
#define MQTT_STATUS_DONT_TRANS -100 // 还未执行完策略,当前的配置参数不上传
#define MQTT_STATUS_CFG_UPLOADED 100 // 策略配置传送完毕
#define MAX_MQTT_HISTORY_MSG_NUM 1000
#define D_PERIODSEND_T_SEQ (0)
#define D_HISTORY_T_SEQ (1)
#define D_DATACHK_T_SEQ (2)
#define D_MAX_THREAD_NUM (3)
#define D_STR_CONNECT_WORD "已连接"
#define D_STR_DISCONN_WORD "已断开"
#define RECORDNETSTATUS(word) kit_insert_lost_contact_record(word);
// MQTT主题枚举定义
typedef enum
{
kEmsTopicType_Start,
kEmsTopicType_root = kEmsTopicType_Start, // 初始化主题
kEmsTopicType_period, // 周期上报主题
kEmsTopicType_change, // 突变上报主题
kEmsTopicType_history, // 历史数据上报主题
kEmsTopicType_control, // 控制指令下发主题
kEmsTopicType_read, // 读参数主题
kEmsTopicType_reply_control, // 回复控制指令下发主题
kEmsTopicType_reply_read, // 回复读参数主题
kEmsTopicType_End = kEmsTopicType_reply_read,
} ems_topic_type_e;
// 数据变化事件分类
typedef enum
{
kEvent_topo_send = 1, // 拓扑发送完成
kEvent_has_history_record = 2, // 存在历史记录record
kEvent_change_devTopo = 4, // 设备列表变化
kEvent_rename_anydev = 8, // 重命名子设备
kEvent_int_change = 16, // 子设备整型测点值变化越死区
kEvent_float_change = 32, // 子设备浮点型测点值变化越死区
kEvent_pointlistcfg_change = 64, // 设备测点配置改变
kEvent_Reconnect_success = 128, // MQTT重连成功
kEvent_Reconnect_failed = 256, // MQTT重连失败
kEvent_Period_Task_begin = 512, // 周期发送任务启动完成
} kEvent_ems_data;
// EMS总的工作模式
typedef enum
{
KEms_cfg_Start, // 无
KEms_cfg_DEBUG, // 调试模式
KEms_cfg_PEAKVALLY, // 削峰填谷模式
KEms_cfg_DEMANDRES, // 需求响应模式
KEms_cfg_LOADTRACK, // 负载跟踪模式
KEms_cfg_DMDCTRL, // 需量控制
KEms_cfg_PFCTRL, // 功率因数
KEms_cfg_Protect, // 保护策略
KEms_cfg_END,
} cfgType_e;
typedef enum
{
kQos_0,//最多交付一次
kQos_1,//至少交付一次
kQos_2,//只交付一次
}Qos_e;
// 拓扑结构结构体
typedef struct
{
int dbId; // 键:数据库中的 ID
topology_t *topology; // 值:指向 topology_t 结构体的指针
UT_hash_handle hh; // uthash 处理句柄,使结构体可哈希
} topology_hash_entry_t;
// MQTT单个测点的db查找表
typedef struct
{
// 取测点使用参数
rtdb_type_e dbType; // 数据库类型
uint16_t devType; // 设备类型
uint16_t devId; // 区分同类型不同设备实体的标识
uint16_t pointId; // 测点id
up_dis_config_type_e txOrRx; // 是上传的测点还是接收云端指令0-仅上报 1-下发
bool whether; // 接收用户输入1-上传0-不上传
double value; // 该测点值的备份用于检测变化事件
bool ifchg; // 是否需要变化上送
} mqtt2db_t;
typedef mqtt2db_t mqttGetDBElem;
typedef mqtt2db_t mqttSetDBElem;
void pointListener(mqtt2db_t *tab);
// 一个设备实体的测点信息表结构体
typedef struct
{
mqttGetDBElem rtdbGetTab[D_MAX_MQTT_DEV_POINT_NUM]; // 实时数据库查找表,对应上传点位
mqttSetDBElem rtdbSetTab[D_MAX_MQTT_DEV_POINT_NUM]; // 实时数据库查找表,对应相应云端控制的下发点位,云端写RTDB操作只能操作该表中的点
bool bChgFlag; // 变化标志
int txcount; // 记录加入payload的测点数量
int rxcount;
} payloadlist_t;
// mqtt通信参数结构体
typedef struct
{
char stationName[100]; // 站点名称
char stationID[20]; // 站点id
dev_type_e devType; // 设备类别
char *devName[D_MAX_DEV_NAME_LEN]; // 设备名称
char sn[D_MAX_DEV_NUM][60]; // 设备SN
char identifier[60]; // 消息服务标识
int devNum; // 设备数量
payloadlist_t pllist[D_MAX_DEV_NUM]; // 该类设备下每个设备实体一张mqtt2db关联表依据该表中的元素操作RTDB
bool bChgFlag;
} mqtt_option_map_t;
// payloadlists
// 保留给回调函数的上下文结构体
typedef struct
{
MQTTClient client;
MQTTClient_connectOptions connopts;
mqtt_lib_t mlib;
int modeword;
} ConnContext;
//所有算法配置的联合体
typedef union
{
peakvalley_zone_tab_t pkvly; // 削峰填谷参数 IN
demandRes_params_t demand; // 需求响应参数
loadTrack_params_t loadTrack; // 负载跟踪参数
debug_params_t debug; // 调试参数 IN
protect_params_t protect; // 保护参数 IN
power_distr_t distr; // 功率分配参数 IN
} logic_Params;
//中断记录与历史数据数组关联的结构体
typedef struct
{
break_record_t record; // 中断记录
UT_array *dataArray; // 其关联的数据存储集合
} break_record_with_data_t;
//从云端MQTT报文返回的信息结构体
typedef struct
{
char *transaction;
int modeWord;
} arvcfgInfo_ret_t;
typedef void (*setCfgCB)(void *);
void setdbgCB(void *arg);
void setPkvlCB(void *arg);
void setDmdRspCB(void *arg);
void setLoadTrackCB(void *arg);
void setDmdCtrlCB(void *arg);
void setPFCtrlCB(void *arg);
/*****************************************************************************
* @brief 消息传送成功触发的回调函数
* @param[in] context: MQTT上下文结构体
* @param[in] dt: 标识符
* @return NONE
*****************************************************************************/
void deliveredCB(void *context, MQTTClient_deliveryToken dt);
/*****************************************************************************
* @brief 消息接收成功触发的回调函数
* @param[in] context: MQTT上下文结构体
* @param[in] topicName:主题
* @param[in] topicLen主题长度
* @return
*****************************************************************************/
int msgarrvdCB(void *context, char *topicName, int topicLen, MQTTClient_message *message);
/*****************************************************************************
* @brief 连接丢失触发的回调函数
* @param[in] context: MQTT上下文结构体
* @param[in] cause:中断原因
* @return NONE
*****************************************************************************/
void connlostCB(void *context, char *cause);
/*****************************************************************************
* @brief 更新MQTT参数
* @param[in] mlt: MQTT连接配置
* @param[in] map:测点配置表
* @return NONE
*****************************************************************************/
void updateMqttPara(mqtt_lib_t *mlt, mqtt_option_map_t *map);
/*****************************************************************************
* @brief 更新MQTT参数
* @param[in] mlt: MQTT连接配置
* @param[in] map:测点配置表
* @return NONE
*****************************************************************************/
int chkDataChange(mqtt_option_map_t *now, mqtt_option_map_t *old);
/*****************************************************************************
* @brief MQTT周期发送线程
* @param[in] arg: 可选参数
* @return NONE
*****************************************************************************/
void *periodSendTask(void *arg);
/*****************************************************************************
* @brief 数据变化监视器
* @param[in] arg: 可选参数
* @return NONE
************************************************************************/
void *dataChgMonitor(void *arg);
/*****************************************************************************
* @brief 从MQTT字符串中获取关键字返回主题类型
* @param[in] topic: MQTT主题字符串
* @return 主题类型枚举值
*****************************************************************************/
int getKeywordsByTopic(char *topic);
/*****************************************************************************
* @brief 根据设备大类组包
* @param[in] payload目标字符串指针
* @param[in] type设备类型
* @param[in] tot设备点表结构体
* @return NONE
*****************************************************************************/
void genDevGrpPeriodPayload(char *payload, dev_type_e type, mqtt_option_map_t *tot);
/*****************************************************************************
* @brief 递归添加子节点到 JSON 结构中
* @param[in] parentArray: 父节点的 JSON 数组
* @param[in] parentId: 父节点的 ID
* @param[in] topologyById: 存储拓扑信息的哈希表
* @return 无
*****************************************************************************/
void addChildrenToJson(cJSON *parentArray, int parentId, topology_hash_entry_t *topologyById);
/*****************************************************************************
* @brief 将数据库中的 EMS 设备拓扑结构数据转换为 JSON 字符串
* @param[out] json: 用于存储生成的 JSON 字符串的缓冲区。确保该缓冲区有足够的空间容纳生成的 JSON 字符串。
* @return 0-成功 1-失败
*****************************************************************************/
int getTopologyJsonByDb(char **json);
/*****************************************************************************
* @brief 将设备类型枚举值转换为与云端通信协议中的字符串
* @param[out] type设备类型枚举值
* @return 设备类型缩写字符串
*****************************************************************************/
char *devTypeToString(dev_type_e type);
/*****************************************************************************
* @brief 比较两个设备节点是否相同
* @param[in] a左操作数
* @param[in] b左操作数
* @return 0-不同1-相同
*****************************************************************************/
bool ifSameTopoElem(const topology_t *a, const topology_t *b);
/*****************************************************************************
* @brief 比较两个设备拓扑是否相同
* @param[in] arr1左操作数
* @param[in] arr2左操作数
* @return 0-不同1-相同
*****************************************************************************/
bool chkTopoDiff(UT_array *arr1, UT_array *arr2);
/*****************************************************************************
* @brief 获得当前时间戳字符串
* @return 当前时间戳字符串
*****************************************************************************/
char *getTmstr2();
/*****************************************************************************
* @brief 解析云端消息
* @param[in] jsonString云端消息json串
* @param[in] arr指向配置参数的指针
* @return 0-成功1-失败
*****************************************************************************/
arvcfgInfo_ret_t parseEmsCfgJson(const char *jsonString, logic_Params *lP);
/*****************************************************************************
* @brief 单个设备测点值是否改变
* @param[in] tot: mqtt_lib_t中的主题字符串指针
* @return false-未改变 true-改变
*****************************************************************************/
bool ifSingledevPointsChg(payloadlist_t *list);
/*****************************************************************************
* @brief 单个设备测点值是否改变
* @param[in] map: 设备组测点列表
* @return false-未改变 true-改变
*****************************************************************************/
bool ifDevGrpPointsChg(mqtt_option_map_t *map);
/*****************************************************************************
* @brief 装载不同设备组变化上送主题有效载荷
* @param[in] topic: mqtt_lib_t中的主题字符串指针
* @param[in] payload:主题类型枚举枚举与mqtt_lib_t中的成员应对应请确保payload的缓冲区长度足够
* @return NONE
*****************************************************************************/
int genChgDevGrpPeriodPayload(char *payload, dev_type_e type, mqtt_option_map_t *tot);
/*****************************************************************************
* @brief 根据云端下发的配置信息,将参数写入联合体
* @return 设备类型字符串
*****************************************************************************/
int parseStrategyJsonObject(cJSON *json, int modeword, logic_Params *lp);
/*****************************************************************************
* @brief 根据配置信息和模式组装成 JSON 字符串,并包含额外的顶层字段
* @param[in] arvcfgInfo_ret_t cfg 结构包含模式字、交易标识等信息
* void *arg 指向配置数据的指针(例如,指向 logic_Params 的指针)
* @return char* 表示 JSON 字符串(需要调用者释放)
*****************************************************************************/
char *createStrategyCfgJsonString(arvcfgInfo_ret_t cfg, const void *arg, int rw);
/*****************************************************************************
* @brief 判断通信是否断开
* @param[in] argtcp连接上下文
* @return 0-中断1-正常
*****************************************************************************/
bool is_mqtt_connected();
/*****************************************************************************
* @brief MQTT主函数
* @param[in] arg: 可选参数
* @return NONE
*****************************************************************************/
void *MqttTask(void *arg);
/*****************************************************************************
* @brief MQTT转发任务入口
* @return NONE
*****************************************************************************/
void creatNetMqttTaskEntry();