|
|
@@ -0,0 +1,145 @@
|
|
|
+package cn.com.mes.server;
|
|
|
+
|
|
|
+import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
+import org.eclipse.paho.client.mqttv3.*;
|
|
|
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
|
|
+import org.springframework.scheduling.annotation.Scheduled;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
+import javax.annotation.PostConstruct;
|
|
|
+import javax.annotation.PreDestroy;
|
|
|
+import java.util.Date;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+
|
|
|
+@Service
|
|
|
+public class HumitureServer {
|
|
|
+ private static final String BROKER = "tcp://222.243.138.146:1883";
|
|
|
+ private static final String TOPIC = "yzkj/zkshi/esp32/DH11";
|
|
|
+ private static final String USERNAME = "ygtx";
|
|
|
+ private static final String PASSWORD = "T!8dFz$kW3#pRgM2";
|
|
|
+
|
|
|
+ private MqttClient client;
|
|
|
+ private final Map<String, Object> latestData = new ConcurrentHashMap<>();
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 程序启动时自动执行,建立 MQTT 长连接
|
|
|
+ */
|
|
|
+ @PostConstruct
|
|
|
+ public void init() {
|
|
|
+ connectAndSubscribe();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 建立连接并订阅主题
|
|
|
+ */
|
|
|
+ private void connectAndSubscribe() {
|
|
|
+ try {
|
|
|
+ String clientId = "java_client_" + System.currentTimeMillis();
|
|
|
+ MqttConnectOptions options = new MqttConnectOptions();
|
|
|
+ options.setUserName(USERNAME);
|
|
|
+ options.setPassword(PASSWORD.toCharArray());
|
|
|
+ options.setConnectionTimeout(10);
|
|
|
+ options.setKeepAliveInterval(60);
|
|
|
+ options.setCleanSession(true);
|
|
|
+
|
|
|
+ client = new MqttClient(BROKER, clientId, new MemoryPersistence());
|
|
|
+
|
|
|
+ client.setCallback(new MqttCallback() {
|
|
|
+ @Override
|
|
|
+ public void connectionLost(Throwable cause) {
|
|
|
+ System.err.println("[MQTT] 连接丢失:" + cause.getMessage());
|
|
|
+ reconnect();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void messageArrived(String topic, MqttMessage message) {
|
|
|
+ try {
|
|
|
+ String payload = new String(message.getPayload());
|
|
|
+ ObjectMapper mapper = new ObjectMapper();
|
|
|
+ Map<String, Object> data = mapper.readValue(payload, Map.class);
|
|
|
+ data.put("statisticsTime", new Date());
|
|
|
+ latestData.clear();
|
|
|
+ latestData.putAll(data);
|
|
|
+ System.out.println("[MQTT] 收到温湿度数据:" + data);
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void deliveryComplete(IMqttDeliveryToken token) {}
|
|
|
+ });
|
|
|
+
|
|
|
+ client.connect(options);
|
|
|
+ client.subscribe(TOPIC, 0);
|
|
|
+ System.out.println("[MQTT] 已连接并订阅:" + TOPIC);
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ System.err.println("[MQTT] 连接失败:" + e.getMessage());
|
|
|
+ scheduleReconnect();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 自动重连逻辑(延迟5秒)
|
|
|
+ */
|
|
|
+ private void reconnect() {
|
|
|
+ try {
|
|
|
+ if (client != null && client.isConnected()) return;
|
|
|
+ Thread.sleep(5000);
|
|
|
+ System.out.println("[MQTT] 尝试重连...");
|
|
|
+ connectAndSubscribe();
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ scheduleReconnect();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 当连接失败时,延迟重新连接
|
|
|
+ */
|
|
|
+ private void scheduleReconnect() {
|
|
|
+ new Thread(() -> {
|
|
|
+ try {
|
|
|
+ Thread.sleep(10000);
|
|
|
+ reconnect();
|
|
|
+ } catch (InterruptedException ignored) {}
|
|
|
+ }).start();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 对外提供获取最新温湿度的方法
|
|
|
+ */
|
|
|
+ public Map<String, Object> getHumiture() {
|
|
|
+ if (latestData.isEmpty()) {
|
|
|
+ return new HashMap<>();
|
|
|
+ }
|
|
|
+ return new HashMap<>(latestData);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 定时任务示例:每5秒打印一次最新数据
|
|
|
+ */
|
|
|
+// @Scheduled(fixedRate = 5000)
|
|
|
+// public void printLatestHumiture() {
|
|
|
+// System.out.println("[定时任务] 最新温湿度数据:" + getHumiture());
|
|
|
+// }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 程序关闭时断开连接
|
|
|
+ */
|
|
|
+ @PreDestroy
|
|
|
+ public void shutdown() {
|
|
|
+ try {
|
|
|
+ if (client != null && client.isConnected()) {
|
|
|
+ client.disconnect();
|
|
|
+ client.close();
|
|
|
+ System.out.println("[MQTT] 已断开连接。");
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|