#include #include #include // 用于JSON操作 // ----------- 设备配置 (需要为您自己的设备和环境修改) ----------- // WiFi const char *ssid = "WEIHANG3718"; // 请输入您的 Wi-Fi 名称 const char *password = "@05Tu146"; // 请输入您的 Wi-Fi 密码 // MQTT Broker const char *mqtt_broker = "yuyun-hk1.stormrain.cn"; // 您的 MQTT Broker 地址 const char *mqtt_username = "emqx"; // 您的 MQTT 用户名 (如果需要) const char *mqtt_password = "public"; // 您的 MQTT 密码 (如果需要) const int mqtt_port = 1883; // 您的 MQTT 端口 // 设备唯一标识符 (非常重要, 必须与后端注册的一致) const char *spotUid = "ESP32_SPOT_001"; // 例如: "SPOT001", "P005-A1" 等 // ----------- MQTT 主题定义 ----------- // 基于 application.yml 和后端服务实现 String topic_uplink_to_backend; // 上报给后端: yupi_mqtt_power_project/robot/status/{spotUid} String topic_downlink_from_backend; // 从后端接收指令: yupi_mqtt_power_project/robot/command/{spotUid} // ----------- 全局变量 ----------- WiFiClient espClient; PubSubClient client(espClient); // 模拟硬件状态 (实际项目中需要从传感器或硬件逻辑获取) const char* currentDeviceStatus = "IDLE"; // 设备当前状态: IDLE, CHARGING, COMPLETED, FAULTED 等 float currentVoltage = 220.0; float currentCurrent = 0.0; float currentPower = 0.0; float currentEnergyConsumed = 0.0; int currentErrorCode = 0; String currentSessionId = ""; // 当前充电会话ID String currentSimulatedLocation = "BASE_STATION"; // 新增:模拟当前位置 int currentSimulatedBattery = 95; // 新增:模拟当前电量 String currentTargetSpot = ""; // 新增:用于移动状态的目标车位 String currentSpotId = ""; // 新增:当前所在车位 (用于充电等状态) unsigned long chargeStartTimeMillis = 0; // 新增:用于计算充电时长 // 定时发送相关 unsigned long lastStatusUpdateTime = 0; unsigned long lastHeartbeatTime = 0; const long statusUpdateInterval = 30000; // 状态上报间隔 (例如: 30秒) const long heartbeatInterval = 60000; // 心跳间隔 (例如: 60秒) unsigned long lastBatteryUpdateTime = 0; // 新增:用于模拟电池电量变化的时间戳 const long batteryUpdateInterval = 5000; // 新增:电池状态更新间隔(例如5秒) void setup_mqtt_topics() { String backend_status_base = "yupi_mqtt_power_project/robot/status/"; String backend_command_base = "yupi_mqtt_power_project/robot/command/"; topic_uplink_to_backend = backend_status_base + String(spotUid); topic_downlink_from_backend = backend_command_base + String(spotUid); Serial.println("MQTT 主题初始化完成 (匹配后端实现):"); Serial.println(" 上行主题 (状态/心跳/ACK): " + topic_uplink_to_backend); Serial.println(" 下行主题 (接收指令): " + topic_downlink_from_backend); } void connect_wifi() { Serial.println("正在连接 Wi-Fi..."); WiFi.begin(ssid, password); while (WiFi.status() != WL_CONNECTED) { delay(500); Serial.print("."); } Serial.println("\nWi-Fi 连接成功!"); Serial.print("IP 地址: "); Serial.println(WiFi.localIP()); } void reconnect_mqtt() { while (!client.connected()) { Serial.print("尝试连接 MQTT Broker: "); Serial.println(mqtt_broker); String client_id = "esp32-client-" + String(spotUid) + "-"; client_id += String(WiFi.macAddress()); Serial.printf("客户端 ID: %s \n", client_id.c_str()); if (client.connect(client_id.c_str(), mqtt_username, mqtt_password)) { Serial.println("MQTT Broker 连接成功!"); // 订阅唯一下行指令主题 client.subscribe(topic_downlink_from_backend.c_str()); Serial.println("已订阅指令主题: " + topic_downlink_from_backend); publish_status_update(false, nullptr, nullptr, nullptr, nullptr, nullptr); // 连接成功后发送一次常规状态更新 } else { Serial.print("连接失败, rc="); Serial.print(client.state()); Serial.println(" 2秒后重试..."); delay(2000); } } } void callback(char *topic, byte *payload, unsigned int length) { Serial.println("-----------------------"); Serial.print("消息抵达, 主题: "); Serial.println(topic); char message[length + 1]; memcpy(message, payload, length); message[length] = '\0'; Serial.print("消息内容: "); Serial.println(message); if (String(topic) != topic_downlink_from_backend) { Serial.println("消息非来自预期的指令主题,忽略。"); return; } StaticJsonDocument<256> doc; DeserializationError error = deserializeJson(doc, message); if (error) { Serial.print("JSON 解析失败: "); Serial.println(error.f_str()); return; } const char* cmdType = nullptr; if (doc.containsKey("commandType")) { cmdType = doc["commandType"]; } else if (doc.containsKey("command")) { cmdType = doc["command"]; } const char* taskId = doc["taskId"]; if (cmdType == nullptr || taskId == nullptr) { Serial.println("指令JSON缺少 commandType/command 或 taskId 字段。"); publish_ack_message(taskId, false, "Command JSON invalid (missing commandType/command or taskId)", nullptr); return; } const char* spotIdFromCommand = nullptr; if (doc.containsKey("spotId")) { spotIdFromCommand = doc["spotId"].as(); } else if (doc.containsKey("target_spot_uid")) { spotIdFromCommand = doc["target_spot_uid"].as(); } if (strcmp(cmdType, "MOVE_TO_SPOT") == 0 || strcmp(cmdType, "MOVE") == 0) { Serial.println("接收到 [移动] 指令 (MOVE_TO_SPOT 或 MOVE)"); if (spotIdFromCommand) { currentTargetSpot = String(spotIdFromCommand); currentSpotId = ""; } else { currentTargetSpot = "UNKNOWN_SPOT"; } currentDeviceStatus = "MOVING"; currentSimulatedLocation = "EN_ROUTE_TO_" + currentTargetSpot; Serial.println("状态更新: MOVING (前往目标车位: " + currentTargetSpot + ")"); publish_status_update(false, nullptr, nullptr, nullptr, nullptr, nullptr); Serial.println("模拟: 机器人正在移动到目标车位 " + currentTargetSpot + "..."); // 模拟移动过程中的电量消耗,更细致的可以在 loop 中做 if (currentSimulatedBattery > 10) currentSimulatedBattery -= 5; // 假设移动固定消耗一些电 else currentSimulatedBattery = 5; // 最低电量 delay(3000); Serial.println("模拟: 机器人已到达目标车位 " + currentTargetSpot + "。"); currentDeviceStatus = "CHARGING"; currentSpotId = currentTargetSpot; currentTargetSpot = ""; currentSimulatedLocation = currentSpotId; chargeStartTimeMillis = millis(); Serial.println("状态更新: CHARGING (已到达目标车位 " + currentSpotId + ",视为开始充电)"); publish_ack_message(taskId, true, "Robot arrived at spot and started charging (simulated)", currentSpotId.c_str()); publish_status_update(false, nullptr, nullptr, nullptr, nullptr, nullptr); } else if (strcmp(cmdType, "START_CHARGE") == 0) { Serial.println("接收到 [启动充电] 指令"); if (strcmp(currentDeviceStatus, "CHARGING") != 0) { // 仅当未在充电时才响应 currentDeviceStatus = "CHARGING"; if (spotIdFromCommand) { currentSpotId = String(spotIdFromCommand); } else if (currentSpotId.length() == 0) { currentSpotId = "DEFAULT_SPOT"; // 如果没有从指令获取且之前也没有,给个默认值 } currentSimulatedLocation = currentSpotId; chargeStartTimeMillis = millis(); Serial.println("状态更新: CHARGING (指令启动于 " + currentSpotId + ")"); } else { Serial.println("设备已在充电中,忽略 START_CHARGE 指令。"); } if (doc.containsKey("sessionId")) { currentSessionId = String(doc["sessionId"].as()); } else { currentSessionId = ""; } Serial.println("模拟: 充电已启动。会话ID: " + currentSessionId + ", 车位: " + currentSpotId); publish_ack_message(taskId, true, "Charging started successfully", currentSessionId.c_str()); publish_status_update(false, nullptr, nullptr, nullptr, nullptr, nullptr); } else if (strcmp(cmdType, "STOP_CHARGE") == 0) { Serial.println("接收到 [停止充电] 指令"); bool wasCharging = strcmp(currentDeviceStatus, "CHARGING") == 0; currentDeviceStatus = "COMPLETED"; currentSimulatedLocation = currentSpotId; unsigned long chargeDuration = 0; if (chargeStartTimeMillis > 0 && wasCharging) { chargeDuration = (millis() - chargeStartTimeMillis) / 1000; } chargeStartTimeMillis = 0; Serial.println("模拟: 充电已停止。车位: " + currentSpotId + ", 本次充电时长约: " + String(chargeDuration) + "s"); String previousSessionId = currentSessionId; currentSessionId = ""; // 在ACK中上报准确的充电时长,如果需要的话,可以通过修改 publish_ack_message 或在 message 字段中添加 // For now, the generic ACK is sent. publish_ack_message(taskId, true, ("Charging stopped. Duration: " + String(chargeDuration) + "s").c_str(), previousSessionId.c_str()); publish_status_update(false, nullptr, nullptr, nullptr, nullptr, nullptr); } // Add other commandType handling here if needed, e.g., "QUERY_STATUS" // else if (strcmp(cmdType, "QUERY_STATUS") == 0) { // Serial.println("接收到 [查询状态] 指令"); // publish_regular_status_update(); // 回复当前状态 // publish_ack_message(taskId, true, "Status reported", nullptr); // } else { Serial.println("未知指令 commandType: " + String(cmdType)); publish_ack_message(taskId, false, ("Unknown commandType: " + String(cmdType)).c_str(), nullptr); } Serial.println("-----------------------"); } void publish_message(const String& topic, const JsonDocument& doc, const char* message_type) { String jsonBuffer; serializeJson(doc, jsonBuffer); Serial.print("发送 "); Serial.print(message_type); Serial.print(" 到主题 ["); Serial.print(topic); Serial.print("]: "); Serial.println(jsonBuffer); if (client.publish(topic.c_str(), jsonBuffer.c_str())) { Serial.println(String(message_type) + " 发送成功"); } else { Serial.println(String(message_type) + " 发送失败"); } } // isAckOrTaskUpdate: true if this is an ACK or a task-specific update, false for general status/heartbeat // ackTaskId: The taskId if this is an ACK for a command. Null otherwise. void publish_status_update(bool isAckOrTaskUpdate, const char* ackTaskId, const char* ackStatus, const char* ackMessage, const char* ackErrorCode, const char* ackSessionId) { StaticJsonDocument<512> doc; doc["robotUid"] = spotUid; if (isAckOrTaskUpdate) { if (ackTaskId) { // 关键修复:同时提供 taskId 和 activeTaskId 字段 // taskId 用于 robot_task 表的状态更新 // activeTaskId 用于 charging_session 的业务流程推进 doc["taskId"] = ackTaskId; doc["activeTaskId"] = ackTaskId; } if (ackStatus) doc["status"] = ackStatus; if (ackMessage) doc["message"] = ackMessage; // 根据用户要求,ACK中不发送errorCode // if (ackErrorCode) doc["errorCode"] = ackErrorCode; doc["actualRobotStatus"] = currentDeviceStatus; if (ackSessionId && strlen(ackSessionId) > 0) { // For ACKs, if we have a sessionId or a spotId that's relevant for context, we can add it. // Let's use "spotId" as suggested by requirements for charging related ACKs if (strcmp(currentDeviceStatus, "CHARGING") == 0 || strcmp(currentDeviceStatus, "COMPLETED") == 0 || (ackMessage && strstr(ackMessage, "arrived"))) { if(currentSpotId.length() > 0) doc["spotId"] = currentSpotId; } } } else { // General status update / heartbeat doc["actualRobotStatus"] = currentDeviceStatus; // Add fields as per requirements.md doc["location"] = currentSimulatedLocation; doc["battery"] = currentSimulatedBattery; // Example value, should be updated by a battery sim function // doc["errorCode"] = currentErrorCode; // 根据用户要求,常规状态下不发送errorCode // Fields specific to certain statuses if (strcmp(currentDeviceStatus, "MOVING") == 0) { if (currentTargetSpot.length() > 0) doc["targetSpot"] = currentTargetSpot; } else if (strcmp(currentDeviceStatus, "CHARGING") == 0) { if (currentSpotId.length() > 0) doc["spotId"] = currentSpotId; if (chargeStartTimeMillis > 0) { doc["durationSeconds"] = (millis() - chargeStartTimeMillis) / 1000; } else { doc["durationSeconds"] = 0; } } else if (strcmp(currentDeviceStatus, "COMPLETED") == 0) { if (currentSpotId.length() > 0) doc["spotId"] = currentSpotId; // totalDurationSeconds would typically be set upon actual completion event, // for now, a regular status update might not have final total, or we can omit. // Let's assume for now that if status is COMPLETED, we send a placeholder or last known duration. // A more robust solution is needed for totalDurationSeconds. // For now, publish_ack_message for STOP_CHARGE should probably carry the final duration. } else if (strcmp(currentDeviceStatus, "ERROR") == 0) { // errorCode is already included. message for error could be added if available. // doc["message"] = "Simulated error description"; // if we have one } } // Common fields (timestamp can be added by backend or here if NTP is used) // doc["timestamp"] = String(millis()); // Already using millis() publish_message(topic_uplink_to_backend, doc, isAckOrTaskUpdate ? "ACK/TaskUpdate" : "StatusUpdate"); if (!isAckOrTaskUpdate) { // Only update lastStatusUpdateTime for general status updates, not for ACKs triggered by commands lastStatusUpdateTime = millis(); } } void publish_regular_status_update() { // This is a wrapper for general periodic status updates publish_status_update(false, nullptr, nullptr, nullptr, nullptr, nullptr); } void publish_heartbeat() { StaticJsonDocument<256> doc; // Heartbeat can be simpler doc["robotUid"] = spotUid; doc["actualRobotStatus"] = currentDeviceStatus; // Heartbeat includes current status // Optionally, add a specific "messageType": "HEARTBEAT" if backend needs explicit differentiation // beyond just a minimal status update. For now, relying on RobotStatusMessage structure. publish_message(topic_uplink_to_backend, doc, "Heartbeat"); lastHeartbeatTime = millis(); } // Simplified ACK message function void publish_ack_message(const char* taskId, bool success, const char* message, const char* contextInfo) { if (!taskId || strlen(taskId) == 0) { Serial.println("无法发送ACK: taskId 为空"); return; } // Use the main publish_status_update function formatted as an ACK // For contextInfo, we can pass spotId if relevant, or sessionId if that's what backend expects for ACKs. // The 'true' indicates it's an ACK. // The ackErrorCode field in publish_status_update will be set to "SUCCESS_ACK" or "FAILURE_ACK" // 根据用户要求,ACK中的errorCode也暂时简化或移除。如果保留,确保含义清晰。 publish_status_update(true, taskId, success ? "COMPLETED" : "FAILED", message, nullptr, contextInfo); } void setup() { Serial.begin(115200); Serial.println("\nESP32 充电桩模拟客户端启动..."); setup_mqtt_topics(); // 初始化MQTT主题 connect_wifi(); // 连接Wi-Fi client.setServer(mqtt_broker, mqtt_port); client.setCallback(callback); // 设置消息回调函数 } void loop() { if (!client.connected()) { reconnect_mqtt(); } client.loop(); unsigned long currentTime = millis(); if (currentTime - lastStatusUpdateTime > statusUpdateInterval) { publish_regular_status_update(); // Use the new wrapper } if (currentTime - lastHeartbeatTime > heartbeatInterval) { publish_heartbeat(); // Uses the new heartbeat logic } // 模拟充电过程中的电量和功率变化 (仅为演示) if (String(currentDeviceStatus) == "CHARGING") { currentEnergyConsumed += 0.01; // 假设每秒消耗0.01kWh (不精确,仅为演示) currentCurrent = 5.0; // 假设充电电流5A currentPower = (currentVoltage * currentCurrent) / 1000.0; // kW // 注意:实际项目中这些值应来自传感器或充电控制器 // 这里为了演示,每隔一段时间简单更新一下 } else { currentCurrent = 0.0; currentPower = 0.0; } delay(100); // 短暂延时,避免loop过于频繁,给其他任务一点时间 (可选) // 模拟电量消耗和位置变化 (更符合需求文档) if (strcmp(currentDeviceStatus, "CHARGING") == 0) { if (currentSimulatedBattery > 0) { // 充电时电量可以缓慢增加,或保持不变,取决于模拟逻辑 // currentSimulatedBattery = min(100, currentSimulatedBattery + 1); // 简单模拟充电增加 } // durationSeconds is calculated in publish_status_update } else if (strcmp(currentDeviceStatus, "MOVING") == 0) { if (currentSimulatedBattery > 5) { // 移动时消耗电量 // currentSimulatedBattery--; // 简单模拟电量消耗 } } else { // IDLE, COMPLETED, ERROR //电量可能不变或缓慢消耗 } // 简单模拟位置更新 (可以更复杂) // currentSimulatedLocation = ... ; // 可以在特定事件中更新 // --- 动态模拟数据更新 --- if (currentTime - lastBatteryUpdateTime > batteryUpdateInterval) { if (strcmp(currentDeviceStatus, "CHARGING") == 0) { if (currentSimulatedBattery < 100) { currentSimulatedBattery += 1; // 每 batteryUpdateInterval 增加 1% 电量 Serial.println("模拟: 电量增加至 " + String(currentSimulatedBattery) + "%"); } else { currentSimulatedBattery = 100; // 防止超过100 } } else if (strcmp(currentDeviceStatus, "MOVING") == 0) { if (currentSimulatedBattery > 2) { currentSimulatedBattery -= 2; // 每 batteryUpdateInterval 消耗 2% 电量 Serial.println("模拟: 电量消耗至 " + String(currentSimulatedBattery) + "%"); } else { currentSimulatedBattery = 2; // 防止低于2,极端情况 } } else { // IDLE, COMPLETED, FAULTED if (currentSimulatedBattery > 1) { // 非常缓慢的自然消耗,或者不消耗 // currentSimulatedBattery -= 1; } } lastBatteryUpdateTime = currentTime; } // 位置模拟: 通常在状态转换时(callback中)已经更新了主要位置。 // loop中可以添加更细致的移动中的位置更新,但目前保持简单,依赖callback中的设定。 // 例如: if (strcmp(currentDeviceStatus, "MOVING") == 0) { /* update location based on time/progress */ } delay(100); }