mqtt集成初步实现

This commit is contained in:
2025-05-13 22:19:18 +08:00
parent 9e32234ca4
commit 5ea13d6dea
17 changed files with 897 additions and 33 deletions

View File

@@ -120,13 +120,13 @@
* 处理 JSON 解析异常、任务未找到等情况。
7. **实现任务超时处理 (`TaskTimeoutHandler`)**:
* 创建 `src/main/java/com/yupi/project/schedule/TaskTimeoutHandler.java` (或放在 service 包)。
* 注入 `RobotTaskService`, 以及可能需要更新状态的 `ChargingRobotService`, `ChargingSessionService`
* 注入 `RobotTaskService`。
* 添加 `@Component` 和 `@EnableScheduling` (在主启动类或配置类上)。
* 创建一个方法,使用 `@Scheduled(fixedRate = 60000)` (例如每分钟执行一次)。
* 在该方法中:
* 调用 `robotTaskService.findAndMarkTimedOutTasks(timeoutSeconds)` (例如超时设为 120 秒)。
* 获取超时的任务列表。
* 对每个超时任务,根据业务逻辑更新关联的 `charging_robot` (如设为 `offline` 或 `error`) 和 `charging_session` (如设为 `error`) 的状态。
* **注意**: 对每个超时任务,根据业务逻辑更新关联的 `charging_robot` 和 `charging_session` 状态的逻辑,将依赖于第三阶段实现的 `ChargingRobotService` 和 `ChargingSessionService`。本阶段仅标记 `RobotTask` 自身状态。
* 添加日志记录。
## 4. 前端 (移动端 App) 开发详解

View File

@@ -31,6 +31,13 @@
* **Entity**: 创建 `ChargingSession.java`。
* **Mapper**: 创建 `ChargingSessionMapper.java`。
* **Service**: 创建 `ChargingSessionService.java` 接口和实现类。包含创建、更新状态、记录时间/费用、查询用户历史记录等方法。
* **注意**: 实现此 Service 后,需要回顾第二阶段的 `TaskTimeoutHandler.java`,补充当 `RobotTask` 超时后,调用此 Service 更新关联 `ChargingSession` 状态的逻辑。
* **创建 `ChargingController`**:
* 注入 `ChargingService`。
* 实现 `POST /api/charging/request` 接口: 调用 `chargingService.requestCharging(...)`。
* 实现 `POST /api/charging/stop` 接口: 调用 `chargingService.stopChargingByUser(...)`。
* 实现 `GET /api/charging/history` 接口: 调用 `chargingSessionService.getUserHistory(...)`。
* 实现 `GET /api/charging/sessions` 接口 (管理员): 调用 `chargingSessionService.getAllSessions(...)`。
4. **充电流程实现**:
* **创建 `ChargingService` (核心业务编排)**: 创建接口 `ChargingService.java` 和实现 `ChargingServiceImpl.java`。
* 注入 `UserService`, `ChargingRobotService`, `ParkingSpotService`, `ChargingSessionService`, `MqttService`, `RobotTaskService`。

View File

@@ -3,9 +3,11 @@ package com.yupi.project;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@MapperScan("com.yupi.project.mapper")
@EnableScheduling
public class MyApplication {
public static void main(String[] args) {

View File

@@ -0,0 +1,14 @@
package com.yupi.project.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.yupi.project.model.entity.RobotTask;
/**
* @author Yupi
* @description 针对表【robot_task(机器人指令任务表)】的数据库操作Mapper
* @createDate 2023-12-02 22:00:00
* @Entity com.yupi.project.model.entity.RobotTask
*/
public interface RobotTaskMapper extends BaseMapper<RobotTask> {
}

View File

@@ -0,0 +1,13 @@
package com.yupi.project.model.dto.mqtt;
import lombok.Data;
@Data
public class RobotStatusMessage {
private Long taskId;
private String status; // e.g., "PROCESSING", "COMPLETED", "FAILED"
private String message;
private String errorCode;
}

View File

@@ -0,0 +1,51 @@
package com.yupi.project.model.entity;
import com.baomidou.mybatisplus.annotation.*;
import com.yupi.project.model.enums.CommandTypeEnum;
import com.yupi.project.model.enums.RobotTaskStatusEnum;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
/**
* 机器人指令任务表实体
*/
@TableName(value ="robot_task")
@Data
public class RobotTask implements Serializable {
@TableId(type = IdType.AUTO)
private Long id;
private String robotId;
// Store enum as String in DB, but handle as Enum in Java
// Consider using MyBatis-Plus EnumTypeHandler.VARCHAR if direct mapping is problematic,
// or ensure the String value from the enum is used for persistence.
private CommandTypeEnum commandType;
private String commandPayload;
// Store enum as String in DB
private RobotTaskStatusEnum status;
private Date sentTime;
private Date ackTime;
private Long relatedSessionId;
private String errorMessage;
@TableField(fill = FieldFill.INSERT)
private Date createTime;
@TableField(fill = FieldFill.INSERT_UPDATE)
private Date updateTime;
@TableLogic // If soft delete is desired, though not specified in DDL
@TableField(select = false) // By default, don't select the delete flag unless explicitly queried
private Integer isDelete; // Common practice for @TableLogic
private static final long serialVersionUID = 1L;
}

View File

@@ -0,0 +1,22 @@
package com.yupi.project.model.enums;
import lombok.Getter;
/**
* 机器人指令任务的命令类型枚举
*/
@Getter
public enum CommandTypeEnum {
MOVE_TO_SPOT("MOVE_TO_SPOT", "移动到指定点"),
START_CHARGE("START_CHARGE", "开始充电"),
STOP_CHARGE("STOP_CHARGE", "停止充电"),
QUERY_STATUS("QUERY_STATUS", "查询状态");
private final String value;
private final String description;
CommandTypeEnum(String value, String description) {
this.value = value;
this.description = description;
}
}

View File

@@ -0,0 +1,39 @@
package com.yupi.project.model.enums;
import lombok.Getter;
import java.util.Arrays;
/**
* 机器人任务的状态枚举
*/
@Getter
public enum RobotTaskStatusEnum {
PENDING("PENDING", "待处理"),
SENT("SENT", "已发送"),
PROCESSING("PROCESSING", "处理中"),
COMPLETED("COMPLETED", "已完成"),
FAILED("FAILED", "已失败"),
TIMED_OUT("TIMED_OUT", "已超时");
private final String value;
private final String description;
RobotTaskStatusEnum(String value, String description) {
this.value = value;
this.description = description;
}
/**
* 根据 value 获取枚举
*
* @param value
* @return
*/
public static RobotTaskStatusEnum fromValue(String value) {
return Arrays.stream(values())
.filter(enumInstance -> enumInstance.value.equalsIgnoreCase(value))
.findFirst()
.orElse(null);
}
}

View File

@@ -1,5 +1,6 @@
package com.yupi.project.mqtt;
import com.yupi.project.service.impl.MqttMessageHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
@@ -7,7 +8,6 @@ import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Component;
// import com.yupi.project.service.MqttMessageHandler; // Will be uncommented and used later
import com.yupi.project.config.properties.MqttProperties;
@Slf4j
@@ -15,9 +15,9 @@ import com.yupi.project.config.properties.MqttProperties;
@RequiredArgsConstructor
public class MqttCallbackHandler implements MqttCallbackExtended {
// private final MqttMessageHandler mqttMessageHandler; // Will be uncommented and used later
private final MqttMessageHandler mqttMessageHandler;
private final MqttProperties mqttProperties;
private MqttClient mqttClient; // Setter needed or passed in constructor/method
private MqttClient mqttClient;
public void setMqttClient(MqttClient mqttClient) {
this.mqttClient = mqttClient;
@@ -26,12 +26,11 @@ public class MqttCallbackHandler implements MqttCallbackExtended {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
log.info("MQTT connection {} to broker: {}", reconnect ? "re-established" : "established", serverURI);
// Subscribe to the status topic upon connection/reconnection
try {
if (mqttClient != null && mqttClient.isConnected()) {
String statusTopic = mqttProperties.getStatusTopicBase() + "/+"; // Subscribe to all robot statuses
mqttClient.subscribe(statusTopic, mqttProperties.getDefaultQos());
log.info("Subscribed to MQTT topic: {}", statusTopic);
String statusTopic = mqttProperties.getStatusTopicBase() + "/+";
mqttClient.subscribe(statusTopic, mqttProperties.getDefaultQos(), mqttMessageHandler);
log.info("Subscribed to MQTT topic: {} with MqttMessageHandler", statusTopic);
} else {
log.warn("MQTT client not available or not connected, cannot subscribe to topic.");
}
@@ -43,21 +42,13 @@ public class MqttCallbackHandler implements MqttCallbackExtended {
@Override
public void connectionLost(Throwable cause) {
log.error("MQTT connection lost!", cause);
// Automatic reconnect is handled by MqttConnectOptions
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String payload = new String(message.getPayload());
log.debug("MQTT message arrived. Topic: {}, QoS: {}, Payload:\n{}", topic, message.getQos(), payload);
// TODO: Implement application-level authentication/validation of the message payload here or in MqttMessageHandler
// try {
// mqttMessageHandler.handleStatusUpdate(topic, payload); // Will be uncommented and used later
// } catch (Exception e) {
// log.error("Error handling MQTT message for topic {}: ", topic, e);
// }
log.warn("Unexpected MQTT message arrived in MqttCallbackHandler. Topic: {}, QoS: {}, Payload:\n{}. " +
"This should ideally be handled by a specific MqttMessageHandler.", topic, message.getQos(), payload);
}
@Override

View File

@@ -0,0 +1,23 @@
package com.yupi.project.service;
import com.yupi.project.model.enums.CommandTypeEnum;
/**
* MQTT 消息发布服务接口。
*/
public interface MqttService {
/**
* 向指定的机器人发送指令。
*
* @param robotId 机器人ID
* @param commandType 指令类型
* @param payloadJson 指令的参数内容 (JSON格式字符串)。对于无参数指令,可以为 null 或空字符串。
* @param sessionId 关联的充电会话ID (可选, 主要用于充电相关指令)
* @return 如果指令成功进入发送流程(任务已创建并尝试发布),返回 true
* 如果机器人正忙或创建任务失败等原因导致无法发送,返回 false。
* @throws Exception 如果MQTT发布过程中发生异常 (例如 MqttException)
*/
boolean sendCommand(String robotId, CommandTypeEnum commandType, String payloadJson, Long sessionId) throws Exception;
}

View File

@@ -0,0 +1,105 @@
package com.yupi.project.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.yupi.project.model.entity.RobotTask;
import com.yupi.project.model.enums.CommandTypeEnum;
import java.util.Date;
import java.util.List;
/**
* @author Yupi
* @description 针对表【robot_task(机器人指令任务表)】的数据库操作Service
* @createDate 2023-12-02 22:00:00
*/
public interface RobotTaskService extends IService<RobotTask> {
/**
* 检查指定机器人是否有正在处理PENDING 或 SENT的任务。
*
* @param robotId 机器人ID
* @return 如果有待处理或已发送任务,则返回 true否则 false
*/
boolean hasPendingOrSentTask(String robotId);
/**
* 创建一个新的机器人指令任务。
*
* @param robotId 机器人ID
* @param commandType 命令类型
* @param payloadJson 命令参数 (JSON格式)
* @param sessionId 关联的充电会话ID (可选)
* @return 创建的任务实体
*/
RobotTask createTask(String robotId, CommandTypeEnum commandType, String payloadJson, Long sessionId);
/**
* 将任务标记为已发送。
*
* @param taskId 任务ID
* @param sentTime 命令发送时间
* @return 是否成功标记
*/
boolean markTaskAsSent(Long taskId, Date sentTime);
/**
* 根据机器人ID查找最近一个已发送SENT但尚未收到响应的任务。
*
* @param robotId 机器人ID
* @return 如果找到,则返回任务实体,否则返回 null
*/
RobotTask findLatestSentTaskByRobotId(String robotId);
/**
* 将任务标记为已确认(成功或失败)。
* 注意:此方法后续可能被更具体的状态更新方法 (如 markTaskAsCompleted, markTaskAsFailed) 替代或补充。
*
* @param taskId 任务ID
* @param success 是否成功
* @param errorMessage 失败时的错误信息 (如果 success 为 true则此参数可为 null)
* @param ackTime 命令确认时间
* @return 是否成功标记
*/
boolean markTaskAsAcknowledged(Long taskId, boolean success, String errorMessage, Date ackTime);
/**
* 查找并标记超时的已发送任务。
* 将状态为 SENT 且发送时间早于 (当前时间 - timeoutSeconds) 的任务标记为 TIMED_OUT。
*
* @param timeoutSeconds 超时秒数
* @return 被标记为超时的任务列表
*/
List<RobotTask> findAndMarkTimedOutTasks(int timeoutSeconds);
/**
* 将任务标记为处理中。
*
* @param taskId 任务ID
* @param ackTime 确认处理中的时间 (可选)
* @param message 附加信息 (可选)
* @return 是否成功标记
*/
boolean markTaskAsProcessing(Long taskId, Date ackTime, String message);
/**
* 将任务标记为已完成。
*
* @param taskId 任务ID
* @param ackTime 完成时间 (可选)
* @param message 附加信息 (可选, 例如成功消息)
* @return 是否成功标记
*/
boolean markTaskAsCompleted(Long taskId, Date ackTime, String message);
/**
* 将任务标记为失败。
*
* @param taskId 任务ID
* @param ackTime 失败确认时间 (可选)
* @param errorCode 错误码 (可选)
* @param errorMessage 错误信息 (可选)
* @return 是否成功标记
*/
boolean markTaskAsFailed(Long taskId, Date ackTime, String errorCode, String errorMessage);
}

View File

@@ -0,0 +1,107 @@
package com.yupi.project.service.impl;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yupi.project.config.properties.MqttProperties;
import com.yupi.project.model.dto.mqtt.RobotStatusMessage;
import com.yupi.project.model.enums.RobotTaskStatusEnum;
import com.yupi.project.service.RobotTaskService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.util.Date;
@Slf4j
@Component
@RequiredArgsConstructor
public class MqttMessageHandler implements IMqttMessageListener {
private final RobotTaskService robotTaskService;
private final MqttProperties mqttProperties;
private final ObjectMapper objectMapper; // For JSON parsing
/**
* This method is called when a message arrives from the server.
*
* @param topic Name of the topic on the message was published to
* @param message The actual message.
* @throws Exception if a terminal error occurs, this call will be client driven
* re-delivery processes rely on the client storing messages which is not implemented.
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
log.info("MQTT Message Arrived. Topic: {}, QoS: {}, Payload: {}", topic, message.getQos(), payload);
// Extract robotId from topic. Example: "robot/status/+/robot123" or "robot/command_response/robot123"
// This logic might need adjustment based on the exact topic structure.
String[] topicParts = topic.split("/");
if (topicParts.length == 0) {
log.error("Received message on invalid topic format: {}", topic);
return;
}
String robotId = topicParts[topicParts.length - 1]; // Assuming robotId is the last part
// Determine message type based on topic structure (e.g., status update vs command response)
// For now, let's assume all messages on subscribed topics are status updates.
// A more robust solution would use different listeners for different topic patterns or inspect the payload.
if (topic.startsWith(mqttProperties.getStatusTopicBase())) {
handleRobotStatusUpdate(robotId, payload);
} else {
log.warn("Received message on unhandled topic structure: {}. Ignoring.", topic);
}
}
private void handleRobotStatusUpdate(String robotId, String payloadJson) {
try {
RobotStatusMessage statusMessage = objectMapper.readValue(payloadJson, RobotStatusMessage.class);
log.info("Parsed RobotStatusMessage for robot {}: {}", robotId, statusMessage);
if (statusMessage.getTaskId() == null) {
log.warn("RobotStatusMessage for robot {} does not contain a taskId. Payload: {}", robotId, payloadJson);
// Optionally, handle general status updates not tied to a specific task
return;
}
// Update the RobotTask based on the status message
RobotTaskStatusEnum newStatus = RobotTaskStatusEnum.fromValue(statusMessage.getStatus());
if (newStatus == null) {
log.error("Invalid status '{}' received from robot {}. Payload: {}", statusMessage.getStatus(), robotId, payloadJson);
return;
}
boolean updated;
switch (newStatus) {
case PROCESSING:
updated = robotTaskService.markTaskAsProcessing(statusMessage.getTaskId(), new Date(), statusMessage.getMessage());
break;
case COMPLETED:
updated = robotTaskService.markTaskAsCompleted(statusMessage.getTaskId(), new Date(), statusMessage.getMessage());
break;
case FAILED:
updated = robotTaskService.markTaskAsFailed(statusMessage.getTaskId(), new Date(), statusMessage.getErrorCode(), statusMessage.getMessage());
break;
default:
log.warn("Received unhandled RobotTaskStatusEnum: {} for task {} from robot {}. Ignoring.",
newStatus, statusMessage.getTaskId(), robotId);
return; // Don't attempt to update with SENT or PENDING from robot
}
if (updated) {
log.info("Successfully updated task {} to status {} for robot {} based on MQTT message.",
statusMessage.getTaskId(), newStatus, robotId);
} else {
log.warn("Failed to update task {} to status {} for robot {} (or task not found/invalid state transition). " +
"Message: {}", statusMessage.getTaskId(), newStatus, robotId, statusMessage.getMessage());
}
} catch (Exception e) {
log.error("Error processing robot status update for robot {}. Payload: {}. Error: {}",
robotId, payloadJson, e.getMessage(), e);
}
}
}

View File

@@ -0,0 +1,86 @@
package com.yupi.project.service.impl;
import com.yupi.project.config.properties.MqttProperties;
import com.yupi.project.model.entity.RobotTask;
import com.yupi.project.model.enums.CommandTypeEnum;
import com.yupi.project.service.MqttService;
import com.yupi.project.service.RobotTaskService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.nio.charset.StandardCharsets;
import java.util.Date;
@Service
@Slf4j
@RequiredArgsConstructor
public class MqttServiceImpl implements MqttService {
private final MqttClient mqttClient; // Autowired by Spring from MqttConfig
private final MqttProperties mqttProperties;
private final RobotTaskService robotTaskService;
@Override
@Transactional(rollbackFor = Exception.class) // Ensure rollback if MQTT publish fails after task creation
public boolean sendCommand(String robotId, CommandTypeEnum commandType, String payloadJson, Long sessionId) throws Exception {
log.info("Attempting to send command {} to robot {} with payload: {}", commandType, robotId, payloadJson);
// 1. Check if the robot has pending or already sent tasks
if (robotTaskService.hasPendingOrSentTask(robotId)) {
log.warn("Robot {} is busy (has PENDING or SENT tasks). Command {} aborted.", robotId, commandType);
return false;
}
// 2. Create a new task in PENDING state
RobotTask task = robotTaskService.createTask(robotId, commandType, payloadJson, sessionId);
if (task == null || task.getId() == null) {
log.error("Failed to create RobotTask for command {} to robot {}. Aborting send.", commandType, robotId);
return false;
}
log.info("Created PENDING RobotTask with ID: {} for command {} to robot {}.", task.getId(), commandType, robotId);
// 3. Construct the topic
String topic = mqttProperties.getCommandTopicBase() + "/" + robotId;
// 4. Prepare the MQTT message
String effectivePayload = (payloadJson == null) ? "" : payloadJson;
MqttMessage mqttMessage = new MqttMessage(effectivePayload.getBytes(StandardCharsets.UTF_8));
mqttMessage.setQos(mqttProperties.getDefaultQos());
// mqttMessage.setRetained(false); // Default is false
try {
// 5. Publish the message
if (!mqttClient.isConnected()) {
log.error("MQTT client is not connected. Cannot send command {} to robot {}. Task ID: {}", commandType, robotId, task.getId());
// Task remains PENDING. A scheduled job might retry or mark as error later.
return false;
}
log.debug("Publishing to topic: {}, QoS: {}, Payload: {}", topic, mqttMessage.getQos(), effectivePayload);
mqttClient.publish(topic, mqttMessage);
log.info("Successfully published command {} to robot {} on topic {}. Task ID: {}", commandType, robotId, topic, task.getId());
// 6. Mark the task as SENT
boolean markedAsSent = robotTaskService.markTaskAsSent(task.getId(), new Date());
if (!markedAsSent) {
log.warn("Command {} published to robot {}, but failed to mark task {} as SENT. System might be in an inconsistent state.",
commandType, robotId, task.getId());
// This is a critical warning. The command was sent, but DB state doesn't reflect it.
}
return true; // Command sent and task status update was attempted
} catch (MqttException e) {
log.error("MqttException while publishing command {} to robot {} (Task ID: {}). Error: {}",
commandType, robotId, task.getId(), e.getMessage(), e);
// The RobotTask remains PENDING. A retry mechanism or a timeout job could handle this later.
throw e; // Re-throw for transactional rollback
} catch (Exception e) {
log.error("Unexpected exception while sending command {} to robot {} (Task ID: {}). Error: {}",
commandType, robotId, task.getId(), e.getMessage(), e);
throw e; // Re-throw for transactional rollback
}
}
}

View File

@@ -0,0 +1,333 @@
package com.yupi.project.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.yupi.project.model.entity.RobotTask;
import com.yupi.project.model.enums.CommandTypeEnum;
import com.yupi.project.model.enums.RobotTaskStatusEnum;
import com.yupi.project.service.RobotTaskService;
import com.yupi.project.mapper.RobotTaskMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Collections;
import java.util.Date;
import java.util.List;
/**
* @author Yupi
* @description 针对表【robot_task(机器人指令任务表)】的数据库操作Service实现
* @createDate 2023-12-02 22:00:00
*/
@Service
@Slf4j
public class RobotTaskServiceImpl extends ServiceImpl<RobotTaskMapper, RobotTask>
implements RobotTaskService {
@Override
public boolean hasPendingOrSentTask(String robotId) {
if (robotId == null) {
log.warn("Cannot check for pending/sent tasks: robotId is null.");
return false; // Or throw an IllegalArgumentException
}
QueryWrapper<RobotTask> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("robot_id", robotId)
.in("status", RobotTaskStatusEnum.PENDING, RobotTaskStatusEnum.SENT);
// .eq("is_delete", 0); // Add if @TableLogic is strictly used and you want to exclude logically deleted
long count = this.count(queryWrapper);
if (count > 0) {
log.info("Robot {} has {} PENDING or SENT tasks.", robotId, count);
return true;
}
return false;
}
@Override
@Transactional
public RobotTask createTask(String robotId, CommandTypeEnum commandType, String payloadJson, Long sessionId) {
if (robotId == null || commandType == null) {
log.error("Cannot create task: robotId or commandType is null.");
// Or throw an IllegalArgumentException
return null;
}
RobotTask task = new RobotTask();
task.setRobotId(robotId);
task.setCommandType(commandType);
task.setCommandPayload(payloadJson);
task.setRelatedSessionId(sessionId);
task.setStatus(RobotTaskStatusEnum.PENDING); // Default status
// createTime and updateTime will be auto-filled by MyBatis-Plus
// isDelete will be handled by @TableLogic if enabled (default 0)
boolean saved = this.save(task);
if (saved) {
log.info("Created new RobotTask with ID: {}, Robot ID: {}, Command: {}, Status: PENDING",
task.getId(), robotId, commandType);
return task;
} else {
log.error("Failed to save new RobotTask for Robot ID: {}, Command: {}", robotId, commandType);
return null;
}
}
@Override
@Transactional
public boolean markTaskAsSent(Long taskId, Date sentTime) {
if (taskId == null || sentTime == null) {
log.error("Cannot mark task as sent: taskId or sentTime is null.");
return false;
}
RobotTask task = this.getById(taskId);
if (task == null) {
log.warn("Cannot mark task as sent: Task with ID {} not found.", taskId);
return false;
}
if (task.getStatus() != RobotTaskStatusEnum.PENDING) {
log.warn("Cannot mark task {} as SENT. Current status is {}, expected PENDING.", taskId, task.getStatus());
return false;
}
RobotTask updateTask = new RobotTask();
updateTask.setId(taskId);
updateTask.setStatus(RobotTaskStatusEnum.SENT);
updateTask.setSentTime(sentTime);
// updateTime will be auto-filled by MyBatis-Plus
boolean updated = this.updateById(updateTask);
if (updated) {
log.info("Marked RobotTask with ID: {} as SENT at {}.", taskId, sentTime);
} else {
log.error("Failed to mark RobotTask with ID: {} as SENT.", taskId);
}
return updated;
}
@Override
public RobotTask findLatestSentTaskByRobotId(String robotId) {
if (robotId == null) {
log.warn("Cannot find latest sent task: robotId is null.");
return null;
}
QueryWrapper<RobotTask> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("robot_id", robotId)
.eq("status", RobotTaskStatusEnum.SENT)
// .eq("is_delete", 0) // If using soft delete
.orderByDesc("create_time") // Or sent_time, depending on desired logic for "latest"
.last("LIMIT 1"); // Get only the most recent one
RobotTask task = this.getOne(queryWrapper);
if (task != null) {
log.info("Found latest SENT task with ID {} for Robot ID: {}.", task.getId(), robotId);
} else {
log.debug("No SENT task found for Robot ID: {}.", robotId);
}
return task;
}
@Override
@Transactional
public boolean markTaskAsAcknowledged(Long taskId, boolean success, String errorMessage, Date ackTime) {
if (taskId == null || ackTime == null) {
log.error("Cannot mark task as acknowledged: taskId or ackTime is null.");
return false;
}
RobotTask task = this.getById(taskId);
if (task == null) {
log.warn("Cannot mark task as acknowledged: Task with ID {} not found.", taskId);
return false;
}
// Optionally, check if current status allows this transition (e.g., from SENT)
if (task.getStatus() != RobotTaskStatusEnum.SENT) {
log.warn("Cannot mark task {} as acknowledged. Current status is {}, expected SENT.", taskId, task.getStatus());
// Depending on logic, might still allow ack if it's a late response to a PENDING task that never got marked SENT by our system
// For now, strict check from SENT.
return false;
}
RobotTask updateTask = new RobotTask();
updateTask.setId(taskId);
updateTask.setStatus(success ? RobotTaskStatusEnum.COMPLETED : RobotTaskStatusEnum.FAILED);
updateTask.setAckTime(ackTime);
if (!success && errorMessage != null) {
updateTask.setErrorMessage(errorMessage);
} else if (success) {
updateTask.setErrorMessage(null); // Clear any previous error message on success
}
// updateTime will be auto-filled
boolean updated = this.updateById(updateTask);
if (updated) {
log.info("Marked RobotTask with ID: {} as {}{}.", taskId,
(success ? RobotTaskStatusEnum.COMPLETED.getValue() : RobotTaskStatusEnum.FAILED.getValue()),
(success ? "" : " with error: " + errorMessage));
} else {
log.error("Failed to mark RobotTask with ID: {} as acknowledged.", taskId);
}
return updated;
}
@Override
@Transactional
public List<RobotTask> findAndMarkTimedOutTasks(int timeoutSeconds) {
if (timeoutSeconds <= 0) {
log.warn("Invalid timeoutSeconds: {}. Must be positive.", timeoutSeconds);
return Collections.emptyList();
}
// Calculate the time before which tasks are considered timed out
// Use Calendar to subtract seconds, more robust for date arithmetic
java.util.Calendar cal = java.util.Calendar.getInstance();
cal.add(java.util.Calendar.SECOND, -timeoutSeconds);
Date timeoutThreshold = cal.getTime();
QueryWrapper<RobotTask> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("status", RobotTaskStatusEnum.SENT)
.lt("sent_time", timeoutThreshold);
// .eq("is_delete", 0); // If using soft delete
List<RobotTask> timedOutTasks = this.list(queryWrapper);
if (timedOutTasks.isEmpty()) {
log.debug("No tasks found that timed out (threshold: {}s, before {}).", timeoutSeconds, timeoutThreshold);
return Collections.emptyList();
}
log.info("Found {} tasks that timed out (threshold: {}s, sent before {}). Attempting to mark them.",
timedOutTasks.size(), timeoutSeconds, timeoutThreshold);
for (RobotTask task : timedOutTasks) {
RobotTask updateTask = new RobotTask();
updateTask.setId(task.getId());
updateTask.setStatus(RobotTaskStatusEnum.TIMED_OUT);
updateTask.setErrorMessage("Task timed out after " + timeoutSeconds + " seconds.");
// ackTime might not be relevant for a system-initiated timeout, or could be set to now
updateTask.setAckTime(new Date()); // Or null, depending on definition
// updateTime will be auto-filled
boolean updated = this.updateById(updateTask);
if (updated) {
log.info("Marked task {} (Robot ID: {}) as TIMED_OUT.", task.getId(), task.getRobotId());
// TODO: Potentially trigger further actions for a timed-out task (e.g., notify admin, update related session)
} else {
log.error("Failed to mark task {} (Robot ID: {}) as TIMED_OUT.", task.getId(), task.getRobotId());
// Add to a list of tasks that failed to update, if needed for retry or reporting
}
}
return timedOutTasks; // Return the list of tasks that were identified as timed out
}
@Override
@Transactional
public boolean markTaskAsProcessing(Long taskId, Date ackTime, String message) {
if (taskId == null) {
log.error("Cannot mark task as processing: taskId is null.");
return false;
}
RobotTask task = this.getById(taskId);
if (task == null) {
log.warn("Cannot mark task as processing: Task with ID {} not found.", taskId);
return false;
}
// Allow transition from PENDING or SENT to PROCESSING
if (task.getStatus() != RobotTaskStatusEnum.PENDING && task.getStatus() != RobotTaskStatusEnum.SENT) {
log.warn("Cannot mark task {} as PROCESSING. Current status is {}, expected PENDING or SENT.", taskId, task.getStatus());
return false;
}
RobotTask updateTask = new RobotTask();
updateTask.setId(taskId);
updateTask.setStatus(RobotTaskStatusEnum.PROCESSING);
if (ackTime != null) { // ackTime might be the time processing was confirmed
updateTask.setAckTime(ackTime);
}
if (message != null) {
updateTask.setErrorMessage(message); // Using errorMessage field for general status messages during processing
}
boolean updated = this.updateById(updateTask);
if (updated) {
log.info("Marked RobotTask with ID: {} as PROCESSING{}.", taskId, (message != null ? " with message: " + message : ""));
} else {
log.error("Failed to mark RobotTask with ID: {} as PROCESSING.", taskId);
}
return updated;
}
@Override
@Transactional
public boolean markTaskAsCompleted(Long taskId, Date ackTime, String message) {
if (taskId == null) {
log.error("Cannot mark task as completed: taskId is null.");
return false;
}
RobotTask task = this.getById(taskId);
if (task == null) {
log.warn("Cannot mark task as completed: Task with ID {} not found.", taskId);
return false;
}
// Allow transition from PENDING, SENT, or PROCESSING to COMPLETED
if (task.getStatus() != RobotTaskStatusEnum.PENDING && task.getStatus() != RobotTaskStatusEnum.SENT && task.getStatus() != RobotTaskStatusEnum.PROCESSING) {
log.warn("Cannot mark task {} as COMPLETED. Current status is {}, expected PENDING, SENT or PROCESSING.", taskId, task.getStatus());
return false;
}
RobotTask updateTask = new RobotTask();
updateTask.setId(taskId);
updateTask.setStatus(RobotTaskStatusEnum.COMPLETED);
if (ackTime != null) {
updateTask.setAckTime(ackTime);
}
updateTask.setErrorMessage(message); // Can be a success message or null
boolean updated = this.updateById(updateTask);
if (updated) {
log.info("Marked RobotTask with ID: {} as COMPLETED{}.", taskId, (message != null ? " with message: " + message : ""));
} else {
log.error("Failed to mark RobotTask with ID: {} as COMPLETED.", taskId);
}
return updated;
}
@Override
@Transactional
public boolean markTaskAsFailed(Long taskId, Date ackTime, String errorCode, String errorMessage) {
if (taskId == null) {
log.error("Cannot mark task as failed: taskId is null.");
return false;
}
RobotTask task = this.getById(taskId);
if (task == null) {
log.warn("Cannot mark task as failed: Task with ID {} not found.", taskId);
return false;
}
// Allow transition from any non-terminal state (PENDING, SENT, PROCESSING) to FAILED
if (task.getStatus() == RobotTaskStatusEnum.COMPLETED || task.getStatus() == RobotTaskStatusEnum.FAILED || task.getStatus() == RobotTaskStatusEnum.TIMED_OUT) {
log.warn("Cannot mark task {} as FAILED. Task is already in a terminal state: {}.", taskId, task.getStatus());
return false;
}
RobotTask updateTask = new RobotTask();
updateTask.setId(taskId);
updateTask.setStatus(RobotTaskStatusEnum.FAILED);
if (ackTime != null) {
updateTask.setAckTime(ackTime);
}
// Construct a comprehensive error message if both are provided
String finalErrorMessage = (errorCode != null ? "Code [" + errorCode + "] " : "") + (errorMessage != null ? errorMessage : "");
updateTask.setErrorMessage(finalErrorMessage.isEmpty() ? null : finalErrorMessage);
boolean updated = this.updateById(updateTask);
if (updated) {
log.info("Marked RobotTask with ID: {} as FAILED. Error: {}", taskId, finalErrorMessage);
} else {
log.error("Failed to mark RobotTask with ID: {} as FAILED.", taskId);
}
return updated;
}
}

View File

@@ -0,0 +1,50 @@
package com.yupi.project.service.impl;
import com.yupi.project.model.entity.RobotTask;
import com.yupi.project.service.RobotTaskService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.stream.Collectors;
@Slf4j
@Component
@RequiredArgsConstructor
public class TaskTimeoutHandler {
private final RobotTaskService robotTaskService;
@Value("${mqtt.task.timeoutSeconds:300}") // 默认5分钟超时
private int taskTimeoutSeconds;
// 默认每分钟检查一次,可以根据实际需求调整
// cron = "秒 分 时 日 月 周"
// 例如 "0 * * * * ?" 每分钟的第0秒执行
// fixedRate = 60000 (毫秒) 每隔60秒执行一次从上一次任务开始时计时
// fixedDelay = 60000 (毫秒) 每隔60秒执行一次从上一次任务完成时计时
@Scheduled(fixedRateString = "${mqtt.task.timeoutCheckRateMs:60000}") // 默认每60秒执行一次
public void checkAndHandleTimedOutTasks() {
log.info("Scheduled task: Checking for timed-out robot tasks (timeout: {}s).", taskTimeoutSeconds);
try {
List<RobotTask> timedOutTasks = robotTaskService.findAndMarkTimedOutTasks(taskTimeoutSeconds);
if (timedOutTasks != null && !timedOutTasks.isEmpty()) {
log.warn("Marked {} tasks as TIMED_OUT: {}", timedOutTasks.size(), timedOutTasks.stream().map(RobotTask::getId).collect(Collectors.toList()));
// 可选:在这里添加进一步处理逻辑,例如通知、更新其他关联实体的状态等
// 这部分逻辑将在第三阶段 ChargingSession 模块实现后补充。
// for (RobotTask task : timedOutTasks) {
// if (task.getRelatedSessionId() != null) {
// // chargingSessionService.markSessionAsTaskTimedOut(task.getRelatedSessionId());
// }
// }
} else {
log.info("No tasks found to be timed out during this check.");
}
} catch (Exception e) {
log.error("Error during scheduled task execution of checkAndHandleTimedOutTasks: ", e);
}
}
}

View File

@@ -59,4 +59,7 @@ mqtt:
connection-timeout: 30 # Connection timeout in seconds
keep-alive-interval: 60 # Keep alive interval in seconds
command-topic-base: yupi_mqtt_power_project/robot/command # Prefixed base topic for sending commands
status-topic-base: yupi_mqtt_power_project/robot/status # Prefixed base topic for receiving status
status-topic-base: yupi_mqtt_power_project/robot/status # Prefixed base topic for receiving status
task: # Task specific configurations
timeoutSeconds: 300 # Default 300 seconds (5 minutes) for a task to be considered timed out
timeoutCheckRateMs: 60000 # Default 60000 ms (1 minute) for how often to check for timed out tasks