From 5ea13d6dea5e1c3ef1c8b00706e24529fb596a3a Mon Sep 17 00:00:00 2001 From: lingyunxsh Date: Tue, 13 May 2025 22:19:18 +0800 Subject: [PATCH] =?UTF-8?q?mqtt=E9=9B=86=E6=88=90=E5=88=9D=E6=AD=A5?= =?UTF-8?q?=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- LogBook.md | 44 ++- .../stage_2_mqtt_integration.md | 4 +- .../stage_3_core_charging_logic.md | 7 + .../java/com/yupi/project/MyApplication.java | 2 + .../yupi/project/mapper/RobotTaskMapper.java | 14 + .../model/dto/mqtt/RobotStatusMessage.java | 13 + .../yupi/project/model/entity/RobotTask.java | 51 +++ .../project/model/enums/CommandTypeEnum.java | 22 ++ .../model/enums/RobotTaskStatusEnum.java | 39 ++ .../project/mqtt/MqttCallbackHandler.java | 25 +- .../com/yupi/project/service/MqttService.java | 23 ++ .../project/service/RobotTaskService.java | 105 ++++++ .../service/impl/MqttMessageHandler.java | 107 ++++++ .../project/service/impl/MqttServiceImpl.java | 86 +++++ .../service/impl/RobotTaskServiceImpl.java | 333 ++++++++++++++++++ .../service/impl/TaskTimeoutHandler.java | 50 +++ .../src/main/resources/application.yml | 5 +- 17 files changed, 897 insertions(+), 33 deletions(-) create mode 100644 springboot-init-main/src/main/java/com/yupi/project/mapper/RobotTaskMapper.java create mode 100644 springboot-init-main/src/main/java/com/yupi/project/model/dto/mqtt/RobotStatusMessage.java create mode 100644 springboot-init-main/src/main/java/com/yupi/project/model/entity/RobotTask.java create mode 100644 springboot-init-main/src/main/java/com/yupi/project/model/enums/CommandTypeEnum.java create mode 100644 springboot-init-main/src/main/java/com/yupi/project/model/enums/RobotTaskStatusEnum.java create mode 100644 springboot-init-main/src/main/java/com/yupi/project/service/MqttService.java create mode 100644 springboot-init-main/src/main/java/com/yupi/project/service/RobotTaskService.java create mode 100644 springboot-init-main/src/main/java/com/yupi/project/service/impl/MqttMessageHandler.java create mode 100644 springboot-init-main/src/main/java/com/yupi/project/service/impl/MqttServiceImpl.java create mode 100644 springboot-init-main/src/main/java/com/yupi/project/service/impl/RobotTaskServiceImpl.java create mode 100644 springboot-init-main/src/main/java/com/yupi/project/service/impl/TaskTimeoutHandler.java diff --git a/LogBook.md b/LogBook.md index f8f8147..0b3ce38 100644 --- a/LogBook.md +++ b/LogBook.md @@ -13,19 +13,37 @@ - 创建了 `springboot-init-main/src/main/java/com/yupi/project/config/properties/MqttProperties.java` 来映射 MQTT 配置。 3. **更新开发文档**: - 修改了 `springboot-init-main/doc/development_stages/stage_2_mqtt_integration.md`,反映了公共 Broker 的使用、Topic 唯一性策略以及应用层鉴权的重要性。 - 4. **实现 MQTT 客户端核心配置 (`MqttConfig.java` 和 `MqttCallbackHandler.java`)**: + 4. **实现 MQTT 客户端核心配置 (`MqttConfig.java`, `MqttCallbackHandler.java`, `MqttConnectionManager.java`)**: - 创建了 `com.yupi.project.mqtt.MqttCallbackHandler` 类,实现 `MqttCallbackExtended`接口,用于处理连接事件和初步的消息接收(日志记录)。在 `connectComplete` 中实现订阅状态主题 `yupi_mqtt_power_project/robot/status/+` 的逻辑。 - - 创建了 `com.yupi.project.config.MqttConfig` 配置类: - - 注入 `MqttProperties` 和 `MqttCallbackHandler`。 - - 定义了 `MqttConnectOptions` Bean,配置连接参数(如自动重连、Clean Session、超时等)。 - - 定义了 `MqttClient` Bean,使用唯一的客户端ID,并设置 `MqttCallbackHandler`。 - - 使用 `@PostConstruct` 实现了应用启动时自动连接到 MQTT Broker 的逻辑。 - - 使用 `@PreDestroy` 实现了应用关闭时断开 MQTT 连接的逻辑。 + - 创建了 `com.yupi.project.config.MqttConfig` 配置类,定义 `MqttConnectOptions` 和 `MqttClient` Beans。 + - 创建了 `com.yupi.project.mqtt.MqttConnectionManager` 类,实现 `ApplicationListener` 和 `DisposableBean`,在应用启动完成后连接 MQTT,并在应用关闭前断开连接。解决了 MqttClient 初始化和连接时序问题。 + 5. **创建 `RobotTask` 管理基础结构**: + - 在 `com.yupi.project.model.enums` 包下创建了 `CommandTypeEnum.java` 和 `TaskStatusEnum.java`。 + - 在 `com.yupi.project.model.entity` 包下创建了 `RobotTask.java` 实体类,包含 MyBatis-Plus 注解。 + - 在 `com.yupi.project.mapper` 包下创建了 `RobotTaskMapper.java` 接口。 + - 在 `com.yupi.project.service` 包下创建了 `RobotTaskService.java` 接口,定义了任务管理的核心方法。 + - 在 `com.yupi.project.service.impl` 包下创建了 `RobotTaskServiceImpl.java` 类,并为接口方法提供了最小化占位实现。 + 6. **详细实现 `RobotTaskServiceImpl` 中的核心业务方法**: + - `createTask(String robotId, CommandTypeEnum commandType, String payloadJson, Long sessionId)`: 创建新的机器人任务,初始状态为 PENDING。 + - `hasPendingOrSentTask(String robotId)`: 检查机器人是否有 PENDING 或 SENT 状态的任务。 + - `markTaskAsSent(Long taskId, Date sentTime)`: 将任务状态从 PENDING 更新为 SENT,并记录发送时间。 + - `findLatestSentTaskByRobotId(String robotId)`: 查找指定机器人最近一个 SENT 状态的任务。 + - `markTaskAsAcknowledged(Long taskId, boolean success, String errorMessage, Date ackTime)`: 根据机器人响应更新任务状态为 ACKNOWLEDGED_SUCCESS 或 ACKNOWLEDGED_FAILURE,并记录确认时间和错误信息。 + - `findAndMarkTimedOutTasks(int timeoutSeconds)`: 查找并标记已发送但超时的任务为 TIMED_OUT。 - **下一步计划 (依据 `stage_2_mqtt_integration.md`)**: - 1. **实现 `RobotTask` 管理 (`RobotTaskService`)**: - - 创建 `RobotTask` 实体、Mapper 接口。 - - 定义并实现 `RobotTaskService` 接口中的方法 (如创建任务、更新任务状态、查找任务等)。 - 2. **实现消息发布 (`MqttService`)**: 用于向机器人发送指令,并与 `RobotTaskService` 交互记录任务。 - 3. **实现消息处理 (`MqttMessageHandler`)**: 详细处理从机器人接收到的状态更新,并与 `RobotTaskService` 交互更新任务状态。 - 4. **实现任务超时处理 (`TaskTimeoutHandler`)**: 定时检查并处理超时的 MQTT 任务。 \ No newline at end of file + 1. **实现消息发布 (`MqttService`)**: + - 创建 `MqttService` 接口和 `MqttServiceImpl` 实现类。 + - 实现 `sendCommand(...)` 方法,该方法会调用 `RobotTaskService.hasPendingOrSentTask` 进行检查,调用 `RobotTaskService.createTask` 创建任务,然后通过 `MqttClient` 发布指令,最后调用 `RobotTaskService.markTaskAsSent` 更新任务状态。 + 2. **实现消息处理 (`MqttMessageHandler`)**: + - 创建 `MqttMessageHandler` 接口和 `MqttMessageHandlerImpl` 实现类 (之前 `MqttCallbackHandler` 中有占位,现在需要具体实现)。 + - 实现 `handleStatusUpdate(String topic, String payload)` 方法,解析机器人状态,查找关联的 `RobotTask`,并调用 `RobotTaskService.markTaskAsAcknowledged` 更新任务。 + - 根据机器人状态执行后续业务逻辑 (此阶段可留空或简单日志记录)。 + 3. **实现任务超时处理 (`TaskTimeoutHandler`)**: + - 创建 `TaskTimeoutHandler` 类,使用 `@Scheduled` 定时调用 `RobotTaskService.findAndMarkTimedOutTasks`。 + - (可选)根据超时任务更新关联的业务实体状态 (如 `ChargingRobot`, `ChargingSession`)。 + +- 更正了 `springboot-init-main/src/main/java/com/yupi/project/service/impl/MqttServiceImpl.java` 的实现,确保 `sendCommand` 方法的逻辑完整和正确。 +- 实现 `TaskTimeoutHandler.java`,使用 `@Scheduled` 定时调用 `RobotTaskService.findAndMarkTimedOutTasks` 处理任务超时。 +- 在 `MyApplication.java` 中添加 `@EnableScheduling` 以启用定时任务。 +- 在 `application.yml` 中添加了 `mqtt.task.timeoutSeconds` 和 `mqtt.task.timeoutCheckRateMs` 配置项。 \ No newline at end of file diff --git a/springboot-init-main/doc/development_stages/stage_2_mqtt_integration.md b/springboot-init-main/doc/development_stages/stage_2_mqtt_integration.md index fd763f2..58045bb 100644 --- a/springboot-init-main/doc/development_stages/stage_2_mqtt_integration.md +++ b/springboot-init-main/doc/development_stages/stage_2_mqtt_integration.md @@ -120,13 +120,13 @@ * 处理 JSON 解析异常、任务未找到等情况。 7. **实现任务超时处理 (`TaskTimeoutHandler`)**: * 创建 `src/main/java/com/yupi/project/schedule/TaskTimeoutHandler.java` (或放在 service 包)。 - * 注入 `RobotTaskService`, 以及可能需要更新状态的 `ChargingRobotService`, `ChargingSessionService`。 + * 注入 `RobotTaskService`。 * 添加 `@Component` 和 `@EnableScheduling` (在主启动类或配置类上)。 * 创建一个方法,使用 `@Scheduled(fixedRate = 60000)` (例如每分钟执行一次)。 * 在该方法中: * 调用 `robotTaskService.findAndMarkTimedOutTasks(timeoutSeconds)` (例如超时设为 120 秒)。 * 获取超时的任务列表。 - * 对每个超时任务,根据业务逻辑更新关联的 `charging_robot` (如设为 `offline` 或 `error`) 和 `charging_session` (如设为 `error`) 的状态。 + * **注意**: 对每个超时任务,根据业务逻辑更新关联的 `charging_robot` 和 `charging_session` 状态的逻辑,将依赖于第三阶段实现的 `ChargingRobotService` 和 `ChargingSessionService`。本阶段仅标记 `RobotTask` 自身状态。 * 添加日志记录。 ## 4. 前端 (移动端 App) 开发详解 diff --git a/springboot-init-main/doc/development_stages/stage_3_core_charging_logic.md b/springboot-init-main/doc/development_stages/stage_3_core_charging_logic.md index cc6d586..641d8e2 100644 --- a/springboot-init-main/doc/development_stages/stage_3_core_charging_logic.md +++ b/springboot-init-main/doc/development_stages/stage_3_core_charging_logic.md @@ -31,6 +31,13 @@ * **Entity**: 创建 `ChargingSession.java`。 * **Mapper**: 创建 `ChargingSessionMapper.java`。 * **Service**: 创建 `ChargingSessionService.java` 接口和实现类。包含创建、更新状态、记录时间/费用、查询用户历史记录等方法。 + * **注意**: 实现此 Service 后,需要回顾第二阶段的 `TaskTimeoutHandler.java`,补充当 `RobotTask` 超时后,调用此 Service 更新关联 `ChargingSession` 状态的逻辑。 + * **创建 `ChargingController`**: + * 注入 `ChargingService`。 + * 实现 `POST /api/charging/request` 接口: 调用 `chargingService.requestCharging(...)`。 + * 实现 `POST /api/charging/stop` 接口: 调用 `chargingService.stopChargingByUser(...)`。 + * 实现 `GET /api/charging/history` 接口: 调用 `chargingSessionService.getUserHistory(...)`。 + * 实现 `GET /api/charging/sessions` 接口 (管理员): 调用 `chargingSessionService.getAllSessions(...)`。 4. **充电流程实现**: * **创建 `ChargingService` (核心业务编排)**: 创建接口 `ChargingService.java` 和实现 `ChargingServiceImpl.java`。 * 注入 `UserService`, `ChargingRobotService`, `ParkingSpotService`, `ChargingSessionService`, `MqttService`, `RobotTaskService`。 diff --git a/springboot-init-main/src/main/java/com/yupi/project/MyApplication.java b/springboot-init-main/src/main/java/com/yupi/project/MyApplication.java index 1797977..850309a 100644 --- a/springboot-init-main/src/main/java/com/yupi/project/MyApplication.java +++ b/springboot-init-main/src/main/java/com/yupi/project/MyApplication.java @@ -3,9 +3,11 @@ package com.yupi.project; import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication @MapperScan("com.yupi.project.mapper") +@EnableScheduling public class MyApplication { public static void main(String[] args) { diff --git a/springboot-init-main/src/main/java/com/yupi/project/mapper/RobotTaskMapper.java b/springboot-init-main/src/main/java/com/yupi/project/mapper/RobotTaskMapper.java new file mode 100644 index 0000000..b718790 --- /dev/null +++ b/springboot-init-main/src/main/java/com/yupi/project/mapper/RobotTaskMapper.java @@ -0,0 +1,14 @@ +package com.yupi.project.mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.yupi.project.model.entity.RobotTask; + +/** +* @author Yupi +* @description 针对表【robot_task(机器人指令任务表)】的数据库操作Mapper +* @createDate 2023-12-02 22:00:00 +* @Entity com.yupi.project.model.entity.RobotTask +*/ +public interface RobotTaskMapper extends BaseMapper { + +} \ No newline at end of file diff --git a/springboot-init-main/src/main/java/com/yupi/project/model/dto/mqtt/RobotStatusMessage.java b/springboot-init-main/src/main/java/com/yupi/project/model/dto/mqtt/RobotStatusMessage.java new file mode 100644 index 0000000..ea6ba42 --- /dev/null +++ b/springboot-init-main/src/main/java/com/yupi/project/model/dto/mqtt/RobotStatusMessage.java @@ -0,0 +1,13 @@ +package com.yupi.project.model.dto.mqtt; + +import lombok.Data; + +@Data +public class RobotStatusMessage { + + private Long taskId; + private String status; // e.g., "PROCESSING", "COMPLETED", "FAILED" + private String message; + private String errorCode; + +} \ No newline at end of file diff --git a/springboot-init-main/src/main/java/com/yupi/project/model/entity/RobotTask.java b/springboot-init-main/src/main/java/com/yupi/project/model/entity/RobotTask.java new file mode 100644 index 0000000..c51509f --- /dev/null +++ b/springboot-init-main/src/main/java/com/yupi/project/model/entity/RobotTask.java @@ -0,0 +1,51 @@ +package com.yupi.project.model.entity; + +import com.baomidou.mybatisplus.annotation.*; +import com.yupi.project.model.enums.CommandTypeEnum; +import com.yupi.project.model.enums.RobotTaskStatusEnum; +import lombok.Data; +import java.io.Serializable; +import java.util.Date; + +/** + * 机器人指令任务表实体 + */ +@TableName(value ="robot_task") +@Data +public class RobotTask implements Serializable { + + @TableId(type = IdType.AUTO) + private Long id; + + private String robotId; + + // Store enum as String in DB, but handle as Enum in Java + // Consider using MyBatis-Plus EnumTypeHandler.VARCHAR if direct mapping is problematic, + // or ensure the String value from the enum is used for persistence. + private CommandTypeEnum commandType; + + private String commandPayload; + + // Store enum as String in DB + private RobotTaskStatusEnum status; + + private Date sentTime; + + private Date ackTime; + + private Long relatedSessionId; + + private String errorMessage; + + @TableField(fill = FieldFill.INSERT) + private Date createTime; + + @TableField(fill = FieldFill.INSERT_UPDATE) + private Date updateTime; + + @TableLogic // If soft delete is desired, though not specified in DDL + @TableField(select = false) // By default, don't select the delete flag unless explicitly queried + private Integer isDelete; // Common practice for @TableLogic + + private static final long serialVersionUID = 1L; +} \ No newline at end of file diff --git a/springboot-init-main/src/main/java/com/yupi/project/model/enums/CommandTypeEnum.java b/springboot-init-main/src/main/java/com/yupi/project/model/enums/CommandTypeEnum.java new file mode 100644 index 0000000..430a960 --- /dev/null +++ b/springboot-init-main/src/main/java/com/yupi/project/model/enums/CommandTypeEnum.java @@ -0,0 +1,22 @@ +package com.yupi.project.model.enums; + +import lombok.Getter; + +/** + * 机器人指令任务的命令类型枚举 + */ +@Getter +public enum CommandTypeEnum { + MOVE_TO_SPOT("MOVE_TO_SPOT", "移动到指定点"), + START_CHARGE("START_CHARGE", "开始充电"), + STOP_CHARGE("STOP_CHARGE", "停止充电"), + QUERY_STATUS("QUERY_STATUS", "查询状态"); + + private final String value; + private final String description; + + CommandTypeEnum(String value, String description) { + this.value = value; + this.description = description; + } +} \ No newline at end of file diff --git a/springboot-init-main/src/main/java/com/yupi/project/model/enums/RobotTaskStatusEnum.java b/springboot-init-main/src/main/java/com/yupi/project/model/enums/RobotTaskStatusEnum.java new file mode 100644 index 0000000..ef67f6b --- /dev/null +++ b/springboot-init-main/src/main/java/com/yupi/project/model/enums/RobotTaskStatusEnum.java @@ -0,0 +1,39 @@ +package com.yupi.project.model.enums; + +import lombok.Getter; + +import java.util.Arrays; + +/** + * 机器人任务的状态枚举 + */ +@Getter +public enum RobotTaskStatusEnum { + PENDING("PENDING", "待处理"), + SENT("SENT", "已发送"), + PROCESSING("PROCESSING", "处理中"), + COMPLETED("COMPLETED", "已完成"), + FAILED("FAILED", "已失败"), + TIMED_OUT("TIMED_OUT", "已超时"); + + private final String value; + private final String description; + + RobotTaskStatusEnum(String value, String description) { + this.value = value; + this.description = description; + } + + /** + * 根据 value 获取枚举 + * + * @param value + * @return + */ + public static RobotTaskStatusEnum fromValue(String value) { + return Arrays.stream(values()) + .filter(enumInstance -> enumInstance.value.equalsIgnoreCase(value)) + .findFirst() + .orElse(null); + } +} \ No newline at end of file diff --git a/springboot-init-main/src/main/java/com/yupi/project/mqtt/MqttCallbackHandler.java b/springboot-init-main/src/main/java/com/yupi/project/mqtt/MqttCallbackHandler.java index ae1b646..c52fb5f 100644 --- a/springboot-init-main/src/main/java/com/yupi/project/mqtt/MqttCallbackHandler.java +++ b/springboot-init-main/src/main/java/com/yupi/project/mqtt/MqttCallbackHandler.java @@ -1,5 +1,6 @@ package com.yupi.project.mqtt; +import com.yupi.project.service.impl.MqttMessageHandler; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; @@ -7,7 +8,6 @@ import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.stereotype.Component; -// import com.yupi.project.service.MqttMessageHandler; // Will be uncommented and used later import com.yupi.project.config.properties.MqttProperties; @Slf4j @@ -15,9 +15,9 @@ import com.yupi.project.config.properties.MqttProperties; @RequiredArgsConstructor public class MqttCallbackHandler implements MqttCallbackExtended { - // private final MqttMessageHandler mqttMessageHandler; // Will be uncommented and used later + private final MqttMessageHandler mqttMessageHandler; private final MqttProperties mqttProperties; - private MqttClient mqttClient; // Setter needed or passed in constructor/method + private MqttClient mqttClient; public void setMqttClient(MqttClient mqttClient) { this.mqttClient = mqttClient; @@ -26,12 +26,11 @@ public class MqttCallbackHandler implements MqttCallbackExtended { @Override public void connectComplete(boolean reconnect, String serverURI) { log.info("MQTT connection {} to broker: {}", reconnect ? "re-established" : "established", serverURI); - // Subscribe to the status topic upon connection/reconnection try { if (mqttClient != null && mqttClient.isConnected()) { - String statusTopic = mqttProperties.getStatusTopicBase() + "/+"; // Subscribe to all robot statuses - mqttClient.subscribe(statusTopic, mqttProperties.getDefaultQos()); - log.info("Subscribed to MQTT topic: {}", statusTopic); + String statusTopic = mqttProperties.getStatusTopicBase() + "/+"; + mqttClient.subscribe(statusTopic, mqttProperties.getDefaultQos(), mqttMessageHandler); + log.info("Subscribed to MQTT topic: {} with MqttMessageHandler", statusTopic); } else { log.warn("MQTT client not available or not connected, cannot subscribe to topic."); } @@ -43,21 +42,13 @@ public class MqttCallbackHandler implements MqttCallbackExtended { @Override public void connectionLost(Throwable cause) { log.error("MQTT connection lost!", cause); - // Automatic reconnect is handled by MqttConnectOptions } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { String payload = new String(message.getPayload()); - log.debug("MQTT message arrived. Topic: {}, QoS: {}, Payload:\n{}", topic, message.getQos(), payload); - - // TODO: Implement application-level authentication/validation of the message payload here or in MqttMessageHandler - - // try { - // mqttMessageHandler.handleStatusUpdate(topic, payload); // Will be uncommented and used later - // } catch (Exception e) { - // log.error("Error handling MQTT message for topic {}: ", topic, e); - // } + log.warn("Unexpected MQTT message arrived in MqttCallbackHandler. Topic: {}, QoS: {}, Payload:\n{}. " + + "This should ideally be handled by a specific MqttMessageHandler.", topic, message.getQos(), payload); } @Override diff --git a/springboot-init-main/src/main/java/com/yupi/project/service/MqttService.java b/springboot-init-main/src/main/java/com/yupi/project/service/MqttService.java new file mode 100644 index 0000000..d535708 --- /dev/null +++ b/springboot-init-main/src/main/java/com/yupi/project/service/MqttService.java @@ -0,0 +1,23 @@ +package com.yupi.project.service; + +import com.yupi.project.model.enums.CommandTypeEnum; + +/** + * MQTT 消息发布服务接口。 + */ +public interface MqttService { + + /** + * 向指定的机器人发送指令。 + * + * @param robotId 机器人ID + * @param commandType 指令类型 + * @param payloadJson 指令的参数内容 (JSON格式字符串)。对于无参数指令,可以为 null 或空字符串。 + * @param sessionId 关联的充电会话ID (可选, 主要用于充电相关指令) + * @return 如果指令成功进入发送流程(任务已创建并尝试发布),返回 true; + * 如果机器人正忙或创建任务失败等原因导致无法发送,返回 false。 + * @throws Exception 如果MQTT发布过程中发生异常 (例如 MqttException) + */ + boolean sendCommand(String robotId, CommandTypeEnum commandType, String payloadJson, Long sessionId) throws Exception; + +} \ No newline at end of file diff --git a/springboot-init-main/src/main/java/com/yupi/project/service/RobotTaskService.java b/springboot-init-main/src/main/java/com/yupi/project/service/RobotTaskService.java new file mode 100644 index 0000000..300b805 --- /dev/null +++ b/springboot-init-main/src/main/java/com/yupi/project/service/RobotTaskService.java @@ -0,0 +1,105 @@ +package com.yupi.project.service; + +import com.baomidou.mybatisplus.extension.service.IService; +import com.yupi.project.model.entity.RobotTask; +import com.yupi.project.model.enums.CommandTypeEnum; + +import java.util.Date; +import java.util.List; + +/** +* @author Yupi +* @description 针对表【robot_task(机器人指令任务表)】的数据库操作Service +* @createDate 2023-12-02 22:00:00 +*/ +public interface RobotTaskService extends IService { + + /** + * 检查指定机器人是否有正在处理(PENDING 或 SENT)的任务。 + * + * @param robotId 机器人ID + * @return 如果有待处理或已发送任务,则返回 true,否则 false + */ + boolean hasPendingOrSentTask(String robotId); + + /** + * 创建一个新的机器人指令任务。 + * + * @param robotId 机器人ID + * @param commandType 命令类型 + * @param payloadJson 命令参数 (JSON格式) + * @param sessionId 关联的充电会话ID (可选) + * @return 创建的任务实体 + */ + RobotTask createTask(String robotId, CommandTypeEnum commandType, String payloadJson, Long sessionId); + + /** + * 将任务标记为已发送。 + * + * @param taskId 任务ID + * @param sentTime 命令发送时间 + * @return 是否成功标记 + */ + boolean markTaskAsSent(Long taskId, Date sentTime); + + /** + * 根据机器人ID查找最近一个已发送(SENT)但尚未收到响应的任务。 + * + * @param robotId 机器人ID + * @return 如果找到,则返回任务实体,否则返回 null + */ + RobotTask findLatestSentTaskByRobotId(String robotId); + + /** + * 将任务标记为已确认(成功或失败)。 + * 注意:此方法后续可能被更具体的状态更新方法 (如 markTaskAsCompleted, markTaskAsFailed) 替代或补充。 + * + * @param taskId 任务ID + * @param success 是否成功 + * @param errorMessage 失败时的错误信息 (如果 success 为 true,则此参数可为 null) + * @param ackTime 命令确认时间 + * @return 是否成功标记 + */ + boolean markTaskAsAcknowledged(Long taskId, boolean success, String errorMessage, Date ackTime); + + /** + * 查找并标记超时的已发送任务。 + * 将状态为 SENT 且发送时间早于 (当前时间 - timeoutSeconds) 的任务标记为 TIMED_OUT。 + * + * @param timeoutSeconds 超时秒数 + * @return 被标记为超时的任务列表 + */ + List findAndMarkTimedOutTasks(int timeoutSeconds); + + /** + * 将任务标记为处理中。 + * + * @param taskId 任务ID + * @param ackTime 确认处理中的时间 (可选) + * @param message 附加信息 (可选) + * @return 是否成功标记 + */ + boolean markTaskAsProcessing(Long taskId, Date ackTime, String message); + + /** + * 将任务标记为已完成。 + * + * @param taskId 任务ID + * @param ackTime 完成时间 (可选) + * @param message 附加信息 (可选, 例如成功消息) + * @return 是否成功标记 + */ + boolean markTaskAsCompleted(Long taskId, Date ackTime, String message); + + /** + * 将任务标记为失败。 + * + * @param taskId 任务ID + * @param ackTime 失败确认时间 (可选) + * @param errorCode 错误码 (可选) + * @param errorMessage 错误信息 (可选) + * @return 是否成功标记 + */ + boolean markTaskAsFailed(Long taskId, Date ackTime, String errorCode, String errorMessage); + +} \ No newline at end of file diff --git a/springboot-init-main/src/main/java/com/yupi/project/service/impl/MqttMessageHandler.java b/springboot-init-main/src/main/java/com/yupi/project/service/impl/MqttMessageHandler.java new file mode 100644 index 0000000..8b9b0b5 --- /dev/null +++ b/springboot-init-main/src/main/java/com/yupi/project/service/impl/MqttMessageHandler.java @@ -0,0 +1,107 @@ +package com.yupi.project.service.impl; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.yupi.project.config.properties.MqttProperties; +import com.yupi.project.model.dto.mqtt.RobotStatusMessage; +import com.yupi.project.model.enums.RobotTaskStatusEnum; +import com.yupi.project.service.RobotTaskService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.IMqttMessageListener; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.springframework.stereotype.Component; + +import java.nio.charset.StandardCharsets; +import java.util.Date; + +@Slf4j +@Component +@RequiredArgsConstructor +public class MqttMessageHandler implements IMqttMessageListener { + + private final RobotTaskService robotTaskService; + private final MqttProperties mqttProperties; + private final ObjectMapper objectMapper; // For JSON parsing + + /** + * This method is called when a message arrives from the server. + * + * @param topic Name of the topic on the message was published to + * @param message The actual message. + * @throws Exception if a terminal error occurs, this call will be client driven + * re-delivery processes rely on the client storing messages which is not implemented. + */ + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + String payload = new String(message.getPayload(), StandardCharsets.UTF_8); + log.info("MQTT Message Arrived. Topic: {}, QoS: {}, Payload: {}", topic, message.getQos(), payload); + + // Extract robotId from topic. Example: "robot/status/+/robot123" or "robot/command_response/robot123" + // This logic might need adjustment based on the exact topic structure. + String[] topicParts = topic.split("/"); + if (topicParts.length == 0) { + log.error("Received message on invalid topic format: {}", topic); + return; + } + String robotId = topicParts[topicParts.length - 1]; // Assuming robotId is the last part + + // Determine message type based on topic structure (e.g., status update vs command response) + // For now, let's assume all messages on subscribed topics are status updates. + // A more robust solution would use different listeners for different topic patterns or inspect the payload. + + if (topic.startsWith(mqttProperties.getStatusTopicBase())) { + handleRobotStatusUpdate(robotId, payload); + } else { + log.warn("Received message on unhandled topic structure: {}. Ignoring.", topic); + } + } + + private void handleRobotStatusUpdate(String robotId, String payloadJson) { + try { + RobotStatusMessage statusMessage = objectMapper.readValue(payloadJson, RobotStatusMessage.class); + log.info("Parsed RobotStatusMessage for robot {}: {}", robotId, statusMessage); + + if (statusMessage.getTaskId() == null) { + log.warn("RobotStatusMessage for robot {} does not contain a taskId. Payload: {}", robotId, payloadJson); + // Optionally, handle general status updates not tied to a specific task + return; + } + + // Update the RobotTask based on the status message + RobotTaskStatusEnum newStatus = RobotTaskStatusEnum.fromValue(statusMessage.getStatus()); + if (newStatus == null) { + log.error("Invalid status '{}' received from robot {}. Payload: {}", statusMessage.getStatus(), robotId, payloadJson); + return; + } + + boolean updated; + switch (newStatus) { + case PROCESSING: + updated = robotTaskService.markTaskAsProcessing(statusMessage.getTaskId(), new Date(), statusMessage.getMessage()); + break; + case COMPLETED: + updated = robotTaskService.markTaskAsCompleted(statusMessage.getTaskId(), new Date(), statusMessage.getMessage()); + break; + case FAILED: + updated = robotTaskService.markTaskAsFailed(statusMessage.getTaskId(), new Date(), statusMessage.getErrorCode(), statusMessage.getMessage()); + break; + default: + log.warn("Received unhandled RobotTaskStatusEnum: {} for task {} from robot {}. Ignoring.", + newStatus, statusMessage.getTaskId(), robotId); + return; // Don't attempt to update with SENT or PENDING from robot + } + + if (updated) { + log.info("Successfully updated task {} to status {} for robot {} based on MQTT message.", + statusMessage.getTaskId(), newStatus, robotId); + } else { + log.warn("Failed to update task {} to status {} for robot {} (or task not found/invalid state transition). " + + "Message: {}", statusMessage.getTaskId(), newStatus, robotId, statusMessage.getMessage()); + } + + } catch (Exception e) { + log.error("Error processing robot status update for robot {}. Payload: {}. Error: {}", + robotId, payloadJson, e.getMessage(), e); + } + } +} \ No newline at end of file diff --git a/springboot-init-main/src/main/java/com/yupi/project/service/impl/MqttServiceImpl.java b/springboot-init-main/src/main/java/com/yupi/project/service/impl/MqttServiceImpl.java new file mode 100644 index 0000000..4ecc667 --- /dev/null +++ b/springboot-init-main/src/main/java/com/yupi/project/service/impl/MqttServiceImpl.java @@ -0,0 +1,86 @@ +package com.yupi.project.service.impl; + +import com.yupi.project.config.properties.MqttProperties; +import com.yupi.project.model.entity.RobotTask; +import com.yupi.project.model.enums.CommandTypeEnum; +import com.yupi.project.service.MqttService; +import com.yupi.project.service.RobotTaskService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.nio.charset.StandardCharsets; +import java.util.Date; + +@Service +@Slf4j +@RequiredArgsConstructor +public class MqttServiceImpl implements MqttService { + + private final MqttClient mqttClient; // Autowired by Spring from MqttConfig + private final MqttProperties mqttProperties; + private final RobotTaskService robotTaskService; + + @Override + @Transactional(rollbackFor = Exception.class) // Ensure rollback if MQTT publish fails after task creation + public boolean sendCommand(String robotId, CommandTypeEnum commandType, String payloadJson, Long sessionId) throws Exception { + log.info("Attempting to send command {} to robot {} with payload: {}", commandType, robotId, payloadJson); + + // 1. Check if the robot has pending or already sent tasks + if (robotTaskService.hasPendingOrSentTask(robotId)) { + log.warn("Robot {} is busy (has PENDING or SENT tasks). Command {} aborted.", robotId, commandType); + return false; + } + + // 2. Create a new task in PENDING state + RobotTask task = robotTaskService.createTask(robotId, commandType, payloadJson, sessionId); + if (task == null || task.getId() == null) { + log.error("Failed to create RobotTask for command {} to robot {}. Aborting send.", commandType, robotId); + return false; + } + log.info("Created PENDING RobotTask with ID: {} for command {} to robot {}.", task.getId(), commandType, robotId); + + // 3. Construct the topic + String topic = mqttProperties.getCommandTopicBase() + "/" + robotId; + + // 4. Prepare the MQTT message + String effectivePayload = (payloadJson == null) ? "" : payloadJson; + MqttMessage mqttMessage = new MqttMessage(effectivePayload.getBytes(StandardCharsets.UTF_8)); + mqttMessage.setQos(mqttProperties.getDefaultQos()); + // mqttMessage.setRetained(false); // Default is false + + try { + // 5. Publish the message + if (!mqttClient.isConnected()) { + log.error("MQTT client is not connected. Cannot send command {} to robot {}. Task ID: {}", commandType, robotId, task.getId()); + // Task remains PENDING. A scheduled job might retry or mark as error later. + return false; + } + log.debug("Publishing to topic: {}, QoS: {}, Payload: {}", topic, mqttMessage.getQos(), effectivePayload); + mqttClient.publish(topic, mqttMessage); + log.info("Successfully published command {} to robot {} on topic {}. Task ID: {}", commandType, robotId, topic, task.getId()); + + // 6. Mark the task as SENT + boolean markedAsSent = robotTaskService.markTaskAsSent(task.getId(), new Date()); + if (!markedAsSent) { + log.warn("Command {} published to robot {}, but failed to mark task {} as SENT. System might be in an inconsistent state.", + commandType, robotId, task.getId()); + // This is a critical warning. The command was sent, but DB state doesn't reflect it. + } + return true; // Command sent and task status update was attempted + } catch (MqttException e) { + log.error("MqttException while publishing command {} to robot {} (Task ID: {}). Error: {}", + commandType, robotId, task.getId(), e.getMessage(), e); + // The RobotTask remains PENDING. A retry mechanism or a timeout job could handle this later. + throw e; // Re-throw for transactional rollback + } catch (Exception e) { + log.error("Unexpected exception while sending command {} to robot {} (Task ID: {}). Error: {}", + commandType, robotId, task.getId(), e.getMessage(), e); + throw e; // Re-throw for transactional rollback + } + } +} \ No newline at end of file diff --git a/springboot-init-main/src/main/java/com/yupi/project/service/impl/RobotTaskServiceImpl.java b/springboot-init-main/src/main/java/com/yupi/project/service/impl/RobotTaskServiceImpl.java new file mode 100644 index 0000000..ebf8c7c --- /dev/null +++ b/springboot-init-main/src/main/java/com/yupi/project/service/impl/RobotTaskServiceImpl.java @@ -0,0 +1,333 @@ +package com.yupi.project.service.impl; + +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.yupi.project.model.entity.RobotTask; +import com.yupi.project.model.enums.CommandTypeEnum; +import com.yupi.project.model.enums.RobotTaskStatusEnum; +import com.yupi.project.service.RobotTaskService; +import com.yupi.project.mapper.RobotTaskMapper; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.util.Collections; +import java.util.Date; +import java.util.List; + +/** +* @author Yupi +* @description 针对表【robot_task(机器人指令任务表)】的数据库操作Service实现 +* @createDate 2023-12-02 22:00:00 +*/ +@Service +@Slf4j +public class RobotTaskServiceImpl extends ServiceImpl + implements RobotTaskService { + + @Override + public boolean hasPendingOrSentTask(String robotId) { + if (robotId == null) { + log.warn("Cannot check for pending/sent tasks: robotId is null."); + return false; // Or throw an IllegalArgumentException + } + QueryWrapper queryWrapper = new QueryWrapper<>(); + queryWrapper.eq("robot_id", robotId) + .in("status", RobotTaskStatusEnum.PENDING, RobotTaskStatusEnum.SENT); + // .eq("is_delete", 0); // Add if @TableLogic is strictly used and you want to exclude logically deleted + + long count = this.count(queryWrapper); + if (count > 0) { + log.info("Robot {} has {} PENDING or SENT tasks.", robotId, count); + return true; + } + return false; + } + + @Override + @Transactional + public RobotTask createTask(String robotId, CommandTypeEnum commandType, String payloadJson, Long sessionId) { + if (robotId == null || commandType == null) { + log.error("Cannot create task: robotId or commandType is null."); + // Or throw an IllegalArgumentException + return null; + } + RobotTask task = new RobotTask(); + task.setRobotId(robotId); + task.setCommandType(commandType); + task.setCommandPayload(payloadJson); + task.setRelatedSessionId(sessionId); + task.setStatus(RobotTaskStatusEnum.PENDING); // Default status + // createTime and updateTime will be auto-filled by MyBatis-Plus + // isDelete will be handled by @TableLogic if enabled (default 0) + + boolean saved = this.save(task); + if (saved) { + log.info("Created new RobotTask with ID: {}, Robot ID: {}, Command: {}, Status: PENDING", + task.getId(), robotId, commandType); + return task; + } else { + log.error("Failed to save new RobotTask for Robot ID: {}, Command: {}", robotId, commandType); + return null; + } + } + + @Override + @Transactional + public boolean markTaskAsSent(Long taskId, Date sentTime) { + if (taskId == null || sentTime == null) { + log.error("Cannot mark task as sent: taskId or sentTime is null."); + return false; + } + RobotTask task = this.getById(taskId); + if (task == null) { + log.warn("Cannot mark task as sent: Task with ID {} not found.", taskId); + return false; + } + + if (task.getStatus() != RobotTaskStatusEnum.PENDING) { + log.warn("Cannot mark task {} as SENT. Current status is {}, expected PENDING.", taskId, task.getStatus()); + return false; + } + + RobotTask updateTask = new RobotTask(); + updateTask.setId(taskId); + updateTask.setStatus(RobotTaskStatusEnum.SENT); + updateTask.setSentTime(sentTime); + // updateTime will be auto-filled by MyBatis-Plus + boolean updated = this.updateById(updateTask); + if (updated) { + log.info("Marked RobotTask with ID: {} as SENT at {}.", taskId, sentTime); + } else { + log.error("Failed to mark RobotTask with ID: {} as SENT.", taskId); + } + return updated; + } + + @Override + public RobotTask findLatestSentTaskByRobotId(String robotId) { + if (robotId == null) { + log.warn("Cannot find latest sent task: robotId is null."); + return null; + } + QueryWrapper queryWrapper = new QueryWrapper<>(); + queryWrapper.eq("robot_id", robotId) + .eq("status", RobotTaskStatusEnum.SENT) + // .eq("is_delete", 0) // If using soft delete + .orderByDesc("create_time") // Or sent_time, depending on desired logic for "latest" + .last("LIMIT 1"); // Get only the most recent one + + RobotTask task = this.getOne(queryWrapper); + if (task != null) { + log.info("Found latest SENT task with ID {} for Robot ID: {}.", task.getId(), robotId); + } else { + log.debug("No SENT task found for Robot ID: {}.", robotId); + } + return task; + } + + @Override + @Transactional + public boolean markTaskAsAcknowledged(Long taskId, boolean success, String errorMessage, Date ackTime) { + if (taskId == null || ackTime == null) { + log.error("Cannot mark task as acknowledged: taskId or ackTime is null."); + return false; + } + RobotTask task = this.getById(taskId); + if (task == null) { + log.warn("Cannot mark task as acknowledged: Task with ID {} not found.", taskId); + return false; + } + + // Optionally, check if current status allows this transition (e.g., from SENT) + if (task.getStatus() != RobotTaskStatusEnum.SENT) { + log.warn("Cannot mark task {} as acknowledged. Current status is {}, expected SENT.", taskId, task.getStatus()); + // Depending on logic, might still allow ack if it's a late response to a PENDING task that never got marked SENT by our system + // For now, strict check from SENT. + return false; + } + + RobotTask updateTask = new RobotTask(); + updateTask.setId(taskId); + updateTask.setStatus(success ? RobotTaskStatusEnum.COMPLETED : RobotTaskStatusEnum.FAILED); + updateTask.setAckTime(ackTime); + if (!success && errorMessage != null) { + updateTask.setErrorMessage(errorMessage); + } else if (success) { + updateTask.setErrorMessage(null); // Clear any previous error message on success + } + // updateTime will be auto-filled + + boolean updated = this.updateById(updateTask); + if (updated) { + log.info("Marked RobotTask with ID: {} as {}{}.", taskId, + (success ? RobotTaskStatusEnum.COMPLETED.getValue() : RobotTaskStatusEnum.FAILED.getValue()), + (success ? "" : " with error: " + errorMessage)); + } else { + log.error("Failed to mark RobotTask with ID: {} as acknowledged.", taskId); + } + return updated; + } + + @Override + @Transactional + public List findAndMarkTimedOutTasks(int timeoutSeconds) { + if (timeoutSeconds <= 0) { + log.warn("Invalid timeoutSeconds: {}. Must be positive.", timeoutSeconds); + return Collections.emptyList(); + } + + // Calculate the time before which tasks are considered timed out + // Use Calendar to subtract seconds, more robust for date arithmetic + java.util.Calendar cal = java.util.Calendar.getInstance(); + cal.add(java.util.Calendar.SECOND, -timeoutSeconds); + Date timeoutThreshold = cal.getTime(); + + QueryWrapper queryWrapper = new QueryWrapper<>(); + queryWrapper.eq("status", RobotTaskStatusEnum.SENT) + .lt("sent_time", timeoutThreshold); + // .eq("is_delete", 0); // If using soft delete + + List timedOutTasks = this.list(queryWrapper); + + if (timedOutTasks.isEmpty()) { + log.debug("No tasks found that timed out (threshold: {}s, before {}).", timeoutSeconds, timeoutThreshold); + return Collections.emptyList(); + } + + log.info("Found {} tasks that timed out (threshold: {}s, sent before {}). Attempting to mark them.", + timedOutTasks.size(), timeoutSeconds, timeoutThreshold); + + for (RobotTask task : timedOutTasks) { + RobotTask updateTask = new RobotTask(); + updateTask.setId(task.getId()); + updateTask.setStatus(RobotTaskStatusEnum.TIMED_OUT); + updateTask.setErrorMessage("Task timed out after " + timeoutSeconds + " seconds."); + // ackTime might not be relevant for a system-initiated timeout, or could be set to now + updateTask.setAckTime(new Date()); // Or null, depending on definition + + // updateTime will be auto-filled + boolean updated = this.updateById(updateTask); + if (updated) { + log.info("Marked task {} (Robot ID: {}) as TIMED_OUT.", task.getId(), task.getRobotId()); + // TODO: Potentially trigger further actions for a timed-out task (e.g., notify admin, update related session) + } else { + log.error("Failed to mark task {} (Robot ID: {}) as TIMED_OUT.", task.getId(), task.getRobotId()); + // Add to a list of tasks that failed to update, if needed for retry or reporting + } + } + return timedOutTasks; // Return the list of tasks that were identified as timed out + } + + @Override + @Transactional + public boolean markTaskAsProcessing(Long taskId, Date ackTime, String message) { + if (taskId == null) { + log.error("Cannot mark task as processing: taskId is null."); + return false; + } + RobotTask task = this.getById(taskId); + if (task == null) { + log.warn("Cannot mark task as processing: Task with ID {} not found.", taskId); + return false; + } + + // Allow transition from PENDING or SENT to PROCESSING + if (task.getStatus() != RobotTaskStatusEnum.PENDING && task.getStatus() != RobotTaskStatusEnum.SENT) { + log.warn("Cannot mark task {} as PROCESSING. Current status is {}, expected PENDING or SENT.", taskId, task.getStatus()); + return false; + } + + RobotTask updateTask = new RobotTask(); + updateTask.setId(taskId); + updateTask.setStatus(RobotTaskStatusEnum.PROCESSING); + if (ackTime != null) { // ackTime might be the time processing was confirmed + updateTask.setAckTime(ackTime); + } + if (message != null) { + updateTask.setErrorMessage(message); // Using errorMessage field for general status messages during processing + } + + boolean updated = this.updateById(updateTask); + if (updated) { + log.info("Marked RobotTask with ID: {} as PROCESSING{}.", taskId, (message != null ? " with message: " + message : "")); + } else { + log.error("Failed to mark RobotTask with ID: {} as PROCESSING.", taskId); + } + return updated; + } + + @Override + @Transactional + public boolean markTaskAsCompleted(Long taskId, Date ackTime, String message) { + if (taskId == null) { + log.error("Cannot mark task as completed: taskId is null."); + return false; + } + RobotTask task = this.getById(taskId); + if (task == null) { + log.warn("Cannot mark task as completed: Task with ID {} not found.", taskId); + return false; + } + + // Allow transition from PENDING, SENT, or PROCESSING to COMPLETED + if (task.getStatus() != RobotTaskStatusEnum.PENDING && task.getStatus() != RobotTaskStatusEnum.SENT && task.getStatus() != RobotTaskStatusEnum.PROCESSING) { + log.warn("Cannot mark task {} as COMPLETED. Current status is {}, expected PENDING, SENT or PROCESSING.", taskId, task.getStatus()); + return false; + } + + RobotTask updateTask = new RobotTask(); + updateTask.setId(taskId); + updateTask.setStatus(RobotTaskStatusEnum.COMPLETED); + if (ackTime != null) { + updateTask.setAckTime(ackTime); + } + updateTask.setErrorMessage(message); // Can be a success message or null + + boolean updated = this.updateById(updateTask); + if (updated) { + log.info("Marked RobotTask with ID: {} as COMPLETED{}.", taskId, (message != null ? " with message: " + message : "")); + } else { + log.error("Failed to mark RobotTask with ID: {} as COMPLETED.", taskId); + } + return updated; + } + + @Override + @Transactional + public boolean markTaskAsFailed(Long taskId, Date ackTime, String errorCode, String errorMessage) { + if (taskId == null) { + log.error("Cannot mark task as failed: taskId is null."); + return false; + } + RobotTask task = this.getById(taskId); + if (task == null) { + log.warn("Cannot mark task as failed: Task with ID {} not found.", taskId); + return false; + } + + // Allow transition from any non-terminal state (PENDING, SENT, PROCESSING) to FAILED + if (task.getStatus() == RobotTaskStatusEnum.COMPLETED || task.getStatus() == RobotTaskStatusEnum.FAILED || task.getStatus() == RobotTaskStatusEnum.TIMED_OUT) { + log.warn("Cannot mark task {} as FAILED. Task is already in a terminal state: {}.", taskId, task.getStatus()); + return false; + } + + RobotTask updateTask = new RobotTask(); + updateTask.setId(taskId); + updateTask.setStatus(RobotTaskStatusEnum.FAILED); + if (ackTime != null) { + updateTask.setAckTime(ackTime); + } + // Construct a comprehensive error message if both are provided + String finalErrorMessage = (errorCode != null ? "Code [" + errorCode + "] " : "") + (errorMessage != null ? errorMessage : ""); + updateTask.setErrorMessage(finalErrorMessage.isEmpty() ? null : finalErrorMessage); + + boolean updated = this.updateById(updateTask); + if (updated) { + log.info("Marked RobotTask with ID: {} as FAILED. Error: {}", taskId, finalErrorMessage); + } else { + log.error("Failed to mark RobotTask with ID: {} as FAILED.", taskId); + } + return updated; + } +} \ No newline at end of file diff --git a/springboot-init-main/src/main/java/com/yupi/project/service/impl/TaskTimeoutHandler.java b/springboot-init-main/src/main/java/com/yupi/project/service/impl/TaskTimeoutHandler.java new file mode 100644 index 0000000..b7a830b --- /dev/null +++ b/springboot-init-main/src/main/java/com/yupi/project/service/impl/TaskTimeoutHandler.java @@ -0,0 +1,50 @@ +package com.yupi.project.service.impl; + +import com.yupi.project.model.entity.RobotTask; +import com.yupi.project.service.RobotTaskService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.stream.Collectors; + +@Slf4j +@Component +@RequiredArgsConstructor +public class TaskTimeoutHandler { + + private final RobotTaskService robotTaskService; + + @Value("${mqtt.task.timeoutSeconds:300}") // 默认5分钟超时 + private int taskTimeoutSeconds; + + // 默认每分钟检查一次,可以根据实际需求调整 + // cron = "秒 分 时 日 月 周" + // 例如 "0 * * * * ?" 每分钟的第0秒执行 + // fixedRate = 60000 (毫秒) 每隔60秒执行一次,从上一次任务开始时计时 + // fixedDelay = 60000 (毫秒) 每隔60秒执行一次,从上一次任务完成时计时 + @Scheduled(fixedRateString = "${mqtt.task.timeoutCheckRateMs:60000}") // 默认每60秒执行一次 + public void checkAndHandleTimedOutTasks() { + log.info("Scheduled task: Checking for timed-out robot tasks (timeout: {}s).", taskTimeoutSeconds); + try { + List timedOutTasks = robotTaskService.findAndMarkTimedOutTasks(taskTimeoutSeconds); + if (timedOutTasks != null && !timedOutTasks.isEmpty()) { + log.warn("Marked {} tasks as TIMED_OUT: {}", timedOutTasks.size(), timedOutTasks.stream().map(RobotTask::getId).collect(Collectors.toList())); + // 可选:在这里添加进一步处理逻辑,例如通知、更新其他关联实体的状态等 + // 这部分逻辑将在第三阶段 ChargingSession 模块实现后补充。 + // for (RobotTask task : timedOutTasks) { + // if (task.getRelatedSessionId() != null) { + // // chargingSessionService.markSessionAsTaskTimedOut(task.getRelatedSessionId()); + // } + // } + } else { + log.info("No tasks found to be timed out during this check."); + } + } catch (Exception e) { + log.error("Error during scheduled task execution of checkAndHandleTimedOutTasks: ", e); + } + } +} \ No newline at end of file diff --git a/springboot-init-main/src/main/resources/application.yml b/springboot-init-main/src/main/resources/application.yml index 4651005..a85d210 100644 --- a/springboot-init-main/src/main/resources/application.yml +++ b/springboot-init-main/src/main/resources/application.yml @@ -59,4 +59,7 @@ mqtt: connection-timeout: 30 # Connection timeout in seconds keep-alive-interval: 60 # Keep alive interval in seconds command-topic-base: yupi_mqtt_power_project/robot/command # Prefixed base topic for sending commands - status-topic-base: yupi_mqtt_power_project/robot/status # Prefixed base topic for receiving status \ No newline at end of file + status-topic-base: yupi_mqtt_power_project/robot/status # Prefixed base topic for receiving status + task: # Task specific configurations + timeoutSeconds: 300 # Default 300 seconds (5 minutes) for a task to be considered timed out + timeoutCheckRateMs: 60000 # Default 60000 ms (1 minute) for how often to check for timed out tasks \ No newline at end of file