system.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. import os
  2. import shutil
  3. import time
  4. import platform
  5. from typing import Annotated
  6. import psutil
  7. from fastapi import APIRouter, Depends, HTTPException, UploadFile, File
  8. from app.api.v1 import deps
  9. from app.models.user import User, UserRole
  10. router = APIRouter()
  11. CERTS_DIR = "/app/certs"
  12. CRT_FILENAME = "server.crt"
  13. KEY_FILENAME = "server.key"
  14. @router.post("/ssl/config", summary="更新SSL证书")
  15. async def update_ssl_config(
  16. crt_file: Annotated[UploadFile, File(description="证书文件 (.crt/.pem)")],
  17. key_file: Annotated[UploadFile, File(description="私钥文件 (.key)")],
  18. current_user: User = Depends(deps.get_current_active_user),
  19. ):
  20. """
  21. 上传并更新SSL证书。
  22. 证书文件将保存到共享卷中,Nginx会自动检测变化并重载。
  23. 需要超级管理员权限。
  24. """
  25. if current_user.role != UserRole.SUPER_ADMIN:
  26. raise HTTPException(status_code=403, detail="权限不足")
  27. # 确保目录存在
  28. os.makedirs(CERTS_DIR, exist_ok=True)
  29. crt_path = os.path.join(CERTS_DIR, CRT_FILENAME)
  30. key_path = os.path.join(CERTS_DIR, KEY_FILENAME)
  31. # 简单的文件扩展名检查 (可以根据需要增强)
  32. if not crt_file.filename.endswith((".crt", ".pem", ".cer")):
  33. raise HTTPException(
  34. status_code=400,
  35. detail="证书文件格式不正确,请上传 .crt, .pem 或 .cer 文件",
  36. )
  37. if not key_file.filename.endswith((".key", ".pem")):
  38. raise HTTPException(
  39. status_code=400,
  40. detail="私钥文件格式不正确,请上传 .key 或 .pem 文件",
  41. )
  42. try:
  43. # 保存证书文件
  44. with open(crt_path, "wb") as buffer:
  45. shutil.copyfileobj(crt_file.file, buffer)
  46. # 保存私钥文件
  47. with open(key_path, "wb") as buffer:
  48. shutil.copyfileobj(key_file.file, buffer)
  49. except Exception as e:
  50. raise HTTPException(status_code=500, detail=f"保存证书文件失败: {str(e)}")
  51. return {"message": "SSL证书已更新,Nginx将自动重新加载配置。"}
  52. @router.get("/status", summary="获取服务器资源状态")
  53. async def get_system_status(
  54. current_user: User = Depends(deps.get_current_active_user),
  55. ):
  56. """
  57. 返回当前服务器的 CPU / 内存 / 磁盘 / 系统信息。
  58. 仅超级管理员可用。
  59. """
  60. if current_user.role != UserRole.SUPER_ADMIN:
  61. raise HTTPException(status_code=403, detail="权限不足")
  62. # CPU
  63. cpu_percent = psutil.cpu_percent(interval=0.5)
  64. cpu_count = psutil.cpu_count(logical=True)
  65. try:
  66. load1, load5, load15 = os.getloadavg()
  67. cpu_load = {"load1": load1, "load5": load5, "load15": load15}
  68. except (AttributeError, OSError):
  69. cpu_load = None
  70. # 内存
  71. mem = psutil.virtual_memory()
  72. memory = {
  73. "total": mem.total,
  74. "used": mem.used,
  75. "available": mem.available,
  76. "percent": mem.percent,
  77. }
  78. # 磁盘(只列出主要分区)
  79. disk_partitions = psutil.disk_partitions(all=False)
  80. disks = []
  81. for part in disk_partitions:
  82. try:
  83. usage = psutil.disk_usage(part.mountpoint)
  84. except PermissionError:
  85. continue
  86. disks.append(
  87. {
  88. "mountpoint": part.mountpoint,
  89. "fstype": part.fstype,
  90. "total": usage.total,
  91. "used": usage.used,
  92. "free": usage.free,
  93. "percent": usage.percent,
  94. }
  95. )
  96. # 系统信息
  97. boot_time = psutil.boot_time()
  98. now = time.time()
  99. system_info = {
  100. "hostname": platform.node(),
  101. "system": platform.system(),
  102. "release": platform.release(),
  103. "version": platform.version(),
  104. "machine": platform.machine(),
  105. "boot_time": boot_time,
  106. "uptime_seconds": int(now - boot_time),
  107. }
  108. return {
  109. "cpu": {
  110. "percent": cpu_percent,
  111. "count": cpu_count,
  112. "load": cpu_load,
  113. },
  114. "memory": memory,
  115. "disks": disks,
  116. "system": system_info,
  117. }