9.0 KiB
9.0 KiB
MQTT通信日志系统开发方案
1. 项目概述
1.1 目标与意义
开发MQTT通信日志系统,用于完整记录单片机与服务器系统之间的所有MQTT消息通信。该系统具有以下价值:
- 通信可视化:直观展示单片机与服务器之间的通信细节和消息内容
- 故障诊断:为消息处理失败、设备离线等异常场景提供完整的调试信息
- 性能分析:通过日志记录消息量、响应时间等指标,评估系统性能
- 安全审计:记录所有通信活动,便于安全审计和问题追踪
- 业务分析:通过通信数据分析设备使用模式和业务流程
1.2 系统定位
该系统为纯日志记录系统,不干扰正常的业务流程:
- 不消费消息队列中的数据
- 不修改消息内容或状态
- 仅作为"旁路监听者"记录通信过程
- 以非阻塞方式运行,不影响主业务性能
2. 技术架构设计
2.1 总体架构
系统采用"监听者模式",不直接参与消息处理流程:
- MQTT消息监听层:订阅与主业务相同的Topic,但只读取不处理
- 日志记录层:异步将消息内容写入数据库
- 查询展示层:提供日志查询、过滤、导出功能的管理界面
2.2 数据库结构
已有数据表 mqtt_communication_log 包含了足够的字段设计:
CREATE TABLE `mqtt_communication_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '日志ID',
`message_id` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT '消息的唯一标识',
`direction` varchar(10) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '消息方向: UPSTREAM (设备->服务器), DOWNSTREAM (服务器->设备)',
`client_id` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT '相关的客户端ID',
`topic` varchar(512) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT 'MQTT 主题',
`payload_format` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT 'TEXT' COMMENT 'Payload 格式',
`payload` mediumtext CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL COMMENT '消息原文',
`qos` tinyint(4) NULL DEFAULT NULL COMMENT '消息QoS级别',
`is_retained` tinyint(1) NULL DEFAULT NULL COMMENT '是否为保留消息',
`log_timestamp` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT '日志记录时间戳',
`backend_processing_status` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT '后端处理状态',
`backend_processing_info` text CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL COMMENT '后端处理附加信息',
`related_session_id` bigint(20) NULL DEFAULT NULL COMMENT '关联的充电会话ID',
`related_task_id` bigint(20) NULL DEFAULT NULL COMMENT '关联的机器人任务ID',
PRIMARY KEY (`id`) USING BTREE,
INDEX `idx_log_timestamp`(`log_timestamp`) USING BTREE,
INDEX `idx_topic`(`topic`(255)) USING BTREE,
INDEX `idx_client_id`(`client_id`) USING BTREE,
INDEX `idx_related_session_id`(`related_session_id`) USING BTREE,
INDEX `idx_related_task_id`(`related_task_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci COMMENT = 'MQTT通信日志表'
2.3 核心组件设计
2.3.1 MQTT日志监听器
@Component
public class MqttCommunicationLogger {
// 依赖注入日志服务和MQTT客户端
}
2.3.2 日志记录服务
@Service
public class MqttLogService {
// 异步记录消息到数据库
}
2.3.3 日志查询Controller
@RestController
@RequestMapping("/api/admin/mqtt-logs")
public class MqttLogController {
// 提供日志查询API
}
3. 实现步骤
3.1 后端实现
-
创建数据访问层
- 实现
MqttCommunicationLogMapper接口 - 实现
MqttCommunicationLogService接口及其实现类
- 实现
-
实现消息监听与记录
- 创建独立的 MQTT 客户端用于日志记录
- 配置该客户端订阅所有业务相关的主题
- 实现监听回调,异步记录收到的消息
- 为发送的消息添加拦截记录逻辑
-
实现关联解析
- 解析消息内容,提取关联的会话ID、任务ID
- 根据消息主题和内容推断消息类型和处理状态
-
实现管理API
- 创建日志查询Controller
- 实现分页查询、条件过滤、时间范围查询
- 实现日志导出功能
3.2 前端实现
-
日志查询页面
- 创建管理员专用的日志查询页面
- 实现条件筛选表单(时间范围、客户端ID、主题等)
- 实现分页表格展示日志记录
-
日志详情展示
- 实现日志详情模态框
- 格式化JSON消息内容,便于阅读
- 显示相关的业务信息(会话、任务)链接
-
日志分析功能
- 实现简单的统计分析(消息量、成功率等)
- 提供日志导出功能
4. 技术实现细节
4.1 MQTT日志客户端配置
@Configuration
public class MqttLoggerConfig {
@Bean(name = "mqttLoggerClient")
public MqttClient mqttLoggerClient() {
// 配置单独的MQTT客户端用于日志记录
// 与主业务使用相同的broker但不同的clientId
}
}
4.2 消息记录实现
@Component
public class MqttLogSubscriber implements MqttCallback {
@Async
public void messageArrived(String topic, MqttMessage message) {
// 1. 解析消息,提取元数据
// 2. 构建日志记录对象
// 3. 异步保存到数据库
// 注意:完全不干扰消息的正常传递
}
}
4.3 发送消息记录
@Aspect
@Component
public class MqttPublishAspect {
@Around("execution(* com.yupi.project.mqtt.MqttService.publish*(..)) || execution(* com.yupi.project.mqtt.MqttService.sendCommand*(..))")
public Object logMqttPublish(ProceedingJoinPoint joinPoint) throws Throwable {
// 前置:记录发送前信息
Object result = joinPoint.proceed(); // 执行原方法
// 后置:记录发送后信息
return result;
}
}
4.4 业务关联解析
@Component
public class MqttPayloadParser {
public MqttLogBusinessInfo parsePayload(String topic, String payload) {
// 根据主题和内容解析出业务相关信息
// 例如:会话ID、任务ID、指令类型等
}
}
5. 部署与配置
5.1 配置项
在 application.yml 中添加MQTT日志系统的配置:
mqtt:
logger:
enabled: true
client-id-prefix: logger-
topics:
- charging/spot/+/status
- charging/spot/+/command/#
- charging/spot/+/heartbeat
async:
core-pool-size: 2
max-pool-size: 5
queue-capacity: 500
retention:
days: 30 # 日志保留天数
5.2 数据库维护
- 设计日志清理策略,避免数据过度膨胀
- 实现定时任务,清理超过保留期的日志数据
- 考虑按时间分区表策略,优化大量日志的存储和查询性能
6. 测试计划
6.1 功能测试
- 验证各类消息的正确记录
- 验证消息内容完整性
- 验证不同格式消息的解析效果
- 验证业务关联信息的正确提取
6.2 性能测试
- 高并发消息下的记录性能
- 验证日志记录对主业务的性能影响(应小于5%)
- 验证大数据量下的查询性能
6.3 稳定性测试
- 长时间运行测试
- 异常情况下的日志记录可靠性测试
- 系统重启后的连续性测试
7. 注意事项与风险
7.1 技术风险
-
性能影响:确保日志记录不成为系统瓶颈
- 解决方案:异步记录,独立线程池,必要时考虑消息缓冲
-
存储膨胀:MQTT消息量大可能导致数据库快速增长
- 解决方案:设置合理的日志保留策略,定期清理或归档
-
消息完整性:某些异常情况可能导致日志不完整
- 解决方案:增加重试机制,添加监控告警
7.2 业务风险
-
敏感信息:日志可能包含敏感业务数据
- 解决方案:实现字段级别的脱敏,严格控制日志访问权限
-
一致性:日志状态与实际业务状态可能不一致
- 解决方案:明确标注日志仅用于参考,不作为业务状态的权威来源
8. 后续优化方向
- 实现更复杂的消息分析功能
- 添加可视化图表,直观展示通信模式和趋势
- 实现异常模式的自动检测和告警
- 与系统监控平台集成,提供更全面的系统健康视图
9. 项目计划
| 阶段 | 内容 | 时间估计 |
|---|---|---|
| 设计与准备 | 详细设计、环境准备 | 2天 |
| 后端开发 | 核心记录功能实现 | 3天 |
| 前端开发 | 日志查询界面实现 | 3天 |
| 测试与优化 | 功能测试、性能优化 | 2天 |
| 文档与部署 | 编写文档、系统部署 | 1天 |
总计划时间:约11个工作日