Files
mqtt_power/springboot-init-main/doc/mqtt_communication_log_plan.md

9.0 KiB
Raw Permalink Blame History

MQTT通信日志系统开发方案

1. 项目概述

1.1 目标与意义

开发MQTT通信日志系统用于完整记录单片机与服务器系统之间的所有MQTT消息通信。该系统具有以下价值

  • 通信可视化:直观展示单片机与服务器之间的通信细节和消息内容
  • 故障诊断:为消息处理失败、设备离线等异常场景提供完整的调试信息
  • 性能分析:通过日志记录消息量、响应时间等指标,评估系统性能
  • 安全审计:记录所有通信活动,便于安全审计和问题追踪
  • 业务分析:通过通信数据分析设备使用模式和业务流程

1.2 系统定位

该系统为纯日志记录系统,不干扰正常的业务流程:

  • 不消费消息队列中的数据
  • 不修改消息内容或状态
  • 仅作为"旁路监听者"记录通信过程
  • 以非阻塞方式运行,不影响主业务性能

2. 技术架构设计

2.1 总体架构

MQTT日志系统架构

系统采用"监听者模式",不直接参与消息处理流程:

  • MQTT消息监听层订阅与主业务相同的Topic但只读取不处理
  • 日志记录层:异步将消息内容写入数据库
  • 查询展示层:提供日志查询、过滤、导出功能的管理界面

2.2 数据库结构

已有数据表 mqtt_communication_log 包含了足够的字段设计:

CREATE TABLE `mqtt_communication_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '日志ID',
  `message_id` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT '消息的唯一标识',
  `direction` varchar(10) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '消息方向: UPSTREAM (设备->服务器), DOWNSTREAM (服务器->设备)',
  `client_id` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT '相关的客户端ID',
  `topic` varchar(512) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT 'MQTT 主题',
  `payload_format` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT 'TEXT' COMMENT 'Payload 格式',
  `payload` mediumtext CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL COMMENT '消息原文',
  `qos` tinyint(4) NULL DEFAULT NULL COMMENT '消息QoS级别',
  `is_retained` tinyint(1) NULL DEFAULT NULL COMMENT '是否为保留消息',
  `log_timestamp` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT '日志记录时间戳',
  `backend_processing_status` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT '后端处理状态',
  `backend_processing_info` text CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL COMMENT '后端处理附加信息',
  `related_session_id` bigint(20) NULL DEFAULT NULL COMMENT '关联的充电会话ID',
  `related_task_id` bigint(20) NULL DEFAULT NULL COMMENT '关联的机器人任务ID',
  PRIMARY KEY (`id`) USING BTREE,
  INDEX `idx_log_timestamp`(`log_timestamp`) USING BTREE,
  INDEX `idx_topic`(`topic`(255)) USING BTREE,
  INDEX `idx_client_id`(`client_id`) USING BTREE,
  INDEX `idx_related_session_id`(`related_session_id`) USING BTREE,
  INDEX `idx_related_task_id`(`related_task_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci COMMENT = 'MQTT通信日志表'

2.3 核心组件设计

2.3.1 MQTT日志监听器

@Component
public class MqttCommunicationLogger {
    // 依赖注入日志服务和MQTT客户端
}

2.3.2 日志记录服务

@Service
public class MqttLogService {
    // 异步记录消息到数据库
}

2.3.3 日志查询Controller

@RestController
@RequestMapping("/api/admin/mqtt-logs")
public class MqttLogController {
    // 提供日志查询API
}

3. 实现步骤

3.1 后端实现

  1. 创建数据访问层

    • 实现 MqttCommunicationLogMapper 接口
    • 实现 MqttCommunicationLogService 接口及其实现类
  2. 实现消息监听与记录

    • 创建独立的 MQTT 客户端用于日志记录
    • 配置该客户端订阅所有业务相关的主题
    • 实现监听回调,异步记录收到的消息
    • 为发送的消息添加拦截记录逻辑
  3. 实现关联解析

    • 解析消息内容提取关联的会话ID、任务ID
    • 根据消息主题和内容推断消息类型和处理状态
  4. 实现管理API

    • 创建日志查询Controller
    • 实现分页查询、条件过滤、时间范围查询
    • 实现日志导出功能

3.2 前端实现

  1. 日志查询页面

    • 创建管理员专用的日志查询页面
    • 实现条件筛选表单时间范围、客户端ID、主题等
    • 实现分页表格展示日志记录
  2. 日志详情展示

    • 实现日志详情模态框
    • 格式化JSON消息内容便于阅读
    • 显示相关的业务信息(会话、任务)链接
  3. 日志分析功能

    • 实现简单的统计分析(消息量、成功率等)
    • 提供日志导出功能

4. 技术实现细节

4.1 MQTT日志客户端配置

@Configuration
public class MqttLoggerConfig {
    @Bean(name = "mqttLoggerClient")
    public MqttClient mqttLoggerClient() {
        // 配置单独的MQTT客户端用于日志记录
        // 与主业务使用相同的broker但不同的clientId
    }
}

4.2 消息记录实现

@Component
public class MqttLogSubscriber implements MqttCallback {
    
    @Async
    public void messageArrived(String topic, MqttMessage message) {
        // 1. 解析消息,提取元数据
        // 2. 构建日志记录对象
        // 3. 异步保存到数据库
        // 注意:完全不干扰消息的正常传递
    }
}

4.3 发送消息记录

@Aspect
@Component
public class MqttPublishAspect {
    
    @Around("execution(* com.yupi.project.mqtt.MqttService.publish*(..)) || execution(* com.yupi.project.mqtt.MqttService.sendCommand*(..))")
    public Object logMqttPublish(ProceedingJoinPoint joinPoint) throws Throwable {
        // 前置:记录发送前信息
        Object result = joinPoint.proceed(); // 执行原方法
        // 后置:记录发送后信息
        return result;
    }
}

4.4 业务关联解析

@Component
public class MqttPayloadParser {
    
    public MqttLogBusinessInfo parsePayload(String topic, String payload) {
        // 根据主题和内容解析出业务相关信息
        // 例如会话ID、任务ID、指令类型等
    }
}

5. 部署与配置

5.1 配置项

application.yml 中添加MQTT日志系统的配置

mqtt:
  logger:
    enabled: true
    client-id-prefix: logger-
    topics: 
      - charging/spot/+/status
      - charging/spot/+/command/#
      - charging/spot/+/heartbeat
    async:
      core-pool-size: 2
      max-pool-size: 5
      queue-capacity: 500
    retention:
      days: 30  # 日志保留天数

5.2 数据库维护

  • 设计日志清理策略,避免数据过度膨胀
  • 实现定时任务,清理超过保留期的日志数据
  • 考虑按时间分区表策略,优化大量日志的存储和查询性能

6. 测试计划

6.1 功能测试

  • 验证各类消息的正确记录
  • 验证消息内容完整性
  • 验证不同格式消息的解析效果
  • 验证业务关联信息的正确提取

6.2 性能测试

  • 高并发消息下的记录性能
  • 验证日志记录对主业务的性能影响应小于5%
  • 验证大数据量下的查询性能

6.3 稳定性测试

  • 长时间运行测试
  • 异常情况下的日志记录可靠性测试
  • 系统重启后的连续性测试

7. 注意事项与风险

7.1 技术风险

  • 性能影响:确保日志记录不成为系统瓶颈

    • 解决方案:异步记录,独立线程池,必要时考虑消息缓冲
  • 存储膨胀MQTT消息量大可能导致数据库快速增长

    • 解决方案:设置合理的日志保留策略,定期清理或归档
  • 消息完整性:某些异常情况可能导致日志不完整

    • 解决方案:增加重试机制,添加监控告警

7.2 业务风险

  • 敏感信息:日志可能包含敏感业务数据

    • 解决方案:实现字段级别的脱敏,严格控制日志访问权限
  • 一致性:日志状态与实际业务状态可能不一致

    • 解决方案:明确标注日志仅用于参考,不作为业务状态的权威来源

8. 后续优化方向

  • 实现更复杂的消息分析功能
  • 添加可视化图表,直观展示通信模式和趋势
  • 实现异常模式的自动检测和告警
  • 与系统监控平台集成,提供更全面的系统健康视图

9. 项目计划

阶段 内容 时间估计
设计与准备 详细设计、环境准备 2天
后端开发 核心记录功能实现 3天
前端开发 日志查询界面实现 3天
测试与优化 功能测试、性能优化 2天
文档与部署 编写文档、系统部署 1天

总计划时间约11个工作日