mqtt初始优化

This commit is contained in:
2025-05-13 21:37:56 +08:00
parent 342284d12c
commit 9e32234ca4
36 changed files with 74 additions and 194 deletions

View File

@@ -12,8 +12,6 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.UUID;
@Slf4j
@@ -24,9 +22,6 @@ public class MqttConfig {
private final MqttProperties mqttProperties;
private final MqttCallbackHandler mqttCallbackHandler;
// No @Autowired here. This field will be set by the mqttClientBean() method.
private MqttClient mqttClient;
@Bean
public MqttConnectOptions mqttConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
@@ -46,45 +41,9 @@ public class MqttConfig {
@Bean
public MqttClient mqttClientBean(MqttConnectOptions mqttConnectOptions) throws MqttException {
String clientId = mqttProperties.getClientIdPrefix() + UUID.randomUUID().toString().replace("-", "");
// Create the client instance
MqttClient client = new MqttClient(mqttProperties.getBrokerUrl(), clientId, new MemoryPersistence());
client.setCallback(mqttCallbackHandler);
mqttCallbackHandler.setMqttClient(client);
// Assign the created client to the instance field for @PostConstruct/@PreDestroy usage
this.mqttClient = client;
return client; // Return it to be managed by Spring as a bean
}
@PostConstruct
public void connect() {
try {
if (this.mqttClient != null && !this.mqttClient.isConnected()) {
log.info("Attempting to connect to MQTT broker: {} with client ID: {}", mqttProperties.getBrokerUrl(), this.mqttClient.getClientId());
// Pass the MqttConnectOptions bean directly to the connect method
this.mqttClient.connect(mqttConnectOptions());
} else if (this.mqttClient == null) {
log.error("MqttClient instance is null (was not set by mqttClientBean), cannot connect.");
}
} catch (MqttException e) {
log.error("Error connecting to MQTT broker: ", e);
}
}
@PreDestroy
public void disconnect() {
try {
if (this.mqttClient != null && this.mqttClient.isConnected()) {
log.info("Disconnecting from MQTT broker: {}", this.mqttClient.getServerURI());
this.mqttClient.disconnect();
log.info("Successfully disconnected from MQTT broker.");
}
if (this.mqttClient != null) {
// Ensure close is called even if disconnect fails or was not connected
this.mqttClient.close();
}
} catch (MqttException e) {
log.error("Error disconnecting/closing MQTT client: ", e);
}
return client;
}
}

View File

@@ -0,0 +1,73 @@
package com.yupi.project.mqtt;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
public class MqttConnectionManager implements ApplicationListener<ContextRefreshedEvent>, DisposableBean {
private final MqttClient mqttClient;
private final MqttConnectOptions mqttConnectOptions;
// private final MqttProperties mqttProperties; // Injected if needed for logging brokerUrl, or get from mqttClient.getServerURI()
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
// Ensure this logic runs only once, for the root application context
if (event.getApplicationContext().getParent() == null) {
connectToMqtt();
}
}
private void connectToMqtt() {
try {
if (mqttClient != null && !mqttClient.isConnected()) {
log.info("Attempting to connect to MQTT broker: {} with client ID: {}", mqttClient.getServerURI(), mqttClient.getClientId());
mqttClient.connect(mqttConnectOptions);
// Subscription logic is handled by MqttCallbackHandler.connectComplete
// which is triggered by the Paho client library upon successful connection.
} else if (mqttClient == null) {
log.error("MqttClient bean is null, cannot connect.");
}
} catch (MqttException e) {
log.error("Error connecting to MQTT broker: ", e);
// Consider retry logic or specific actions based on requirements
}
}
@Override
public void destroy() throws Exception {
disconnectFromMqtt();
}
private void disconnectFromMqtt() {
try {
if (mqttClient != null && mqttClient.isConnected()) {
log.info("Disconnecting from MQTT broker: {}", mqttClient.getServerURI());
mqttClient.disconnect();
log.info("Successfully disconnected from MQTT broker.");
}
} catch (MqttException e) {
log.error("Error disconnecting from MQTT broker: ", e);
} finally {
try {
if (mqttClient != null) { // No need to check isOpen(), close() handles its state.
log.info("Closing MQTT client for client ID: {}", mqttClient.getClientId());
mqttClient.close();
log.info("MQTT client closed successfully for client ID: {}", mqttClient.getClientId());
}
} catch (MqttException e) {
// Log if close() itself throws an exception, though it's less common if called after disconnect or on an unopen client.
log.error("Error closing MQTT client: ", e);
}
}
}
}