/***************************************************************************** * @copyright 1997-2050, . POWER SUPPLY CO., LTD. * @file bsp_mqttClient.h * @brief mqtt头文件 * @author mdy * @date 2024-09-29 * @remark *****************************************************************************/ // #pragma once #include "bsp_mqttClient.h" // #define MQTTSELFTEST // #define CLOUDCTRLPRINT // #define MQTTPRINT sem_t g_allowSend; // 控制发送的信号量 sem_t g_MqttwaitAndSaveHistory; // MQTT断线后,等待重连并将数据包入库 static const UT_icd topology_icd = {sizeof(topology_t), NULL, NULL, NULL}; // static const UT_icd uint16_icd = {sizeof(uint16_t), NULL, NULL, NULL}; static const UT_icd brcd_icd = {sizeof(break_record_t), NULL, NULL, NULL}; static const UT_icd bdst_icd = {sizeof(break_data_storage_t), NULL, NULL, NULL}; static const UT_icd brd_icd = {sizeof(break_record_with_data_t), NULL, NULL, NULL}; static const UT_icd pvcfg_icd = {sizeof(pv_date_config_t), NULL, NULL, NULL}; UT_array *g_recordsWithData; // MQTT msgid 由paho.mqtt负责维护,int类型,标识每一次消息传递的状态,在客户端和服务端之间唯一标识一条消息的交付操作。 volatile MQTTClient_deliveryToken deliveredtoken; UT_array *g_topology_storage; //旧的拓扑信息,用于对比数据库中的拓扑 uint64_t g_MqttMsg;//mqtt全局消息 pthread_mutex_t connection_status_mutex = PTHREAD_MUTEX_INITIALIZER;//连接状态互斥锁 pthread_mutex_t record_mutex = PTHREAD_MUTEX_INITIALIZER;//中断记录互斥锁 char emsName[MAX_NAME_BUF_LEN]; char mainEmsSn[MAX_NAME_BUF_LEN]; mqtt_option_map_t g_mqtt_map[kDev_Type_End]; // 接收配置信息的结构体数组 mqtt_option_map_t g_mqtt_map_chg[kDev_Type_End]; // 配置信息的备份 bool g_ChgSend[kDev_Type_End] = {false}; logic_Params g_LP = {0}; BcuBsuMap *g_bcuMap = NULL; long g_mqttSendTag = 0; volatile int period_thread_alive = 1; // 状态变量,监控周期任务工作线程是否活跃 volatile int history_thread_alive = 1; // 状态变量,监控历史数据任务工作线程是否活跃 volatile int chkdata_thread_alive = 1; // 状态变量,监控检测变化任务工作线程是否活跃 volatile int period_thread_haveSendSig = 0; // 状态变量,监控周期任务工作线程是否持有发送资源 volatile int history_thread_haveSendSig = 0; // 状态变量,监控历史数据任务工作线程是否持有发送资源 volatile int chkdata_thread_haveSendSig = 0; // 状态变量,监控检测数据变化监测任务工作线程是否持有发送资源 volatile int period_thread_haveRcdSig = 0; // 状态变量,监控周期任务工作线程是否持有断点记录读写资源 volatile int history_thread_haveRcdSig = 0; // 状态变量,监控历史数据任务工作线程是否持有发送资源 pthread_t g_Tids[D_MAX_THREAD_NUM] = {0}; long long CurtimesMS() { struct timeval te; // CS104_Connection_create("local_host", 2404); gettimeofday(&te, NULL); // 获取当前时间 // MeasuredValueScaled_create; long long milliseconds = te.tv_sec * 1000LL + te.tv_usec / 1000; // 将秒和微秒转换为毫秒 return milliseconds; } /***************************************************************************** * @brief 发送消息给broker时触发的回调函数 * @param[in] context:mqtt连接上下文 * @param[in] dt:token * @return NONE *****************************************************************************/ void deliveredCB(void *context, MQTTClient_deliveryToken dt) { long times = time(NULL); deliveredtoken = dt; long deltaT = abs(times - g_mqttSendTag); printf("Message with token value %d delivery confirmed.msg finish timestamp=%ld,msg send success timestamp=%ld,deltaT=%ld\n", dt, g_mqttSendTag, times, deltaT); }; void printdeltaT() { long times = CurtimesMS(NULL); long deltaT = abs(times - g_mqttSendTag); printf("msg finish timestamp=%ld,msg send success timestamp=%ld,deltaT=%ld\n", g_mqttSendTag, times, deltaT); }; /***************************************************************************** * @brief 根据报文的写配置类型,为参数指针申请相应的内存 * @param[in] modeWord:策略类型枚举 * @param[in] funArg:函数参数指针 * @return NONE *****************************************************************************/ int param_set_select(int modeWord, void *funArg) { int lenth = 0; switch (modeWord) { case E_TACTIC_MODE_DEBUG: // 调试模式 lenth = sizeof(debug_params_t); funArg = (debug_params_t *)malloc(lenth); break; case E_TACTIC_MODE_PEAKVALLY: // 削峰填谷模式 lenth = sizeof(peakvalley_zone_tab_t); funArg = (peakvalley_zone_tab_t *)malloc(lenth); break; // case E_TACTIC_MODE_LOADTRACK: // 负载跟踪模式 // lenth = sizeof(peakvalley_zone_tab_t); // break; case E_TACTIC_MODE_DMDCTRL: // 需量控制 lenth = sizeof(protect_params_t); funArg = (protect_params_t *)malloc(lenth); break; case E_TACTIC_MODE_PFCTRL: // 功率因数 break; default: return 1; // 失败 } return 0; // 成功 } /***************************************************************************** * @brief 将调试模式参数写入SqliteDB * @param[in] arg:调试模式参数结构体指针 * @return NONE *****************************************************************************/ int writeDbgParaToDb(debug_algorithm_t *arg) { int rc = kit_set_debug_algorithm(arg); return rc; } /***************************************************************************** * @brief 将削峰填谷模式参数写入SqliteDB * @param[in] arg:削峰填谷模式参数结构体指针 * @return NONE *****************************************************************************/ int writePvParaToDb(UT_array *arg) { int rc = 0; rc = kit_set_pv_date_time_cfg(&arg); return rc; } /***************************************************************************** * @brief 将负荷跟踪模式参数写入SqliteDB * @param[in] arg:负荷跟踪模式参数结构体指针 * @return NONE *****************************************************************************/ int writeLoadTrckParaToDb(debug_params_t *arg) { return 0; } /***************************************************************************** * @brief 将需量控制模式参数写入SqliteDB * @param[in] arg:负荷跟踪模式参数结构体指针 * @return NONE *****************************************************************************/ int writeDmCtrlParaToDb(protect_params_t *arg) { return 0; } /***************************************************************************** * @brief 将保护参数写入SqliteDB * @param[in] arg:负荷跟踪模式参数结构体指针 * @return NONE *****************************************************************************/ int writeProtecParaToDb(protect_params_t *arg) { // 此处缺失数据赋值 protect_algorithm_t item = {0}; int rc = kit_set_protect_algorithm(&item); return rc; } /***************************************************************************** * @brief 根据配置类型写对应的数据库表 * @param[in] arg:配置参数结构体指针 * @param[in] modeWord:配置类型 * @return NONE *****************************************************************************/ int writeCfgToDb(logic_Params *arg, int modeWord) { int rc = 0; debug_algorithm_t dbarg = {0}; UT_array *ua; utarray_new(ua, &pvcfg_icd); switch (modeWord) { case E_TACTIC_MODE_DEBUG: // 调试模式 dbarg.activePower = arg->debug.activePower; dbarg.reactivePower = arg->debug.reactivePower; dbarg.pcsSwitch = arg->debug.pcsSwitch; dbarg.dbId = 1; rc |= writeDbgParaToDb(&dbarg); break; case E_TACTIC_MODE_PEAKVALLY: // 削峰填谷模式 for (int i = 0; i < arg->pkvly.zoneTabLen; i++) { utarray_push_back(ua, &arg->pkvly.peakItem[i]); } rc |= writePvParaToDb(ua); break; // case E_TACTIC_MODE_LOADTRACK: // 负载跟踪模式 // lenth = sizeof(peakvalley_zone_tab_t); // break; case E_TACTIC_MODE_DMDCTRL: // 需量控制 // writeDmCtrlParaToDb((protect_params_t *)arg); break; case E_TACTIC_MODE_PFCTRL: // 功率因数 break; case 11: // writeProtecParaToDb((protect_params_t *)arg); break; default: rc = 1; } if (ua != NULL) { utarray_free(ua); } return rc; } /***************************************************************************** * @brief 告知云端ems处于本地模式的报文 * @param[in] info:接收消息参数结构体指针 * @return NONE *****************************************************************************/ char *createLocalModeDefualtMsg(arvcfgInfo_ret_t *info) { // 创建根 JSON 对象 cJSON *root = cJSON_CreateObject(); if (root == NULL) { goto end; } // 添加 transaction if (cJSON_AddStringToObject(root, "transaction", info->transaction) == NULL) { goto end; } // 添加 respCode if (cJSON_AddNumberToObject(root, "respCode", 1) == NULL) { goto end; } // 添加 msg if (cJSON_AddStringToObject(root, "msg", "ERROR:ems is running at local mode!") == NULL) { goto end; } // 添加毫秒时间戳 if (cJSON_AddNumberToObject(root, "timestamp", CurtimesMS(NULL)) == NULL) { goto end; } // 打印成JSON字符串 char *json_string = cJSON_Print(root); if (json_string == NULL) { goto end; } // 释放 JSON 对象 cJSON_Delete(root); return json_string; end: cJSON_Delete(root); return NULL; } /***************************************************************************** * @brief 告知云端ems处于本地模式 * @param[in] client: mqtt客户端句柄 * @param[in] info:接收消息参数结构体指针 * @return 0-成功 *****************************************************************************/ int sendLocalStatus(MQTTClient client, arvcfgInfo_ret_t *arg) { signal(SIGPIPE, SIG_IGN); int rc = 0; int cnt = 0; char *msg = (char *)malloc(MAX_MQTT_MSG_LEN * sizeof(char)); MQTTClient_message pubmsg = MQTTClient_message_initializer; msg = createLocalModeDefualtMsg(arg); pubmsg.payload = msg; pubmsg.payloadlen = strlen(msg); pubmsg.qos = kQos_2; pubmsg.retained = 0; char *topic = (char *)NorthProtoTable[kProto_MQTT_Slave].northProtocol.mqttLib.rootTopic; retry:; if (0 == sem_trywait(&g_allowSend)) { rc = MQTTClient_publishMessage(client, topic, &pubmsg, &deliveredtoken); if (MQTTCLIENT_SUCCESS == rc) { rc = MQTTClient_waitForCompletion(client, deliveredtoken, TIMEOUT); } sem_post(&g_allowSend); } else if (cnt > 10) { rc = 1; } else { usleep(50000); cnt++; goto retry; } if (msg != NULL) { free(msg); msg = NULL; } return rc; }; int msgarrvdCB(void *context, char *topicName, int topicLen, MQTTClient_message *message) { // printf("Message arrived\n"); // printf("Topic: %s\n", topicName); // printf("Message: "); ConnContext *ct = (ConnContext *)context; // 解析消息主题 int msgtype = getKeywordsByTopic(topicName); char *msgstr = message->payload; memset(&g_LP, 0, sizeof(logic_Params)); arvcfgInfo_ret_t cfgInfo = parseEmsCfgJson(msgstr, &g_LP); if (cfgInfo.modeWord < E_TACTIC_MODE_DEBUG || cfgInfo.modeWord > E_TACTIC_MODE_PFCTRL) { return -1; } // 打印消息内容 #ifdef MQTTPRINT printf("recv msg is\n%s\n", msgstr); #endif int rc = 0; // 根据modeWord解析云端下发的配置信息,解析后的参数放入结构体指针arg中 MQTTClient_message pubmsg = MQTTClient_message_initializer; // 根据模式从数据库读取对应的配置信息 void *stArg = NULL; // 策略参数 char *ans = NULL; int ifremote = AdvancedSettingTable[kEms_Mode].value; // 读取本地远程设置 // rc = kit_get_localRemote(&ifremote); retry_label:; switch (msgtype) { case kEmsTopicType_control: // while (stlogic.exitTaskFlag != 2) // { // usleep(50000); // } if (ifremote) { rc = writeCfgToDb(&g_LP, cfgInfo.modeWord); if (0 == rc) { // 如果写数据库成功,写算法配置改变信号 writeWebSign(Rtdb_ShMem, kSign_LogicDebug + cfgInfo.modeWord - 1, 1); work_mode_set_t setting; setting.workMode = cfgInfo.modeWord; rc = kit_set_work_mode(&setting); if (0 == rc) { // 如果工作模式写入数据库成功,写工作模式切换信号 writeWebSign(Rtdb_ShMem, kSign_LogicMode, 1); } } getCfgBymodeWord(stArg, cfgInfo.modeWord); ans = createStrategyCfgJsonString(cfgInfo, stArg, 1); pubmsg.payload = ans; pubmsg.payloadlen = strlen(ans); pubmsg.qos = kQos_2; pubmsg.retained = 0; if ((0 == sem_trywait(&g_allowSend)) && MQTTClient_isConnected(ct->client)) { rc = MQTTClient_publishMessage(ct->client, (char *)ct->mlib.replycontrolTopic, &pubmsg, &deliveredtoken); if (MQTTCLIENT_SUCCESS == rc) { rc = MQTTClient_waitForCompletion(ct->client, &deliveredtoken, TIMEOUT); } sem_post(&g_allowSend); } else { goto retry_label; } } else { sendLocalStatus(ct->client, &cfgInfo); } break; case kEmsTopicType_read: while (stlogic.exitTaskFlag != 2) { usleep(50000); } if ((0 == sem_trywait(&g_allowSend)) && MQTTClient_isConnected(ct->client)) { getCfgBymodeWord(stArg, cfgInfo.modeWord); ans = createStrategyCfgJsonString(cfgInfo, stArg, 0); pubmsg.payload = ans; pubmsg.payloadlen = strlen(ans); pubmsg.qos = kQos_2; pubmsg.retained = 0; rc = MQTTClient_publishMessage(ct->client, (char *)ct->mlib.replyreadTopic, &pubmsg, &deliveredtoken); if (MQTTCLIENT_SUCCESS == rc) { rc = MQTTClient_waitForCompletion(ct->client, &deliveredtoken, TIMEOUT); } sem_post(&g_allowSend); } else { goto retry_label; } break; default: rc = 1; break; } MQTTClient_freeMessage(&message); MQTTClient_free(topicName); return rc; } int updateBreakRcd(break_record_t *rcd) { // 一次通信中断触发时,更新插入断点数据库的dbid和break_record_t long long id = kit_insert_break_record(rcd); rcd->dbId = id; rcd->isUploaded = 0; strcpy((char *)rcd->reason, MQTT_PERIOD_SEND_FAIL_WORDS); return 0; } int initBreakRcd(break_record_t *rcd) { if (rcd == NULL) { return 1; } rcd->dbId = 0; rcd->isUploaded = 0; strcpy((char *)rcd->reason, MQTT_PERIOD_SEND_FAIL_WORDS); return 0; } /***************************************************************************** * @brief 与broker连接丢失时触发的回调函数 * @param[in] context:mqtt连接上下文 * @param[in] cause:错误信息 * @return NONE *****************************************************************************/ // 假设的方法,用于重连MQTT客户端 int attemptReconnect(MQTTClient client, MQTTClient_connectOptions *connopts) { int rc; // 这里执行重连逻辑 rc = MQTTClient_connect(client, connopts); if (MQTTCLIENT_SUCCESS != rc) { KITPTF(LOG_MQTT_EN, INFO_EN, "MQTT重新连接失败,返回值%d。", rc); return rc; } return rc; } /***************************************************************************** * @brief 从北向配置表的点表中读取单个测点的rtdb查找信息 * @param[in] tar:mqtt2db_t查找表指针 * @param[in] cfg:测点配置信息 * @return NONE *****************************************************************************/ void loadRtdbFindTabByCfg(mqtt2db_t *tar, up_dis_point_t cfg) { tar->dbType = Rtdb_ShMem; tar->devType = cfg.devType; tar->devId = cfg.devId; tar->pointId = cfg.pointId; } /***************************************************************************** * @brief 将EMS测点rtdb参数存入结构体 * @param[in] tar:待写入mqtt查找表 * @param[in] p:源测点 * @return NONE *****************************************************************************/ void loadEmsPoints(mqtt_option_map_t *map) { if (map == NULL) { return; } map[kDev_Type_EMS].devNum++; // strncpy(map[kDev_Type_EMS].sn[0],) for (int i = 0; i < kEms_DataEnd; i++) { map[kDev_Type_EMS].pllist[0].rtdbGetTab[i].dbType = kSign_ShMem; map[kDev_Type_EMS].pllist[0].rtdbGetTab[i].devId = 0; map[kDev_Type_EMS].pllist[0].rtdbGetTab[i].devType = kDev_Type_EMS; map[kDev_Type_EMS].pllist[0].rtdbGetTab[i].pointId = i; map[kDev_Type_EMS].pllist[0].txcount++; } }; /***************************************************************************** * @brief 从自定义配置表的点表中读取mqtt单个测点的rtdb查找信息 * @param[in] tar:mqtt2db_t查找表指针 * @param[in] cfg:测点配置信息 * @return NONE *****************************************************************************/ void loadNorthCfgBySet(mqtt_option_map_t *map) // 配置的测点 { loadEmsPoints(map); if (0 == NorthProtoTable[kProto_MQTT_Slave].disDevNum + NorthProtoTable[kProto_MQTT_Slave].upDevNum) { return; // 若北向协议中的测点列表为空 } for (int i = 0; i < NorthProtoTable[kProto_MQTT_Slave].upDevNum; i++) { dev_type_e type = NorthProtoTable[kProto_MQTT_Slave].upDevArr[i].devType; int n = map[type].devNum; // map[type].devName[n] = NorthProtoTable[kProto_MQTT_Slave].upDevArr[i]; for (int k = 0; k < NorthProtoTable[kProto_MQTT_Slave].upDevArr[i].upDisPointNum; k++) { loadRtdbFindTabByCfg(&map[type].pllist[n].rtdbGetTab[map[type].pllist[n].txcount++], NorthProtoTable[kProto_MQTT_Slave].upDevArr[i].upDisPointArr[k]); } } for (int i = 0; i < NorthProtoTable[kProto_MQTT_Slave].disDevNum; i++) { dev_type_e type = NorthProtoTable[kProto_MQTT_Slave].disDevArr[i].devType; int n = map[type].devNum; // map[type].devName[n] = NorthProtoTable[kProto_MQTT_Slave].upDevArr[i]; for (int k = 0; k < NorthProtoTable[kProto_MQTT_Slave].disDevArr[i].upDisPointNum; k++) { loadRtdbFindTabByCfg(&map[type].pllist[n].rtdbSetTab[map[type].pllist[n].rxcount++], NorthProtoTable[kProto_MQTT_Slave].disDevArr[i].upDisPointArr[k]); } } } /***************************************************************************** * @brief 初始化mqtt表结构 * @param[in] map:mqtt总表指针 * @return NONE *****************************************************************************/ void initMqttMap(mqtt_option_map_t *map) { for (int i = 0; i < kDev_Type_End; i++) { map[i].devNum = 0; map[i].devType = i; for (int k = 0; k < D_MAX_DEV_NUM; k++) { map[i].pllist[k].rxcount = 0; map[i].pllist[k].txcount = 0; for (int j = 0; j < D_MAX_MQTT_DEV_POINT_NUM; j++) { map[i].pllist[k].rtdbGetTab[j].dbType = 0; map[i].pllist[k].rtdbGetTab[j].devId = 0; map[i].pllist[k].rtdbGetTab[j].devType = 0; map[i].pllist[k].rtdbGetTab[j].pointId = 0; map[i].pllist[k].rtdbGetTab[j].value = 0; map[i].pllist[k].rtdbGetTab[j].ifchg = 0; map[i].pllist[k].rtdbSetTab[j].dbType = 0; map[i].pllist[k].rtdbSetTab[j].devId = 0; map[i].pllist[k].rtdbSetTab[j].devType = 0; map[i].pllist[k].rtdbSetTab[j].pointId = 0; map[i].pllist[k].rtdbSetTab[j].value = 0; map[i].pllist[k].rtdbSetTab[j].ifchg = 0; } } } } /***************************************************************************** * @brief 将测点rtdb参数存入结构体 * @param[in] tar:待写入mqtt查找表 * @param[in] p:源测点 * @return NONE *****************************************************************************/ int loadRtdbFindTabBySouthCfg(mqtt2db_t *tar, point_t p) { if (tar == NULL) { return -1; } tar->dbType = Rtdb_ShMem; tar->devType = p.devType; tar->devId = p.devId; tar->pointId = p.pointId; // printf("dbtype=%d,devType=%d,devId=%d,pointId=%d\n",tar->dbType,tar->devType,tar->devId,tar->pointId); return 0; } /***************************************************************************** * @brief 将测点rtdb参数存入结构体 * @param[in] tar:待写入mqtt查找表 * @param[in] p:源测点 * @return NONE *****************************************************************************/ void loadNorthCfgAllptsByProtocol(mqtt_option_map_t *map, proto_dev_point_map_t proto) { if (proto.devNum == 0) { return; } char *str = {0}; for (int k = 0; k < proto.devNum; k++) { dev_type_e devType = proto.devPointMapArr[k].devType; if (devType < 0 || devType > kDev_Type_End) { continue; } str = (char *)proto.devPointMapArr[k].devName; // 其他设备的测点从数据库读取配置 int index = map[devType].devNum; if (map[devType].devName[index] != NULL) { free(map[devType].devName[index]); map[devType].devName[index] = NULL; } map[devType].devName[index] = (char *)malloc(strlen(str)); strncpy(map[devType].devName[index], str, strlen(str)); for (int p = 0; p < proto.devPointMapArr[k].pointNum; p++) { map[devType].pllist[index].txcount++; loadRtdbFindTabBySouthCfg(&map[devType].pllist[index].rtdbGetTab[p], proto.devPointMapArr[k].pointArr[p]); map[devType].pllist[index].rtdbGetTab[p].value = getRtdbPointValue(map[devType].pllist[index].rtdbGetTab[p].dbType, map[devType].pllist[index].rtdbGetTab[p].devType, map[devType].pllist[index].rtdbGetTab[p].devId, map[devType].pllist[index].rtdbGetTab[p].pointId); } map[devType].devNum++; } } void loadvBsuPoints(mqtt_option_map_t *map) { if (gStDevTypeNum[kDev_Type_BSU] == 0 && gStDevTypeNum[kDev_Type_BCU] != 0) { map[kDev_Type_BSU].devNum = 1; init_bcu_bsu_map(&g_bcuMap); // 初始化哈希表 map[kDev_Type_BSU].devName[0] = "1#bsu_v"; int i = 0; BcuBsuMap *current; map[kDev_Type_BSU].pllist[0].txcount = 0; for (current = g_bcuMap; current != NULL; current = current->hh.next) { map[kDev_Type_BSU].pllist[0].rtdbGetTab[i].dbType = Rtdb_ShMem; map[kDev_Type_BSU].pllist[0].rtdbGetTab[i].devType = kDev_Type_BSU; map[kDev_Type_BSU].pllist[0].rtdbGetTab[i].devId = 0; map[kDev_Type_BSU].pllist[0].rtdbGetTab[i].pointId = current->bsupointId; i++; map[kDev_Type_BSU].pllist[0].txcount++; } } }; void loadNorthCfgAllPts(mqtt_option_map_t *map) // 全量测点上传 { initMqttMap(map); #ifdef MQTTPRINT for (int i = 0; i < kDev_Type_End; i++) { if (gStDevTypeNum[i] > 0 && i != kDev_Type_EMS) { printf("devType[%d]=%d\n", i, gStDevTypeNum[i]); } } for (int i = 0; i < kProto_Master_End; i++) { printf("porotodev[%d]=%d\n", i, protoTable[i].devNum); } for (int t = 0; t < protoTable[kProto_ModbusTCP_Master].devNum; t++) { printf("devtype[%d] is %d", t, protoTable[kProto_ModbusTCP_Master].devPointMapArr[t].devType); } #endif loadEmsPoints(map); loadvBsuPoints(map); // 加载虚拟bsu节点测点参数 for (int i = 0; i < kProto_Master_End; i++) { loadNorthCfgAllptsByProtocol(map, protoTable[i]); } } /***************************************************************************** * @brief 将北向配置表中的信息(上传/下发)读入mqtt_option_map_t结构体 * @param[in] map:mqttClient使用的测点配置表 * @return NONE *****************************************************************************/ void loadNorthCfg(mqtt_option_map_t *map) { if (map == NULL) { return; } switch (NorthProtoTable[kProto_MQTT_Slave].configType) { case kNorth_Config_Default: loadNorthCfgAllPts(map); // 从南向设备表读取所有MQTT测点配置 break; case kNorth_Config_Up: loadNorthCfgBySet(map); // 根据北向配置读取MQTT测点配置 break; default: break; } } /***************************************************************************** * @brief 初始化mqttClient参数和mqtt测点配置表 * @param[in] mlt:存放mqtt_lib_t参数 * @param[in] map:存放北向测点配置的map,一般为全局变量 * @return NONE *****************************************************************************/ void setEMSsn() { char serial[128]; int rc = kit_get_ems_sn(serial); if (0 == rc) { strncpy(mainEmsSn, serial, MAX_NAME_BUF_LEN); mainEmsSn[MAX_NAME_BUF_LEN - 1] = '\0'; } } int getMQTTCfg(mqtt_lib_t *mlt) { if (mlt == NULL) { return 1; } // 读取连接配置 memcpy(mlt, &NorthProtoTable[kProto_MQTT_Slave].northProtocol.mqttLib, sizeof(mqtt_lib_t)); return 0; } /***************************************************************************** * @brief 初始化mqttClient参数和mqtt测点配置表 * @param[in] mlt:存放mqtt_lib_t参数 * @param[in] map:存放北向测点配置的map,一般为全局变量 * @return NONE *****************************************************************************/ void updateMqttPara(mqtt_lib_t *mlt, mqtt_option_map_t *map) { // 读取连接配置 getMQTTCfg(mlt); // 将北向协议里的测点读到mqtt结构体中 loadNorthCfg(map); // 设置EMS 的sn号 setEMSsn(); } /***************************************************************************** * @brief 历史数据处理 * @param[in] arg:tcp连接上下文 * @return NONE *****************************************************************************/ void updateConnectionStatus(bool is_connected) { setRtdbPointValue(Rtdb_ShMem, kDev_Type_EMS, 0, kEms_netStatus, is_connected); // 这里可以利用一个全局变量如 g_MqttMsg 来编码连接状态 pthread_mutex_lock(&connection_status_mutex); // 假设此互斥锁存在 if (is_connected) { g_MqttMsg &= ~kEvent_Reconnect_failed; } else { g_MqttMsg |= kEvent_Reconnect_failed; } pthread_mutex_unlock(&connection_status_mutex); } /***************************************************************************** * @brief 判断通信是否断开 * @param[in] arg:tcp连接上下文 * @return 0-中断;1-正常 *****************************************************************************/ bool is_mqtt_connected() { pthread_mutex_lock(&connection_status_mutex); bool connected = !(g_MqttMsg & kEvent_Reconnect_failed); pthread_mutex_unlock(&connection_status_mutex); return connected; } /***************************************************************************** * @brief 处理MQTT连接丢失的回调函数 * * @param[in] context 指向 `ConnContext` 的指针,代表当前连接的上下文信息 * @param[in] cause 导致连接丢失的原因字符串 * * @return NONE * * @details 函数在MQTT连接丢失时被调用。该函数执行以下操作: * 1. 将连接状态更新为断开状态。 * 2. 记录连接丢失的信息,创建一个新的断开记录,并将其加入全局记录数组。 * 3. 执行重连逻辑,直到成功连接为止。 * 4. 在重新连接成功后更新连接状态。 *****************************************************************************/ long g_teststart = 0; int record_is_updated = 0; void updateBrkRcd(char *cause) { pthread_mutex_lock(&record_mutex); // 创建一个新的 `break_record_with_data_t` break_record_with_data_t newRecord; memset(&newRecord, 0, sizeof(break_record_with_data_t)); strcpy((char *)newRecord.record.reason, cause ? cause : "Unknown reason"); // 插入断开记录到数据库并获取其ID newRecord.record.isUploaded = false; newRecord.record.dbId = kit_insert_break_record(&(newRecord.record)); // 初始化数据存储数组 utarray_new(newRecord.dataArray, &bdst_icd); // 将新的记录添加到全局数组 // 首先,确保 g_recordsWithData 中仅有一个元素 if (utarray_len(g_recordsWithData) == 1) { // 用 newRecord 替换现有的唯一一条记录 void *p = utarray_eltptr(g_recordsWithData, 0); // 获取指向第一条记录的指针 if (p != NULL) { memcpy(p, &newRecord, sizeof(break_record_with_data_t)); // 通过内存复制进行替换 } } else { // 如果数组里面不止一条记录,可以选择清空数组并添加 newRecord utarray_clear(g_recordsWithData); // 清空数组 utarray_push_back(g_recordsWithData, &newRecord); // 添加 newRecord } pthread_mutex_unlock(&record_mutex); } void connlostCB(void *context, char *cause) { // 记录网络断开 RECORDNETSTATUS(D_STR_DISCONN_WORD) // pthread_t lockPipefd; // int lock = 0; // pthread_create(&lockPipefd, NULL, lockPipe, &lock); ConnContext *ct = (ConnContext *)context; // 立即更新连接状态为断开 updateConnectionStatus(false); g_MqttMsg &= ~kEvent_has_history_record; if (utarray_len(g_recordsWithData) == 0) { // 创建一个中断记录 cause = "MQTT连接中断"; updateBrkRcd(cause); g_MqttMsg |= kEvent_has_history_record; } // 重连逻辑 int rc = 0; while ((rc = attemptReconnect(ct->client, &ct->connopts)) != MQTTCLIENT_SUCCESS) { KITLOG(LOG_MQTT_EN, WARN_EN, "Reconnection attempt failed, retrying...", NULL); // 重新连接成功,如果尾部记录还有未插入数据库的 数据,插入,且清理尾部记录的内存空间 break_record_with_data_t *record = utarray_back(g_recordsWithData); if (utarray_len(record->dataArray) > 0) { int rc = kit_insert_break_data_storage(record->record.dbId, &record->dataArray); if (1 == rc) { KITLOG(LOG_MQTT_EN, ERROR_EN, "insert history data to DB failed...", NULL); } else if (utarray_len(record->dataArray) > 0) { utarray_clear(record->dataArray); } } sleep(10); // 重试间隔 } // 记录网络恢复 RECORDNETSTATUS(D_STR_CONNECT_WORD) KITLOG(LOG_MQTT_EN, INFO_EN, "Reconnected to the broker successfully.", NULL); // 更新连接状态,连接恢复 // if (utarray_len(g_recordsWithData) > 0) // { // utarray_clear(g_recordsWithData); // } updateConnectionStatus(true); } /***************************************************************************** * @brief MQTT周期发送任务 * @param[in] arg:MQTT连接上下文 * @return NONE *****************************************************************************/ void *periodSendTask(void *arg) { while (!(g_MqttMsg & kEvent_topo_send)) { usleep(50000); } ConnContext *ct = (ConnContext *)arg; int period = ct->mlib.tSendTaskPeriod; if (period == 0) { period = 5; // 默认周期为5秒 } // 忽略 SIGPIPE 信号 signal(SIGPIPE, SIG_IGN); MQTTClient_message pubmsg = MQTTClient_message_initializer; char *msg = (char *)malloc(MAX_MQTT_MSG_LEN * sizeof(char)); if (msg == NULL) { return NULL; } int rc = 0; int cnt = 0; while (1) { cnt++; // if (stlogic.taskStateFlag == LOGIC_SIGN_UPLOAD) // { // } readWebSign(kSign_ShMem, kSign_LogicDebug); for (int i = 0; i < kDev_Type_End; i++) { if (i != kDev_Type_EMS && g_mqtt_map[i].devNum == 0) { usleep(50000); continue; } genDevGrpPeriodPayload(msg, i, &g_mqtt_map[i]); pubmsg.payload = msg; pubmsg.payloadlen = strlen(msg); pubmsg.qos = kQos_0; pubmsg.retained = 0; retry: if ((0 == sem_trywait(&g_allowSend))) { period_thread_haveSendSig = 1; rc = MQTTClient_publishMessage(ct->client, (char *)ct->mlib.periodTopic, &pubmsg, &deliveredtoken); if (rc != MQTTCLIENT_SUCCESS) { sem_post(&g_allowSend); break_record_with_data_t *test = (break_record_with_data_t *)utarray_back(g_recordsWithData); // 获取或创建当前的中断记录 if (!(g_MqttMsg & kEvent_has_history_record)) { KITLOG(LOG_MQTT_EN, INFO_EN, "MQTT:no historydata record existed!\n", NULL); continue; } break_record_with_data_t *record = (break_record_with_data_t *)utarray_back(g_recordsWithData); if (record != NULL) { if (0 == sem_trywait(&g_MqttwaitAndSaveHistory)) { period_thread_haveRcdSig = 1; break_data_storage_t newData; newData.breakDbId = record->record.dbId; strncpy((char *)newData.content, msg, MAX_JSON_STR_LEN - 1); newData.content[MAX_JSON_STR_LEN - 1] = '\0'; newData.isUploaded = false; utarray_push_back(record->dataArray, &newData); sem_post(&g_MqttwaitAndSaveHistory); } } usleep(50000); } else { rc = MQTTClient_waitForCompletion(ct->client, deliveredtoken, TIMEOUT); sem_post(&g_allowSend); if (MQTTCLIENT_SUCCESS == rc) { #ifdef MQTTPRINT printdeltaT(); #endif } usleep(50000); } } else { usleep(50000); goto retry; } } if (!(g_MqttMsg & kEvent_Period_Task_begin) && cnt < 2) // 周期上送运行2次之后再检测变化 { g_MqttMsg |= kEvent_Period_Task_begin; } sleep(period); } free(msg); return NULL; } /***************************************************************************** * @brief MQTT历史数据处理任务 * @param[in] arg:MQTT连接上下文 * @return NONE *****************************************************************************/ void *historyDataHandle(void *arg) { while (!(g_MqttMsg & kEvent_topo_send)) { usleep(50000); } ConnContext *ct = (ConnContext *)arg; MQTTClient_message pubmsg = MQTTClient_message_initializer; UT_array *breakRcds; UT_array *dataSets; utarray_new(breakRcds, &brcd_icd); utarray_new(dataSets, &bdst_icd); int rc = 0; // 忽略 SIGPIPE 信号 signal(SIGPIPE, SIG_IGN); while (1) { if (0 == sem_trywait(&g_MqttwaitAndSaveHistory)) { period_thread_haveRcdSig = 1; break_record_with_data_t *record = (break_record_with_data_t *)utarray_back(g_recordsWithData); if (record != NULL) { if (utarray_len(record->dataArray) > 0) { int rc = kit_insert_break_data_storage(record->record.dbId, &record->dataArray); if (1 == rc) { KITLOG(LOG_MQTT_EN, ERROR_EN, "insert history data to DB failed...", NULL); } else { utarray_clear(record->dataArray); } } } sem_post(&g_MqttwaitAndSaveHistory); } // if (!is_mqtt_connected()) // 如果连接为断开状态,历史数据线程不工作 // { // continue; // } rc = kit_query_break_records(false, &breakRcds); if (1 == rc) // 如果查询数据库失败,等待10s再操作数据库 { history_thread_alive = 1; sleep(10); continue; } else if (0 == utarray_len(breakRcds)) // 如果短点记录,不运行 { sleep(1); history_thread_alive = 1; continue; } utarray_foreach(breakRcds, break_record_t *, p_rcd) { if (p_rcd->isUploaded == true) { continue; } rc = kit_query_break_data(p_rcd->dbId, &dataSets); if (1 == rc && utarray_len(dataSets) > 0) { utarray_clear(dataSets); continue; } p_rcd->isUploaded = true; // 先将标记位设置为true,一旦有消息发送失败,is_Uploaded就会置为false int length = 0; uint64_t ids[100] = {0}; int retry_cnt = 0; utarray_foreach(dataSets, break_data_storage_t *, p_contents) { if (p_contents == NULL) { continue; } retry_cnt = 0; pubmsg.payload = p_contents->content; pubmsg.payloadlen = strlen((char *)p_contents->content); pubmsg.qos = kQos_0; pubmsg.retained = true; retry_label:; if ((0 == sem_trywait(&g_allowSend))) { history_thread_haveSendSig = 1; rc = MQTTClient_publishMessage(ct->client, (char *)ct->mlib.historyTopic, &pubmsg, &deliveredtoken); if (rc == MQTTCLIENT_SUCCESS) { rc = MQTTClient_waitForCompletion(ct->client, deliveredtoken, TIMEOUT); ids[length] = p_contents->breakDbId; length++; } else { p_rcd->isUploaded = false; } sem_post(&g_allowSend); history_thread_haveSendSig = 0; usleep(50000); } // else if (retry_cnt > 10) // { // continue; // } else { retry_cnt++; goto retry_label; } } if (utarray_len(dataSets) > 0) { utarray_clear(dataSets); } kit_update_break_record(p_rcd); kit_update_break_data(p_rcd->dbId, ids, length); } history_thread_alive = 1; sleep(ct->mlib.tSendTaskPeriod > 0 ? ct->mlib.tSendTaskPeriod : 5); } return NULL; } /***************************************************************************** * @brief * @param[in] json: json对象 * @param[in] key:键 * @param[in] value:值 * @return NONE *****************************************************************************/ void appendJsonKeyValue(cJSON *json, const char *key, double value) { cJSON_AddNumberToObject(json, key, value); } /***************************************************************************** * @brief 装载周期上送主题有效载荷 * @param[in] topic: mqtt_lib_t中的主题字符串指针 * @param[in] payload:主题类型枚举,枚举与mqtt_lib_t中的成员应对应,请确保payload的缓冲区长度足够 * @return 主题类型枚举 *****************************************************************************/ int getKeywordsByTopic(char *topic) { if (strstr(topic, MQTT_ROOT_TOPIC_KEY)) { return kEmsTopicType_root; } else if (strstr(topic, MQTT_PERIOD_TOPIC_KEY)) { return kEmsTopicType_period; } else if (strstr(topic, MQTT_CHANGE_TOPIC_KEY)) { return kEmsTopicType_change; } else if (strstr(topic, MQTT_HISTORY_TOPIC_KEY)) { return kEmsTopicType_history; } else if (strstr(topic, MQTT_CONTROL_TOPIC_KEY)) { return kEmsTopicType_control; } else if (strstr(topic, MQTT_READ_TOPIC_KEY)) { return kEmsTopicType_read; } else if (strstr(topic, MQTT_CONTROL_TOPIC_KEY) && strstr(topic, MQTT_REPLY_TOPIC_KEY)) { return kEmsTopicType_reply_control; } else if (strstr(topic, MQTT_READ_TOPIC_KEY) && strstr(topic, MQTT_REPLY_TOPIC_KEY)) { return kEmsTopicType_reply_read; } else { return -1; } } /***************************************************************************** * @brief 获取浮点数变化死区 * @return 浮点数变化死区 *****************************************************************************/ double getFpDeadZone() { return 1e-6; }; /***************************************************************************** * @brief 获取整型数变化死区 * @return 整型数变化死区 *****************************************************************************/ int getIntDeadZone() { return 1; } /***************************************************************************** * @brief mqtt查找db表结构 * @return 目标测点值 *****************************************************************************/ double mqttFindDB(mqtt2db_t m) { return getRtdbPointValue(m.dbType, m.devType, m.devId, m.pointId); } /***************************************************************************** * @brief mqtt查找db表结构 * @return 目标测点值 *****************************************************************************/ bool isSamePoint(mqtt2db_t l, mqtt2db_t r) { if (l.devType == r.devType && l.devId == r.devType && l.pointId == r.pointId) { return true; } return false; } /***************************************************************************** * @brief 检测payloadlist的变化事件 * @param[in] now: 当前的mqtt_map_t结构体 * @param[in] old: 存储的mqtt_map_t结构体 * @return 变化事件类型,后期可与装载部分优化成mqtt客户端状态机 *****************************************************************************/ int chkDataChange(mqtt_option_map_t *now, mqtt_option_map_t *old) { return 0; // int rt = 0; // if(0!=strcmp(now->stationName,old->stationName)) // { // rt |= kEvent_rename_station; // } // if(0!=strcmp(now->stationID,old->stationID)) // { // rt |= kEvent_reset_id; // } // if(now->devNum != old->devNum) // { // rt |= kEvent_change_devTopo; // } // for(int i=0;i < D_MAX_DEV_NUM; i++) // { // if((0 != strcmp(now->devType,old->devType))&&(0 != strcmp(now->devName[i],old->devName[i]))) // { // rt |= kEvent_change_devTopo; // break; // } // } // for(int devSeq = 0 ;devSeqpllist[devSeq].txcount != old->pllist[devSeq].txcount) // { // rt |= kEvent_pointlistcfg_change; // } // for(int pointSeq = 0;pointSeqpllist[devSeq].rtdbGetTab[pointSeq],old->pllist[devSeq].rtdbGetTab[pointSeq])) // { // double lval = mqttFindDB(now->pllist[devSeq].rtdbGetTab[pointSeq]); // double rval = mqttFindDB(old->pllist[devSeq].rtdbGetTab[pointSeq]); // if(abs(lval-rval)>getIntDeadZone()) // { // rt |= kEvent_int_change; // } // else if(abs(lval-rval)>getFpDeadZone()) // { // rt |= kEvent_float_change; // } // } // } // } // return rt; } /***************************************************************************** * @brief 打印所有类型设备的数量 * @return NONE *****************************************************************************/ int judgeNodev() { int rc = 1; for (int i = kDev_Type_Start + 1; i < kDev_Type_End; i++) { if (gStDevTypeNum[i] > 0) rc = 0; #ifdef MQTTPRINT // printf("%s:%d台\n", devTypeToString(i), gStDevTypeNum[i]); #endif } return rc; } /***************************************************************************** * @brief 单个设备测点值是否改变 * @param[in] list: 测点列表 * @return false-未改变 true-改变 *****************************************************************************/ bool ifSingledevPointsChg(payloadlist_t *list) { bool res = false; for (int i = 0; i < list->txcount; i++) { res |= list->rtdbGetTab[i].ifchg; } return res; } /***************************************************************************** * @brief 单个设备测点值是否改变 * @param[in] map: 设备组测点列表 * @return false-未改变 true-改变 *****************************************************************************/ bool ifDevGrpPointsChg(mqtt_option_map_t *map) { bool res = false; for (int i = 0; i < map->devNum; i++) { res |= map->bChgFlag; } return res; } /***************************************************************************** * @brief 装载不同设备组周期上送主题有效载荷 * @param[in] topic: mqtt_lib_t中的主题字符串指针 * @param[in] payload:主题类型枚举,枚举与mqtt_lib_t中的成员应对应,请确保payload的缓冲区长度足够 * @return NONE *****************************************************************************/ void genDevGrpPeriodPayload(char *payload, dev_type_e type, mqtt_option_map_t *map) { #ifdef MQTTPRINT #endif cJSON *root = cJSON_CreateObject(); cJSON *devDataArray = cJSON_CreateArray(); char *str_type = devTypeToString(type); uint16_t devN[kDev_Type_End] = {0}; memcpy(devN, gStDevTypeNum, sizeof(gStDevTypeNum)); devN[kDev_Type_EMS] = 1; devN[kDev_Type_BSU] = g_mqtt_map[kDev_Type_BSU].devNum; // bool res = false; for (int i = 0; i < devN[type]; i++) { if (map->pllist[i].txcount == 0) { continue; } // 检查设备类型是否匹配给定的类型 if (map->devType == type) { cJSON *devData = cJSON_CreateObject(); cJSON_AddItemToObject(devData, "devType", cJSON_CreateNumber(type)); if (type == kDev_Type_EMS) { cJSON_AddItemToObject(devData, "devName", cJSON_CreateString(emsName)); } else { cJSON_AddItemToObject(devData, "devName", cJSON_CreateString(map->devName[i])); } char *sn_tmp = (char *)malloc(50 * sizeof(char)); if (sn_tmp == NULL) { usleep(50000); continue; } sprintf(sn_tmp, "%s00%d", str_type, i); cJSON_AddItemToObject(devData, "sn", cJSON_CreateString(sn_tmp)); free(sn_tmp); cJSON *data = cJSON_CreateObject(); for (int j = 0; j < map->pllist[i].txcount; j++) { char tag[128]; uint16_t a = map->pllist[i].rtdbGetTab[j].pointId; // printf("a=%hu\n",a); snprintf(tag, sizeof(tag), "%s_%hu", str_type, a); // 生成tag名称 double value = 0; if (type == kDev_Type_BSU) { value = getBsuRTDBPointValue(g_bcuMap, map->pllist[i].rtdbGetTab[j].dbType, map->pllist[i].rtdbGetTab[j].devType, map->pllist[i].rtdbGetTab[j].devId, map->pllist[i].rtdbGetTab[j].pointId); } else { value = mqttFindDB(map->pllist[i].rtdbGetTab[j]); // 根据测点查找表的参数,将value写入到匹配的key } cJSON_AddItemToObject(data, tag, cJSON_CreateNumber(value)); // 预留值 } cJSON_AddItemToObject(devData, "data", data); cJSON_AddItemToArray(devDataArray, devData); } } long t = CurtimesMS(NULL); cJSON_AddItemToObject(root, "timeStamp", cJSON_CreateNumber(t)); // 时间戳 g_mqttSendTag = t; cJSON_AddItemToObject(root, "devData", devDataArray); // 打印生成的JSON char *jsonString = cJSON_Print(root); strcpy(payload, jsonString); // 将生成的JSON字符串复制到payload free(jsonString); cJSON_Delete(root); } /***************************************************************************** * @brief 装载不同设备组变化上送主题有效载荷 * @param[in] topic: mqtt_lib_t中的主题字符串指针 * @param[in] payload:主题类型枚举,枚举与mqtt_lib_t中的成员应对应,请确保payload的缓冲区长度足够 * @return NONE *****************************************************************************/ int genChgDevGrpPeriodPayload(char *payload, dev_type_e type, mqtt_option_map_t *map) { #ifdef MQTTPRINT #endif int rc = 0; cJSON *root = cJSON_CreateObject(); cJSON *devDataArray = cJSON_CreateArray(); char *str_type = devTypeToString(type); uint16_t devN[kDev_Type_End] = {0}; memcpy(devN, gStDevTypeNum, sizeof(gStDevTypeNum)); devN[kDev_Type_EMS] = 1; devN[kDev_Type_BSU] = g_mqtt_map[kDev_Type_BSU].devNum; if (map->bChgFlag == false) // 该类设备无数据变化 { rc = 1; } bool isNULL = true; for (int i = 0; i < devN[type]; i++) { if (map->pllist[i].txcount == 0 || map->pllist[i].bChgFlag == false) { continue; } // 检查设备类型是否匹配给定的类型 if (map->devType == type) { cJSON *devData = cJSON_CreateObject(); cJSON_AddItemToObject(devData, "devType", cJSON_CreateNumber(type)); if (type == kDev_Type_EMS) { cJSON_AddItemToObject(devData, "devName", cJSON_CreateString(emsName)); } else { cJSON_AddItemToObject(devData, "devName", cJSON_CreateString(map->devName[i])); } char *sn_tmp = (char *)malloc(50 * sizeof(char)); if (sn_tmp == NULL) { continue; } sprintf(sn_tmp, "%s00%d", str_type, i); cJSON_AddItemToObject(devData, "sn", cJSON_CreateString(sn_tmp)); free(sn_tmp); cJSON *data = cJSON_CreateObject(); for (int j = 0; j < map->pllist[i].txcount; j++) { if (map->pllist[i].rtdbGetTab[j].ifchg == false) { continue; } char tag[128]; uint16_t a = map->pllist[i].rtdbGetTab[j].pointId; // printf("a=%hu\n",a); snprintf(tag, sizeof(tag), "%s_%hu", str_type, a); // 生成tag名称 double value = 0; // double old, new = 0; if (type == kDev_Type_BSU) { value = getBsuRTDBPointValue(g_bcuMap, map->pllist[i].rtdbGetTab[j].dbType, map->pllist[i].rtdbGetTab[j].devType, map->pllist[i].rtdbGetTab[j].devId, map->pllist[i].rtdbGetTab[j].pointId); } else { value = mqttFindDB(map->pllist[i].rtdbGetTab[j]); // 根据测点查找表的参数,将value写入到匹配的key } map->pllist[i].rtdbGetTab[j].value = value; // printf("dbtype=%d,devType=%d,devId=%d,pointId=%d,old=%f,new=%f\n", map->pllist[i].rtdbGetTab[j].dbType, // map->pllist[i].rtdbGetTab[j].devType, // map->pllist[i].rtdbGetTab[j].devId, // map->pllist[i].rtdbGetTab[j].pointId, old, new); cJSON_AddItemToObject(data, tag, cJSON_CreateNumber(value)); // 预留值 isNULL = false; } cJSON_AddItemToObject(devData, "data", data); cJSON_AddItemToArray(devDataArray, devData); } } long t = CurtimesMS(NULL); cJSON_AddItemToObject(root, "timeStamp", cJSON_CreateNumber(t)); // 时间戳 g_mqttSendTag = t; cJSON_AddItemToObject(root, "devData", devDataArray); // 打印生成的JSON char *jsonString = cJSON_Print(root); strcpy(payload, jsonString); // 将生成的JSON字符串复制到payload if (isNULL == true) { rc = 1; } free(jsonString); cJSON_Delete(root); return rc; } // 将dev_type_e转换为字符串 char *devTypeToString(dev_type_e type) { char *str; switch (type) { case kDev_Type_EMS: str = "ems"; break; case kDev_Type_Pccmeter: str = "pcc"; break; case kDev_Type_Bsmeter: str = "bsm"; break; case kDev_Type_PCS: str = "pcs"; break; case kDev_Type_BSU: str = "bsu"; break; case kDev_Type_BCU: str = "bcu"; break; case kDev_Type_Thsensor: str = "thss"; break; case kDev_Type_DI_DO_Device: str = "dido"; break; case kDev_Type_UPS: str = "ups"; break; case kDev_Type_AirCond_LiquidCool: str = "airlqd"; break; case kDev_Type_WaterThsensor: str = "wlsd"; break; case kDev_Type_Reserve2: str = "Reserve2"; break; case kDev_Type_Reserve3: str = "Reserve3"; break; case kDev_Type_Reserve4: str = "Reserve4"; break; case kDev_Type_Reserve5: str = "Reserve5"; break; default: return "Unknown"; break; } return str; } /***************************************************************************** * @brief 递归添加子节点到 JSON 结构中 * @param[in] parentArray: 父节点的 JSON 数组 * @param[in] parentId: 父节点的 ID * @param[in] topologyById: 存储拓扑信息的哈希表 * @return 无 *****************************************************************************/ void addChildrenToJson(cJSON *parentArray, int parentId, topology_hash_entry_t *topologyById) { topology_hash_entry_t *entry; for (entry = topologyById; entry != NULL; entry = entry->hh.next) { // 遍历整个哈希表 if (entry->topology->parentId == parentId) { cJSON *child = cJSON_CreateObject(); // 创建一个子节点的 JSON 对象 cJSON_AddNumberToObject(child, "menuTree", entry->topology->menuTree); // 添加 menuTree 属性 cJSON_AddStringToObject(child, "name", (char *)entry->topology->name); // 添加 name 属性 cJSON_AddNumberToObject(child, "devType", entry->topology->devType); // 添加 devType 属性 cJSON_AddStringToObject(child, "sn", (char *)entry->topology->sn); // 添加 sn 属性 cJSON *grandchildren = cJSON_CreateArray(); // 创建子节点的子节点数组 cJSON_AddItemToObject(child, "child", grandchildren); // 将子节点数组添加到子节点对象 cJSON_AddItemToArray(parentArray, child); // 将子节点添加到父节点数组 addChildrenToJson(grandchildren, entry->topology->dbId, topologyById); // 递归调用,处理子节点的子节点 } } } /***************************************************************************** * @brief 将数据库中的 EMS 设备拓扑结构数据转换为 JSON 字符串 * @param[out] json: 用于存储生成的 JSON 字符串的缓冲区。确保该缓冲区有足够的空间容纳生成的 JSON 字符串。 * @return 0-成功 1-失败 *****************************************************************************/ int getTopologyJsonByDb(char **json) { UT_array *topologies = NULL; // 初始化 UT_array 指针 topology_hash_entry_t *topologyById = NULL; // 初始化哈希表 // 从数据库获取拓扑数据 if (kit_get_topology_db_data(&topologies) != 0) { // 注意这里需要传递 topologies 的地址 // 处理数据库查询错误 // if (topologies != NULL) // utarray_free(topologies); // 释放内存 return 1; } // 如果拓扑信息发生了改变,发送消息给全局变量 if (false == chkTopoDiff(g_topology_storage, topologies)) { g_MqttMsg |= kEvent_change_devTopo; } else { g_MqttMsg &= ~(1 << 2); } if (g_topology_storage != NULL) { utarray_clear(g_topology_storage); } utarray_new(g_topology_storage, &topology_icd); // 存储当前的拓扑信息,用于检测拓扑发生了变化 utarray_concat(g_topology_storage, topologies); // 将拓扑数据填充到哈希表中,使用 dbId 作为键 for (int i = 0; i < utarray_len(topologies); i++) { topology_t *p_topology = (topology_t *)utarray_eltptr(topologies, i); topology_hash_entry_t *entry = (topology_hash_entry_t *)malloc(sizeof(topology_hash_entry_t)); if (entry == NULL) { // 内存分配失败处理 fprintf(stderr, "内存分配失败!\n"); utarray_free(topologies); return 1; } entry->dbId = p_topology->dbId; entry->topology = p_topology; HASH_ADD_INT(topologyById, dbId, entry); // 将条目添加到哈希表 } cJSON *root = cJSON_CreateObject(); // 创建根 JSON 对象 cJSON *structure = cJSON_CreateObject(); // 创建 structure JSON 对象 // test #ifdef MQTTPRINT for (int i = 0; i < utarray_len(topologies); i++) { // test,打印设备拓扑列表 topology_t *p_topology = (topology_t *)utarray_eltptr(topologies, i); int parentId = p_topology->parentId; printf("dev%d.parent_Id=%d\n", i, parentId); } #endif // 遍历拓扑数据以查找顶级节点 for (int i = 0; i < utarray_len(topologies); i++) { topology_t *p_topology = (topology_t *)utarray_eltptr(topologies, i); int parentId = p_topology->parentId; if (parentId == -999) { // 顶级节点 cJSON_AddNumberToObject(structure, "menuTree", p_topology->menuTree); // 添加 menuTree 属性 cJSON_AddStringToObject(structure, "name", (char *)p_topology->name); // 添加 name 属性 strncpy(emsName, (char *)p_topology->name, MAX_NAME_BUF_LEN - 1); // ems设备名称 emsName[MAX_NAME_BUF_LEN - 1] = '\0'; // 确保最后一个字符是空字符 cJSON_AddNumberToObject(structure, "devType", p_topology->devType); // 添加 devType 属性 cJSON_AddStringToObject(structure, "sn", mainEmsSn); // 添加 sn 属性 cJSON *children = cJSON_CreateArray(); // 创建顶级节点的子节点数组 cJSON_AddItemToObject(structure, "child", children); // 将子节点数组添加到顶级节点对象 addChildrenToJson(children, p_topology->dbId, topologyById); // 调用递归函数添加子节点 break; } } cJSON_AddItemToObject(root, "structure", structure); // 将 structure 添加到 root 对象 // 将 cJSON 对象转换为字符串 char *jsonString = cJSON_Print(root); if (jsonString == NULL) { cJSON_Delete(root); utarray_free(topologies); // 释放哈希表 topology_hash_entry_t *current_entry, *tmp; HASH_ITER(hh, topologyById, current_entry, tmp) { HASH_DEL(topologyById, current_entry); free(current_entry); } return 1; // 处理 cJSON 打印错误 } *json = (char *)malloc((strlen(jsonString) + 1) * sizeof(char)); strncpy(*json, jsonString, strlen(jsonString)); // 将 JSON 字符串复制到输出缓冲区 json[strlen(jsonString)] = '\0'; // printf("%s\n",*json); int lenth = strlen(jsonString); free(jsonString); // 释放 cJSON_Print 分配的字符串 cJSON_Delete(root); // 释放 cJSON 对象 utarray_free(topologies); // 释放 UT_array g_mqttSendTag = CurtimesMS(NULL); // 释放哈希表 topology_hash_entry_t *current_entry, *tmp; HASH_ITER(hh, topologyById, current_entry, tmp) { HASH_DEL(topologyById, current_entry); free(current_entry); } return lenth; // 成功返回 } // 按类型解析策略配置 int parseStrategyJsonObject(cJSON *strategyCfg, int mode, logic_Params *lP) { if (strategyCfg == NULL) { return 1; } switch (mode) { case E_TACTIC_MODE_START: { // 暂未实现 break; } case E_TACTIC_MODE_DEBUG: { cJSON *activepower = cJSON_GetObjectItem(strategyCfg, "P"); cJSON *reactivepower = cJSON_GetObjectItem(strategyCfg, "Q"); cJSON *onoff = cJSON_GetObjectItem(strategyCfg, "onOff"); #ifdef MQTTPRINT printf("Active power: %s, Reactive power: %s, On/Off: %s\n", cJSON_GetStringValue(activepower), // 有功功率 cJSON_GetStringValue(reactivepower), // 无功功率 cJSON_GetStringValue(onoff)); // 开关机 #endif lP->debug.activePower = activepower->valuedouble; lP->debug.reactivePower = reactivepower->valuedouble; lP->debug.pcsSwitch = onoff->valueint; break; } case E_TACTIC_MODE_PEAKVALLY: { if (cJSON_IsObject(strategyCfg)) { cJSON *peakValleyTimeTables = cJSON_GetObjectItem(strategyCfg, "pvTab"); if (cJSON_IsObject(peakValleyTimeTables)) { cJSON *timeTableLength = cJSON_GetObjectItem(peakValleyTimeTables, "dTabN"); lP->pkvly.zoneTabLen = timeTableLength->valueint; #ifdef CLOUDCTRLPRINT printf("Time Table Length: %d\n", lP->pkvly.zoneTabLen); #endif cJSON *peakItem = cJSON_GetObjectItem(peakValleyTimeTables, "dTab"); if (cJSON_IsArray(peakItem)) { int peakItemCount = cJSON_GetArraySize(peakItem); lP->pkvly.peakItem = (pv_date_config_t *)malloc(peakItemCount * sizeof(pv_date_config_t)); for (int i = 0; i < peakItemCount; i++) { cJSON *peak = cJSON_GetArrayItem(peakItem, i); if (cJSON_IsObject(peak)) { cJSON *startDateTime = cJSON_GetObjectItem(peak, "sDay"); cJSON *endDateTime = cJSON_GetObjectItem(peak, "eDay"); memcpy(lP->pkvly.peakItem[i].startDate, (uint8_t *)startDateTime->valuestring, 6); memcpy(lP->pkvly.peakItem[i].endDate, (uint8_t *)endDateTime->valuestring, 6); #ifdef CLOUDCTRLPRINT printf("Start DateTime: %s, End DateTime: %s\n", cJSON_IsString(startDateTime) ? startDateTime->valuestring : "null", cJSON_IsString(endDateTime) ? endDateTime->valuestring : "null"); #endif #ifdef CLOUDCTRLPRINT cJSON *timeSlotTableLength = cJSON_GetObjectItem(peak, "sTabN"); printf("Time Slot Table Length: %d\n", cJSON_IsNumber(timeSlotTableLength) ? timeSlotTableLength->valueint : 0); #endif cJSON *timeSlotTable = cJSON_GetObjectItem(peak, "sTab"); if (cJSON_IsArray(timeSlotTable)) { int timeSlotCount = cJSON_GetArraySize(timeSlotTable); lP->pkvly.peakItem[i].timeCfgTab = (pv_time_config_t *)malloc(timeSlotCount * sizeof(pv_time_config_t)); lP->pkvly.peakItem[i].timeCfgLen = timeSlotCount; for (int j = 0; j < timeSlotCount; j++) { cJSON *timeSlot = cJSON_GetArrayItem(timeSlotTable, j); if (cJSON_IsObject(timeSlot)) { cJSON *startTime = cJSON_GetObjectItem(timeSlot, "sSec"); cJSON *endTime = cJSON_GetObjectItem(timeSlot, "eSec"); cJSON *powerKW = cJSON_GetObjectItem(timeSlot, "pwrKw"); lP->pkvly.peakItem[i].timeCfgTab[j].startTime = atoi(startTime->valuestring); lP->pkvly.peakItem[i].timeCfgTab[j].endTime = atoi(endTime->valuestring); lP->pkvly.peakItem[i].timeCfgTab[j].power = powerKW->valuedouble; #ifdef CLOUDCTRLPRINT printf("Time Slot [%d]: Start: %d, End: %d, Power: %.2f kW\n", j, atoi(startTime->valuestring), atoi(endTime->valuestring), powerKW->valuedouble); #endif } } // 释放资源 if (lP->pkvly.peakItem[i].timeCfgTab != NULL) { free(lP->pkvly.peakItem[i].timeCfgTab); lP->pkvly.peakItem[i].timeCfgTab = NULL; } } } } } } } break; } case E_TACTIC_MODE_DEMANDRES: { break; } case E_TACTIC_MODE_LOADTRACK: { // 暂未实现 break; } case E_TACTIC_MODE_DMDCTRL: { // 暂未实现 break; } case E_TACTIC_MODE_PFCTRL: { // 暂未实现 break; } case KEms_cfg_Protect: { cJSON *transCapacity = cJSON_GetObjectItem(strategyCfg, "transCapacity"); cJSON *olWarnLimitVal = cJSON_GetObjectItem(strategyCfg, "olWarnLimitVal"); cJSON *maxPower = cJSON_GetObjectItem(strategyCfg, "maxPower"); cJSON *olShutdownVal = cJSON_GetObjectItem(strategyCfg, "olShutdownVal"); cJSON *demandSwitch = cJSON_GetObjectItem(strategyCfg, "demandSwitch"); cJSON *targetDemand = cJSON_GetObjectItem(strategyCfg, "targetDemand"); cJSON *deWarnLimitVal = cJSON_GetObjectItem(strategyCfg, "deWarnLimitVal"); cJSON *deShutdownVal = cJSON_GetObjectItem(strategyCfg, "deShutdownVal"); cJSON *backflowSwitch = cJSON_GetObjectItem(strategyCfg, "backflowSwitch"); cJSON *bfWarnLimitVal = cJSON_GetObjectItem(strategyCfg, "bfWarnLimitVal"); cJSON *bfShutdownVal = cJSON_GetObjectItem(strategyCfg, "bfShutdownVal"); cJSON *socForbidCharge = cJSON_GetObjectItem(strategyCfg, "socForbidCharge"); cJSON *socForbidDischarge = cJSON_GetObjectItem(strategyCfg, "socForbidDischarge"); lP->protect.maxActivePower = transCapacity->valuedouble; lP->protect.overFlowLowLimt = olWarnLimitVal->valuedouble; lP->protect.overFlowCloseLimt = olShutdownVal->valuedouble; lP->protect.maxPower = maxPower->valuedouble; lP->protect.demandCtl = demandSwitch->valueint; lP->protect.aimActiveDemand = targetDemand->valuedouble; lP->protect.demandCtrlLowLimt = deWarnLimitVal->valuedouble; lP->protect.demandCtrlCloseLimt = deShutdownVal->valuedouble; lP->protect.backFlow = backflowSwitch->valueint; lP->protect.backFlowLowLimt = bfWarnLimitVal->valueint; lP->protect.backFlowCloseLimt = bfShutdownVal->valuedouble; lP->protect.socForbidCharge = socForbidCharge->valuedouble; lP->protect.socForbidDischarge = socForbidDischarge->valuedouble; break; } case KEms_cfg_END: { break; } default: { printf("Unknown mode: %d\n", mode); break; } } return 0; } /***************************************************************************** * @brief 根据配置信息和模式组装成 JSON 字符串,并包含额外的顶层字段 * @param[in] arvcfgInfo_ret_t cfg 结构包含模式字、事务标识等信息 * @param[in] arg 指向配置数据的指针(例如,指向 logic_Params 的指针) * @param[in] rw 0-read,1-conrtol * @return char* 表示 JSON 字符串(需要调用者释放) *****************************************************************************/ char *createStrategyCfgJsonString(arvcfgInfo_ret_t cfg, const void *arg, int rw) { int mode = cfg.modeWord; char *transaction = cfg.transaction; int rc = 0; char *msg = NULL; work_mode_set_t curmode = {0}; rc = kit_get_work_mode_set(&curmode); if (1 == rc) { msg = "db fault"; } // 如果是控制指令且参数为空 if (arg == NULL && rw == 1) { rc = 1; msg = "param Analysis failed!"; // 参数解析失败 } // 获取当前时间戳 time_t timeStamp = time(NULL); // 创建一个新的 JSON 对象来包含所有信息 cJSON *root = cJSON_CreateObject(); cJSON *strategyCfg = cJSON_CreateObject(); if (strategyCfg == NULL) { rc = 1; msg = "program fault"; // 内存错误 } debug_algorithm_t p1 = {0}; protect_params_t p7 = {0}; UT_array *pvDateConfigs; // 添加顶层字段 cJSON_AddStringToObject(root, "transaction", transaction); cJSON_AddNumberToObject(root, "timeStamp", CurtimesMS(NULL)); cJSON_AddNumberToObject(root, "curMode", curmode.workMode); cJSON_AddNumberToObject(root, "modeWord", mode); cJSON_AddItemToObject(root, "strategyCfg", strategyCfg); switch (mode) { case E_TACTIC_MODE_START: // 暂未实现,如有需要,可以在这里设置相关字段 break; case E_TACTIC_MODE_DEBUG: rc = kit_get_debug_algorithm(&p1); if (rc == 1) { msg = "db fault"; break; } cJSON_AddNumberToObject(strategyCfg, "activepower", p1.activePower); cJSON_AddNumberToObject(strategyCfg, "reactivepower", p1.reactivePower); cJSON_AddNumberToObject(strategyCfg, "onOff", p1.pcsSwitch); break; case E_TACTIC_MODE_PEAKVALLY: rc = kit_get_pv_date_cfg_db_data(&pvDateConfigs); if (rc == 1) { msg = "db fault"; break; } // pv_date_config_t *pv_elem = (pv_date_config_t *)malloc(sizeof(pv_date_config_t)); cJSON *peakValleyTimeTables = cJSON_CreateObject(); cJSON_AddItemToObject(strategyCfg, "pvTab", peakValleyTimeTables); cJSON_AddNumberToObject(peakValleyTimeTables, "dTabN", utarray_len(pvDateConfigs)); cJSON *peakItem = cJSON_CreateArray(); cJSON_AddItemToObject(peakValleyTimeTables, "dTab", peakItem); utarray_foreach(pvDateConfigs, pv_date_config_t *, pv_elem) { cJSON *peak = cJSON_CreateObject(); cJSON_AddItemToArray(peakItem, peak); cJSON_AddStringToObject(peak, "sDay", (char *)pv_elem->startDate); cJSON_AddStringToObject(peak, "eDay", (char *)pv_elem->endDate); cJSON_AddNumberToObject(peak, "sTabN", pv_elem->timeCfgLen); cJSON *timeSlotTable = cJSON_CreateArray(); cJSON_AddItemToObject(peak, "sTab", timeSlotTable); for (int j = 0; j < pv_elem->timeCfgLen; j++) { cJSON *timeSlot = cJSON_CreateObject(); cJSON_AddItemToArray(timeSlotTable, timeSlot); cJSON_AddNumberToObject(timeSlot, "sSec", pv_elem->timeCfgTab[j].startTime); cJSON_AddNumberToObject(timeSlot, "eSec", pv_elem->timeCfgTab[j].endTime); cJSON_AddNumberToObject(timeSlot, "pwrKw", pv_elem->timeCfgTab[j].power); } } break; case E_TACTIC_MODE_DEMANDRES: // 暂未实现 break; case E_TACTIC_MODE_LOADTRACK: // 暂未实现 break; case E_TACTIC_MODE_DMDCTRL: // 暂未实现 break; case E_TACTIC_MODE_PFCTRL: // 暂未实现 break; case KEms_cfg_Protect: if (rw == 0) { // rc = kit_get_protect_algorithm(&p7); if (rc == 1) { msg = "db fault"; } } else { memcpy(&p7, arg, sizeof(protect_params_t)); } cJSON_AddNumberToObject(strategyCfg, "transCapacity", p7.maxActivePower); cJSON_AddNumberToObject(strategyCfg, "olWarnLimitVal", p7.overFlowLowLimt); cJSON_AddNumberToObject(strategyCfg, "maxPower", p7.maxPower); cJSON_AddNumberToObject(strategyCfg, "olShutdownVal", p7.overFlowCloseLimt); cJSON_AddNumberToObject(strategyCfg, "demandSwitch", p7.demandCtl); cJSON_AddNumberToObject(strategyCfg, "targetDemand", p7.aimActiveDemand); cJSON_AddNumberToObject(strategyCfg, "deWarnLimitVal", p7.demandCtrlLowLimt); cJSON_AddNumberToObject(strategyCfg, "deShutdownVal", p7.demandCtrlCloseLimt); cJSON_AddNumberToObject(strategyCfg, "backflowSwitch", p7.backFlow); cJSON_AddNumberToObject(strategyCfg, "bfWarnLimitVal", p7.backFlowLowLimt); cJSON_AddNumberToObject(strategyCfg, "bfShutdownVal", p7.backFlowCloseLimt); cJSON_AddNumberToObject(strategyCfg, "socForbidCharge", p7.socForbidCharge); cJSON_AddNumberToObject(strategyCfg, "socForbidDischarge", p7.socForbidDischarge); // 暂未实现,如有需要,可以在这里设置相关字段 break; case KEms_cfg_END: // 暂未实现,如有需要,可以在这里设置相关字段 break; default: printf("Unknown mode: %d\n", mode); cJSON_Delete(strategyCfg); return NULL; } cJSON_AddNumberToObject(root, "respCode", rc); if (rc == 0) { msg = "success"; } cJSON_AddStringToObject(root, "respMsg", msg); char *jsonString = cJSON_Print(strategyCfg); cJSON_Delete(strategyCfg); // 清理 JSON 对象 return jsonString; // 返回生成的 JSON 字符串 } /***************************************************************************** * @brief 解析云端的配置消息 * @param[in] jsonString:云端消息字符串 * @param[in] lP:算法配置联合体指针 * @return 事务符、mode *****************************************************************************/ arvcfgInfo_ret_t parseEmsCfgJson(const char *jsonString, logic_Params *lP) { arvcfgInfo_ret_t rc = {0}; cJSON *json = cJSON_Parse(jsonString); if (json == NULL) { printf("JSON parsing failed.\n"); return rc; } cJSON *transaction = cJSON_GetObjectItem(json, "transaction"); cJSON *timeStamp = cJSON_GetObjectItem(json, "timeStamp"); cJSON *modeWord = cJSON_GetObjectItem(json, "modeWord"); cJSON *strategyCfg = cJSON_GetObjectItem(json, "strategyCfg"); rc.transaction = transaction->valuestring; rc.modeWord = modeWord->valueint; if (transaction && timeStamp && modeWord && strategyCfg) { printf("Transaction: %s, Timestamp: %s\n", cJSON_GetStringValue(transaction), timeStamp->valuedouble); // 跟据算法枚举值解析json中的startegy元素,解析后的算法参数放入logic_Params中 parseStrategyJsonObject(strategyCfg, rc.modeWord, lP); } cJSON_Delete(json); return rc; } /***************************************************************************** * @brief 获取时间戳 * @return NONE *****************************************************************************/ char *getTmstr2() { time_t t = time(NULL); struct tm *tm = localtime(&t); char *time_str = malloc(20); // 动态分配内存 if (time_str == NULL) { return NULL; // 检查内存分配是否成功 } strftime(time_str, 20, "%Y-%m-%d %H:%M:%S", tm); return time_str; } /***************************************************************************** * @brief 获取时间戳 * @return NONE *****************************************************************************/ time_t strToTimeT(char *time_str) { struct tm tm; // 初始化tm结构体 memset(&tm, 0, sizeof(struct tm)); // 解析日期时间字符串 if (strftime(time_str, strlen(time_str), "%Y-%m-%d %H:%M:%S", &tm) == 0) { // 解析失败 return (time_t)-1; } // 转换为time_t return mktime(&tm); } /***************************************************************************** * @brief 从时间戳获取距离当前的时间差 * @return NONE *****************************************************************************/ void getTmDiffByStr(char *strdate) { time_t past_time = strToTimeT(strdate); if (past_time == (time_t)-1) { printf("Failed to convert string to time_t\n"); } time_t current_time = time(NULL); double difference = difftime(current_time, past_time); // 计算差异的天数、小时数、分钟数和秒数 int days = difference / (60 * 60 * 24); int hours = ((int)difference % (60 * 60 * 24)) / (60 * 60); int minutes = ((int)difference % (60 * 60)) / 60; int seconds = (int)difference % 60; printf("Time difference: %d days, %d hours, %d minutes, %d seconds\n", days, hours, minutes, seconds); } /***************************************************************************** * @brief 比较两个设备节点是否相同 * @param[in] a:左操作数 * @param[in] b:左操作数 * @return 0-不同,1-相同 *****************************************************************************/ bool ifSameTopoElem(const topology_t *a, const topology_t *b) { bool result = true; if (a->menuTree != b->menuTree) { // printf("menuTree differs: %d vs %d\n", a->menuTree, b->menuTree); result = false; } if (memcmp(a->name, b->name, MAX_NAME_BUF_LEN) != 0) { // printf("name differs: %s vs %s\n", a->name, b->name); result = false; } if (memcmp(a->sn, b->sn, MAX_CODE_BUF_LEN) != 0) { // printf("sn differs: %s vs %s\n", a->sn, b->sn); result = false; } if (a->dbId != b->dbId) { // printf("dbId differs: %d vs %d\n", a->dbId, b->dbId); result = false; } if (a->parentId != b->parentId) { // printf("parentId differs: %d vs %d\n", a->parentId, b->parentId); result = false; } if (a->sortOrder != b->sortOrder) { // printf("sortOrder differs: %d vs %d\n", a->sortOrder, b->sortOrder); result = false; } if (a->devType != b->devType) { // printf("devType differs: %d vs %d\n", a->devType, b->devType); result = false; } return result; } /***************************************************************************** * @brief 比较两个设备拓扑是否相同 * @param[in] arr1:左操作数 * @param[in] arr2:左操作数 * @return 0-不同,1-相同 *****************************************************************************/ bool chkTopoDiff(UT_array *arr1, UT_array *arr2) { if (arr1 == NULL || arr2 == NULL) { return false; } if (utarray_len(arr1) != utarray_len(arr2)) { return false; } topology_t *el1, *el2; for (el1 = (topology_t *)utarray_front(arr1), el2 = (topology_t *)utarray_front(arr2); el1 != NULL && el2 != NULL; el1 = (topology_t *)utarray_next(arr1, el1), el2 = (topology_t *)utarray_next(arr2, el2)) { if (!ifSameTopoElem(el1, el2)) { return false; } } return true; } int getCfgBymodeWord(void *arg, int modeWord) { debug_algorithm_t *pDbA = (debug_algorithm_t *)malloc(sizeof(debug_algorithm_t)); debug_params_t *pDbg = (debug_params_t *)malloc(sizeof(debug_params_t)); int rc = 0; switch (modeWord) { case E_TACTIC_MODE_DEBUG: // 调试模式 rc = kit_get_debug_algorithm(pDbA); if (rc != 0) { return 1; } pDbg->activePower = pDbA->activePower; pDbg->reactivePower = pDbA->reactivePower; pDbg->pcsSwitch = pDbA->pcsSwitch; arg = (debug_params_t *)malloc(sizeof(debug_params_t)); memcpy(arg, pDbg, sizeof(debug_params_t)); break; case E_TACTIC_MODE_PEAKVALLY: // 削峰填谷模式 break; // case E_TACTIC_MODE_LOADTRACK: // 负载跟踪模式 // lenth = sizeof(peakvalley_zone_tab_t); // break; case E_TACTIC_MODE_DMDCTRL: // 需量控制 break; case E_TACTIC_MODE_PFCTRL: // 功率因数 break; default: break; // 失败 } if (pDbA != NULL) { free(pDbA); } if (pDbg != NULL) { free(pDbg); } return 0; } /***************************************************************************** * @brief 更新上云测点的变化状态 * @param[in] map:mqtt测点配置表 * @return NONE *****************************************************************************/ void updatePointsChgStat(mqtt_option_map_t *map) { map->bChgFlag = false; for (int i = 0; i < map->devNum; i++) { map->pllist->bChgFlag = false; for (int j = 0; j < map->pllist[i].txcount; j++) { double old = map->pllist[i].rtdbGetTab[j].value; double new = 0; if (map->devType == kDev_Type_BSU) { new = getBsuRTDBPointValue(g_bcuMap, map->pllist[i].rtdbGetTab[j].dbType, map->pllist[i].rtdbGetTab[j].devType, map->pllist[i].rtdbGetTab[j].devId, map->pllist[i].rtdbGetTab[j].pointId); } else { new = mqttFindDB(map->pllist[i].rtdbGetTab[j]); } map->pllist[i].rtdbGetTab[j].value = new; if (double_equal(old, new)) { map->pllist[i].rtdbGetTab[j].ifchg = false; } else { map->pllist[i].rtdbGetTab[j].ifchg = true; #ifdef MQTTPRINT printf("dbtype=%d,devType=%d,devId=%d,pointId=%d,old=%f,new=%f\n", map->pllist[i].rtdbGetTab[j].dbType, map->pllist[i].rtdbGetTab[j].devType, map->pllist[i].rtdbGetTab[j].devId, map->pllist[i].rtdbGetTab[j].pointId, old, new); #endif } map->pllist[i].bChgFlag |= map->pllist[i].rtdbGetTab[j].ifchg; } map->bChgFlag |= map->pllist[i].bChgFlag; } } char *getDbgLocalSetMsg(debug_algorithm_t *dbg, work_mode_set_t *wm) { // 为字符串申请内存 char *jsonStr = malloc(256); if (jsonStr == NULL) { return NULL; } // 构建json snprintf(jsonStr, 256, "{" "\"timeStamp\":0," "\"transaction\":\"0\"," "\"curMode\":\"%d\"," "\"modeWord\":\"1\"," "\"strategyCfg\":{" "\"activePower\":\"%f\"," "\"reactivePower\":\"%f\"," "\"switch\":\"%d\"" "}" "}", wm->workMode, dbg->activePower, dbg->reactivePower, dbg->pcsSwitch); return jsonStr; } void setdbgCB(void *arg) { // 忽略 SIGPIPE 信号 signal(SIGPIPE, SIG_IGN); ConnContext *ct = (ConnContext *)arg; int rc = 0; MQTTClient_message pubmsg = MQTTClient_message_initializer; char *msg = (char *)malloc(MAX_MQTT_MSG_LEN * sizeof(char)); debug_algorithm_t dbgArg = {0}; rc = kit_get_debug_algorithm(&dbgArg); work_mode_set_t workmode = {0}; kit_get_work_mode_set(&workmode); if (0 == rc) { msg = getDbgLocalSetMsg(&dbgArg, &workmode); pubmsg.payload = msg; pubmsg.payloadlen = strlen(msg); pubmsg.qos = kQos_2; pubmsg.retained = 0; retry:; if (0 == sem_trywait(&g_allowSend)) { rc = MQTTClient_publishMessage(ct->client, (char *)ct->mlib.changeTopic, &pubmsg, NULL); if (MQTTCLIENT_SUCCESS == rc) { rc = MQTTClient_waitForCompletion(ct->client, deliveredtoken, TIMEOUT); sem_post(&g_allowSend); } else { sem_post(&g_allowSend); goto retry; } } else { usleep(50000); goto retry; } } return NULL; }; void setPkvlCB(void *arg) { return; }; void setDmdRspCB(void *arg) { return; }; void setLoadTrackCB(void *arg) { return; }; void setDmdCtrlCB(void *arg) { return; }; void setPFCtrlCB(void *arg) { return; }; setCfgCB setCfgCBGrp[] = { setdbgCB, setPkvlCB, setDmdRspCB, setLoadTrackCB, setDmdCtrlCB, setPFCtrlCB, }; void *dataChgMonitor(void *arg) { while (!(g_MqttMsg & kEvent_topo_send)) { usleep(50000); } // 忽略 SIGPIPE 信号 signal(SIGPIPE, SIG_IGN); ConnContext *ct = (ConnContext *)arg; int rc = 0; MQTTClient_message pubmsg = MQTTClient_message_initializer; char *msg = (char *)malloc(MAX_MQTT_MSG_LEN * sizeof(char)); if (msg == NULL) { return NULL; } while (1) { for (int cfgN = kSign_LogicDebug; cfgN < kSign_Logic_End; cfgN++) { rc = readWebSign(kSign_ShMem, cfgN); if (true == rc) { setCfgCBGrp[cfgN - kSign_Logic_Start - 1](ct); } } if (g_MqttMsg & kEvent_Period_Task_begin) { for (int i = 0; i < kDev_Type_End; i++) { if (g_mqtt_map[i].bChgFlag == false) { updatePointsChgStat(&g_mqtt_map[i]); usleep(50000); continue; } else { rc = genChgDevGrpPeriodPayload(msg, i, &g_mqtt_map[i]); updatePointsChgStat(&g_mqtt_map[i]); if (0 == strlen(msg) || g_mqtt_map[i].devNum < 1 || 1 == rc) // rc==1代表该包数据无测点信息 { continue; } pubmsg.payload = msg; pubmsg.payloadlen = strlen(msg); pubmsg.qos = kQos_2; pubmsg.retained = 0; retry_label:; if (0 == sem_trywait(&g_allowSend)) { chkdata_thread_haveSendSig = 1; rc = MQTTClient_publishMessage(ct->client, (char *)ct->mlib.changeTopic, &pubmsg, NULL); if (MQTTCLIENT_SUCCESS == rc) { rc = MQTTClient_waitForCompletion(ct->client, deliveredtoken, TIMEOUT); sem_post(&g_allowSend); chkdata_thread_haveSendSig = 0; } else { sem_post(&g_allowSend); chkdata_thread_haveSendSig = 0; } } else { usleep(50000); goto retry_label; } updatePointsChgStat(&g_mqtt_map[i]); } } chkdata_thread_alive = 1; usleep(50000); } else { usleep(50000); } } } void pointListener(mqtt2db_t *tab) { double old = tab->value; double new = mqttFindDB(*tab); if (!double_equal(old, new)) { tab->ifchg = 1; } } /***************************************************************************** * @brief 线程监视函数 * @param[in] arg: 可选参数 * @return NONE *****************************************************************************/ void *threadGuard(void *arg) { pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); pthread_t worker = 0; mqtt_lib_t mlt = {0}; getMQTTCfg(&mlt); // 传递给重连回调函数的结构体 ConnContext connContext; MQTTClient_connectOptions connopts_tmp = MQTTClient_connectOptions_initializer; memcpy(&connContext.mlib, &mlt, sizeof(mqtt_lib_t)); connContext.connopts = connopts_tmp; connContext.connopts.username = (char *)mlt.username; connContext.connopts.password = (char *)mlt.password; connContext.connopts.keepAliveInterval = 60; // 设置心跳间隔为 60 秒 connContext.connopts.cleansession = 0; int period = connContext.mlib.tSendTaskPeriod > 0 ? 2 * connContext.mlib.tSendTaskPeriod : 10; int cnt = 0; while (1) { sleep(period); // 检查频率,为2倍周期线程的周期 cnt = 0; periodT_rechk:; if (!period_thread_alive) { if (cnt < 20) { cnt++; goto periodT_rechk; } if (period_thread_haveSendSig) { sem_post(&g_allowSend); } if (period_thread_haveRcdSig) { sem_post(&g_MqttwaitAndSaveHistory); } pthread_cancel(g_Tids[D_PERIODSEND_T_SEQ]); // 尝试取消阻塞或无响应的线程 pthread_create(&worker, NULL, periodSendTask, &connContext); // 创建新的线程 g_Tids[D_PERIODSEND_T_SEQ] = worker; // 存储新的线程id } worker = 0; cnt = 0; history_rechk:; if (!history_thread_alive) { if (cnt < 20) { cnt++; goto history_rechk; } if (history_thread_haveSendSig) { sem_post(&g_allowSend); } if (history_thread_haveRcdSig) { sem_post(&g_MqttwaitAndSaveHistory); } pthread_cancel(g_Tids[D_HISTORY_T_SEQ]); // 尝试取消阻塞或无响应的线程 pthread_create(&worker, NULL, historyDataHandle, &connContext); // 创建新的线程 g_Tids[D_HISTORY_T_SEQ] = worker; // 存储新的线程id } worker = 0; chg_rechk:; if (!chkdata_thread_alive) { if (cnt < 20) { cnt++; goto chg_rechk; } if (chkdata_thread_haveSendSig) { sem_post(&g_allowSend); } pthread_cancel(g_Tids[D_DATACHK_T_SEQ]); // 尝试取消阻塞或无响应的线程 pthread_create(&worker, NULL, dataChgMonitor, &connContext); // 创建新的线程 g_Tids[D_DATACHK_T_SEQ] = worker; // 存储新的线程id } period_thread_alive = 0; // 重置 history_thread_alive = 0; chkdata_thread_alive = 0; } return NULL; } /***************************************************************************** * @brief 本地设置调试模式的参数 * @param[in] onoff:开关机 * @param[in] actiePower:有功功率 * @param[in] reactivePower:无功功率 * @return 控制报文json串 *****************************************************************************/ char *LocalCtrlDebugParam(int onoff, float activePower, float reactivePower) { // 为字符串申请内存 char *jsonStr = malloc(256); if (jsonStr == NULL) { return NULL; } // 构建json snprintf(jsonStr, 256, "{" "\"transaction\":\"0\"," "\"timeStamp\":\"0\"," "\"modeWord\":\"1\"," "\"strategyCfg\":{" "\"P\":%f," "\"Q\":%f," "\"onOff\":%d" "}" "}", activePower, reactivePower, onoff); return jsonStr; } /***************************************************************************** * @brief 本地设置削峰填谷模式的参数 * @return 控制报文json串 *****************************************************************************/ char *LocalPkvlyParams() { char *json_string = "{\n" " \"transaction\": \"0\",\n" " \"timeStamp\": \"0\",\n" " \"modeWord\": 2,\n" " \"strategyCfg\": {\n" " \"pvTab\": {\n" " \"dTabN\": 2,\n" " \"dTab\": [\n" " {\n" " \"sDay\": \"2024-09-24\",\n" " \"eDay\": \"2024-09-30\",\n" " \"sTabN\": 3,\n" " \"sTab\": [\n" " {\n" " \"sSec\": 1000,\n" " \"eSec\": 1000,\n" " \"pwrKw\": 7.0\n" " },\n" " {\n" " \"sSec\": 4000,\n" " \"eSec\": 10000,\n" " \"pwrKw\": 9\n" " },\n" " {\n" " \"sSec\": 1000,\n" " \"eSec\": 20000,\n" " \"pwrKw\": 18\n" " }\n" " ]\n" " },\n" " {\n" " \"sDay\": \"2024-10-01\",\n" " \"eDay\": \"2024-10-07\",\n" " \"sTabN\": 2,\n" " \"sTab\": [\n" " {\n" " \"sSec\": 1500,\n" " \"eSec\": 2500,\n" " \"pwrKw\": 45\n" " },\n" " {\n" " \"sSec\": 2500,\n" " \"eSec\": 3500,\n" " \"pwrKw\": -55\n" " }\n" " ]\n" " }\n" " ]\n" " }\n" " }\n" "}"; return json_string; } /***************************************************************************** * @brief 发送拓扑结构的函数 * @param[in] client:客户端句柄 * @return none *****************************************************************************/ void sendDevTopo(MQTTClient client) { signal(SIGPIPE, SIG_IGN); static int finish = 0; if (finish == 1) { return; } int rc = 0; char *msg = (char *)malloc(MAX_MQTT_MSG_LEN * sizeof(char)); if ((g_MqttMsg & kEvent_change_devTopo) || (finish == 0)) { g_MqttMsg &= ~kEvent_topo_send; MQTTClient_message pubmsg = MQTTClient_message_initializer; getTopologyJsonByDb(&msg); pubmsg.payload = msg; pubmsg.payloadlen = strlen(msg); pubmsg.qos = kQos_2; pubmsg.retained = 0; char *topic = (char *)NorthProtoTable[kProto_MQTT_Slave].northProtocol.mqttLib.rootTopic; retry:; if (0 == sem_trywait(&g_allowSend)) { rc = MQTTClient_publishMessage(client, topic, &pubmsg, &deliveredtoken); if (MQTTCLIENT_SUCCESS == rc) { rc = MQTTClient_waitForCompletion(client, deliveredtoken, TIMEOUT); sem_post(&g_allowSend); if (MQTTCLIENT_SUCCESS == rc) { g_MqttMsg |= kEvent_topo_send; finish = 1; } else { finish = 0; } } else { sem_post(&g_allowSend); finish = 0; } } else { usleep(50000); goto retry; } } return; }; /***************************************************************************** * @brief mqtt主函数 * @param[in] arg: 可选参数 * @return NONE *****************************************************************************/ void *MqttTask(void *arg) { // 忽略 SIGPIPE 信号 signal(SIGPIPE, SIG_IGN); // printf("mqtt task begin!\n"); int rc; g_MqttMsg = 0; mqtt_lib_t mlt; retry: updateMqttPara(&mlt, g_mqtt_map); // 初始化历史数据数组 utarray_new(g_recordsWithData, &brd_icd); MQTTClient_connectOptions connopts_tmp = MQTTClient_connectOptions_initializer; // 传递给重连回调函数的结构体 ConnContext connContext; memcpy(&connContext.mlib, &mlt, sizeof(mqtt_lib_t)); connContext.connopts = connopts_tmp; connContext.connopts.username = (char *)mlt.username; connContext.connopts.password = (char *)mlt.password; connContext.connopts.keepAliveInterval = 10; // 设置心跳间隔为 10 秒 connContext.connopts.cleansession = 0; // connContext.connopts.retryInterval = 1; // 创建MQTT客户端 rc = MQTTClient_create(&connContext.client, (char *)mlt.url, (char *)mlt.clientId, MQTTCLIENT_PERSISTENCE_NONE, NULL); // 设置回调函数 rc = MQTTClient_setCallbacks(connContext.client, (void *)&connContext, connlostCB, msgarrvdCB, NULL); // 连接到MQTT代理 if ((rc = MQTTClient_connect(connContext.client, &connContext.connopts)) != MQTTCLIENT_SUCCESS) { char *cause = "MQTT Broker连接失败。"; updateBrkRcd(cause); updateConnectionStatus(false); KITPTF(LOG_MQTT_EN, INFO_EN, "Failed to connect, return code%d\n.", rc); KITLOG(LOG_MQTT_EN, INFO_EN, "MQTT Broker连接失败。", NULL); // 记录网络断开 RECORDNETSTATUS(D_STR_DISCONN_WORD) goto retry; } else { updateConnectionStatus(true); KITPTF(LOG_MQTT_EN, INFO_EN, "successfully connected to MQTT broker!\n.", rc); KITLOG(LOG_MQTT_EN, INFO_EN, "successfully connected to MQTT broker!\n", NULL); // 记录网络连接成功 RECORDNETSTATUS(D_STR_CONNECT_WORD) } // 订阅EMS control/read主题以接收消息 rc = MQTTClient_subscribe(connContext.client, (char *)mlt.controlTopic, mlt.qos); if (MQTTCLIENT_SUCCESS == rc) { // printf("Successfully subscribed to the %s topic!\n", mlt.controlTopic); } rc = MQTTClient_subscribe(connContext.client, (char *)mlt.readTopic, mlt.qos); if (MQTTCLIENT_SUCCESS == rc) { // printf("Successfully subscribed to the %s topic!\n", mlt.readTopic); } // 初始化消息发送互斥信号量 sem_init(&g_allowSend, 0, 1); // 初始化历史数据存储和发送互斥量 sem_init(&g_MqttwaitAndSaveHistory, 0, 1); pthread_t periodfd; pthread_create(&periodfd, NULL, periodSendTask, &connContext); // 创建周期发送线程 g_Tids[D_PERIODSEND_T_SEQ] = periodfd; pthread_t historyHandlefd; pthread_create(&historyHandlefd, NULL, historyDataHandle, &connContext); // 创建历史数据处理线程 g_Tids[D_HISTORY_T_SEQ] = historyHandlefd; pthread_t dataChgMonitorfd; pthread_create(&dataChgMonitorfd, NULL, dataChgMonitor, &connContext); // 创建变化检测处理线程 g_Tids[D_DATACHK_T_SEQ] = dataChgMonitorfd; // pthread_t thread_guardfd; // pthread_create(&thread_guardfd, NULL,threadGuard, &connContext); // 创建线程管理线程 while (1) { // 在首次开始发送任务或者设备拓扑改动时,发送设备拓扑 sendDevTopo(connContext.client); sleep(1); } // 断开连接并销毁客户端 MQTTClient_disconnect(connContext.client, 10000); MQTTClient_destroy(&connContext.client); } /***************************************************************************** * @brief 创建MQTT线程函数 * @return NONE *****************************************************************************/ void creatNetMqttTaskEntry() { int nodevToSend = judgeNodev(); if (NorthProtoTable[kProto_MQTT_Slave].configType == 0) { if (nodevToSend == 1) { KITPTF(LOG_MQTT_EN, INFO_EN, "MQTT:failed to get mqtt config!\n.", NULL); KITLOG(LOG_MQTT_EN, INFO_EN, "MQTT:failed to get mqtt config!\n", NULL); return; } } else if (NorthProtoTable[kProto_MQTT_Slave].configType == 1) { if (NorthProtoTable[kProto_MQTT_Slave].upDevNum + NorthProtoTable[kProto_MQTT_Slave].disDevNum == 0) { KITPTF(LOG_MQTT_EN, INFO_EN, "MQTT:failed to get mqtt config!\n.", NULL); KITLOG(LOG_MQTT_EN, INFO_EN, "MQTT:failed to get mqtt config!\n", NULL); return; } } pthread_t mqttfd; if (pthread_create(&mqttfd, NULL, MqttTask, NULL) != 0) { KITPTF(LOG_MQTT_EN, INFO_EN, "MQTT任务创建失败。", NULL); KITLOG(LOG_MQTT_EN, INFO_EN, "MQTT任务创建失败。", NULL); // printf("fail to connect!"); } else { KITPTF(LOG_MQTT_EN, INFO_EN, "MQTT任务创建成功。", NULL); KITLOG(LOG_MQTT_EN, INFO_EN, "MQTT任务创建成功。", NULL); // printf("success to connect!\n"); } };