Jelajahi Sumber

InfluxDB获取宿舍耗电量

wangpx 9 bulan lalu
induk
melakukan
57bf28197d

+ 6 - 0
pom.xml

@@ -42,6 +42,12 @@
 		</dependency>
 
 
+		<!-- InfluxDB 官方 Java 客户端  -->
+		<dependency>
+			<groupId>com.influxdb</groupId>
+			<artifactId>influxdb-client-java</artifactId>
+			<version>6.9.0</version>
+		</dependency>
 		<!--io常用工具类 -->
 		<dependency>
 			<groupId>commons-io</groupId>

+ 55 - 0
src/main/java/cn/com/energy/config/InfluxDBConfig.java

@@ -0,0 +1,55 @@
+package cn.com.energy.config;
+
+import com.influxdb.client.InfluxDBClient;
+import com.influxdb.client.InfluxDBClientFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PreDestroy;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author: wangpx
+ * @date: 2025-07-03 10:50
+ */
+@Component
+public class InfluxDBConfig {
+    @Value("${influxdb.url}")
+    private String url;
+
+    @Value("${influxdb.token}")
+    private String token;
+
+    @Value("${influxdb.org}")
+    private String org;
+
+    // 使用 Map 存储不同 bucket 的 InfluxDBClient 实例,确保一个 bucket 只创建一个实例
+    private static final Map<String, InfluxDBClient> clientPool = new ConcurrentHashMap<>();
+
+    /**
+     * 获取指定 bucket 的 InfluxDBClient,如果不存在则创建
+     */
+    public InfluxDBClient getClient(String bucket) {
+        return clientPool.computeIfAbsent(bucket, b -> InfluxDBClientFactory.create(url, token.toCharArray(), org, b));
+    }
+
+    /**
+     * 关闭指定 bucket 的客户端
+     */
+    public void closeClient(String bucket) {
+        InfluxDBClient client = clientPool.remove(bucket);
+        if (client != null) {
+            client.close();
+        }
+    }
+
+    /**
+     * 关闭所有客户端
+     */
+    @PreDestroy
+    public void closeAllClients() {
+        clientPool.values().forEach(InfluxDBClient::close);
+        clientPool.clear();
+    }
+}

+ 76 - 3
src/main/java/cn/com/energy/controller/EnergyApiController.java

@@ -1,23 +1,37 @@
 package cn.com.energy.controller;
 
+import cn.com.energy.config.InfluxDBConfig;
 import cn.com.energy.model.DvMachinery;
 import cn.com.energy.model.DvMachineryData;
+import cn.com.energy.model.InfluxDBQuery;
 import cn.com.energy.service.IDvMachineryDataService;
 import cn.com.energy.service.IDvMachineryService;
 import cn.com.v2.common.base.BaseController;
 import cn.com.v2.common.domain.AjaxResult;
 import cn.hutool.core.date.DateUtil;
+import com.influxdb.client.QueryApi;
+import com.influxdb.query.FluxRecord;
+import com.influxdb.query.FluxTable;
 import io.swagger.annotations.ApiOperation;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.web.bind.annotation.*;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.ResponseBody;
+import org.springframework.web.bind.annotation.RestController;
 
 import java.math.BigDecimal;
 import java.math.RoundingMode;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
 import java.util.*;
 
 @RestController
 @RequestMapping("/api/energy")
 public class EnergyApiController extends BaseController {
+
+	@Autowired
+	private InfluxDBConfig influxDBConfig;
 	@Autowired
 	private IDvMachineryDataService iDvMachineryDataService;
 	@Autowired
@@ -65,11 +79,70 @@ public class EnergyApiController extends BaseController {
 		return success().put("data", val);
 	}
 
+//	@ApiOperation(value = "获取周能耗数据", notes = "获取数据")
+//	@GetMapping("/getEnergyWeekData")
+//	@ResponseBody
+//	public AjaxResult getEnergyWeekData() {
+//		List<Map<String,Object>> dvMachineryWeekDataList=iDvMachineryDataService.selectElectricByWeek();
+//		return success().put("data", dvMachineryWeekDataList);
+//	}
+
 	@ApiOperation(value = "获取周能耗数据", notes = "获取数据")
 	@GetMapping("/getEnergyWeekData")
 	@ResponseBody
 	public AjaxResult getEnergyWeekData() {
-		List<Map<String,Object>> dvMachineryWeekDataList=iDvMachineryDataService.selectElectricByWeek();
-		return success().put("data", dvMachineryWeekDataList);
+		String bucket = "home_assistant";
+		QueryApi queryApi = influxDBConfig.getClient(bucket).getQueryApi();
+		InfluxDBQuery query = new InfluxDBQuery(); // 构建查询语句
+		query.setBucket(bucket);
+		query.setMeasurement("Wh");
+
+		// 获取当前 UTC 时间的0点
+		ZonedDateTime utcMidnight = ZonedDateTime.now(ZoneOffset.UTC)
+				.minusDays(7)         // 7天前
+				.withHour(0)
+				.withMinute(0)
+				.withSecond(0)
+				.withNano(0);
+		// 格式化为 ISO-8601
+		String formatted = utcMidnight.format(DateTimeFormatter.ISO_INSTANT);
+
+		query.setTimeRangeStart(formatted);
+		query.setWindowPeriod("1d");
+		query.setAggregateFunction("last");
+		query.setFields(new String[]{"value"});
+		query.setSources(new String[]{"HA"});
+		String fluxQuery = query.buildFlux();
+		// 查询数据
+		List<FluxTable> tables = queryApi.query(fluxQuery);
+		List<FluxRecord> records = tables.get(0).getRecords();
+		List<Double> values = new ArrayList<>(); // 创建空列表存储数据
+		for (int i = 0; i < records.size(); i++) {
+			values.add(0d);
+		}
+		for (FluxTable table : tables) { // 遍历表格获取数据
+			int i = 0;
+			for (FluxRecord record : table.getRecords()) {
+				if (record.getValue() != null) {
+					double v = Double.parseDouble(record.getValue().toString());
+					Double d = values.get(i);
+					d += v;
+					values.set(i, d);
+				}
+				i++;
+			}
+		}
+		List<Map<String,Object>> data = new ArrayList<>();
+		for (int i = 0; i < records.size(); i++) { // 装填数据
+			FluxRecord record = records.get(i);
+			if (i == values.size() - 1) break;
+			Map<String,Object> map = new HashMap<>();
+			map.put("day", record.getTime().atZone(ZoneOffset.UTC).format(DateTimeFormatter.ofPattern("MM-dd")));
+			Double newValue = values.get(i + 1);
+			map.put("value", (newValue - values.get(i)) / 1000d);
+			map.put("sum", (newValue - values.get(0)) / 1000d);
+			data.add(map);
+		}
+		return success().put("data", data);
 	}
 }

+ 115 - 0
src/main/java/cn/com/energy/model/InfluxDBQuery.java

@@ -0,0 +1,115 @@
+package cn.com.energy.model;
+
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+
+/**
+ * @author: wangpx
+ * @date: 2025-03-08 12:00
+ */
+@Data
+public class InfluxDBQuery {
+    @ApiModelProperty(value = "库名")
+    private String bucket;
+    @ApiModelProperty(value = "表名")
+    private String measurement;
+    @ApiModelProperty(value = "查询字段")
+    private String[] fields;
+    @ApiModelProperty(value = "数据源")
+    private String[] sources;
+    @ApiModelProperty(value = "按值查询")
+    private String equipmentName;
+    @ApiModelProperty(value = "开始时间")
+    private String timeRangeStart;
+    @ApiModelProperty(value = "结束时间")
+    private String timeRangeStop;
+    @ApiModelProperty(value = "窗口期")
+    private String windowPeriod;
+    @ApiModelProperty(value = "聚合函数")
+    private String aggregateFunction;
+    @ApiModelProperty(value = "填充值")
+    private Double fill;
+
+    /**
+     * from(bucket: "shanghai_dcs_bucket")
+     *   |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
+     *   |> filter(fn: (r) => r["_measurement"] == "data")
+     *   |> filter(fn: (r) => r["_field"] == "LB6bpqdzd" or r["_field"] == "LB6bpsz" or r["_field"] == "LB6bpztz" or r["_field"] == "LB6yxfkpl")
+     *   |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
+     *   |> yield(name: "mean")
+     */
+    // 构造Flux语句
+    public String buildFlux() {
+        StringBuilder query = new StringBuilder();
+        
+        // from bucket
+        query.append("from(bucket: \"").append(bucket).append("\")\n");
+        
+        // range
+        if (timeRangeStart != null) {
+            query.append("  |> range(start: ").append(timeRangeStart);
+            if (timeRangeStop != null) {
+                query.append(", stop: ").append(timeRangeStop);
+            }
+            query.append(")\n");
+        }
+        
+        // measurement filter
+        if (measurement != null) {
+            query.append("  |> filter(fn: (r) => r[\"_measurement\"] == \"")
+                 .append(measurement).append("\")\n");
+        }
+        
+        // fields filter
+        if (fields != null && fields.length > 0) {
+            query.append("  |> filter(fn: (r) => ");
+            for (int i = 0; i < fields.length; i++) {
+                if (i > 0) {
+                    query.append(" or ");
+                }
+                query.append("r[\"_field\"] == \"").append(fields[i]).append("\"");
+            }
+            query.append(")\n");
+        }
+
+        // sources filter
+        if (sources != null && sources.length > 0) {
+            query.append("  |> filter(fn: (r) => ");
+            for (int i = 0; i < sources.length; i++) {
+                if (i > 0) {
+                    query.append(" or ");
+                }
+                query.append("r[\"source\"] == \"").append(sources[i]).append("\"");
+            }
+            query.append(")\n");
+        }
+        
+        // equipment name filter
+        if (equipmentName != null) {
+            query.append("  |> filter(fn: (r) => r[\"equipment_name\"] == \"")
+                 .append(equipmentName).append("\")\n");
+        }
+        
+        // aggregate window
+        if (windowPeriod != null && aggregateFunction != null) {
+            query.append("  |> aggregateWindow(every: ").append(windowPeriod)
+                 .append(", fn: ").append(aggregateFunction)
+                 .append(", createEmpty: true)\n");
+        }
+        query.append("  |> fill(");
+        if (fill == null) { // 无填充值
+            // 默认使用上一个值填充
+            query.append("usePrevious: true");
+        } else {
+            query.append("value: ").append(fill);
+        }
+        query.append(")\n");
+        
+        // yield
+        if (aggregateFunction != null) {
+            query.append("  |> yield(name: \"").append(aggregateFunction).append("\")");
+        }
+        
+        return query.toString();
+    }
+}

+ 5 - 1
src/main/resources/application.yml

@@ -109,4 +109,8 @@ sa-token:
     # 是否输出操作日志 
     is-log: false
 oa:
-  domain: yz
+  domain: yz
+influxdb:
+  url: http://127.0.0.1:8086
+  token: qt6GbeXF09wFwz9jbueRzhL_CtxebLaYOuU6TRjVL2ih_ePaHNvlP7A88Suer7cA5p4zzGIw511hgYkBlIkMPA==
+  org: ygtx