Files
mqtt_power/mqtt_esp32_client/mqtt_esp32_client.ino

297 lines
13 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <WiFi.h>
#include <PubSubClient.h>
#include <ArduinoJson.h> // 用于JSON操作
// ----------- 设备配置 (需要为您自己的设备和环境修改) -----------
// WiFi
const char *ssid = "UFI_DB50CD"; // 请输入您的 Wi-Fi 名称
const char *password = "1349534012"; // 请输入您的 Wi-Fi 密码
// MQTT Broker
const char *mqtt_broker = "broker.emqx.io"; // 您的 MQTT Broker 地址
const char *mqtt_username = "emqx"; // 您的 MQTT 用户名 (如果需要)
const char *mqtt_password = "public"; // 您的 MQTT 密码 (如果需要)
const int mqtt_port = 1883; // 您的 MQTT 端口
// 设备唯一标识符 (非常重要, 必须与后端注册的一致)
const char *spotUid = "ESP32_SPOT_001"; // 例如: "SPOT001", "P005-A1" 等
// ----------- MQTT 主题定义 -----------
// 基于 application.yml 和后端服务实现
String topic_uplink_to_backend; // 上报给后端: yupi_mqtt_power_project/robot/status/{spotUid}
String topic_downlink_from_backend; // 从后端接收指令: yupi_mqtt_power_project/robot/command/{spotUid}
// ----------- 全局变量 -----------
WiFiClient espClient;
PubSubClient client(espClient);
// 模拟硬件状态 (实际项目中需要从传感器或硬件逻辑获取)
const char* currentDeviceStatus = "IDLE"; // 设备当前状态: IDLE, CHARGING, FAULTED 等
float currentVoltage = 220.0;
float currentCurrent = 0.0;
float currentPower = 0.0;
float currentEnergyConsumed = 0.0;
int currentErrorCode = 0;
String currentSessionId = ""; // 当前充电会话ID
// 定时发送相关
unsigned long lastStatusUpdateTime = 0;
unsigned long lastHeartbeatTime = 0;
const long statusUpdateInterval = 30000; // 状态上报间隔 (例如: 30秒)
const long heartbeatInterval = 60000; // 心跳间隔 (例如: 60秒)
void setup_mqtt_topics() {
String backend_status_base = "yupi_mqtt_power_project/robot/status/";
String backend_command_base = "yupi_mqtt_power_project/robot/command/";
topic_uplink_to_backend = backend_status_base + String(spotUid);
topic_downlink_from_backend = backend_command_base + String(spotUid);
Serial.println("MQTT 主题初始化完成 (匹配后端实现):");
Serial.println(" 上行主题 (状态/心跳/ACK): " + topic_uplink_to_backend);
Serial.println(" 下行主题 (接收指令): " + topic_downlink_from_backend);
}
void connect_wifi() {
Serial.println("正在连接 Wi-Fi...");
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("\nWi-Fi 连接成功!");
Serial.print("IP 地址: ");
Serial.println(WiFi.localIP());
}
void reconnect_mqtt() {
while (!client.connected()) {
Serial.print("尝试连接 MQTT Broker: ");
Serial.println(mqtt_broker);
String client_id = "esp32-client-" + String(spotUid) + "-";
client_id += String(WiFi.macAddress());
Serial.printf("客户端 ID: %s \n", client_id.c_str());
if (client.connect(client_id.c_str(), mqtt_username, mqtt_password)) {
Serial.println("MQTT Broker 连接成功!");
// 订阅唯一下行指令主题
client.subscribe(topic_downlink_from_backend.c_str());
Serial.println("已订阅指令主题: " + topic_downlink_from_backend);
publish_status_update(false, nullptr, nullptr, nullptr, nullptr, nullptr); // 连接成功后发送一次常规状态更新
} else {
Serial.print("连接失败, rc=");
Serial.print(client.state());
Serial.println(" 2秒后重试...");
delay(2000);
}
}
}
void callback(char *topic, byte *payload, unsigned int length) {
Serial.println("-----------------------");
Serial.print("消息抵达, 主题: ");
Serial.println(topic);
char message[length + 1];
memcpy(message, payload, length);
message[length] = '\0';
Serial.print("消息内容: ");
Serial.println(message);
if (String(topic) != topic_downlink_from_backend) {
Serial.println("消息非来自预期的指令主题,忽略。");
return;
}
StaticJsonDocument<256> doc;
DeserializationError error = deserializeJson(doc, message);
if (error) {
Serial.print("JSON 解析失败: ");
Serial.println(error.f_str());
return;
}
const char* cmdType = doc["commandType"]; // 例如: "START_CHARGE", "STOP_CHARGE"
const char* taskId = doc["taskId"]; // 用于ACK
if (cmdType == nullptr || taskId == nullptr) {
Serial.println("指令JSON缺少 commandType 或 taskId 字段。");
publish_ack_message(taskId, false, "Command JSON invalid", nullptr); // 尝试ACK错误
return;
}
if (strcmp(cmdType, "MOVE_TO_SPOT") == 0) {
Serial.println("接收到 [移动到车位] 指令");
// const char* targetSpotUid = doc["target_spot_uid"]; // 可选: 从payload中获取目标车位ID (如果存在且需要进一步处理)
// if (targetSpotUid) {
// Serial.println("目标车位UID: " + String(targetSpotUid));
// }
// 模拟机器人移动到指定位置的动作
Serial.println("模拟: 机器人正在移动到目标车位...");
delay(1000); // 模拟移动耗时 (缩短演示时间)
Serial.println("模拟: 机器人已到达目标车位。");
publish_ack_message(taskId, true, "Robot arrived at spot (simulated)", nullptr);
// 注意:此时设备状态 currentDeviceStatus 可以保持不变,或根据业务逻辑更新
// 例如: currentDeviceStatus = "IDLE_AT_SPOT";
// 如果需要,可以立即上报一次状态: publish_regular_status_update();
} else if (strcmp(cmdType, "START_CHARGE") == 0) {
Serial.println("接收到 [启动充电] 指令");
currentDeviceStatus = "CHARGING";
if (doc.containsKey("sessionId")) {
currentSessionId = String(doc["sessionId"].as<const char*>());
} else {
currentSessionId = ""; //确保没有sessionId时清空
}
Serial.println("模拟: 充电已启动。会话ID: " + currentSessionId);
publish_ack_message(taskId, true, "Charging started successfully", currentSessionId.c_str());
publish_status_update(false, nullptr, nullptr, nullptr, nullptr, nullptr); // 立即更新状态
} else if (strcmp(cmdType, "STOP_CHARGE") == 0) {
Serial.println("接收到 [停止充电] 指令");
currentDeviceStatus = "COMPLETED";
Serial.println("模拟: 充电已停止。");
String previousSessionId = currentSessionId; // 保存一下以防ACK需要
currentSessionId = "";
publish_ack_message(taskId, true, "Charging stopped successfully", previousSessionId.c_str());
publish_status_update(false, nullptr, nullptr, nullptr, nullptr, nullptr); // 立即更新状态
}
// Add other commandType handling here if needed, e.g., "QUERY_STATUS"
// else if (strcmp(cmdType, "QUERY_STATUS") == 0) {
// Serial.println("接收到 [查询状态] 指令");
// publish_regular_status_update(); // 回复当前状态
// publish_ack_message(taskId, true, "Status reported", nullptr);
// }
else {
Serial.println("未知指令 commandType: " + String(cmdType));
publish_ack_message(taskId, false, ("Unknown commandType: " + String(cmdType)).c_str(), nullptr);
}
Serial.println("-----------------------");
}
void publish_message(const String& topic, const JsonDocument& doc, const char* message_type) {
String jsonBuffer;
serializeJson(doc, jsonBuffer);
Serial.print("发送 ");
Serial.print(message_type);
Serial.print(" 到主题 [");
Serial.print(topic);
Serial.print("]: ");
Serial.println(jsonBuffer);
if (client.publish(topic.c_str(), jsonBuffer.c_str())) {
Serial.println(String(message_type) + " 发送成功");
} else {
Serial.println(String(message_type) + " 发送失败");
}
}
// isAckOrTaskUpdate: true if this is an ACK or a task-specific update, false for general status/heartbeat
// ackTaskId: The taskId if this is an ACK for a command. Null otherwise.
void publish_status_update(bool isAckOrTaskUpdate, const char* ackTaskId, const char* ackStatus, const char* ackMessage, const char* ackErrorCode, const char* ackSessionId) {
StaticJsonDocument<512> doc;
doc["robotUid"] = spotUid;
if (isAckOrTaskUpdate) {
if (ackTaskId) doc["taskId"] = ackTaskId;
if (ackStatus) doc["status"] = ackStatus; // e.g., "SUCCESS", "FAILURE" or task-specific status
if (ackMessage) doc["message"] = ackMessage;
if (ackErrorCode) doc["errorCode"] = ackErrorCode;
// actualRobotStatus should still be sent to reflect current state after ACK
doc["actualRobotStatus"] = currentDeviceStatus;
if (ackSessionId && strlen(ackSessionId) > 0) doc["activeTaskId"] = ackSessionId; // Assuming activeTaskId can hold sessionId for context in ACKs
// Or, if RobotStatusMessage is extended for sessionId in future.
// For now, activeTaskId might be a way to correlate, or it might be ignored by backend for ACKs.
} else { // General status update / heartbeat
doc["actualRobotStatus"] = currentDeviceStatus;
doc["voltage"] = currentVoltage; // Example: Add these if backend expects them with general status
doc["current"] = currentCurrent;
doc["power"] = currentPower;
doc["energyConsumed"] = currentEnergyConsumed;
doc["errorCode"] = currentErrorCode; // General device error code
if (currentSessionId.length() > 0) {
// For general status, if a session is active, it might be relevant as activeTaskId
// This depends on how backend interprets activeTaskId outside of specific task ACKs.
doc["activeTaskId"] = currentSessionId; // Or a more generic field if RobotStatusMessage evolves
}
}
// Common fields (timestamp can be added by backend or here if NTP is used)
// doc["timestamp"] = String(millis()); // Already using millis()
publish_message(topic_uplink_to_backend, doc, isAckOrTaskUpdate ? "ACK/TaskUpdate" : "StatusUpdate");
if (!isAckOrTaskUpdate) { // Only update lastStatusUpdateTime for general status updates, not for ACKs triggered by commands
lastStatusUpdateTime = millis();
}
}
void publish_regular_status_update() {
// This is a wrapper for general periodic status updates
publish_status_update(false, nullptr, nullptr, nullptr, nullptr, nullptr);
}
void publish_heartbeat() {
StaticJsonDocument<256> doc; // Heartbeat can be simpler
doc["robotUid"] = spotUid;
doc["actualRobotStatus"] = currentDeviceStatus; // Heartbeat includes current status
// Optionally, add a specific "messageType": "HEARTBEAT" if backend needs explicit differentiation
// beyond just a minimal status update. For now, relying on RobotStatusMessage structure.
publish_message(topic_uplink_to_backend, doc, "Heartbeat");
lastHeartbeatTime = millis();
}
// Simplified ACK message function
void publish_ack_message(const char* taskId, bool success, const char* message, const char* sessionIdForAckContext) {
if (!taskId || strlen(taskId) == 0) {
Serial.println("无法发送ACK: taskId 为空");
// Potentially send a general error status if appropriate, but usually ACK needs a taskId
return;
}
// Use the main publish_status_update function formatted as an ACK
publish_status_update(true, taskId, success ? "SUCCESS" : "FAILURE", message, success ? "0" : "GENERAL_ERROR_ON_ACK", sessionIdForAckContext);
}
void setup() {
Serial.begin(115200);
Serial.println("\nESP32 充电桩模拟客户端启动...");
setup_mqtt_topics(); // 初始化MQTT主题
connect_wifi(); // 连接Wi-Fi
client.setServer(mqtt_broker, mqtt_port);
client.setCallback(callback); // 设置消息回调函数
}
void loop() {
if (!client.connected()) {
reconnect_mqtt();
}
client.loop();
unsigned long currentTime = millis();
if (currentTime - lastStatusUpdateTime > statusUpdateInterval) {
publish_regular_status_update(); // Use the new wrapper
}
if (currentTime - lastHeartbeatTime > heartbeatInterval) {
publish_heartbeat(); // Uses the new heartbeat logic
}
// 模拟充电过程中的电量和功率变化 (仅为演示)
if (String(currentDeviceStatus) == "CHARGING") {
currentEnergyConsumed += 0.01; // 假设每秒消耗0.01kWh (不精确,仅为演示)
currentCurrent = 5.0; // 假设充电电流5A
currentPower = (currentVoltage * currentCurrent) / 1000.0; // kW
// 注意:实际项目中这些值应来自传感器或充电控制器
// 这里为了演示,每隔一段时间简单更新一下
} else {
currentCurrent = 0.0;
currentPower = 0.0;
}
delay(100); // 短暂延时避免loop过于频繁给其他任务一点时间 (可选)
}