Files
mqtt_power/esp32_robot_firmware/mqtt_handler.cpp

233 lines
7.9 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "mqtt_handler.h"
#include "config.h" // 确保config.h被包含以获取定义
#include <WiFi.h>
// ----------- 全局MQTT相关变量 -----------
WiFiClient espClient;
PubSubClient mqttClient(espClient);
CommandCallback global_command_callback = nullptr;
// 主题字符串
String topic_uplink_to_backend;
String topic_downlink_from_backend;
// 将RobotStatus枚举转换为字符串 (与后端DTO的actualRobotStatus对应)
const char* robotStatusToString(RobotStatus status) {
switch (status) {
case IDLE: return "IDLE";
case MOVING: return "MOVING";
case CHARGING: return "CHARGING";
case COMPLETED: return "COMPLETED";
case FAULTED: return "FAULTED"; // 或 "ERROR"
default: return "UNKNOWN";
}
}
void callback(char* topic, byte* payload, unsigned int length) {
Serial.print("消息抵达, 主题: ");
Serial.println(topic);
char message[length + 1];
memcpy(message, payload, length);
message[length] = '\0';
Serial.print("消息内容: ");
Serial.println(message);
if (String(topic) != topic_downlink_from_backend) {
Serial.println("消息非来自预期的指令主题,忽略。");
return;
}
StaticJsonDocument<256> doc;
DeserializationError error = deserializeJson(doc, message);
if (error) {
Serial.print("JSON 解析失败: ");
Serial.println(error.f_str());
// 可以在这里发送一个格式错误的ACK但通常不建议对无法解析的taskId发ACK
return;
}
const char* cmdType = nullptr;
if (doc.containsKey("commandType")) {
cmdType = doc["commandType"];
} else if (doc.containsKey("command")) {
cmdType = doc["command"];
}
const char* taskId = doc["taskId"];
if (cmdType == nullptr || taskId == nullptr) {
Serial.println("指令JSON缺少 commandType/command 或 taskId 字段。");
// 尝试发送ACK即使taskId可能为null但通常最好有taskId
mqtt_publish_ack(taskId, false, "Command JSON invalid (missing commandType/command or taskId)", IDLE /*随便给个状态*/, nullptr);
return;
}
const char* targetSpot = nullptr;
if (doc.containsKey("spotId")) {
targetSpot = doc["spotId"].as<const char*>();
} else if (doc.containsKey("target_spot_uid")) {
targetSpot = doc["target_spot_uid"].as<const char*>();
}
Serial.printf("解析到指令: Type=%s, TaskID=%s, TargetSpot=%s\n", cmdType, taskId, targetSpot ? targetSpot : "N/A");
if (global_command_callback) {
global_command_callback(cmdType, taskId, targetSpot);
} else {
Serial.println("错误: 指令回调函数未设置!");
}
}
void connect_wifi() {
Serial.println("正在连接 Wi-Fi...");
WiFi.begin(WIFI_SSID, WIFI_PASSWORD);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("\nWi-Fi 连接成功!");
Serial.print("IP 地址: ");
Serial.println(WiFi.localIP());
}
void reconnect_mqtt() {
while (!mqttClient.connected()) {
Serial.print("尝试连接 MQTT Broker: ");
Serial.println(MQTT_BROKER_HOST);
String clientId = "esp32-robot-" + String(DEVICE_UID) + "-" + String(random(0xffff), HEX);
Serial.printf("客户端 ID: %s \n", clientId.c_str());
if (mqttClient.connect(clientId.c_str(), MQTT_USERNAME, MQTT_PASSWORD)) {
Serial.println("MQTT Broker 连接成功!");
mqttClient.subscribe(topic_downlink_from_backend.c_str());
Serial.println("已订阅指令主题: " + topic_downlink_from_backend);
// 连接成功后可以发送一次初始状态,但这通常由主逻辑决定
} else {
Serial.print("连接失败, rc=");
Serial.print(mqttClient.state());
Serial.println(" 2秒后重试...");
delay(2000);
}
}
}
// ----------- Public Functions -----------
void mqtt_setup(CommandCallback cmd_callback) {
global_command_callback = cmd_callback;
// 构建主题字符串
String base_topic_path = MQTT_TOPIC_BASE;
topic_uplink_to_backend = base_topic_path + "/status/" + String(DEVICE_UID);
topic_downlink_from_backend = base_topic_path + "/command/" + String(DEVICE_UID);
Serial.println("MQTT 主题初始化完成:");
Serial.println(" 上行主题: " + topic_uplink_to_backend);
Serial.println(" 下行主题: " + topic_downlink_from_backend);
connect_wifi();
mqttClient.setServer(MQTT_BROKER_HOST, MQTT_BROKER_PORT);
mqttClient.setCallback(callback);
}
void mqtt_loop() {
if (!mqttClient.connected()) {
reconnect_mqtt();
}
mqttClient.loop();
}
void mqtt_publish_status_update(
RobotStatus current_status,
const char* location,
int battery_level,
const char* current_task_id, // 这个字段在我们的状态消息中通常不直接用,但保留参数以备将来
const char* target_spot,
const char* current_spot,
unsigned long charge_duration_seconds,
int error_code,
const char* error_message
) {
StaticJsonDocument<512> doc;
doc["robotUid"] = DEVICE_UID;
doc["actualRobotStatus"] = robotStatusToString(current_status);
if (location) doc["location"] = location;
doc["battery"] = battery_level;
// 根据状态添加特定字段
switch (current_status) {
case MOVING:
if (target_spot) doc["targetSpot"] = target_spot;
break;
case CHARGING:
if (current_spot) doc["spotId"] = current_spot;
doc["durationSeconds"] = charge_duration_seconds;
break;
case COMPLETED:
if (current_spot) doc["spotId"] = current_spot;
// totalDurationSeconds 通常在特定完成事件的ACK中上报更准确
break;
case FAULTED:
doc["errorCode"] = error_code; // 仅在FAULTED状态下发送错误码
if (error_message) doc["message"] = error_message;
break;
default:
break;
}
// 注意移除了常规状态更新中的errorCode除非状态是FAULTED
String jsonBuffer;
serializeJson(doc, jsonBuffer);
Serial.print("发送 StatusUpdate 到主题 [");
Serial.print(topic_uplink_to_backend);
Serial.print("]: ");
Serial.println(jsonBuffer);
if (mqttClient.publish(topic_uplink_to_backend.c_str(), jsonBuffer.c_str())) {
Serial.println("StatusUpdate 发送成功");
} else {
Serial.println("StatusUpdate 发送失败");
}
}
void mqtt_publish_ack(
const char* task_id,
bool success,
const char* message,
RobotStatus current_robot_status_at_ack,
const char* ack_context_spot_id
) {
if (!task_id) {
Serial.println("无法发送ACK: taskId 为空");
return;
}
StaticJsonDocument<256> doc;
doc["robotUid"] = DEVICE_UID;
doc["taskId"] = task_id;
doc["status"] = success ? "SUCCESS" : "FAILURE";
if (message) doc["message"] = message;
doc["actualRobotStatus"] = robotStatusToString(current_robot_status_at_ack);
if (ack_context_spot_id) {
doc["spotId"] = ack_context_spot_id; // 在ACK上下文中关联spotId
}
// 根据之前的讨论ACK中不发送errorCode字段
String jsonBuffer;
serializeJson(doc, jsonBuffer);
Serial.print("发送 ACK/TaskUpdate 到主题 [");
Serial.print(topic_uplink_to_backend);
Serial.print("]: ");
Serial.println(jsonBuffer);
if (mqttClient.publish(topic_uplink_to_backend.c_str(), jsonBuffer.c_str())) {
Serial.println("ACK/TaskUpdate 发送成功");
} else {
Serial.println("ACK/TaskUpdate 发送失败");
}
}
// 这个函数可以移除或重新设计,因为状态现在由主逻辑管理并通过参数传递给 publish 函数
// RobotStatus mqtt_get_current_robot_status_commanded() {
// return IDLE; // Placeholder
// }