Files
mqtt_power/mqtt_esp32_client/mqtt_esp32_client.ino
2025-06-07 16:28:02 +08:00

434 lines
20 KiB
C++
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <WiFi.h>
#include <PubSubClient.h>
#include <ArduinoJson.h> // 用于JSON操作
// ----------- 设备配置 (需要为您自己的设备和环境修改) -----------
// WiFi
const char *ssid = "WEIHANG3718"; // 请输入您的 Wi-Fi 名称
const char *password = "@05Tu146"; // 请输入您的 Wi-Fi 密码
// MQTT Broker
const char *mqtt_broker = "yuyun-hk1.stormrain.cn"; // 您的 MQTT Broker 地址
const char *mqtt_username = "emqx"; // 您的 MQTT 用户名 (如果需要)
const char *mqtt_password = "public"; // 您的 MQTT 密码 (如果需要)
const int mqtt_port = 1883; // 您的 MQTT 端口
// 设备唯一标识符 (非常重要, 必须与后端注册的一致)
const char *spotUid = "ESP32_SPOT_001"; // 例如: "SPOT001", "P005-A1" 等
// ----------- MQTT 主题定义 -----------
// 基于 application.yml 和后端服务实现
String topic_uplink_to_backend; // 上报给后端: yupi_mqtt_power_project/robot/status/{spotUid}
String topic_downlink_from_backend; // 从后端接收指令: yupi_mqtt_power_project/robot/command/{spotUid}
// ----------- 全局变量 -----------
WiFiClient espClient;
PubSubClient client(espClient);
// 模拟硬件状态 (实际项目中需要从传感器或硬件逻辑获取)
const char* currentDeviceStatus = "IDLE"; // 设备当前状态: IDLE, CHARGING, COMPLETED, FAULTED 等
float currentVoltage = 220.0;
float currentCurrent = 0.0;
float currentPower = 0.0;
float currentEnergyConsumed = 0.0;
int currentErrorCode = 0;
String currentSessionId = ""; // 当前充电会话ID
String currentSimulatedLocation = "BASE_STATION"; // 新增:模拟当前位置
int currentSimulatedBattery = 95; // 新增:模拟当前电量
String currentTargetSpot = ""; // 新增:用于移动状态的目标车位
String currentSpotId = ""; // 新增:当前所在车位 (用于充电等状态)
unsigned long chargeStartTimeMillis = 0; // 新增:用于计算充电时长
// 定时发送相关
unsigned long lastStatusUpdateTime = 0;
unsigned long lastHeartbeatTime = 0;
const long statusUpdateInterval = 30000; // 状态上报间隔 (例如: 30秒)
const long heartbeatInterval = 60000; // 心跳间隔 (例如: 60秒)
unsigned long lastBatteryUpdateTime = 0; // 新增:用于模拟电池电量变化的时间戳
const long batteryUpdateInterval = 5000; // 新增电池状态更新间隔例如5秒
void setup_mqtt_topics() {
String backend_status_base = "yupi_mqtt_power_project/robot/status/";
String backend_command_base = "yupi_mqtt_power_project/robot/command/";
topic_uplink_to_backend = backend_status_base + String(spotUid);
topic_downlink_from_backend = backend_command_base + String(spotUid);
Serial.println("MQTT 主题初始化完成 (匹配后端实现):");
Serial.println(" 上行主题 (状态/心跳/ACK): " + topic_uplink_to_backend);
Serial.println(" 下行主题 (接收指令): " + topic_downlink_from_backend);
}
void connect_wifi() {
Serial.println("正在连接 Wi-Fi...");
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("\nWi-Fi 连接成功!");
Serial.print("IP 地址: ");
Serial.println(WiFi.localIP());
}
void reconnect_mqtt() {
while (!client.connected()) {
Serial.print("尝试连接 MQTT Broker: ");
Serial.println(mqtt_broker);
String client_id = "esp32-client-" + String(spotUid) + "-";
client_id += String(WiFi.macAddress());
Serial.printf("客户端 ID: %s \n", client_id.c_str());
if (client.connect(client_id.c_str(), mqtt_username, mqtt_password)) {
Serial.println("MQTT Broker 连接成功!");
// 订阅唯一下行指令主题
client.subscribe(topic_downlink_from_backend.c_str());
Serial.println("已订阅指令主题: " + topic_downlink_from_backend);
publish_status_update(false, nullptr, nullptr, nullptr, nullptr, nullptr); // 连接成功后发送一次常规状态更新
} else {
Serial.print("连接失败, rc=");
Serial.print(client.state());
Serial.println(" 2秒后重试...");
delay(2000);
}
}
}
void callback(char *topic, byte *payload, unsigned int length) {
Serial.println("-----------------------");
Serial.print("消息抵达, 主题: ");
Serial.println(topic);
char message[length + 1];
memcpy(message, payload, length);
message[length] = '\0';
Serial.print("消息内容: ");
Serial.println(message);
if (String(topic) != topic_downlink_from_backend) {
Serial.println("消息非来自预期的指令主题,忽略。");
return;
}
StaticJsonDocument<256> doc;
DeserializationError error = deserializeJson(doc, message);
if (error) {
Serial.print("JSON 解析失败: ");
Serial.println(error.f_str());
return;
}
const char* cmdType = nullptr;
if (doc.containsKey("commandType")) {
cmdType = doc["commandType"];
} else if (doc.containsKey("command")) {
cmdType = doc["command"];
}
const char* taskId = doc["taskId"];
if (cmdType == nullptr || taskId == nullptr) {
Serial.println("指令JSON缺少 commandType/command 或 taskId 字段。");
publish_ack_message(taskId, false, "Command JSON invalid (missing commandType/command or taskId)", nullptr);
return;
}
const char* spotIdFromCommand = nullptr;
if (doc.containsKey("spotId")) {
spotIdFromCommand = doc["spotId"].as<const char*>();
} else if (doc.containsKey("target_spot_uid")) {
spotIdFromCommand = doc["target_spot_uid"].as<const char*>();
}
if (strcmp(cmdType, "MOVE_TO_SPOT") == 0 || strcmp(cmdType, "MOVE") == 0) {
Serial.println("接收到 [移动] 指令 (MOVE_TO_SPOT 或 MOVE)");
if (spotIdFromCommand) {
currentTargetSpot = String(spotIdFromCommand);
currentSpotId = "";
} else {
currentTargetSpot = "UNKNOWN_SPOT";
}
currentDeviceStatus = "MOVING";
currentSimulatedLocation = "EN_ROUTE_TO_" + currentTargetSpot;
Serial.println("状态更新: MOVING (前往目标车位: " + currentTargetSpot + ")");
publish_status_update(false, nullptr, nullptr, nullptr, nullptr, nullptr);
Serial.println("模拟: 机器人正在移动到目标车位 " + currentTargetSpot + "...");
// 模拟移动过程中的电量消耗,更细致的可以在 loop 中做
if (currentSimulatedBattery > 10) currentSimulatedBattery -= 5; // 假设移动固定消耗一些电
else currentSimulatedBattery = 5; // 最低电量
delay(3000);
Serial.println("模拟: 机器人已到达目标车位 " + currentTargetSpot + "");
currentDeviceStatus = "CHARGING";
currentSpotId = currentTargetSpot;
currentTargetSpot = "";
currentSimulatedLocation = currentSpotId;
chargeStartTimeMillis = millis();
Serial.println("状态更新: CHARGING (已到达目标车位 " + currentSpotId + ",视为开始充电)");
publish_ack_message(taskId, true, "Robot arrived at spot and started charging (simulated)", currentSpotId.c_str());
publish_status_update(false, nullptr, nullptr, nullptr, nullptr, nullptr);
} else if (strcmp(cmdType, "START_CHARGE") == 0) {
Serial.println("接收到 [启动充电] 指令");
if (strcmp(currentDeviceStatus, "CHARGING") != 0) { // 仅当未在充电时才响应
currentDeviceStatus = "CHARGING";
if (spotIdFromCommand) {
currentSpotId = String(spotIdFromCommand);
} else if (currentSpotId.length() == 0) {
currentSpotId = "DEFAULT_SPOT"; // 如果没有从指令获取且之前也没有,给个默认值
}
currentSimulatedLocation = currentSpotId;
chargeStartTimeMillis = millis();
Serial.println("状态更新: CHARGING (指令启动于 " + currentSpotId + ")");
} else {
Serial.println("设备已在充电中,忽略 START_CHARGE 指令。");
}
if (doc.containsKey("sessionId")) {
currentSessionId = String(doc["sessionId"].as<const char*>());
} else {
currentSessionId = "";
}
Serial.println("模拟: 充电已启动。会话ID: " + currentSessionId + ", 车位: " + currentSpotId);
publish_ack_message(taskId, true, "Charging started successfully", currentSessionId.c_str());
publish_status_update(false, nullptr, nullptr, nullptr, nullptr, nullptr);
} else if (strcmp(cmdType, "STOP_CHARGE") == 0) {
Serial.println("接收到 [停止充电] 指令");
bool wasCharging = strcmp(currentDeviceStatus, "CHARGING") == 0;
currentDeviceStatus = "COMPLETED";
currentSimulatedLocation = currentSpotId;
unsigned long chargeDuration = 0;
if (chargeStartTimeMillis > 0 && wasCharging) {
chargeDuration = (millis() - chargeStartTimeMillis) / 1000;
}
chargeStartTimeMillis = 0;
Serial.println("模拟: 充电已停止。车位: " + currentSpotId + ", 本次充电时长约: " + String(chargeDuration) + "s");
String previousSessionId = currentSessionId;
currentSessionId = "";
// 在ACK中上报准确的充电时长如果需要的话可以通过修改 publish_ack_message 或在 message 字段中添加
// For now, the generic ACK is sent.
publish_ack_message(taskId, true, ("Charging stopped. Duration: " + String(chargeDuration) + "s").c_str(), previousSessionId.c_str());
publish_status_update(false, nullptr, nullptr, nullptr, nullptr, nullptr);
}
// Add other commandType handling here if needed, e.g., "QUERY_STATUS"
// else if (strcmp(cmdType, "QUERY_STATUS") == 0) {
// Serial.println("接收到 [查询状态] 指令");
// publish_regular_status_update(); // 回复当前状态
// publish_ack_message(taskId, true, "Status reported", nullptr);
// }
else {
Serial.println("未知指令 commandType: " + String(cmdType));
publish_ack_message(taskId, false, ("Unknown commandType: " + String(cmdType)).c_str(), nullptr);
}
Serial.println("-----------------------");
}
void publish_message(const String& topic, const JsonDocument& doc, const char* message_type) {
String jsonBuffer;
serializeJson(doc, jsonBuffer);
Serial.print("发送 ");
Serial.print(message_type);
Serial.print(" 到主题 [");
Serial.print(topic);
Serial.print("]: ");
Serial.println(jsonBuffer);
if (client.publish(topic.c_str(), jsonBuffer.c_str())) {
Serial.println(String(message_type) + " 发送成功");
} else {
Serial.println(String(message_type) + " 发送失败");
}
}
// isAckOrTaskUpdate: true if this is an ACK or a task-specific update, false for general status/heartbeat
// ackTaskId: The taskId if this is an ACK for a command. Null otherwise.
void publish_status_update(bool isAckOrTaskUpdate, const char* ackTaskId, const char* ackStatus, const char* ackMessage, const char* ackErrorCode, const char* ackSessionId) {
StaticJsonDocument<512> doc;
doc["robotUid"] = spotUid;
if (isAckOrTaskUpdate) {
if (ackTaskId) {
// 关键修复:同时提供 taskId 和 activeTaskId 字段
// taskId 用于 robot_task 表的状态更新
// activeTaskId 用于 charging_session 的业务流程推进
doc["taskId"] = ackTaskId;
doc["activeTaskId"] = ackTaskId;
}
if (ackStatus) doc["status"] = ackStatus;
if (ackMessage) doc["message"] = ackMessage;
// 根据用户要求ACK中不发送errorCode
// if (ackErrorCode) doc["errorCode"] = ackErrorCode;
doc["actualRobotStatus"] = currentDeviceStatus;
if (ackSessionId && strlen(ackSessionId) > 0) {
// For ACKs, if we have a sessionId or a spotId that's relevant for context, we can add it.
// Let's use "spotId" as suggested by requirements for charging related ACKs
if (strcmp(currentDeviceStatus, "CHARGING") == 0 || strcmp(currentDeviceStatus, "COMPLETED") == 0 || (ackMessage && strstr(ackMessage, "arrived"))) {
if(currentSpotId.length() > 0) doc["spotId"] = currentSpotId;
}
}
} else { // General status update / heartbeat
doc["actualRobotStatus"] = currentDeviceStatus;
// Add fields as per requirements.md
doc["location"] = currentSimulatedLocation;
doc["battery"] = currentSimulatedBattery; // Example value, should be updated by a battery sim function
// doc["errorCode"] = currentErrorCode; // 根据用户要求常规状态下不发送errorCode
// Fields specific to certain statuses
if (strcmp(currentDeviceStatus, "MOVING") == 0) {
if (currentTargetSpot.length() > 0) doc["targetSpot"] = currentTargetSpot;
} else if (strcmp(currentDeviceStatus, "CHARGING") == 0) {
if (currentSpotId.length() > 0) doc["spotId"] = currentSpotId;
if (chargeStartTimeMillis > 0) {
doc["durationSeconds"] = (millis() - chargeStartTimeMillis) / 1000;
} else {
doc["durationSeconds"] = 0;
}
} else if (strcmp(currentDeviceStatus, "COMPLETED") == 0) {
if (currentSpotId.length() > 0) doc["spotId"] = currentSpotId;
// totalDurationSeconds would typically be set upon actual completion event,
// for now, a regular status update might not have final total, or we can omit.
// Let's assume for now that if status is COMPLETED, we send a placeholder or last known duration.
// A more robust solution is needed for totalDurationSeconds.
// For now, publish_ack_message for STOP_CHARGE should probably carry the final duration.
} else if (strcmp(currentDeviceStatus, "ERROR") == 0) {
// errorCode is already included. message for error could be added if available.
// doc["message"] = "Simulated error description"; // if we have one
}
}
// Common fields (timestamp can be added by backend or here if NTP is used)
// doc["timestamp"] = String(millis()); // Already using millis()
publish_message(topic_uplink_to_backend, doc, isAckOrTaskUpdate ? "ACK/TaskUpdate" : "StatusUpdate");
if (!isAckOrTaskUpdate) { // Only update lastStatusUpdateTime for general status updates, not for ACKs triggered by commands
lastStatusUpdateTime = millis();
}
}
void publish_regular_status_update() {
// This is a wrapper for general periodic status updates
publish_status_update(false, nullptr, nullptr, nullptr, nullptr, nullptr);
}
void publish_heartbeat() {
StaticJsonDocument<256> doc; // Heartbeat can be simpler
doc["robotUid"] = spotUid;
doc["actualRobotStatus"] = currentDeviceStatus; // Heartbeat includes current status
// Optionally, add a specific "messageType": "HEARTBEAT" if backend needs explicit differentiation
// beyond just a minimal status update. For now, relying on RobotStatusMessage structure.
publish_message(topic_uplink_to_backend, doc, "Heartbeat");
lastHeartbeatTime = millis();
}
// Simplified ACK message function
void publish_ack_message(const char* taskId, bool success, const char* message, const char* contextInfo) {
if (!taskId || strlen(taskId) == 0) {
Serial.println("无法发送ACK: taskId 为空");
return;
}
// Use the main publish_status_update function formatted as an ACK
// For contextInfo, we can pass spotId if relevant, or sessionId if that's what backend expects for ACKs.
// The 'true' indicates it's an ACK.
// The ackErrorCode field in publish_status_update will be set to "SUCCESS_ACK" or "FAILURE_ACK"
// 根据用户要求ACK中的errorCode也暂时简化或移除。如果保留确保含义清晰。
publish_status_update(true, taskId, success ? "COMPLETED" : "FAILED", message, nullptr, contextInfo);
}
void setup() {
Serial.begin(115200);
Serial.println("\nESP32 充电桩模拟客户端启动...");
setup_mqtt_topics(); // 初始化MQTT主题
connect_wifi(); // 连接Wi-Fi
client.setServer(mqtt_broker, mqtt_port);
client.setCallback(callback); // 设置消息回调函数
}
void loop() {
if (!client.connected()) {
reconnect_mqtt();
}
client.loop();
unsigned long currentTime = millis();
if (currentTime - lastStatusUpdateTime > statusUpdateInterval) {
publish_regular_status_update(); // Use the new wrapper
}
if (currentTime - lastHeartbeatTime > heartbeatInterval) {
publish_heartbeat(); // Uses the new heartbeat logic
}
// 模拟充电过程中的电量和功率变化 (仅为演示)
if (String(currentDeviceStatus) == "CHARGING") {
currentEnergyConsumed += 0.01; // 假设每秒消耗0.01kWh (不精确,仅为演示)
currentCurrent = 5.0; // 假设充电电流5A
currentPower = (currentVoltage * currentCurrent) / 1000.0; // kW
// 注意:实际项目中这些值应来自传感器或充电控制器
// 这里为了演示,每隔一段时间简单更新一下
} else {
currentCurrent = 0.0;
currentPower = 0.0;
}
delay(100); // 短暂延时避免loop过于频繁给其他任务一点时间 (可选)
// 模拟电量消耗和位置变化 (更符合需求文档)
if (strcmp(currentDeviceStatus, "CHARGING") == 0) {
if (currentSimulatedBattery > 0) { // 充电时电量可以缓慢增加,或保持不变,取决于模拟逻辑
// currentSimulatedBattery = min(100, currentSimulatedBattery + 1); // 简单模拟充电增加
}
// durationSeconds is calculated in publish_status_update
} else if (strcmp(currentDeviceStatus, "MOVING") == 0) {
if (currentSimulatedBattery > 5) { // 移动时消耗电量
// currentSimulatedBattery--; // 简单模拟电量消耗
}
} else { // IDLE, COMPLETED, ERROR
//电量可能不变或缓慢消耗
}
// 简单模拟位置更新 (可以更复杂)
// currentSimulatedLocation = ... ; // 可以在特定事件中更新
// --- 动态模拟数据更新 ---
if (currentTime - lastBatteryUpdateTime > batteryUpdateInterval) {
if (strcmp(currentDeviceStatus, "CHARGING") == 0) {
if (currentSimulatedBattery < 100) {
currentSimulatedBattery += 1; // 每 batteryUpdateInterval 增加 1% 电量
Serial.println("模拟: 电量增加至 " + String(currentSimulatedBattery) + "%");
} else {
currentSimulatedBattery = 100; // 防止超过100
}
} else if (strcmp(currentDeviceStatus, "MOVING") == 0) {
if (currentSimulatedBattery > 2) {
currentSimulatedBattery -= 2; // 每 batteryUpdateInterval 消耗 2% 电量
Serial.println("模拟: 电量消耗至 " + String(currentSimulatedBattery) + "%");
} else {
currentSimulatedBattery = 2; // 防止低于2极端情况
}
} else { // IDLE, COMPLETED, FAULTED
if (currentSimulatedBattery > 1) {
// 非常缓慢的自然消耗,或者不消耗
// currentSimulatedBattery -= 1;
}
}
lastBatteryUpdateTime = currentTime;
}
// 位置模拟: 通常在状态转换时callback中已经更新了主要位置。
// loop中可以添加更细致的移动中的位置更新但目前保持简单依赖callback中的设定。
// 例如: if (strcmp(currentDeviceStatus, "MOVING") == 0) { /* update location based on time/progress */ }
delay(100);
}