mqtt消息记录开发完成

This commit is contained in:
2025-05-23 11:03:42 +08:00
parent abf14bd42f
commit 49f1220310
27 changed files with 1787 additions and 66 deletions

View File

@@ -0,0 +1,105 @@
package com.yupi.project.aop;
import com.yupi.project.config.properties.MqttLoggerProperties;
import com.yupi.project.model.entity.RobotTask;
import com.yupi.project.service.MqttCommunicationLogService;
import com.yupi.project.service.RobotTaskService;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
@Aspect
@Component
@Slf4j
public class MqttPublishLogAspect {
private final MqttCommunicationLogService logService;
private final MqttLoggerProperties loggerProperties;
private final RobotTaskService robotTaskService;
private final MqttClient mainMqttClient;
@Autowired
public MqttPublishLogAspect(MqttCommunicationLogService logService,
MqttLoggerProperties loggerProperties,
RobotTaskService robotTaskService,
@Qualifier("mqttClientBean") MqttClient mainMqttClient) {
this.logService = logService;
this.loggerProperties = loggerProperties;
this.robotTaskService = robotTaskService;
this.mainMqttClient = mainMqttClient;
}
@Around("execution(* com.yupi.project.service.impl.MqttServiceImpl.sendCommand(..)) || execution(* com.yupi.project.service.MqttService.sendCommand(..))")
public Object logMqttSendCommand(ProceedingJoinPoint joinPoint) throws Throwable {
if (!loggerProperties.isEnabled()) {
return joinPoint.proceed();
}
Object[] args = joinPoint.getArgs();
String robotIdArg = null;
String commandTypeArg = null;
String payloadJsonArg = null;
Long sessionIdArg = null;
Long taskIdForLog = null;
if (args.length >= 3) {
robotIdArg = (String) args[0];
commandTypeArg = args[1].toString();
payloadJsonArg = (String) args[2];
if (args.length >= 4 && args[3] instanceof Long) {
sessionIdArg = (Long) args[3];
}
}
String topic = null;
Integer qos = null;
Boolean retained = false;
String messageIdForLog = UUID.randomUUID().toString();
Object result = null;
try {
result = joinPoint.proceed();
} catch (Throwable throwable) {
logService.asyncLogDownstreamMessage(
"UNKNOWN_TOPIC_PRE_SEND_FAILURE",
payloadJsonArg,
qos,
retained,
mainMqttClient.getClientId(),
messageIdForLog,
sessionIdArg,
null
);
log.error("Exception during sendCommand, logged with placeholder data. RobotId: {}, Command: {}", robotIdArg, commandTypeArg, throwable);
throw throwable;
}
if (robotIdArg != null && sessionIdArg !=null) {
RobotTask latestTask = robotTaskService.findLatestTaskByRobotIdAndSessionId(robotIdArg,sessionIdArg);
if(latestTask != null){
taskIdForLog = latestTask.getId();
}
}
logService.asyncLogDownstreamMessage(
args.length > 0 ? ("CMD_TO_" + args[0]) : "UNKNOWN_TOPIC_POST_SEND",
payloadJsonArg,
1,
false,
mainMqttClient.getClientId(),
messageIdForLog,
sessionIdArg,
taskIdForLog
);
return result;
}
}

View File

@@ -0,0 +1,39 @@
package com.yupi.project.config;
import com.yupi.project.config.properties.MqttLoggerProperties;
import lombok.RequiredArgsConstructor;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.aop.interceptor.SimpleAsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@Configuration
@EnableAsync
@RequiredArgsConstructor
public class MqttLoggerAsyncConfigurer implements AsyncConfigurer {
private final MqttLoggerProperties mqttLoggerProperties;
@Override
@Bean(name = "mqttLoggerThreadPoolTaskExecutor")
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
MqttLoggerProperties.Async asyncProps = mqttLoggerProperties.getAsync();
executor.setCorePoolSize(asyncProps.getCorePoolSize());
executor.setMaxPoolSize(asyncProps.getMaxPoolSize());
executor.setQueueCapacity(asyncProps.getQueueCapacity());
executor.setThreadNamePrefix(asyncProps.getThreadNamePrefix());
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new SimpleAsyncUncaughtExceptionHandler();
}
}

View File

@@ -0,0 +1,45 @@
package com.yupi.project.config;
import com.yupi.project.config.properties.MqttLoggerProperties;
import com.yupi.project.config.properties.MqttProperties;
import com.yupi.project.mqtt.MqttLoggerCallbackHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import java.util.UUID;
@Slf4j
@Configuration
@RequiredArgsConstructor
public class MqttLoggerClientConfig {
private final MqttProperties mainMqttProperties;
private final MqttLoggerProperties mqttLoggerProperties;
private final MqttConnectOptions mqttConnectOptions;
@Bean(name = "mqttLoggerClient")
@Lazy
public MqttClient mqttLoggerClient(@Qualifier("mqttLoggerCallbackHandler") MqttLoggerCallbackHandler loggerCallbackHandler)
throws MqttException {
if (!mqttLoggerProperties.isEnabled()) {
log.info("MQTT Logger Client is disabled via configuration.");
return null;
}
String clientId = mqttLoggerProperties.getClientIdPrefix() + UUID.randomUUID().toString().replace("-", "");
log.info("Initializing MQTT Logger Client with Broker URL: {} and Client ID: {}", mainMqttProperties.getBrokerUrl(), clientId);
MqttClient client = new MqttClient(mainMqttProperties.getBrokerUrl(), clientId, new MemoryPersistence());
client.setCallback(loggerCallbackHandler);
loggerCallbackHandler.setMqttClient(client);
return client;
}
}

View File

@@ -0,0 +1,32 @@
package com.yupi.project.config.properties;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.List;
@Data
@Component
@ConfigurationProperties(prefix = "mqtt.logger")
public class MqttLoggerProperties {
private boolean enabled = false;
private String clientIdPrefix;
private List<String> topics;
private Async async = new Async();
private Retention retention = new Retention();
@Data
public static class Async {
private int corePoolSize = 1;
private int maxPoolSize = 5;
private int queueCapacity = 100;
private String threadNamePrefix = "mqtt-log-async-";
}
@Data
public static class Retention {
private int days = 30;
}
}

View File

@@ -0,0 +1,114 @@
package com.yupi.project.controller;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.yupi.project.common.BaseResponse;
import com.yupi.project.common.ErrorCode;
import com.yupi.project.common.ResultUtils;
import com.yupi.project.exception.BusinessException;
import com.yupi.project.model.dto.mqttlog.MqttLogQueryRequest;
import com.yupi.project.model.entity.MqttCommunicationLog;
import com.yupi.project.service.MqttCommunicationLogService;
import com.yupi.project.annotation.AuthCheck;
import com.yupi.project.constant.UserConstant;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
import java.util.Date;
@RestController
@RequestMapping("/admin/mqtt-log")
@RequiredArgsConstructor
@Slf4j
public class MqttCommunicationLogController {
private final MqttCommunicationLogService mqttCommunicationLogService;
/**
* 分页获取MQTT通信日志列表
*
* @param mqttLogQueryRequest
* @param request
* @return
*/
@PostMapping("/list/page")
@AuthCheck(mustRole = UserConstant.ADMIN_ROLE)
public BaseResponse<Page<MqttCommunicationLog>> listMqttCommunicationLogsByPage(@RequestBody MqttLogQueryRequest mqttLogQueryRequest, HttpServletRequest request) {
if (mqttLogQueryRequest == null) {
throw new BusinessException(ErrorCode.PARAMS_ERROR);
}
long current = mqttLogQueryRequest.getCurrent();
long size = mqttLogQueryRequest.getPageSize();
// 限制爬虫
if (size > 50) {
throw new BusinessException(ErrorCode.PARAMS_ERROR, "请求数据量过大");
}
QueryWrapper<MqttCommunicationLog> queryWrapper = getQueryWrapper(mqttLogQueryRequest);
Page<MqttCommunicationLog> logPage = mqttCommunicationLogService.page(new Page<>(current, size), queryWrapper);
return ResultUtils.success(logPage);
}
private QueryWrapper<MqttCommunicationLog> getQueryWrapper(MqttLogQueryRequest queryRequest) {
QueryWrapper<MqttCommunicationLog> queryWrapper = new QueryWrapper<>();
if (queryRequest == null) {
return queryWrapper;
}
String messageId = queryRequest.getMessageId();
String direction = queryRequest.getDirection();
String clientId = queryRequest.getClientId();
String topic = queryRequest.getTopic();
String payloadContains = queryRequest.getPayloadContains();
Integer qos = queryRequest.getQos();
Date startTime = queryRequest.getStartTime();
Date endTime = queryRequest.getEndTime();
Long relatedSessionId = queryRequest.getRelatedSessionId();
Long relatedTaskId = queryRequest.getRelatedTaskId();
String sortField = queryRequest.getSortField();
String sortOrder = queryRequest.getSortOrder();
if (StringUtils.isNotBlank(messageId)) {
queryWrapper.like("message_id", messageId);
}
if (StringUtils.isNotBlank(direction)) {
queryWrapper.eq("direction", direction);
}
if (StringUtils.isNotBlank(clientId)) {
queryWrapper.like("client_id", clientId);
}
if (StringUtils.isNotBlank(topic)) {
queryWrapper.like("topic", topic);
}
if (StringUtils.isNotBlank(payloadContains)) {
queryWrapper.like("payload", payloadContains);
}
if (qos != null) {
queryWrapper.eq("qos", qos);
}
if (startTime != null) {
queryWrapper.ge("log_timestamp", startTime);
}
if (endTime != null) {
queryWrapper.le("log_timestamp", endTime);
}
if (relatedSessionId != null) {
queryWrapper.eq("related_session_id", relatedSessionId);
}
if (relatedTaskId != null) {
queryWrapper.eq("related_task_id", relatedTaskId);
}
// 默认按日志时间降序排序
if (StringUtils.isNotBlank(sortField)) {
boolean isAsc = "asc".equalsIgnoreCase(sortOrder);
queryWrapper.orderBy(true, isAsc, sortField);
} else {
queryWrapper.orderByDesc("log_timestamp");
}
return queryWrapper;
}
}

View File

@@ -0,0 +1,7 @@
package com.yupi.project.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.yupi.project.model.entity.MqttCommunicationLog;
public interface MqttCommunicationLogMapper extends BaseMapper<MqttCommunicationLog> {
}

View File

@@ -0,0 +1,32 @@
package com.yupi.project.model.dto.mqttlog;
import com.yupi.project.common.PageRequest;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.springframework.format.annotation.DateTimeFormat;
import java.io.Serializable;
import java.util.Date;
@EqualsAndHashCode(callSuper = true)
@Data
public class MqttLogQueryRequest extends PageRequest implements Serializable {
private String messageId;
private String direction;
private String clientId;
private String topic;
private String payloadContains;
private Integer qos;
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date startTime;
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date endTime;
private Long relatedSessionId;
private Long relatedTaskId;
private static final long serialVersionUID = 1L;
}

View File

@@ -0,0 +1,46 @@
package com.yupi.project.model.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
@TableName(value = "mqtt_communication_log")
@Data
public class MqttCommunicationLog implements Serializable {
@TableId(type = IdType.AUTO)
private Long id;
private String messageId;
private String direction; // UPSTREAM, DOWNSTREAM
private String clientId;
private String topic;
private String payloadFormat; // TEXT, JSON, BINARY
private String payload;
private Integer qos;
private Boolean isRetained;
private Date logTimestamp;
private String backendProcessingStatus; // RECEIVED, PROCESSING, SUCCESS, FAILED
private String backendProcessingInfo;
private Long relatedSessionId;
private Long relatedTaskId;
@TableField(exist = false)
private static final long serialVersionUID = 1L;
}

View File

@@ -1,24 +1,31 @@
package com.yupi.project.mqtt;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
public class MqttConnectionManager implements ApplicationListener<ContextRefreshedEvent>, DisposableBean {
private final MqttClient mqttClient;
private final MqttConnectOptions mqttConnectOptions;
// private final MqttProperties mqttProperties; // Injected if needed for logging brokerUrl, or get from mqttClient.getServerURI()
@Autowired
public MqttConnectionManager(@Qualifier("mqttClientBean") MqttClient mqttClient,
MqttConnectOptions mqttConnectOptions) {
this.mqttClient = mqttClient;
this.mqttConnectOptions = mqttConnectOptions;
}
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
// Ensure this logic runs only once, for the root application context

View File

@@ -0,0 +1,91 @@
package com.yupi.project.mqtt;
import com.yupi.project.config.properties.MqttLoggerProperties;
import com.yupi.project.config.properties.MqttProperties; // 主业务的 properties获取QoS等
import com.yupi.project.service.MqttCommunicationLogService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.UUID;
@Slf4j
@Component("mqttLoggerCallbackHandler") // 指定Bean名称
public class MqttLoggerCallbackHandler implements MqttCallbackExtended {
private final MqttCommunicationLogService logService;
private final MqttLoggerProperties loggerProperties;
private final MqttProperties mainMqttProperties; // 用于获取默认QoS等
private MqttClient mqttLoggerClient; // 通过setter注入
public MqttLoggerCallbackHandler(MqttCommunicationLogService logService,
MqttLoggerProperties loggerProperties,
MqttProperties mainMqttProperties) {
this.logService = logService;
this.loggerProperties = loggerProperties;
this.mainMqttProperties = mainMqttProperties;
}
public void setMqttClient(@Qualifier("mqttLoggerClient") MqttClient mqttLoggerClient) {
this.mqttLoggerClient = mqttLoggerClient;
}
@Override
public void connectComplete(boolean reconnect, String serverURI) {
log.info("MQTT Logger Client connection {} to broker: {}", reconnect ? "re-established" : "established", serverURI);
if (!loggerProperties.isEnabled() || mqttLoggerClient == null) {
log.warn("MQTT Logger is disabled or client not initialized. Cannot subscribe to topics for logging.");
return;
}
try {
List<String> topicsToLog = loggerProperties.getTopics();
if (topicsToLog != null && !topicsToLog.isEmpty()) {
for (String topicFilter : topicsToLog) {
// 使用主配置的QoS等级进行订阅或者在loggerProperties中单独配置
mqttLoggerClient.subscribe(topicFilter, mainMqttProperties.getDefaultQos());
log.info("MQTT Logger Client subscribed to topic: {} for logging.", topicFilter);
}
} else {
log.warn("No topics configured for MQTT Logger Client to subscribe.");
}
} catch (Exception e) {
log.error("Error subscribing MQTT Logger Client to topics: ", e);
}
}
@Override
public void connectionLost(Throwable cause) {
log.error("MQTT Logger Client connection lost!", cause);
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
if (!loggerProperties.isEnabled()) {
return; // 如果日志功能被禁用,则不处理
}
// 异步记录上行消息 (UPSTREAM)
// clientId 可以从 mqttLoggerClient.getClientId() 获取,表示是日志客户端收到的
// messageIdStr可以尝试从 MQTTv5 属性获取,如果使用的是 MQTTv3Paho 的 message.getId() 是内部ID对于 broker 可能无意义
// 对于日志记录,如果 message.getId() > 0可以用它否则生成一个UUID
String messageIdStr = message.getId() > 0 ? String.valueOf(message.getId()) : UUID.randomUUID().toString();
logService.asyncLogMessage(topic, message, "UPSTREAM", mqttLoggerClient.getClientId(), messageIdStr, null, null, null, null);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// Logger client typically does not publish, so this might not be very relevant
// unless it's used for some diagnostic publishing.
try {
if (token != null && token.isComplete() && token.getMessage() != null) {
log.trace("MQTT Logger Client: Delivery complete for message ID: {}", token.getMessageId());
}
} catch (Exception e) {
log.error("Error in MQTT Logger Client deliveryComplete: ", e);
}
}
}

View File

@@ -0,0 +1,76 @@
package com.yupi.project.mqtt;
import com.yupi.project.config.properties.MqttLoggerProperties;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
public class MqttLoggerConnectionManager implements ApplicationListener<ContextRefreshedEvent>, DisposableBean {
@Qualifier("mqttLoggerClient")
private final MqttClient mqttLoggerClient; // 注入日志专用客户端
private final MqttConnectOptions mqttConnectOptions; // 可复用主连接配置
private final MqttLoggerProperties mqttLoggerProperties;
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (event.getApplicationContext().getParent() == null) { // Ensure root context
if (mqttLoggerProperties.isEnabled() && mqttLoggerClient != null) {
connectToMqtt();
}
}
}
private void connectToMqtt() {
try {
if (!mqttLoggerClient.isConnected()) {
log.info("Attempting to connect MQTT Logger Client: {} to broker: {}", mqttLoggerClient.getClientId(), mqttLoggerClient.getServerURI());
mqttLoggerClient.connect(mqttConnectOptions);
// Subscription logic is handled by MqttLoggerCallbackHandler.connectComplete
} else {
log.info("MQTT Logger Client {} is already connected.", mqttLoggerClient.getClientId());
}
} catch (MqttException e) {
log.error("Error connecting MQTT Logger Client: ", e);
} catch (Exception e) {
log.error("Unexpected error during MQTT Logger Client connection: ", e);
}
}
@Override
public void destroy() throws Exception {
if (mqttLoggerProperties.isEnabled() && mqttLoggerClient != null) {
disconnectFromMqtt();
}
}
private void disconnectFromMqtt() {
try {
if (mqttLoggerClient.isConnected()) {
log.info("Disconnecting MQTT Logger Client: {} from broker: {}", mqttLoggerClient.getClientId(), mqttLoggerClient.getServerURI());
mqttLoggerClient.disconnect();
log.info("MQTT Logger Client {} disconnected successfully.", mqttLoggerClient.getClientId());
}
} catch (MqttException e) {
log.error("Error disconnecting MQTT Logger Client: ", e);
} finally {
try {
log.info("Closing MQTT Logger Client: {}", mqttLoggerClient.getClientId());
mqttLoggerClient.close();
log.info("MQTT Logger Client {} closed successfully.", mqttLoggerClient.getClientId());
} catch (MqttException e) {
log.error("Error closing MQTT Logger Client: ", e);
}
}
}
}

View File

@@ -0,0 +1,31 @@
package com.yupi.project.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.yupi.project.model.entity.MqttCommunicationLog;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public interface MqttCommunicationLogService extends IService<MqttCommunicationLog> {
/**
* 异步记录MQTT消息
*
* @param topic 主题
* @param message MQTT消息对象
* @param direction 方向 (UPSTREAM/DOWNSTREAM)
* @param clientId 客户端ID
* @param messageIdStr MQTT v5 的 Message ID 或应用生成的UUID
* @param backendProcessingStatus 后端处理状态 (可选)
* @param backendProcessingInfo 后端处理信息 (可选)
* @param relatedSessionId 关联的会话ID (可选)
* @param relatedTaskId 关联的任务ID (可选)
*/
void asyncLogMessage(String topic, MqttMessage message, String direction, String clientId,
String messageIdStr, String backendProcessingStatus,
String backendProcessingInfo, Long relatedSessionId, Long relatedTaskId);
/**
* 异步记录出站 (DOWNSTREAM) MQTT消息的简化版本。
* 通常在消息发布后调用。
*/
void asyncLogDownstreamMessage(String topic, String payload, Integer qos, Boolean retained, String clientId, String messageIdStr, Long relatedSessionId, Long relatedTaskId);
}

View File

@@ -101,4 +101,13 @@ public interface RobotTaskService extends IService<RobotTask> {
*/
boolean markTaskAsFailed(Long taskId, String errorMessage, Date failedTime);
/**
* 根据机器人ID和会话ID查找最新的任务。
*
* @param robotId 机器人ID
* @param sessionId 会话ID
* @return 如果找到,则返回任务实体,否则返回 null
*/
RobotTask findLatestTaskByRobotIdAndSessionId(String robotId, Long sessionId);
}

View File

@@ -0,0 +1,105 @@
package com.yupi.project.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.yupi.project.mapper.MqttCommunicationLogMapper;
import com.yupi.project.model.entity.MqttCommunicationLog;
import com.yupi.project.service.MqttCommunicationLogService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.UUID;
@Service
@Slf4j
public class MqttCommunicationLogServiceImpl extends ServiceImpl<MqttCommunicationLogMapper, MqttCommunicationLog>
implements MqttCommunicationLogService {
private static final String DIRECTION_UPSTREAM = "UPSTREAM";
private static final String DIRECTION_DOWNSTREAM = "DOWNSTREAM";
private static final String PAYLOAD_FORMAT_TEXT = "TEXT";
private static final String PAYLOAD_FORMAT_JSON = "JSON"; // Assume JSON if parseable
private static final String PAYLOAD_FORMAT_BINARY = "BINARY";
@Override
@Async("mqttLoggerThreadPoolTaskExecutor") // Ensure this bean name matches your AsyncConfigurer
public void asyncLogMessage(String topic, MqttMessage message, String direction, String clientId,
String messageIdStr, String backendProcessingStatus,
String backendProcessingInfo, Long relatedSessionId, Long relatedTaskId) {
try {
MqttCommunicationLog logEntry = new MqttCommunicationLog();
logEntry.setMessageId(messageIdStr != null ? messageIdStr : (message.getId() > 0 ? String.valueOf(message.getId()) : null) );
logEntry.setDirection(direction);
logEntry.setClientId(clientId);
logEntry.setTopic(topic);
byte[] payloadBytes = message.getPayload();
String payloadString = new String(payloadBytes, StandardCharsets.UTF_8);
logEntry.setPayload(payloadString); // Store as string
// Basic payload format detection (can be enhanced)
if (isJson(payloadString)) {
logEntry.setPayloadFormat(PAYLOAD_FORMAT_JSON);
} else {
logEntry.setPayloadFormat(PAYLOAD_FORMAT_TEXT); // Default to TEXT, could be BINARY if not UTF-8 decodable well
}
// For true binary, would need different handling or indication
logEntry.setQos(message.getQos());
logEntry.setIsRetained(message.isRetained());
logEntry.setLogTimestamp(new Date()); // Record time when log entry is created
logEntry.setBackendProcessingStatus(backendProcessingStatus);
logEntry.setBackendProcessingInfo(backendProcessingInfo);
logEntry.setRelatedSessionId(relatedSessionId);
logEntry.setRelatedTaskId(relatedTaskId);
this.save(logEntry);
} catch (Exception e) {
log.error("Failed to asynchronously log MQTT message. Topic: {}, Direction: {}, ClientId: {}", topic, direction, clientId, e);
}
}
@Override
@Async("mqttLoggerThreadPoolTaskExecutor")
public void asyncLogDownstreamMessage(String topic, String payload, Integer qos, Boolean retained, String clientId, String messageIdStr, Long relatedSessionId, Long relatedTaskId) {
try {
MqttCommunicationLog logEntry = new MqttCommunicationLog();
logEntry.setMessageId(messageIdStr != null ? messageIdStr : UUID.randomUUID().toString()); // Generate UUID if not provided
logEntry.setDirection(DIRECTION_DOWNSTREAM);
logEntry.setClientId(clientId);
logEntry.setTopic(topic);
logEntry.setPayload(payload);
if (isJson(payload)) {
logEntry.setPayloadFormat(PAYLOAD_FORMAT_JSON);
} else {
logEntry.setPayloadFormat(PAYLOAD_FORMAT_TEXT);
}
logEntry.setQos(qos);
logEntry.setIsRetained(retained != null ? retained : false);
logEntry.setLogTimestamp(new Date());
// For downstream, processing status/info is typically not set at the moment of logging send attempt
// It might be updated later if a response is expected and tracked.
logEntry.setRelatedSessionId(relatedSessionId);
logEntry.setRelatedTaskId(relatedTaskId);
this.save(logEntry);
} catch (Exception e) {
log.error("Failed to asynchronously log downstream MQTT message. Topic: {}, ClientId: {}", topic, clientId, e);
}
}
private boolean isJson(String str) {
if (str == null || str.isEmpty()) {
return false;
}
String trimmedStr = str.trim();
return (trimmedStr.startsWith("{") && trimmedStr.endsWith("}")) || (trimmedStr.startsWith("[") && trimmedStr.endsWith("]"));
}
}

View File

@@ -10,6 +10,7 @@ import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@@ -18,13 +19,20 @@ import java.util.Date;
@Service
@Slf4j
@RequiredArgsConstructor
public class MqttServiceImpl implements MqttService {
private final MqttClient mqttClient; // Autowired by Spring from MqttConfig
private final MqttProperties mqttProperties;
private final RobotTaskService robotTaskService;
public MqttServiceImpl(@Qualifier("mqttClientBean") MqttClient mqttClient,
MqttProperties mqttProperties,
RobotTaskService robotTaskService) {
this.mqttClient = mqttClient;
this.mqttProperties = mqttProperties;
this.robotTaskService = robotTaskService;
}
@Override
@Transactional(rollbackFor = Exception.class) // Ensure rollback if MQTT publish fails after task creation
public boolean sendCommand(String robotId, CommandTypeEnum commandType, String payloadJson, Long sessionId) throws Exception {

View File

@@ -316,4 +316,25 @@ public class RobotTaskServiceImpl extends ServiceImpl<RobotTaskMapper, RobotTask
}
return updated;
}
@Override
public RobotTask findLatestTaskByRobotIdAndSessionId(String robotId, Long sessionId) {
if (robotId == null || sessionId == null) {
log.warn("Cannot find latest task: robotId or sessionId is null.");
return null;
}
QueryWrapper<RobotTask> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("robot_id", robotId)
.eq("related_session_id", sessionId)
.orderByDesc("create_time")
.last("LIMIT 1");
RobotTask task = this.getOne(queryWrapper);
if (task != null) {
log.info("Found latest task with ID {} for Robot ID: {} and Session ID: {}.", task.getId(), robotId, sessionId);
} else {
log.debug("No task found for Robot ID: {} and Session ID: {}.", robotId, sessionId);
}
return task;
}
}

View File

@@ -62,4 +62,20 @@ mqtt:
status-topic-base: yupi_mqtt_power_project/robot/status # Prefixed base topic for receiving status
task: # Task specific configurations
timeoutSeconds: 300 # Default 300 seconds (5 minutes) for a task to be considered timed out
timeoutCheckRateMs: 60000 # Default 60000 ms (1 minute) for how often to check for timed out tasks
timeoutCheckRateMs: 60000 # Default 60000 ms (1 minute) for how often to check for timed out tasks
logger:
enabled: true
client-id-prefix: backend-yupi-mqtt-power-logger- # Ensure this is different from the main client-id-prefix
# Topics to subscribe to for logging. Use '#' for all sub-topics.
# These should cover all topics the main application interacts with for upstream messages.
topics:
- "yupi_mqtt_power_project/robot/status/#" # Example: status messages from robots
# Add other topics for upstream messages (e.g., heartbeats, specific acknowledgements if not covered by status)
# - "yupi_mqtt_power_project/robot/heartbeat/#" # If heartbeats are on a separate root
async: # Async properties for logging tasks
core-pool-size: 2
max-pool-size: 5
queue-capacity: 1000 # Queue for logging tasks
thread-name-prefix: "mqtt-log-async-"
retention:
days: 30 # Log retention period in days (for future cleanup tasks)