Prechádzať zdrojové kódy

feat(protocol): opcua通讯

wangpx 7 mesiacov pred
rodič
commit
997b199e1b

+ 1 - 0
admin/src/main/java/com/dcs/equipment/service/impl/ModbusTcpServiceImpl.java

@@ -156,6 +156,7 @@ public class ModbusTcpServiceImpl implements ProtocolService {
             values = ModbusUtil.getValues(request);
         } catch (ModbusProtocolException | ModbusNumberException | ModbusIOException | DeviceIOException e) {
             logger.error("获取设备数据失败:{}", e.getMessage());
+            logger.error("设备请求:{}", request);
             throw new RuntimeException(e.getMessage());
         }
         return values;

+ 5 - 1
admin/src/main/java/com/dcs/equipment/task/ModbusTcpTask.java

@@ -14,6 +14,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.BeanUtils;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
 
@@ -51,6 +52,9 @@ public class ModbusTcpTask {
     // 保存设备寄存器信息和数据地址
     public static volatile List<EquipmentParam> equipmentParamList;
 
+    @Value("${dcs.influxdb.bucket}")
+    private String influxDBBucket;
+
     @PostConstruct
     private void init() {
         deviceService.updateEquipmentCodes();
@@ -100,7 +104,7 @@ public class ModbusTcpTask {
         CacheCenter.registerNowDataMap= EquipmentParamUtils.paramListToMap(broadCastEquipmentParamFormVOList);
         // 保存数据到influxdb
         CompletableFuture.runAsync(() ->
-                influxDBService.saveDataToInfluxDB("shanghai_dcs_bucket", deepCopyEquipmentParamFormVOList)
+                influxDBService.saveDataToInfluxDB(influxDBBucket, deepCopyEquipmentParamFormVOList)
         , scheduledExecutorService);
     }
 

+ 74 - 45
admin/src/main/java/com/dcs/equipment/utils/ModbusUtil.java

@@ -23,7 +23,9 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
 
 import static com.dcs.equipment.constants.ModbusConstants.*;
 import static com.dcs.equipment.domain.ModbusTcpEnum.DATA_ADDRESS_LENGTH_MAP;
@@ -39,63 +41,84 @@ public class ModbusUtil {
 
     private static final ExecutorService ModbusThreadPoolTaskExecutor = Executors.newFixedThreadPool(10);
 
-    @Value("${dcs.device.reconnectLimit}")
-    private static int reconnectLimit;
+    private static volatile int reconnectLimit = 3;
 
-    private static Map<String, ModbusMaster> masters = new HashMap<>();
-    private static Map<String, AtomicBoolean> reconnectingStatus = new HashMap<>();
+    @Value("${dcs.device.reconnectLimit:3}")
+    public void setReconnectLimit(int limit) { ModbusUtil.reconnectLimit = limit; }
+
+    private static final ConcurrentHashMap<String, ModbusMaster> masters = new ConcurrentHashMap<>();
+    private static final ConcurrentHashMap<String, AtomicBoolean> reconnectingStatus = new ConcurrentHashMap<>();
+    private static final ConcurrentHashMap<String, ReentrantLock> urlLocks = new ConcurrentHashMap<>();
+
+    private static ReentrantLock getUrlLock(String url) {
+        return urlLocks.computeIfAbsent(url, k -> new ReentrantLock());
+    }
 
     public static ModbusMaster connect(String host, Integer port) throws ModbusIOException, UnknownHostException {
         String url = host + ":" + port;
-        logger.info("正在连接到 ModbusTCP {} 服务端", url);
-        TcpParameters tcpParameters = new TcpParameters();
-        InetAddress address = null;
-        try {
-            address = InetAddress.getByName(host);
-        } catch (UnknownHostException e) {
-            logger.error("ModbusTCP {} 地址解析失败: {}", url, e.getMessage());
-            throw new UnknownHostException("ModbusTCP " + url + " 地址解析失败");
+        // fast path: already connected
+        ModbusMaster fastExisting = masters.get(url);
+        if (fastExisting != null && fastExisting.isConnected()) {
+            return fastExisting;
         }
-        tcpParameters.setHost(address);
-        tcpParameters.setPort(port);
-        tcpParameters.setKeepAlive(true);
-        Modbus.setAutoIncrementTransactionId(true);
-        ModbusMaster master = ModbusMasterFactory.createModbusMasterTCP(tcpParameters);
+        ReentrantLock lock = getUrlLock(url);
+        lock.lock();
         try {
-            master.connect();
-            masters.put(url, master);
-            reconnectingStatus.put(url, new AtomicBoolean(false));
-            logger.info("与 ModbusTCP {} 连接成功", url);
-        } catch (ModbusIOException e) {
-            logger.error("与 ModbusTCP {} 连接失败: {}", url, e.getMessage());
-            throw new ModbusIOException("ModbusTCP " + url + " 连接失败");
+            ModbusMaster existing = masters.get(url);
+            if (existing != null && existing.isConnected()) {
+                return existing;
+            }
+
+            logger.info("正在连接到 ModbusTCP {} 服务端", url);
+            TcpParameters tcpParameters = new TcpParameters();
+            InetAddress address;
+            try {
+                address = InetAddress.getByName(host);
+            } catch (UnknownHostException e) {
+                logger.error("ModbusTCP {} 地址解析失败: {}", url, e.getMessage());
+                throw new UnknownHostException("ModbusTCP " + url + " 地址解析失败");
+            }
+            tcpParameters.setHost(address);
+            tcpParameters.setPort(port);
+            tcpParameters.setKeepAlive(true);
+            Modbus.setAutoIncrementTransactionId(true);
+            ModbusMaster master = ModbusMasterFactory.createModbusMasterTCP(tcpParameters);
+            try {
+                master.connect();
+                masters.put(url, master);
+                reconnectingStatus.computeIfAbsent(url, k -> new AtomicBoolean(false));
+                logger.info("与 ModbusTCP {} 连接成功", url);
+            } catch (ModbusIOException e) {
+                logger.error("与 ModbusTCP {} 连接失败: {}", url, e.getMessage());
+                throw new ModbusIOException("ModbusTCP " + url + " 连接失败");
+            }
+            return masters.get(url);
+        } finally {
+            lock.unlock();
         }
-        return master;
     }
 
     private static void reconnect(String host, Integer port) throws DeviceIOException {
         String url = host + ":" + port;
-        AtomicBoolean reconnecting = reconnectingStatus.get(url);
+        AtomicBoolean reconnecting = reconnectingStatus.computeIfAbsent(url, k -> new AtomicBoolean(false));
         if (reconnecting.get()) return;
 
         if (reconnecting.compareAndSet(false, true)) {
             CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                 try {
                     for (int i = 0; i < reconnectLimit; i++) { // 单次重连 最多尝试次数
-                        ModbusMaster master = masters.get(url);
                         logger.warn("ModbusTCP {} 连接断开,开始重连...", url);
                         try {
-                            connect(host, port);
+                            ModbusMaster newMaster = connect(host, port);
+                            if (newMaster != null && newMaster.isConnected()) {
+                                logger.info("ModbusTCP {} 重连成功!", url);
+                                break;
+                            }
                         } catch (UnknownHostException e) {
                             throw new RuntimeException(e);
                         } catch (ModbusIOException e) {
                             if (i == reconnectLimit - 1) throw new RuntimeException(e);
                         }
-                        if (master.isConnected()) {
-                            logger.info("ModbusTCP {} 重连成功!", url);
-                            break;
-                        }
-
                         try {
                             Thread.sleep(1000); // 每次重试间隔 1 秒
                         } catch (InterruptedException e) {
@@ -125,19 +148,23 @@ public class ModbusUtil {
     }
 
     public static void disconnect(String host, Integer port) {
-        String url = host+":"+port;
-        ModbusMaster master= masters.get(url);
+        String url = host + ":" + port;
+        ReentrantLock lock = getUrlLock(url);
+        lock.lock();
         try {
-            master.disconnect();
-            if (masters.containsKey(url)) {
-                masters.remove(url);
-            }
-            if (reconnectingStatus.containsKey(url)) {
-                reconnectingStatus.remove(url);
+            ModbusMaster master = masters.get(url);
+            if (master != null) {
+                try {
+                    master.disconnect();
+                } catch (ModbusIOException e) {
+                    logger.error("与 ModbusTCP {} 断开连接失败: {}", url, e.getMessage());
+                }
             }
+            masters.remove(url);
+            reconnectingStatus.remove(url);
             logger.info("ModbusTCP {} 断开连接", url);
-        } catch (ModbusIOException e) {
-            logger.error("与 ModbusTCP {} 断开连接失败: {}", url, e.getMessage());
+        } finally {
+            lock.unlock();
         }
     }
     @PreDestroy
@@ -311,8 +338,9 @@ public class ModbusUtil {
     }
 
     private static ModbusMaster getMaster(String host, Integer port) throws DeviceIOException {
-        ModbusMaster master = masters.get(host+":"+port);
-        if (master == null){
+        String url = host + ":" + port;
+        ModbusMaster master = masters.get(url);
+        if (master == null) {
             try {
                 return connect(host, port);
             } catch (ModbusIOException | UnknownHostException e) {
@@ -321,6 +349,7 @@ public class ModbusUtil {
         }
         if (!master.isConnected()) {
             reconnect(host, port);
+            master = masters.get(url);
         }
         return master;
     }

+ 190 - 0
admin/src/main/java/com/dcs/equipment/utils/OpcuaUtil.java

@@ -0,0 +1,190 @@
+package com.dcs.equipment.utils;
+
+import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import org.eclipse.milo.opcua.stack.core.UaException;
+import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
+import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
+import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
+import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+
+import javax.annotation.PreDestroy;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * @author: wangpx
+ * @date: 2025-05-26 14:53
+ */
+public class OpcuaUtil {
+    private static final Logger logger = LoggerFactory.getLogger("OPCUA");
+
+    private static final ExecutorService ModbusThreadPoolTaskExecutor = Executors.newFixedThreadPool(10);
+
+    @Value("${dcs.device.reconnectLimit}")
+    private static int reconnectLimit;
+
+    private static Map<String, OpcUaClient> clients = new HashMap<>();
+    // 检查是否连接并自动重连
+    private static Map<String, AtomicBoolean> reconnectingStatus = new HashMap<>();
+
+    // 创建并连接客户端
+    public static void connect(String host, Integer port) {
+        String URL = "opc.tcp://" + host + ":" + port;
+        try {
+            logger.info("正在连接到 OPC UA {} 服务端", host+":"+port);
+            OpcUaClient client = OpcUaClient.create(URL);
+            client.connect().get();
+            clients.put(host+":"+port, client);
+            reconnectingStatus.put(host+":"+port, new AtomicBoolean(false));
+            logger.info("OPC UA {} 客户端连接成功", host+":"+port);
+        } catch (UaException e) {
+            logger.error("OPC UA 客户端连接失败: {}", e.getMessage());
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            logger.error("连接线程中断: {}", e.getMessage());
+        } catch (ExecutionException e) {
+            logger.error("OPC UA {} 客户端连接失败: {}", host+":"+port, e.getMessage());
+        }
+    }
+
+    private static void reconnect(String host, Integer port) {
+        OpcUaClient client = clients.get(host+":"+port);
+        if (client == null || client.getSession() == null || !client.getSession().isDone()) {
+            logger.warn("检测到 OPC UA 客户端连接中断。");
+            AtomicBoolean reconnecting = reconnectingStatus.get(host+":"+port);
+            if (reconnecting.compareAndSet(false, true)) {
+                try {
+                    logger.info("尝试重新连接 OPC UA 服务器: {}", host);
+                    connect(host, port);
+                    logger.info("重连成功。");
+                } catch (Exception e) {
+                    logger.error("重连 OPC UA 失败: {}", e.getMessage());
+                } finally {
+                    reconnecting.set(false);
+                }
+            } else {
+                logger.debug("已有线程正在重连,当前线程跳过连接尝试。");
+            }
+        }
+    }
+
+    public static void disconnect(String host, Integer port) {
+        OpcUaClient client = clients.get(host+":"+port);
+        try {
+            if (client != null && client.getSession() != null && client.getSession().isDone()) {
+                client.disconnect().get();
+                logger.info("OPC UA 客户端已断开连接");
+            }
+        } catch (Exception e) {
+            logger.error("客户端断开连接失败: {}", e.getMessage());
+        }
+    }
+
+    // 关闭系统时,释放资源
+    @PreDestroy
+    private void destroy() {
+        Set<String> hostList = clients.keySet();
+        for (String key : hostList) {
+            String[] split = key.split(":");
+            String host = split[0];
+            Integer port = Integer.parseInt(split[1]);
+            disconnect(host, port);
+        }
+    }
+
+    private static OpcUaClient getClient(String host, Integer port) {
+        OpcUaClient client = clients.get(host+":"+port);
+
+        if (client == null) {
+            reconnect(host, port);
+        }
+        return client;
+    }
+
+
+    // 批量节点值获取
+    public List<Object> getValues(String host, Integer port, List<NodeId> nodeIds) {
+        OpcUaClient client = getClient(host, port);
+        List<DataValue> dataValueList = new ArrayList<>();
+        try {
+            dataValueList = client.readValues(0, TimestampsToReturn.Both, nodeIds).get();
+        } catch (InterruptedException | ExecutionException e) {
+            logger.error("读取节点 {} 值失败", nodeIds);
+            logger.error("错误信息: {}", e.getMessage());
+        }
+
+        List<Object> values = new ArrayList<>();
+        for (DataValue value : dataValueList) {
+            values.add(value.getValue() != null ? value.getValue().getValue() : null);
+        }
+        return values;
+    }
+
+    // 批量节点值更新
+    public boolean setValues(String host, Integer port, List<NodeId> nodeIds, List<Object> values) {
+        OpcUaClient client = getClient(host, port);
+        List<DataValue> dataValues = new ArrayList<>();
+        for (Object value : values) {
+            dataValues.add(new DataValue(new Variant(value), StatusCode.GOOD, null));
+        }
+        try {
+            List<StatusCode> statusCodes = client.writeValues(nodeIds, dataValues).get();
+            for (StatusCode code : statusCodes) {
+                if (code.isBad()) {
+                    logger.warn("节点写入失败: {}", code);
+                    return false;
+                }
+            }
+            return true;
+        } catch (InterruptedException | ExecutionException e) {
+            logger.error("写入节点值失败: {}", nodeIds, e);
+            return false;
+        }
+    }
+
+    // 单个节点值获取
+    public Object getValue(String host, Integer port, NodeId nodeId) {
+        List<Object> values = getValues(host, port, Collections.singletonList(nodeId));
+        return values.isEmpty() ? null : values.get(0);
+    }
+
+    // 单个节点值更新
+    public boolean setValue(String host, Integer port, NodeId nodeId, Object value) {
+        return setValues(host, port, Collections.singletonList(nodeId), Collections.singletonList(value));
+    }
+
+
+    @Value("${dcs.opcua.url}")
+    private String HOST;
+    @Value("${dcs.opcua.port}")
+    private int PORT;
+
+    private final String URL = "opc.tcp://" + HOST + ":" + PORT;
+    private static String ip = "192.168.0.11";
+    private static Integer port = 4840;
+
+    public static void main(String[] args) {
+        String URL = "opc.tcp://" + ip + ":" + port;
+
+        try {
+            clients.put(ip + ":" + port, OpcUaClient.create(URL));
+        } catch (UaException e) {
+            throw new RuntimeException(e);
+        }
+        try {
+            clients.get(ip + ":" + port).connect().get();
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        } catch (ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+}