mqtt初始优化
This commit is contained in:
@@ -8,7 +8,6 @@ import org.eclipse.paho.client.mqttv3.MqttClient;
|
|||||||
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
||||||
import org.eclipse.paho.client.mqttv3.MqttException;
|
import org.eclipse.paho.client.mqttv3.MqttException;
|
||||||
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
import org.springframework.util.StringUtils;
|
import org.springframework.util.StringUtils;
|
||||||
@@ -25,7 +24,7 @@ public class MqttConfig {
|
|||||||
private final MqttProperties mqttProperties;
|
private final MqttProperties mqttProperties;
|
||||||
private final MqttCallbackHandler mqttCallbackHandler;
|
private final MqttCallbackHandler mqttCallbackHandler;
|
||||||
|
|
||||||
@Autowired // Autowire the MqttClient bean defined below
|
// No @Autowired here. This field will be set by the mqttClientBean() method.
|
||||||
private MqttClient mqttClient;
|
private MqttClient mqttClient;
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
@@ -37,55 +36,55 @@ public class MqttConfig {
|
|||||||
if (StringUtils.hasText(mqttProperties.getPassword())) {
|
if (StringUtils.hasText(mqttProperties.getPassword())) {
|
||||||
options.setPassword(mqttProperties.getPassword().toCharArray());
|
options.setPassword(mqttProperties.getPassword().toCharArray());
|
||||||
}
|
}
|
||||||
options.setAutomaticReconnect(true); // Enable automatic reconnect
|
options.setAutomaticReconnect(true);
|
||||||
options.setCleanSession(true); // Start with a clean session
|
options.setCleanSession(true);
|
||||||
options.setConnectionTimeout(mqttProperties.getConnectionTimeout());
|
options.setConnectionTimeout(mqttProperties.getConnectionTimeout());
|
||||||
options.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());
|
options.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());
|
||||||
// options.setWill(...) // Configure Last Will and Testament if needed
|
|
||||||
return options;
|
return options;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean // Bean method name will be the bean name by default: "mqttClientBean"
|
@Bean
|
||||||
public MqttClient mqttClientBean(MqttConnectOptions mqttConnectOptions) throws MqttException {
|
public MqttClient mqttClientBean(MqttConnectOptions mqttConnectOptions) throws MqttException {
|
||||||
String clientId = mqttProperties.getClientIdPrefix() + UUID.randomUUID().toString().replace("-", "");
|
String clientId = mqttProperties.getClientIdPrefix() + UUID.randomUUID().toString().replace("-", "");
|
||||||
|
// Create the client instance
|
||||||
MqttClient client = new MqttClient(mqttProperties.getBrokerUrl(), clientId, new MemoryPersistence());
|
MqttClient client = new MqttClient(mqttProperties.getBrokerUrl(), clientId, new MemoryPersistence());
|
||||||
client.setCallback(mqttCallbackHandler);
|
client.setCallback(mqttCallbackHandler);
|
||||||
// Pass the client instance to the handler so it can subscribe on connectComplete
|
mqttCallbackHandler.setMqttClient(client);
|
||||||
mqttCallbackHandler.setMqttClient(client);
|
|
||||||
return client;
|
// Assign the created client to the instance field for @PostConstruct/@PreDestroy usage
|
||||||
|
this.mqttClient = client;
|
||||||
|
return client; // Return it to be managed by Spring as a bean
|
||||||
}
|
}
|
||||||
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
public void connect() {
|
public void connect() {
|
||||||
try {
|
try {
|
||||||
// Use the autowired mqttClient field
|
|
||||||
if (this.mqttClient != null && !this.mqttClient.isConnected()) {
|
if (this.mqttClient != null && !this.mqttClient.isConnected()) {
|
||||||
log.info("Attempting to connect to MQTT broker: {} with client ID: {}", mqttProperties.getBrokerUrl(), this.mqttClient.getClientId());
|
log.info("Attempting to connect to MQTT broker: {} with client ID: {}", mqttProperties.getBrokerUrl(), this.mqttClient.getClientId());
|
||||||
this.mqttClient.connect(mqttConnectOptions()); // mqttConnectOptions() provides the bean
|
// Pass the MqttConnectOptions bean directly to the connect method
|
||||||
// Subscription logic is now in MqttCallbackHandler.connectComplete
|
this.mqttClient.connect(mqttConnectOptions());
|
||||||
} else if (this.mqttClient == null) {
|
} else if (this.mqttClient == null) {
|
||||||
log.error("MqttClient (autowired) is null, cannot connect.");
|
log.error("MqttClient instance is null (was not set by mqttClientBean), cannot connect.");
|
||||||
}
|
}
|
||||||
} catch (MqttException e) {
|
} catch (MqttException e) {
|
||||||
log.error("Error connecting to MQTT broker: ", e);
|
log.error("Error connecting to MQTT broker: ", e);
|
||||||
// Consider retry logic or application failure based on requirements
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@PreDestroy
|
@PreDestroy
|
||||||
public void disconnect() {
|
public void disconnect() {
|
||||||
try {
|
try {
|
||||||
// Use the autowired mqttClient field
|
|
||||||
if (this.mqttClient != null && this.mqttClient.isConnected()) {
|
if (this.mqttClient != null && this.mqttClient.isConnected()) {
|
||||||
log.info("Disconnecting from MQTT broker: {}", this.mqttClient.getServerURI());
|
log.info("Disconnecting from MQTT broker: {}", this.mqttClient.getServerURI());
|
||||||
this.mqttClient.disconnect();
|
this.mqttClient.disconnect();
|
||||||
log.info("Successfully disconnected from MQTT broker.");
|
log.info("Successfully disconnected from MQTT broker.");
|
||||||
}
|
}
|
||||||
if (this.mqttClient != null) {
|
if (this.mqttClient != null) {
|
||||||
this.mqttClient.close();
|
// Ensure close is called even if disconnect fails or was not connected
|
||||||
|
this.mqttClient.close();
|
||||||
}
|
}
|
||||||
} catch (MqttException e) {
|
} catch (MqttException e) {
|
||||||
log.error("Error disconnecting from MQTT broker: ", e);
|
log.error("Error disconnecting/closing MQTT client: ", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Reference in New Issue
Block a user