| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 |
- import os
- import shutil
- import time
- import platform
- from typing import Annotated
- import psutil
- from fastapi import APIRouter, Depends, HTTPException, UploadFile, File
- from app.api.v1 import deps
- from app.models.user import User, UserRole
- router = APIRouter()
- CERTS_DIR = "/app/certs"
- CRT_FILENAME = "server.crt"
- KEY_FILENAME = "server.key"
- @router.post("/ssl/config", summary="更新SSL证书")
- async def update_ssl_config(
- crt_file: Annotated[UploadFile, File(description="证书文件 (.crt/.pem)")],
- key_file: Annotated[UploadFile, File(description="私钥文件 (.key)")],
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 上传并更新SSL证书。
- 证书文件将保存到共享卷中,Nginx会自动检测变化并重载。
- 需要超级管理员权限。
- """
- if current_user.role != UserRole.SUPER_ADMIN:
- raise HTTPException(status_code=403, detail="权限不足")
- # 确保目录存在
- os.makedirs(CERTS_DIR, exist_ok=True)
- crt_path = os.path.join(CERTS_DIR, CRT_FILENAME)
- key_path = os.path.join(CERTS_DIR, KEY_FILENAME)
- # 简单的文件扩展名检查 (可以根据需要增强)
- if not crt_file.filename.endswith((".crt", ".pem", ".cer")):
- raise HTTPException(
- status_code=400,
- detail="证书文件格式不正确,请上传 .crt, .pem 或 .cer 文件",
- )
- if not key_file.filename.endswith((".key", ".pem")):
- raise HTTPException(
- status_code=400,
- detail="私钥文件格式不正确,请上传 .key 或 .pem 文件",
- )
- try:
- # 保存证书文件
- with open(crt_path, "wb") as buffer:
- shutil.copyfileobj(crt_file.file, buffer)
- # 保存私钥文件
- with open(key_path, "wb") as buffer:
- shutil.copyfileobj(key_file.file, buffer)
- except Exception as e:
- raise HTTPException(status_code=500, detail=f"保存证书文件失败: {str(e)}")
- return {"message": "SSL证书已更新,Nginx将自动重新加载配置。"}
- @router.get("/status", summary="获取服务器资源状态")
- async def get_system_status(
- current_user: User = Depends(deps.get_current_active_user),
- ):
- """
- 返回当前服务器的 CPU / 内存 / 磁盘 / 系统信息。
- 仅超级管理员可用。
- """
- if current_user.role != UserRole.SUPER_ADMIN:
- raise HTTPException(status_code=403, detail="权限不足")
- # CPU
- cpu_percent = psutil.cpu_percent(interval=0.5)
- cpu_count = psutil.cpu_count(logical=True)
- try:
- load1, load5, load15 = os.getloadavg()
- cpu_load = {"load1": load1, "load5": load5, "load15": load15}
- except (AttributeError, OSError):
- cpu_load = None
- # 内存
- mem = psutil.virtual_memory()
- memory = {
- "total": mem.total,
- "used": mem.used,
- "available": mem.available,
- "percent": mem.percent,
- }
- # 磁盘(只列出主要分区)
- disk_partitions = psutil.disk_partitions(all=False)
- disks = []
- for part in disk_partitions:
- try:
- usage = psutil.disk_usage(part.mountpoint)
- except PermissionError:
- continue
- disks.append(
- {
- "mountpoint": part.mountpoint,
- "fstype": part.fstype,
- "total": usage.total,
- "used": usage.used,
- "free": usage.free,
- "percent": usage.percent,
- }
- )
- # 系统信息
- boot_time = psutil.boot_time()
- now = time.time()
- system_info = {
- "hostname": platform.node(),
- "system": platform.system(),
- "release": platform.release(),
- "version": platform.version(),
- "machine": platform.machine(),
- "boot_time": boot_time,
- "uptime_seconds": int(now - boot_time),
- }
- return {
- "cpu": {
- "percent": cpu_percent,
- "count": cpu_count,
- "load": cpu_load,
- },
- "memory": memory,
- "disks": disks,
- "system": system_info,
- }
|