|
|
@@ -1,11 +1,15 @@
|
|
|
package com.dcs.equipment.service.impl;
|
|
|
|
|
|
+import com.dcs.equipment.constants.ProtocolConstants;
|
|
|
import com.dcs.equipment.domain.DeviceRequest;
|
|
|
import com.dcs.equipment.service.ProtocolService;
|
|
|
import org.eclipse.paho.client.mqttv3.*;
|
|
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
+import javax.annotation.PreDestroy;
|
|
|
+import java.util.concurrent.*;
|
|
|
+
|
|
|
import java.util.List;
|
|
|
|
|
|
/**
|
|
|
@@ -14,6 +18,7 @@ import java.util.List;
|
|
|
*/
|
|
|
@Service("mqttService")
|
|
|
public class MQTTServiceImpl implements ProtocolService {
|
|
|
+ private final ExecutorService executor = Executors.newSingleThreadExecutor();
|
|
|
@Override
|
|
|
public List<Object> getValues(DeviceRequest request) {
|
|
|
return null;
|
|
|
@@ -21,69 +26,116 @@ public class MQTTServiceImpl implements ProtocolService {
|
|
|
|
|
|
@Override
|
|
|
public boolean setValue(DeviceRequest request) {
|
|
|
- String broker = "tcp://" + request.getIpAddress() + ":" + request.getPort();
|
|
|
- String topic = request.getResource();
|
|
|
- String clientId = "java_client_" + System.currentTimeMillis();
|
|
|
- String username = request.getData().get(1).toString();
|
|
|
- String password = request.getData().get(2).toString();
|
|
|
- Integer qos = request.getLengthOrQos();
|
|
|
-
|
|
|
- try {
|
|
|
- MqttClient client = new MqttClient(broker, clientId, new MemoryPersistence());
|
|
|
-
|
|
|
- MqttConnectOptions options = new MqttConnectOptions();
|
|
|
- options.setCleanSession(true);
|
|
|
- options.setUserName(username);
|
|
|
- options.setPassword(password.toCharArray());
|
|
|
-
|
|
|
- // 设置回调,用于接收消息
|
|
|
- client.setCallback(new MqttCallback() {
|
|
|
- @Override
|
|
|
- public void connectionLost(Throwable cause) {
|
|
|
- System.out.println("连接丢失: " + cause.getMessage());
|
|
|
- }
|
|
|
+ if (request == null || request.getProtocol() == null || !request.getProtocol().equals(ProtocolConstants.MQTT)) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ if (request.getData() == null || request.getData().size() < 3) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
|
|
|
- @Override
|
|
|
- public void messageArrived(String topic, MqttMessage message) {
|
|
|
- System.out.println("接收到设备回复: " + new String(message.getPayload()));
|
|
|
+ // 提交任务到线程池
|
|
|
+ Future<Boolean> future = executor.submit(() -> {
|
|
|
+ String broker = "tcp://" + request.getIpAddress() + ":" + request.getPort();
|
|
|
+ String topic = request.getResource();
|
|
|
+ String clientId = "java_client_" + System.currentTimeMillis();
|
|
|
+ String username = request.getData().get(1).toString();
|
|
|
+ String password = request.getData().get(2).toString();
|
|
|
+ Integer qos = request.getLengthOrQos();
|
|
|
+ MqttClient client = null;
|
|
|
+
|
|
|
+ try {
|
|
|
+ client = new MqttClient(broker, clientId, new MemoryPersistence());
|
|
|
+ MqttConnectOptions options = new MqttConnectOptions();
|
|
|
+ options.setCleanSession(true);
|
|
|
+ options.setUserName(username);
|
|
|
+ options.setPassword(password.toCharArray());
|
|
|
+
|
|
|
+ client.setCallback(new MqttCallback() {
|
|
|
+ @Override
|
|
|
+ public void connectionLost(Throwable cause) {
|
|
|
+ System.out.println("连接丢失: " + cause.getMessage());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void messageArrived(String topic, MqttMessage message) {
|
|
|
+ System.out.println("接收到设备回复: " + new String(message.getPayload()));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void deliveryComplete(IMqttDeliveryToken token) {
|
|
|
+ System.out.println("消息已成功发送");
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ client.connect(options);
|
|
|
+ System.out.println("连接成功");
|
|
|
+ client.subscribe(topic, qos);
|
|
|
+ System.out.println("已订阅 topic: " + topic);
|
|
|
+
|
|
|
+ // 发布消息
|
|
|
+ String jsonPayload = request.getData().get(0).toString();
|
|
|
+ MqttMessage message = new MqttMessage(jsonPayload.getBytes());
|
|
|
+ message.setQos(qos);
|
|
|
+ client.publish(topic, message);
|
|
|
+ System.out.println("已发送 JSON 消息: " + jsonPayload);
|
|
|
+
|
|
|
+ // 模拟等待回复(实际应根据业务需求调整)
|
|
|
+ Thread.sleep(30000);
|
|
|
+ return true;
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt(); // 恢复中断状态
|
|
|
+ System.out.println("任务被中断,执行清理");
|
|
|
+ if (client != null) {
|
|
|
+ try {
|
|
|
+ if (client.isConnected()) {
|
|
|
+ client.disconnect();
|
|
|
+ }
|
|
|
+ client.close();
|
|
|
+ } catch (MqttException ex) {
|
|
|
+ ex.printStackTrace();
|
|
|
+ System.err.println("MQTT 客户端关闭失败: " + e.getMessage());
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
- @Override
|
|
|
- public void deliveryComplete(IMqttDeliveryToken token) {
|
|
|
- // 可选:消息确认送达后触发
|
|
|
- System.out.println("消息已成功发送");
|
|
|
+ return false;
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ return false;
|
|
|
+ } finally {
|
|
|
+ if (client != null) {
|
|
|
+ try {
|
|
|
+ if (client.isConnected()) {
|
|
|
+ client.disconnect();
|
|
|
+ }
|
|
|
+ client.close(); // 即使未连接也调用 close()
|
|
|
+ } catch (MqttException e) {
|
|
|
+ // 记录日志,避免吞没异常
|
|
|
+ System.err.println("MQTT 客户端关闭失败: " + e.getMessage());
|
|
|
+ }
|
|
|
}
|
|
|
- });
|
|
|
-
|
|
|
- client.connect(options);
|
|
|
- System.out.println("连接成功");
|
|
|
-
|
|
|
- // 订阅 topic 等待设备响应
|
|
|
- client.subscribe(topic, qos);
|
|
|
- System.out.println("已订阅 topic: " + topic);
|
|
|
-
|
|
|
- // 构建 JSON 消息并发布
|
|
|
- Object o = request.getData().get(0);
|
|
|
- String jsonPayload = o.toString();
|
|
|
- MqttMessage message = new MqttMessage(jsonPayload.getBytes());
|
|
|
- message.setQos(qos);
|
|
|
- client.publish(topic, message);
|
|
|
- System.out.println("已发送 JSON 消息: " + jsonPayload);
|
|
|
-
|
|
|
- // 保持连接等待回复(例如 30 秒)
|
|
|
-// System.out.println("等待设备回复中...");
|
|
|
-// Thread.sleep(30000); // 可根据需要调整等待时间
|
|
|
-
|
|
|
- // 关闭连接
|
|
|
- client.disconnect();
|
|
|
- client.close();
|
|
|
- System.out.println("连接已断开");
|
|
|
- return true;
|
|
|
+ }
|
|
|
+ });
|
|
|
|
|
|
+ // 阻塞等待结果(或设置超时)
|
|
|
+ try {
|
|
|
+ return future.get(60, TimeUnit.SECONDS); // 总超时时间(连接+等待回复)
|
|
|
} catch (Exception e) {
|
|
|
- e.printStackTrace();
|
|
|
+ future.cancel(true); // 超时后取消任务
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 关闭线程池(在应用终止时调用)
|
|
|
+ @PreDestroy
|
|
|
+ public void shutdown() {
|
|
|
+ executor.shutdown();
|
|
|
+ try {
|
|
|
+ if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
|
|
|
+ executor.shutdownNow();
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ executor.shutdownNow();
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
}
|
|
|
- return false;
|
|
|
}
|
|
|
|
|
|
@Override
|