|
|
@@ -7,18 +7,44 @@ import org.eclipse.paho.client.mqttv3.*;
|
|
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
+import javax.annotation.PostConstruct;
|
|
|
import javax.annotation.PreDestroy;
|
|
|
import java.util.concurrent.*;
|
|
|
|
|
|
import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
|
/**
|
|
|
+ * MQTT 长连接服务实现
|
|
|
* @author: LaFantasque
|
|
|
* @date: 2025-07-23 11:46
|
|
|
*/
|
|
|
@Service("mqttService")
|
|
|
public class MQTTServiceImpl implements ProtocolService {
|
|
|
- private final ExecutorService executor = Executors.newSingleThreadExecutor();
|
|
|
+ // 线程池用于处理MQTT消息发送和接收
|
|
|
+ private final ExecutorService executor = Executors.newCachedThreadPool();
|
|
|
+
|
|
|
+ // 存储MQTT客户端连接,key为broker地址+用户名
|
|
|
+ private final Map<String, MqttClient> clientCache = new ConcurrentHashMap<>();
|
|
|
+
|
|
|
+ // 存储每个客户端的订阅主题,key为客户端ID,value为订阅的主题集合
|
|
|
+ private final Map<String, Set<String>> subscribedTopics = new ConcurrentHashMap<>();
|
|
|
+
|
|
|
+ // 存储每个客户端的响应Future,key为客户端ID
|
|
|
+ private final Map<String, CompletableFuture<String>> responseFutures = new ConcurrentHashMap<>();
|
|
|
+
|
|
|
+ // 连接超时时间(毫秒)
|
|
|
+ private static final int CONNECT_TIMEOUT = 10000;
|
|
|
+ // 默认等待响应时间(毫秒)
|
|
|
+ private static final int DEFAULT_RESPONSE_TIMEOUT = 30000;
|
|
|
+
|
|
|
+ @PostConstruct
|
|
|
+ public void init() {
|
|
|
+ // 可以在这里初始化一些东西
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public List<Object> getValues(DeviceRequest request) {
|
|
|
return null;
|
|
|
@@ -33,100 +59,137 @@ public class MQTTServiceImpl implements ProtocolService {
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
- // 提交任务到线程池
|
|
|
- Future<Boolean> future = executor.submit(() -> {
|
|
|
+ try {
|
|
|
String broker = "tcp://" + request.getIpAddress() + ":" + request.getPort();
|
|
|
String topic = request.getResource();
|
|
|
- String clientId = "java_client_" + System.currentTimeMillis();
|
|
|
String username = request.getData().get(1).toString();
|
|
|
String password = request.getData().get(2).toString();
|
|
|
- Integer qos = request.getLengthOrQos();
|
|
|
- MqttClient client = null;
|
|
|
+ Integer qos = request.getLengthOrQos() != null ? request.getLengthOrQos() : 1;
|
|
|
+ String jsonPayload = request.getData().get(0).toString();
|
|
|
+
|
|
|
+ // 生成客户端缓存key
|
|
|
+ String clientKey = broker + "|" + username;
|
|
|
+ String clientId = "java_client_" + username + "_" + System.currentTimeMillis();
|
|
|
+
|
|
|
+ // 获取或创建MQTT客户端
|
|
|
+ MqttClient client = clientCache.computeIfAbsent(clientKey, k -> {
|
|
|
+ try {
|
|
|
+ MqttClient newClient = new MqttClient(broker, clientId, new MemoryPersistence());
|
|
|
+ MqttConnectOptions options = new MqttConnectOptions();
|
|
|
+ options.setCleanSession(false); // 改为false以保持长连接
|
|
|
+ options.setUserName(username);
|
|
|
+ options.setPassword(password.toCharArray());
|
|
|
+ options.setConnectionTimeout(CONNECT_TIMEOUT);
|
|
|
+ options.setAutomaticReconnect(true); // 自动重连
|
|
|
+
|
|
|
+ newClient.setCallback(new MqttCallback() {
|
|
|
+ @Override
|
|
|
+ public void connectionLost(Throwable cause) {
|
|
|
+ System.out.println("连接丢失: " + cause.getMessage());
|
|
|
+ // 可以在这里添加重连逻辑
|
|
|
+ }
|
|
|
|
|
|
- try {
|
|
|
- client = new MqttClient(broker, clientId, new MemoryPersistence());
|
|
|
- MqttConnectOptions options = new MqttConnectOptions();
|
|
|
- options.setCleanSession(true);
|
|
|
- options.setUserName(username);
|
|
|
- options.setPassword(password.toCharArray());
|
|
|
-
|
|
|
- client.setCallback(new MqttCallback() {
|
|
|
- @Override
|
|
|
- public void connectionLost(Throwable cause) {
|
|
|
- System.out.println("连接丢失: " + cause.getMessage());
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void messageArrived(String topic, MqttMessage message) {
|
|
|
- System.out.println("接收到设备回复: " + new String(message.getPayload()));
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void deliveryComplete(IMqttDeliveryToken token) {
|
|
|
- System.out.println("消息已成功发送");
|
|
|
- }
|
|
|
- });
|
|
|
-
|
|
|
- client.connect(options);
|
|
|
- System.out.println("连接成功");
|
|
|
- client.subscribe(topic, qos);
|
|
|
- System.out.println("已订阅 topic: " + topic);
|
|
|
-
|
|
|
- // 发布消息
|
|
|
- String jsonPayload = request.getData().get(0).toString();
|
|
|
- MqttMessage message = new MqttMessage(jsonPayload.getBytes());
|
|
|
- message.setQos(qos);
|
|
|
- client.publish(topic, message);
|
|
|
- System.out.println("已发送 JSON 消息: " + jsonPayload);
|
|
|
-
|
|
|
- // 模拟等待回复(实际应根据业务需求调整)
|
|
|
- Thread.sleep(30000);
|
|
|
- return true;
|
|
|
- } catch (InterruptedException e) {
|
|
|
- Thread.currentThread().interrupt(); // 恢复中断状态
|
|
|
- System.out.println("任务被中断,执行清理");
|
|
|
- if (client != null) {
|
|
|
- try {
|
|
|
- if (client.isConnected()) {
|
|
|
- client.disconnect();
|
|
|
+ @Override
|
|
|
+ public void messageArrived(String receivedTopic, MqttMessage message) {
|
|
|
+ System.out.println("接收到设备回复: " + new String(message.getPayload()));
|
|
|
+ // 处理响应消息
|
|
|
+ CompletableFuture<String> future = responseFutures.get(newClient.getClientId());
|
|
|
+ if (future != null) {
|
|
|
+ future.complete(new String(message.getPayload()));
|
|
|
+ responseFutures.remove(newClient.getClientId());
|
|
|
+ }
|
|
|
}
|
|
|
- client.close();
|
|
|
- } catch (MqttException ex) {
|
|
|
- ex.printStackTrace();
|
|
|
- System.err.println("MQTT 客户端关闭失败: " + e.getMessage());
|
|
|
- }
|
|
|
- }
|
|
|
- return false;
|
|
|
- } catch (Exception e) {
|
|
|
- e.printStackTrace();
|
|
|
- return false;
|
|
|
- } finally {
|
|
|
- if (client != null) {
|
|
|
- try {
|
|
|
- if (client.isConnected()) {
|
|
|
- client.disconnect();
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void deliveryComplete(IMqttDeliveryToken token) {
|
|
|
+ System.out.println("消息已成功发送");
|
|
|
}
|
|
|
- client.close(); // 即使未连接也调用 close()
|
|
|
- } catch (MqttException e) {
|
|
|
- // 记录日志,避免吞没异常
|
|
|
- System.err.println("MQTT 客户端关闭失败: " + e.getMessage());
|
|
|
- }
|
|
|
+ });
|
|
|
+
|
|
|
+ newClient.connect(options);
|
|
|
+ System.out.println("MQTT客户端连接成功: " + broker);
|
|
|
+ subscribedTopics.put(newClient.getClientId(), ConcurrentHashMap.newKeySet());
|
|
|
+ return newClient;
|
|
|
+ } catch (MqttException e) {
|
|
|
+ System.err.println("创建MQTT客户端失败: " + e.getMessage());
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ // 检查连接状态,如果断开则重新连接
|
|
|
+ if (!client.isConnected()) {
|
|
|
+ try {
|
|
|
+ MqttConnectOptions options = new MqttConnectOptions();
|
|
|
+ options.setCleanSession(false);
|
|
|
+ options.setUserName(username);
|
|
|
+ options.setPassword(password.toCharArray());
|
|
|
+ client.connect(options);
|
|
|
+ subscribedTopics.get(client.getClientId()).clear();
|
|
|
+ } catch (MqttException e) {
|
|
|
+ System.err.println("重新连接MQTT失败: " + e.getMessage());
|
|
|
+ return false;
|
|
|
}
|
|
|
}
|
|
|
- });
|
|
|
|
|
|
- // 阻塞等待结果(或设置超时)
|
|
|
- try {
|
|
|
- return future.get(60, TimeUnit.SECONDS); // 总超时时间(连接+等待回复)
|
|
|
+ // 订阅响应主题(如果需要)
|
|
|
+ Set<String> topics = subscribedTopics.get(client.getClientId());
|
|
|
+ if (topics == null) {
|
|
|
+ topics = ConcurrentHashMap.newKeySet();
|
|
|
+ subscribedTopics.put(client.getClientId(), topics);
|
|
|
+ }
|
|
|
+ if (!topics.contains(topic)) {
|
|
|
+ try {
|
|
|
+ client.subscribe(topic, qos);
|
|
|
+ topics.add(topic);
|
|
|
+ System.out.println("已订阅 topic: " + topic);
|
|
|
+ } catch (MqttException e) {
|
|
|
+ System.err.println("订阅主题失败: " + e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 创建响应Future
|
|
|
+ CompletableFuture<String> responseFuture = new CompletableFuture<>();
|
|
|
+ responseFutures.put(client.getClientId(), responseFuture);
|
|
|
+
|
|
|
+ // 发布消息
|
|
|
+ MqttMessage message = new MqttMessage(jsonPayload.getBytes());
|
|
|
+ message.setQos(qos);
|
|
|
+ client.publish(topic, message);
|
|
|
+ System.out.println("已发送 JSON 消息: " + jsonPayload);
|
|
|
+
|
|
|
+ // 等待响应
|
|
|
+ String response = responseFuture.get(DEFAULT_RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
|
|
|
+ System.out.println("收到响应: " + response);
|
|
|
+ return true;
|
|
|
+
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ System.out.println("任务被中断");
|
|
|
+ return false;
|
|
|
} catch (Exception e) {
|
|
|
- future.cancel(true); // 超时后取消任务
|
|
|
+ System.err.println("MQTT操作失败: " + e.getMessage());
|
|
|
return false;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // 关闭线程池(在应用终止时调用)
|
|
|
+ // 关闭线程池和MQTT客户端(在应用终止时调用)
|
|
|
@PreDestroy
|
|
|
public void shutdown() {
|
|
|
+ // 断开所有MQTT客户端
|
|
|
+ clientCache.values().forEach(client -> {
|
|
|
+ try {
|
|
|
+ if (client.isConnected()) {
|
|
|
+ client.disconnect();
|
|
|
+ }
|
|
|
+ client.close();
|
|
|
+ } catch (MqttException e) {
|
|
|
+ System.err.println("关闭MQTT客户端失败: " + e.getMessage());
|
|
|
+ }
|
|
|
+ });
|
|
|
+ clientCache.clear();
|
|
|
+ subscribedTopics.clear(); // 清空订阅记录
|
|
|
+
|
|
|
+ // 关闭线程池
|
|
|
executor.shutdown();
|
|
|
try {
|
|
|
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
|