ems/ems_c/bsp/bsp_mqttClient.h

367 lines
16 KiB
C
Raw Permalink Normal View History

2025-05-13 17:49:49 +08:00
/*****************************************************************************
* @copyright 1997-2050, . POWER SUPPLY CO., LTD.
* @file bsp_mqttClient.c
* @brief mqtt通信客户端程序
* @author mdy
* @date 2024-09-29
* @remark
*****************************************************************************/
#pragma once
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <assert.h>
#include "MQTTClient.h"
#include "MQTTAsync.h"
#include <cjson/cJSON.h>
#include "bsp_rtdb.h"
#include "bsp_comm.h"
#include <semaphore.h>
#include <signal.h>
#include "kit_db.h"
#include "kit_core.h"
#include "logic_bcu2bsu.h"
#include "logic_main.h"
#include "logic_dido.h"
#define D_MAX_MQTT_DEV_POINT_NUM (2000) // 每类设备最大的上云测点个数
#define D_MAX_DEV_NUM (100) // 子设备最大数量
#define D_MAX_DEV_NAME_LEN (100)
#define D_MAX_DEV_KEYWORD (11)
#define D_MAX_IPV4_LENGTH (16)
#define D_MAX_USERNAME_LENGTH (20)
#define D_MAX_PASSWORD_LENGTH (20)
#define QOS 1
#define TIMEOUT 10000L
#define MQTT_ROOT_TOPIC_KEY "root"
#define MQTT_PERIOD_TOPIC_KEY "period"
#define MQTT_CHANGE_TOPIC_KEY "change"
#define MQTT_HISTORY_TOPIC_KEY "history"
#define MQTT_CONTROL_TOPIC_KEY "control"
#define MQTT_READ_TOPIC_KEY "read"
#define MQTT_REPLY_TOPIC_KEY "reply"
#define MQTT_PAYLOAD_MAX_LEN (1024 * 1024) // 能源云限制报文长度最大为1MB
#define MQTT_PERIOD_SEND_FAIL_WORDS "period send fail." // 周期发送失败
#define D_MAX_TOPIC_NUM (7)
#define MQTT_STATUS_DONT_TRANS -100 // 还未执行完策略,当前的配置参数不上传
#define MQTT_STATUS_CFG_UPLOADED 100 // 策略配置传送完毕
#define MAX_MQTT_HISTORY_MSG_NUM 1000
#define D_PERIODSEND_T_SEQ (0)
#define D_HISTORY_T_SEQ (1)
#define D_DATACHK_T_SEQ (2)
#define D_MAX_THREAD_NUM (3)
#define D_STR_CONNECT_WORD "已连接"
#define D_STR_DISCONN_WORD "已断开"
#define RECORDNETSTATUS(word) kit_insert_lost_contact_record(word);
// MQTT主题枚举定义
typedef enum
{
kEmsTopicType_Start,
kEmsTopicType_root = kEmsTopicType_Start, // 初始化主题
kEmsTopicType_period, // 周期上报主题
kEmsTopicType_change, // 突变上报主题
kEmsTopicType_history, // 历史数据上报主题
kEmsTopicType_control, // 控制指令下发主题
kEmsTopicType_read, // 读参数主题
kEmsTopicType_reply_control, // 回复控制指令下发主题
kEmsTopicType_reply_read, // 回复读参数主题
kEmsTopicType_End = kEmsTopicType_reply_read,
} ems_topic_type_e;
// 数据变化事件分类
typedef enum
{
kEvent_topo_send = 1, // 拓扑发送完成
kEvent_has_history_record = 2, // 存在历史记录record
kEvent_change_devTopo = 4, // 设备列表变化
kEvent_rename_anydev = 8, // 重命名子设备
kEvent_int_change = 16, // 子设备整型测点值变化越死区
kEvent_float_change = 32, // 子设备浮点型测点值变化越死区
kEvent_pointlistcfg_change = 64, // 设备测点配置改变
kEvent_Reconnect_success = 128, // MQTT重连成功
kEvent_Reconnect_failed = 256, // MQTT重连失败
kEvent_Period_Task_begin = 512, // 周期发送任务启动完成
} kEvent_ems_data;
// EMS总的工作模式
typedef enum
{
KEms_cfg_Start, // 无
KEms_cfg_DEBUG, // 调试模式
KEms_cfg_PEAKVALLY, // 削峰填谷模式
KEms_cfg_DEMANDRES, // 需求响应模式
KEms_cfg_LOADTRACK, // 负载跟踪模式
KEms_cfg_DMDCTRL, // 需量控制
KEms_cfg_PFCTRL, // 功率因数
KEms_cfg_Protect, // 保护策略
KEms_cfg_END,
} cfgType_e;
typedef enum
{
kQos_0,//最多交付一次
kQos_1,//至少交付一次
kQos_2,//只交付一次
}Qos_e;
// 拓扑结构结构体
typedef struct
{
int dbId; // 键:数据库中的 ID
topology_t *topology; // 值:指向 topology_t 结构体的指针
UT_hash_handle hh; // uthash 处理句柄,使结构体可哈希
} topology_hash_entry_t;
// MQTT单个测点的db查找表
typedef struct
{
// 取测点使用参数
rtdb_type_e dbType; // 数据库类型
uint16_t devType; // 设备类型
uint16_t devId; // 区分同类型不同设备实体的标识
uint16_t pointId; // 测点id
up_dis_config_type_e txOrRx; // 是上传的测点还是接收云端指令0-仅上报 1-下发
bool whether; // 接收用户输入1-上传0-不上传
double value; // 该测点值的备份用于检测变化事件
bool ifchg; // 是否需要变化上送
} mqtt2db_t;
typedef mqtt2db_t mqttGetDBElem;
typedef mqtt2db_t mqttSetDBElem;
void pointListener(mqtt2db_t *tab);
// 一个设备实体的测点信息表结构体
typedef struct
{
mqttGetDBElem rtdbGetTab[D_MAX_MQTT_DEV_POINT_NUM]; // 实时数据库查找表,对应上传点位
mqttSetDBElem rtdbSetTab[D_MAX_MQTT_DEV_POINT_NUM]; // 实时数据库查找表,对应相应云端控制的下发点位,云端写RTDB操作只能操作该表中的点
bool bChgFlag; // 变化标志
int txcount; // 记录加入payload的测点数量
int rxcount;
} payloadlist_t;
// mqtt通信参数结构体
typedef struct
{
char stationName[100]; // 站点名称
char stationID[20]; // 站点id
dev_type_e devType; // 设备类别
char *devName[D_MAX_DEV_NAME_LEN]; // 设备名称
char sn[D_MAX_DEV_NUM][60]; // 设备SN
char identifier[60]; // 消息服务标识
int devNum; // 设备数量
payloadlist_t pllist[D_MAX_DEV_NUM]; // 该类设备下每个设备实体一张mqtt2db关联表依据该表中的元素操作RTDB
bool bChgFlag;
} mqtt_option_map_t;
// payloadlists
// 保留给回调函数的上下文结构体
typedef struct
{
MQTTClient client;
MQTTClient_connectOptions connopts;
mqtt_lib_t mlib;
int modeword;
} ConnContext;
//所有算法配置的联合体
typedef union
{
peakvalley_zone_tab_t pkvly; // 削峰填谷参数 IN
demandRes_params_t demand; // 需求响应参数
loadTrack_params_t loadTrack; // 负载跟踪参数
debug_params_t debug; // 调试参数 IN
protect_params_t protect; // 保护参数 IN
power_distr_t distr; // 功率分配参数 IN
} logic_Params;
//中断记录与历史数据数组关联的结构体
typedef struct
{
break_record_t record; // 中断记录
UT_array *dataArray; // 其关联的数据存储集合
} break_record_with_data_t;
//从云端MQTT报文返回的信息结构体
typedef struct
{
char *transaction;
int modeWord;
} arvcfgInfo_ret_t;
typedef void (*setCfgCB)(void *);
void setdbgCB(void *arg);
void setPkvlCB(void *arg);
void setDmdRspCB(void *arg);
void setLoadTrackCB(void *arg);
void setDmdCtrlCB(void *arg);
void setPFCtrlCB(void *arg);
/*****************************************************************************
* @brief
* @param[in] context: MQTT上下文结构体
* @param[in] dt:
* @return NONE
*****************************************************************************/
void deliveredCB(void *context, MQTTClient_deliveryToken dt);
/*****************************************************************************
* @brief
* @param[in] context: MQTT上下文结构体
* @param[in] topicName:
* @param[in] topicLen
* @return
*****************************************************************************/
int msgarrvdCB(void *context, char *topicName, int topicLen, MQTTClient_message *message);
/*****************************************************************************
* @brief
* @param[in] context: MQTT上下文结构体
* @param[in] cause:
* @return NONE
*****************************************************************************/
void connlostCB(void *context, char *cause);
/*****************************************************************************
* @brief MQTT参数
* @param[in] mlt: MQTT连接配置
* @param[in] map:
* @return NONE
*****************************************************************************/
void updateMqttPara(mqtt_lib_t *mlt, mqtt_option_map_t *map);
/*****************************************************************************
* @brief MQTT参数
* @param[in] mlt: MQTT连接配置
* @param[in] map:
* @return NONE
*****************************************************************************/
int chkDataChange(mqtt_option_map_t *now, mqtt_option_map_t *old);
/*****************************************************************************
* @brief MQTT周期发送线程
* @param[in] arg:
* @return NONE
*****************************************************************************/
void *periodSendTask(void *arg);
/*****************************************************************************
* @brief
* @param[in] arg:
* @return NONE
************************************************************************/
void *dataChgMonitor(void *arg);
/*****************************************************************************
* @brief MQTT字符串中获取关键字返回主题类型
* @param[in] topic: MQTT主题字符串
* @return
*****************************************************************************/
int getKeywordsByTopic(char *topic);
/*****************************************************************************
* @brief
* @param[in] payload
* @param[in] type
* @param[in] tot
* @return NONE
*****************************************************************************/
void genDevGrpPeriodPayload(char *payload, dev_type_e type, mqtt_option_map_t *tot);
/*****************************************************************************
* @brief JSON
* @param[in] parentArray: JSON
* @param[in] parentId: ID
* @param[in] topologyById:
* @return
*****************************************************************************/
void addChildrenToJson(cJSON *parentArray, int parentId, topology_hash_entry_t *topologyById);
/*****************************************************************************
* @brief EMS JSON
* @param[out] json: JSON JSON
* @return 0- 1-
*****************************************************************************/
int getTopologyJsonByDb(char **json);
/*****************************************************************************
* @brief
* @param[out] type
* @return
*****************************************************************************/
char *devTypeToString(dev_type_e type);
/*****************************************************************************
* @brief
* @param[in] a
* @param[in] b
* @return 0-1-
*****************************************************************************/
bool ifSameTopoElem(const topology_t *a, const topology_t *b);
/*****************************************************************************
* @brief
* @param[in] arr1
* @param[in] arr2
* @return 0-1-
*****************************************************************************/
bool chkTopoDiff(UT_array *arr1, UT_array *arr2);
/*****************************************************************************
* @brief
* @return
*****************************************************************************/
char *getTmstr2();
/*****************************************************************************
* @brief
* @param[in] jsonStringjson串
* @param[in] arr
* @return 0-1-
*****************************************************************************/
arvcfgInfo_ret_t parseEmsCfgJson(const char *jsonString, logic_Params *lP);
/*****************************************************************************
* @brief
* @param[in] tot: mqtt_lib_t中的主题字符串指针
* @return false- true-
*****************************************************************************/
bool ifSingledevPointsChg(payloadlist_t *list);
/*****************************************************************************
* @brief
* @param[in] map:
* @return false- true-
*****************************************************************************/
bool ifDevGrpPointsChg(mqtt_option_map_t *map);
/*****************************************************************************
* @brief
* @param[in] topic: mqtt_lib_t中的主题字符串指针
* @param[in] payload:mqtt_lib_t中的成员应对应payload的缓冲区长度足够
* @return NONE
*****************************************************************************/
int genChgDevGrpPeriodPayload(char *payload, dev_type_e type, mqtt_option_map_t *tot);
/*****************************************************************************
* @brief
* @return
*****************************************************************************/
int parseStrategyJsonObject(cJSON *json, int modeword, logic_Params *lp);
/*****************************************************************************
* @brief JSON
* @param[in] arvcfgInfo_ret_t cfg
* void *arg logic_Params
* @return char* JSON
*****************************************************************************/
char *createStrategyCfgJsonString(arvcfgInfo_ret_t cfg, const void *arg, int rw);
/*****************************************************************************
* @brief
* @param[in] argtcp连接上下文
* @return 0-1-
*****************************************************************************/
bool is_mqtt_connected();
/*****************************************************************************
* @brief MQTT主函数
* @param[in] arg:
* @return NONE
*****************************************************************************/
void *MqttTask(void *arg);
/*****************************************************************************
* @brief MQTT转发任务入口
* @return NONE
*****************************************************************************/
void creatNetMqttTaskEntry();