diff --git a/LogBook.md b/LogBook.md new file mode 100644 index 0000000..ad095e3 --- /dev/null +++ b/LogBook.md @@ -0,0 +1,6 @@ +# 变更日志 + +## YYYY-MM-DD +- 初始化项目变更日志。 +- 确认采用 Gitea Webhook + Shell 脚本的 CI/CD 方案。 +- 部署脚本 (`deploy.sh`) 和相关配置已准备。 \ No newline at end of file diff --git a/README.md b/README.md index 18710b6..c53697f 100644 --- a/README.md +++ b/README.md @@ -131,81 +131,76 @@ ### 4.2. MQTT 主题 (Topics) 约定 -#### 4.2.1. 状态上报 (单片机 -> 后端) -* **主题格式**: `charging/spot/{spotUid}/status` - * `{spotUid}`: 充电桩/车位的唯一标识符 (必须与后端数据库中 `parking_spot` 表记录的 `uid` 一致)。 -* **消息格式 (Payload)**: JSON +根据后端服务的实际MQTT通信实现,主题约定如下: + +#### 4.2.1. 上行消息 (单片机 -> 后端) + +所有类型的上行消息,包括设备状态更新、心跳以及对后端指令的执行回执 (ACK),都统一发送到以下主题: + +* **统一上行主题格式**: `yupi_mqtt_power_project/robot/status/{spotUid}` + * `{spotUid}`: 充电桩/车位的唯一标识符 (例如 `ESP32_SPOT_001`,必须与后端系统中注册的设备ID一致)。 +* **消息格式 (Payload)**: JSON。 + 消息体结构应与后端 `com.yupi.project.model.dto.mqtt.RobotStatusMessage` 类对应。通过消息体内的字段来区分具体的消息含义。 + + **示例 - 常规状态更新或心跳包 (Heartbeat):** ```json { - "spotUid": "SPOT001", - "timestamp": "2024-05-19T12:30:00Z", // ISO 8601 UTC - "status": "IDLE", + "robotUid": "ESP32_SPOT_001", + "actualRobotStatus": "IDLE", // 当前设备状态,如 IDLE, CHARGING, COMPLETED, FAULTED + // 以下为可选的详细状态,根据实际需求和后端处理逻辑添加 "voltage": 220.5, "current": 0.0, "power": 0.0, - "energyConsumed": 0.0, // 本次充电已用电量 (kWh) - "errorCode": 0, - "errorMessage": "", - "sessionId": null // 可选,充电中则为当前会话ID + "energyConsumed": 1.23, // kWh, 本次或累计,根据业务定义 + "errorCode": 0, // 设备故障码,0为正常 + "message": "Device operational", // 可选的文本消息 + "activeTaskId": null // 如果当前正在执行某个任务,可上报任务ID或会话ID } ``` -* **状态定义 (`status`)**: (根据实际项目枚举调整) - * `IDLE`: 空闲 - * `PLUGGED_IN`: 已插枪 (待启动) - * `AWAITING_START_COMMAND`: (可选) 已扫码,等待后端确认启动 - * `CHARGING`: 充电中 - * `STOPPING`: 停止中 - * `COMPLETED`: 充电完成 - * `FAULTED`: 故障 + * **心跳频率**: 建议每 30-60 秒发送一次心跳(可以是一个简化的状态更新消息)。 -#### 4.2.2. 指令下发 (后端 -> 单片机) -单片机需订阅以下主题以接收指令。 - -* **启动充电指令**: - * **主题格式**: `charging/spot/{spotUid}/command/start` - * **消息格式 (Payload)**: JSON - ```json - { - "commandId": "cmd_uuid_123", - "sessionId": 789, - "userId": 123 - } - ``` - -* **停止充电指令**: - * **主题格式**: `charging/spot/{spotUid}/command/stop` - * **消息格式 (Payload)**: JSON - ```json - { - "commandId": "cmd_uuid_678", - "sessionId": 789, - "reason": "USER_REQUEST" - } - ``` - -#### 4.2.3. 指令执行回执 (单片机 -> 后端, 推荐) -* **主题格式**: `charging/spot/{spotUid}/command/ack` -* **消息格式 (Payload)**: JSON + **示例 - 指令执行回执 (ACK):** + 当单片机执行完后端下发的指令后,应向此统一上行主题发送一个回执消息。 ```json { - "commandId": "cmd_uuid_123", - "spotUid": "SPOT001", - "status": "SUCCESS", - "message": "Charging started" + "robotUid": "ESP32_SPOT_001", + "taskId": "backend_provided_task_id_123", // 后端下发指令时提供的taskId + "status": "SUCCESS", // 或 "FAILURE" + "message": "Charging started successfully", // 对指令执行结果的描述 + "errorCode": "0", // 如果失败,提供错误码 + "actualRobotStatus": "CHARGING" // 执行指令后设备的当前状态 + // "activeTaskId": "session_xyz" // 可选,如果与特定会话相关 } ``` -#### 4.2.4. 心跳 (单片机 -> 后端) -* **主题格式**: `charging/spot/{spotUid}/heartbeat` -* **消息格式 (Payload)**: JSON +#### 4.2.2. 下行指令 (后端 -> 单片机) + +后端服务向特定单片机下发指令时,使用以下主题格式。单片机应订阅其自身的这个专属指令主题。 + +* **统一下行指令主题格式**: `yupi_mqtt_power_project/robot/command/{spotUid}` + * `{spotUid}`: 目标充电桩的唯一标识符。 +* **消息格式 (Payload)**: JSON。 + JSON消息体内部包含了具体的指令类型和所需参数。 + + **示例 - 启动充电指令:** ```json { - "spotUid": "SPOT001", - "timestamp": "2024-05-19T12:35:00Z", - "status": "IDLE" + "commandType": "START_CHARGE", // 指令类型 + "taskId": "backend_task_id_for_ack_789", // 供单片机ACK时使用的任务ID + "sessionId": "session_abc_123" // 关联的充电会话ID + // ...其他可能的参数... } ``` -* **频率**: 例如每 30-60 秒。 + + **示例 - 停止充电指令:** + ```json + { + "commandType": "STOP_CHARGE", + "taskId": "backend_task_id_for_ack_000" + // ...其他可能的参数... + } + ``` + * **单片机处理逻辑**: 单片机收到消息后,需解析JSON负载,识别 `commandType`,提取 `taskId` (用于后续ACK),并获取其他指令参数来执行相应操作。 ### 4.3. 单片机开发关键逻辑 1. **MQTT 初始化与重连机制**。 diff --git a/docs/系统开发日志/LogBook.md b/docs/系统开发日志/LogBook.md index eed8b99..00398aa 100644 --- a/docs/系统开发日志/LogBook.md +++ b/docs/系统开发日志/LogBook.md @@ -207,4 +207,18 @@ * 在生产环境中,可以根据需要设置 `NEXT_PUBLIC_API_BASE_URL` 环境变量,指向后端服务器的实际地址。 * 如果使用相对路径 `/api`,需要确保 Next.js 应用和后端服务部署在同一个域名下,或者通过反向代理(如 Nginx)将 `/api` 路径的请求转发到后端服务。 +## 2025-05-28 (基于对话日期推断) + +* **修复 MQTT ESP32 客户端编译错误**: + * **问题**: MQTT ESP32 客户端代码 (`mqtt_esp32_client.ino`) 出现多个编译错误: + 1. 第113行存在转义字符错误 `stray '\' in program` 和 `missing terminating ' character` + 2. `publish_status_update()` 和 `publish_heartbeat()` 函数中使用了不存在的 `WiFi.getTime()` 方法 + * **解决方案**: + 1. 修复了第113行的字符串结束符,将 `\'\\0\'` 改为正确的 `'\0'` + 2. 替换了不存在的 `WiFi.getTime()` 方法,改用 `millis()` 函数作为时间戳 + * **预期效果**: ESP32 代码现可以正常编译,设备可以连接到 MQTT 代理并发送状态更新和心跳消息 + * **注意事项**: + * 使用 `millis()` 作为时间戳只能表示设备启动后的毫秒数,不是实际的日期时间 + * 如需准确时间,可考虑添加 NTP 客户端功能或使用 RTC 模块 + --- \ No newline at end of file diff --git a/mqtt_esp32_client/mqtt_esp32_client.ino b/mqtt_esp32_client/mqtt_esp32_client.ino index b7e1ff7..0bebbed 100644 --- a/mqtt_esp32_client/mqtt_esp32_client.ino +++ b/mqtt_esp32_client/mqtt_esp32_client.ino @@ -4,8 +4,8 @@ // ----------- 设备配置 (需要为您自己的设备和环境修改) ----------- // WiFi -const char *ssid = "xxxxx"; // 请输入您的 Wi-Fi 名称 -const char *password = "xxxxx"; // 请输入您的 Wi-Fi 密码 +const char *ssid = "UFI_DB50CD"; // 请输入您的 Wi-Fi 名称 +const char *password = "1349534012"; // 请输入您的 Wi-Fi 密码 // MQTT Broker const char *mqtt_broker = "broker.emqx.io"; // 您的 MQTT Broker 地址 @@ -17,15 +17,9 @@ const int mqtt_port = 1883; // 您的 MQTT 端口 const char *spotUid = "ESP32_SPOT_001"; // 例如: "SPOT001", "P005-A1" 等 // ----------- MQTT 主题定义 ----------- -// 上行 (ESP32 -> 后端) -String topic_status_update; // 状态上报: charging/spot/{spotUid}/status -String topic_command_ack; // 指令回执: charging/spot/{spotUid}/command/ack -String topic_heartbeat; // 心跳: charging/spot/{spotUid}/heartbeat - -// 下行 (后端 -> ESP32, ESP32需要订阅这些) -String topic_command_start; // 启动充电: charging/spot/{spotUid}/command/start -String topic_command_stop; // 停止充电: charging/spot/{spotUid}/command/stop -// String topic_command_query_status; // 查询状态 (可选): charging/spot/{spotUid}/command/query_status +// 基于 application.yml 和后端服务实现 +String topic_uplink_to_backend; // 上报给后端: yupi_mqtt_power_project/robot/status/{spotUid} +String topic_downlink_from_backend; // 从后端接收指令: yupi_mqtt_power_project/robot/command/{spotUid} // ----------- 全局变量 ----------- WiFiClient espClient; @@ -47,20 +41,15 @@ const long statusUpdateInterval = 30000; // 状态上报间隔 (例如: 30秒) const long heartbeatInterval = 60000; // 心跳间隔 (例如: 60秒) void setup_mqtt_topics() { - String baseTopic = "charging/spot/" + String(spotUid) + "/"; - topic_status_update = baseTopic + "status"; - topic_command_ack = baseTopic + "command/ack"; - topic_heartbeat = baseTopic + "heartbeat"; + String backend_status_base = "yupi_mqtt_power_project/robot/status/"; + String backend_command_base = "yupi_mqtt_power_project/robot/command/"; - topic_command_start = baseTopic + "command/start"; - topic_command_stop = baseTopic + "command/stop"; - // topic_command_query_status = baseTopic + "command/query_status"; - Serial.println("MQTT 主题初始化完成:"); - Serial.println(" 状态上报: " + topic_status_update); - Serial.println(" 指令回执: " + topic_command_ack); - Serial.println(" 心跳: " + topic_heartbeat); - Serial.println(" 订阅启动指令: " + topic_command_start); - Serial.println(" 订阅停止指令: " + topic_command_stop); + topic_uplink_to_backend = backend_status_base + String(spotUid); + topic_downlink_from_backend = backend_command_base + String(spotUid); + + Serial.println("MQTT 主题初始化完成 (匹配后端实现):"); + Serial.println(" 上行主题 (状态/心跳/ACK): " + topic_uplink_to_backend); + Serial.println(" 下行主题 (接收指令): " + topic_downlink_from_backend); } void connect_wifi() { @@ -85,15 +74,11 @@ void reconnect_mqtt() { if (client.connect(client_id.c_str(), mqtt_username, mqtt_password)) { Serial.println("MQTT Broker 连接成功!"); - // 重新订阅主题 - client.subscribe(topic_command_start.c_str()); - Serial.println("已订阅: " + topic_command_start); - client.subscribe(topic_command_stop.c_str()); - Serial.println("已订阅: " + topic_command_stop); - // if (topic_command_query_status.length() > 0) client.subscribe(topic_command_query_status.c_str()); - - // (可选) 连接成功后立即发送一次状态 - publish_status_update(); + // 订阅唯一下行指令主题 + client.subscribe(topic_downlink_from_backend.c_str()); + Serial.println("已订阅指令主题: " + topic_downlink_from_backend); + + publish_status_update(false, nullptr, nullptr, nullptr, nullptr, nullptr); // 连接成功后发送一次常规状态更新 } else { Serial.print("连接失败, rc="); Serial.print(client.state()); @@ -110,11 +95,16 @@ void callback(char *topic, byte *payload, unsigned int length) { char message[length + 1]; memcpy(message, payload, length); - message[length] = \'\\0\'; // 添加字符串结束符 + message[length] = '\0'; Serial.print("消息内容: "); Serial.println(message); - StaticJsonDocument<256> doc; // 适当调整JSON文档大小 + if (String(topic) != topic_downlink_from_backend) { + Serial.println("消息非来自预期的指令主题,忽略。"); + return; + } + + StaticJsonDocument<256> doc; DeserializationError error = deserializeJson(doc, message); if (error) { @@ -123,38 +113,60 @@ void callback(char *topic, byte *payload, unsigned int length) { return; } - const char* commandId = doc["commandId"]; // 提取 commandId 用于回执 + const char* cmdType = doc["commandType"]; // 例如: "START_CHARGE", "STOP_CHARGE" + const char* taskId = doc["taskId"]; // 用于ACK - // 根据主题处理指令 - if (String(topic) == topic_command_start) { + if (cmdType == nullptr || taskId == nullptr) { + Serial.println("指令JSON缺少 commandType 或 taskId 字段。"); + publish_ack_message(taskId, false, "Command JSON invalid", nullptr); // 尝试ACK错误 + return; + } + + if (strcmp(cmdType, "MOVE_TO_SPOT") == 0) { + Serial.println("接收到 [移动到车位] 指令"); + // const char* targetSpotUid = doc["target_spot_uid"]; // 可选: 从payload中获取目标车位ID (如果存在且需要进一步处理) + // if (targetSpotUid) { + // Serial.println("目标车位UID: " + String(targetSpotUid)); + // } + // 模拟机器人移动到指定位置的动作 + Serial.println("模拟: 机器人正在移动到目标车位..."); + delay(1000); // 模拟移动耗时 (缩短演示时间) + Serial.println("模拟: 机器人已到达目标车位。"); + publish_ack_message(taskId, true, "Robot arrived at spot (simulated)", nullptr); + // 注意:此时设备状态 currentDeviceStatus 可以保持不变,或根据业务逻辑更新 + // 例如: currentDeviceStatus = "IDLE_AT_SPOT"; + // 如果需要,可以立即上报一次状态: publish_regular_status_update(); + + } else if (strcmp(cmdType, "START_CHARGE") == 0) { Serial.println("接收到 [启动充电] 指令"); - // 实际硬件操作: 启动充电 - // 例如: digitalWrite(RELAY_PIN, HIGH); currentDeviceStatus = "CHARGING"; if (doc.containsKey("sessionId")) { currentSessionId = String(doc["sessionId"].as()); + } else { + currentSessionId = ""; //确保没有sessionId时清空 } Serial.println("模拟: 充电已启动。会话ID: " + currentSessionId); - publish_command_ack(commandId, true, "Charging started successfully"); - publish_status_update(); // 立即更新状态 + publish_ack_message(taskId, true, "Charging started successfully", currentSessionId.c_str()); + publish_status_update(false, nullptr, nullptr, nullptr, nullptr, nullptr); // 立即更新状态 - } else if (String(topic) == topic_command_stop) { + } else if (strcmp(cmdType, "STOP_CHARGE") == 0) { Serial.println("接收到 [停止充电] 指令"); - // 实际硬件操作: 停止充电 - // 例如: digitalWrite(RELAY_PIN, LOW); - currentDeviceStatus = "COMPLETED"; // 或者 "IDLE",取决于逻辑 + currentDeviceStatus = "COMPLETED"; Serial.println("模拟: 充电已停止。"); - publish_command_ack(commandId, true, "Charging stopped successfully"); - currentSessionId = ""; // 清除会话ID - publish_status_update(); // 立即更新状态 + String previousSessionId = currentSessionId; // 保存一下,以防ACK需要 + currentSessionId = ""; + publish_ack_message(taskId, true, "Charging stopped successfully", previousSessionId.c_str()); + publish_status_update(false, nullptr, nullptr, nullptr, nullptr, nullptr); // 立即更新状态 } - // else if (String(topic) == topic_command_query_status) { - // Serial.println("接收到 [查询状态] 指令"); - // publish_status_update(); // 回复当前状态 - // publish_command_ack(commandId, true, "Status reported"); + // Add other commandType handling here if needed, e.g., "QUERY_STATUS" + // else if (strcmp(cmdType, "QUERY_STATUS") == 0) { + // Serial.println("接收到 [查询状态] 指令"); + // publish_regular_status_update(); // 回复当前状态 + // publish_ack_message(taskId, true, "Status reported", nullptr); // } else { - Serial.println("未知指令主题"); + Serial.println("未知指令 commandType: " + String(cmdType)); + publish_ack_message(taskId, false, ("Unknown commandType: " + String(cmdType)).c_str(), nullptr); } Serial.println("-----------------------"); } @@ -176,48 +188,70 @@ void publish_message(const String& topic, const JsonDocument& doc, const char* m } } -void publish_status_update() { - StaticJsonDocument<512> doc; // 适当调整JSON文档大小 - doc["spotUid"] = spotUid; - doc["timestamp"] = String(WiFi.getTime()); // 需要NTP同步时间才能获取正确UTC时间,此处仅为示例 - doc["status"] = currentDeviceStatus; - doc["voltage"] = currentVoltage; - doc["current"] = currentCurrent; - doc["power"] = currentPower; - doc["energyConsumed"] = currentEnergyConsumed; - doc["errorCode"] = currentErrorCode; - if (currentSessionId.length() > 0) { - doc["sessionId"] = currentSessionId; - } else { - doc["sessionId"] = nullptr; // 或者不包含此字段 +// isAckOrTaskUpdate: true if this is an ACK or a task-specific update, false for general status/heartbeat +// ackTaskId: The taskId if this is an ACK for a command. Null otherwise. +void publish_status_update(bool isAckOrTaskUpdate, const char* ackTaskId, const char* ackStatus, const char* ackMessage, const char* ackErrorCode, const char* ackSessionId) { + StaticJsonDocument<512> doc; + doc["robotUid"] = spotUid; + + if (isAckOrTaskUpdate) { + if (ackTaskId) doc["taskId"] = ackTaskId; + if (ackStatus) doc["status"] = ackStatus; // e.g., "SUCCESS", "FAILURE" or task-specific status + if (ackMessage) doc["message"] = ackMessage; + if (ackErrorCode) doc["errorCode"] = ackErrorCode; + // actualRobotStatus should still be sent to reflect current state after ACK + doc["actualRobotStatus"] = currentDeviceStatus; + if (ackSessionId && strlen(ackSessionId) > 0) doc["activeTaskId"] = ackSessionId; // Assuming activeTaskId can hold sessionId for context in ACKs + // Or, if RobotStatusMessage is extended for sessionId in future. + // For now, activeTaskId might be a way to correlate, or it might be ignored by backend for ACKs. + + } else { // General status update / heartbeat + doc["actualRobotStatus"] = currentDeviceStatus; + doc["voltage"] = currentVoltage; // Example: Add these if backend expects them with general status + doc["current"] = currentCurrent; + doc["power"] = currentPower; + doc["energyConsumed"] = currentEnergyConsumed; + doc["errorCode"] = currentErrorCode; // General device error code + if (currentSessionId.length() > 0) { + // For general status, if a session is active, it might be relevant as activeTaskId + // This depends on how backend interprets activeTaskId outside of specific task ACKs. + doc["activeTaskId"] = currentSessionId; // Or a more generic field if RobotStatusMessage evolves + } } - - publish_message(topic_status_update, doc, "状态更新"); - lastStatusUpdateTime = millis(); + // Common fields (timestamp can be added by backend or here if NTP is used) + // doc["timestamp"] = String(millis()); // Already using millis() + + publish_message(topic_uplink_to_backend, doc, isAckOrTaskUpdate ? "ACK/TaskUpdate" : "StatusUpdate"); + if (!isAckOrTaskUpdate) { // Only update lastStatusUpdateTime for general status updates, not for ACKs triggered by commands + lastStatusUpdateTime = millis(); + } +} + +void publish_regular_status_update() { + // This is a wrapper for general periodic status updates + publish_status_update(false, nullptr, nullptr, nullptr, nullptr, nullptr); } void publish_heartbeat() { - StaticJsonDocument<256> doc; // 适当调整JSON文档大小 - doc["spotUid"] = spotUid; - doc["timestamp"] = String(WiFi.getTime()); // 同上,NTP时间问题 - doc["status"] = currentDeviceStatus; // 心跳中也带上当前状态 - - publish_message(topic_heartbeat, doc, "心跳"); + StaticJsonDocument<256> doc; // Heartbeat can be simpler + doc["robotUid"] = spotUid; + doc["actualRobotStatus"] = currentDeviceStatus; // Heartbeat includes current status + // Optionally, add a specific "messageType": "HEARTBEAT" if backend needs explicit differentiation + // beyond just a minimal status update. For now, relying on RobotStatusMessage structure. + + publish_message(topic_uplink_to_backend, doc, "Heartbeat"); lastHeartbeatTime = millis(); } -void publish_command_ack(const char* commandId, bool success, const char* message) { - if (!commandId || strlen(commandId) == 0) { - Serial.println("无法发送ACK: commandId 为空"); - return; +// Simplified ACK message function +void publish_ack_message(const char* taskId, bool success, const char* message, const char* sessionIdForAckContext) { + if (!taskId || strlen(taskId) == 0) { + Serial.println("无法发送ACK: taskId 为空"); + // Potentially send a general error status if appropriate, but usually ACK needs a taskId + return; } - StaticJsonDocument<256> doc; // 适当调整JSON文档大小 - doc["commandId"] = commandId; - doc["spotUid"] = spotUid; - doc["status"] = success ? "SUCCESS" : "FAILURE"; - doc["message"] = message; - - publish_message(topic_command_ack, doc, "指令回执"); + // Use the main publish_status_update function formatted as an ACK + publish_status_update(true, taskId, success ? "SUCCESS" : "FAILURE", message, success ? "0" : "GENERAL_ERROR_ON_ACK", sessionIdForAckContext); } void setup() { @@ -233,22 +267,18 @@ void setup() { void loop() { if (!client.connected()) { - reconnect_mqtt(); // 如果MQTT未连接,则重连 + reconnect_mqtt(); } - client.loop(); // 维持MQTT连接和处理消息 + client.loop(); unsigned long currentTime = millis(); - // 定时发送状态更新 if (currentTime - lastStatusUpdateTime > statusUpdateInterval) { - // 在实际项目中,这里应该先更新 currentVoltage, currentCurrent 等状态值 - // 例如: currentVoltage = readVoltageSensor(); - publish_status_update(); + publish_regular_status_update(); // Use the new wrapper } - // 定时发送心跳 if (currentTime - lastHeartbeatTime > heartbeatInterval) { - publish_heartbeat(); + publish_heartbeat(); // Uses the new heartbeat logic } // 模拟充电过程中的电量和功率变化 (仅为演示) diff --git a/springboot-init-main/src/main/java/com/yupi/project/service/impl/MqttServiceImpl.java b/springboot-init-main/src/main/java/com/yupi/project/service/impl/MqttServiceImpl.java index 1efd169..e13d4e5 100644 --- a/springboot-init-main/src/main/java/com/yupi/project/service/impl/MqttServiceImpl.java +++ b/springboot-init-main/src/main/java/com/yupi/project/service/impl/MqttServiceImpl.java @@ -17,14 +17,19 @@ import org.springframework.transaction.annotation.Transactional; import java.nio.charset.StandardCharsets; import java.util.Date; +/** + * MQTT 服务实现类 + * 负责向 MQTT Broker 发送指令给机器人/设备 + */ @Service @Slf4j public class MqttServiceImpl implements MqttService { - private final MqttClient mqttClient; // Autowired by Spring from MqttConfig - private final MqttProperties mqttProperties; - private final RobotTaskService robotTaskService; + private final MqttClient mqttClient; // 由Spring从MqttConfig注入的MQTT客户端实例 + private final MqttProperties mqttProperties; // MQTT相关的配置属性 + private final RobotTaskService robotTaskService; // 机器人任务服务,用于创建和更新任务状态 + // 构造函数注入依赖 public MqttServiceImpl(@Qualifier("mqttClientBean") MqttClient mqttClient, MqttProperties mqttProperties, RobotTaskService robotTaskService) { @@ -33,67 +38,79 @@ public class MqttServiceImpl implements MqttService { this.robotTaskService = robotTaskService; } + /** + * 发送指令给指定的机器人/设备 + * + * @param robotId 目标机器人的唯一标识符 + * @param commandType 指令类型 (枚举) + * @param payloadJson 指令的JSON格式负载 + * @param sessionId (可选) 与此指令关联的充电会话ID + * @return 指令是否发送成功 (注意:这仅表示MQTT消息已发布,不代表设备已执行) + * @throws Exception 如果发送过程中发生MQTT异常或其他意外异常 + */ @Override - @Transactional(rollbackFor = Exception.class) // Ensure rollback if MQTT publish fails after task creation + @Transactional(rollbackFor = Exception.class) // 如果MQTT发布失败或后续操作失败,确保事务回滚(例如任务创建) public boolean sendCommand(String robotId, CommandTypeEnum commandType, String payloadJson, Long sessionId) throws Exception { - log.info("Attempting to send command {} to robot {} with payload: {}", commandType, robotId, payloadJson); + log.info("尝试向机器人 {} 发送指令 {},负载: {}", commandType, robotId, payloadJson); - // 1. Check if the robot has pending or already sent tasks + // 步骤1:首先检查机器人是否已有正在处理(PENDING)或已发送(SENT)的任务 if (robotTaskService.hasPendingOrSentTask(robotId)) { - // 添加优先级处理:STOP_CHARGE 命令应该可以覆盖其他任务 + // 特殊处理:STOP_CHARGE (停止充电) 指令具有高优先级,可以覆盖其他任务 if (CommandTypeEnum.STOP_CHARGE.equals(commandType)) { - log.info("Robot {} has pending tasks, but STOP_CHARGE is a priority command, proceeding anyway.", robotId); + log.info("机器人 {} 有正在处理的任务,但 STOP_CHARGE 是优先指令,继续执行。", robotId); } else { - log.warn("Robot {} is busy (has PENDING or SENT tasks). Command {} aborted.", robotId, commandType); - return false; + log.warn("机器人 {} 正忙 (存在PENDING或SENT状态的任务),指令 {} 已中止。", robotId, commandType); + return false; // 机器人忙,不发送新指令 (除非是停止指令) } } - // 2. Create a new task in PENDING state + // 步骤2:如果机器人不忙(或指令是STOP_CHARGE),再创建新任务 RobotTask task = robotTaskService.createTask(robotId, commandType, payloadJson, sessionId); if (task == null || task.getId() == null) { - log.error("Failed to create RobotTask for command {} to robot {}. Aborting send.", commandType, robotId); - return false; + log.error("为机器人 {} 的指令 {} 创建RobotTask失败。发送操作中止。", commandType, robotId); + return false; // 任务创建失败,不继续发送 } - log.info("Created PENDING RobotTask with ID: {} for command {} to robot {}.", task.getId(), commandType, robotId); + log.info("已为机器人 {} 的指令 {} 创建 PENDING 状态的 RobotTask,任务ID: {}。", commandType, robotId, task.getId()); - // 3. Construct the topic + // 3. 构建目标MQTT主题 + // 主题格式: [mqttProperties.commandTopicBase]/[robotId] + // 例如: yupi_mqtt_power_project/robot/command/ESP32_SPOT_001 String topic = mqttProperties.getCommandTopicBase() + "/" + robotId; - // 4. Prepare the MQTT message - String effectivePayload = (payloadJson == null) ? "" : payloadJson; + // 4. 准备MQTT消息 + String effectivePayload = (payloadJson == null) ? "" : payloadJson; // 确保payload不为null MqttMessage mqttMessage = new MqttMessage(effectivePayload.getBytes(StandardCharsets.UTF_8)); - mqttMessage.setQos(mqttProperties.getDefaultQos()); - // mqttMessage.setRetained(false); // Default is false + mqttMessage.setQos(mqttProperties.getDefaultQos()); // 设置QoS级别,从配置中读取 + // mqttMessage.setRetained(false); // 保留消息标志,默认为false,通常指令不需要保留 try { - // 5. Publish the message + // 5. 发布MQTT消息 if (!mqttClient.isConnected()) { - log.error("MQTT client is not connected. Cannot send command {} to robot {}. Task ID: {}", commandType, robotId, task.getId()); - // Task remains PENDING. A scheduled job might retry or mark as error later. + log.error("MQTT客户端未连接。无法向机器人 {} 发送指令 {}。任务ID: {}", commandType, robotId, task.getId()); + // 任务将保持PENDING状态。后续可由定时任务重试或标记为错误。 return false; } - log.debug("Publishing to topic: {}, QoS: {}, Payload: {}", topic, mqttMessage.getQos(), effectivePayload); + log.debug("正在向主题发布消息: {}, QoS: {}, 负载: {}", topic, mqttMessage.getQos(), effectivePayload); mqttClient.publish(topic, mqttMessage); - log.info("Successfully published command {} to robot {} on topic {}. Task ID: {}", commandType, robotId, topic, task.getId()); + log.info("成功向机器人 {} 在主题 {} 上发布指令 {}。任务ID: {}", commandType, robotId, topic, task.getId()); - // 6. Mark the task as SENT + // 6. 将机器人任务的状态标记为 SENT (已发送) boolean markedAsSent = robotTaskService.markTaskAsSent(task.getId(), new Date()); if (!markedAsSent) { - log.warn("Command {} published to robot {}, but failed to mark task {} as SENT. System might be in an inconsistent state.", + log.warn("指令 {} 已发布给机器人 {},但将任务 {} 标记为 SENT 失败。系统可能处于不一致状态。", commandType, robotId, task.getId()); - // This is a critical warning. The command was sent, but DB state doesn't reflect it. + // 这是一个严重警告。指令已发送,但数据库状态未正确反映。 } - return true; // Command sent and task status update was attempted + return true; // 指令已发送,并尝试更新了任务状态 } catch (MqttException e) { - log.error("MqttException while publishing command {} to robot {} (Task ID: {}). Error: {}", + log.error("向机器人 {} 发布指令 {} (任务ID: {}) 时发生MqttException。错误: {}", commandType, robotId, task.getId(), e.getMessage(), e); - // The RobotTask remains PENDING. A retry mechanism or a timeout job could handle this later. - throw e; // Re-throw for transactional rollback + // RobotTask 将保持 PENDING 状态。重试机制或超时任务后续可以处理此问题。 + throw e; // 重新抛出异常,以便Spring的@Transactional进行事务回滚 } catch (Exception e) { - log.error("Unexpected exception while sending command {} to robot {} (Task ID: {}). Error: {}", + log.error("向机器人 {} 发送指令 {} (任务ID: {}) 时发生意外异常。错误: {}", commandType, robotId, task.getId(), e.getMessage(), e); - throw e; // Re-throw for transactional rollback + throw e; // 重新抛出异常,以便事务回滚 } } } \ No newline at end of file