minio.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. from minio import Minio
  2. from minio.error import S3Error
  3. from app.core.config import settings
  4. import io
  5. import uuid
  6. from datetime import datetime
  7. import logging
  8. logger = logging.getLogger(__name__)
  9. class MessageStorage:
  10. def __init__(self):
  11. try:
  12. # Handle endpoint protocol
  13. endpoint = settings.MINIO_ENDPOINT
  14. secure = settings.MINIO_SECURE
  15. if endpoint.startswith("http://"):
  16. endpoint = endpoint.replace("http://", "")
  17. secure = False
  18. elif endpoint.startswith("https://"):
  19. endpoint = endpoint.replace("https://", "")
  20. secure = True
  21. self.client = Minio(
  22. endpoint=endpoint,
  23. access_key=settings.MINIO_ACCESS_KEY,
  24. secret_key=settings.MINIO_SECRET_KEY,
  25. secure=secure
  26. )
  27. self.bucket_name = settings.MINIO_BUCKET_NAME
  28. self._ensure_bucket()
  29. except Exception as e:
  30. logger.error(f"MinIO init failed: {e}")
  31. self.client = None
  32. def _ensure_bucket(self):
  33. if not self.client: return
  34. try:
  35. if not self.client.bucket_exists(self.bucket_name):
  36. self.client.make_bucket(self.bucket_name)
  37. # 移除公开读策略,默认为私有
  38. # self.client.set_bucket_policy(self.bucket_name, json.dumps(policy))
  39. except S3Error as e:
  40. logger.error(f"MinIO bucket error: {e}")
  41. def get_presigned_url(self, object_name: str, expires=None):
  42. """生成预签名访问链接"""
  43. if not self.client: return None
  44. try:
  45. from datetime import timedelta
  46. if expires is None:
  47. expires = timedelta(hours=1)
  48. return self.client.get_presigned_url(
  49. "GET",
  50. self.bucket_name,
  51. object_name,
  52. expires=expires
  53. )
  54. except Exception as e:
  55. logger.error(f"Generate presigned url failed: {e}")
  56. return None
  57. def upload_message_file(self, file_data: bytes, filename: str, content_type: str, user_id: int) -> str:
  58. """
  59. 上传消息附件
  60. 路径格式: messages/{user_id}/{year}/{month}/{uuid}.ext
  61. 返回: object_name (用于存储和生成签名URL)
  62. """
  63. if not self.client:
  64. raise Exception("Storage service unavailable")
  65. # 生成结构化路径
  66. ext = filename.split('.')[-1] if '.' in filename else 'bin'
  67. now = datetime.now()
  68. object_name = f"messages/{user_id}/{now.year}/{now.month:02d}/{uuid.uuid4()}.{ext}"
  69. try:
  70. self.client.put_object(
  71. bucket_name=self.bucket_name,
  72. object_name=object_name,
  73. data=io.BytesIO(file_data),
  74. length=len(file_data),
  75. content_type=content_type
  76. )
  77. return object_name
  78. except S3Error as e:
  79. logger.error(f"Upload failed: {e}")
  80. raise Exception("File upload failed")
  81. def upload_db_backup(self, file_path: str, filename: str) -> str:
  82. """
  83. 上传数据库备份文件到 MinIO(使用独立的备份桶)
  84. 路径格式: {year}/{month}/{filename}
  85. 返回: object_name
  86. """
  87. if not self.client:
  88. raise Exception("Storage service unavailable")
  89. # 使用数据库备份专用的 bucket
  90. backup_bucket = settings.MINIO_DB_BACKUP_BUCKET_NAME
  91. # 确保备份桶存在
  92. try:
  93. if not self.client.bucket_exists(backup_bucket):
  94. self.client.make_bucket(backup_bucket)
  95. logger.info(f"Created backup bucket: {backup_bucket}")
  96. except S3Error as e:
  97. logger.error(f"MinIO backup bucket error: {e}")
  98. raise Exception("Backup bucket unavailable")
  99. # 读取文件
  100. with open(file_path, 'rb') as f:
  101. file_data = f.read()
  102. # 生成结构化路径
  103. now = datetime.now()
  104. object_name = f"{now.year}/{now.month:02d}/{filename}"
  105. try:
  106. self.client.put_object(
  107. bucket_name=backup_bucket,
  108. object_name=object_name,
  109. data=io.BytesIO(file_data),
  110. length=len(file_data),
  111. content_type="application/gzip"
  112. )
  113. logger.info(f"Successfully uploaded database backup to MinIO: {backup_bucket}/{object_name}")
  114. return object_name
  115. except S3Error as e:
  116. logger.error(f"Database backup upload failed: {e}")
  117. raise Exception("Database backup upload failed")
  118. def _ensure_distribution_bucket(self):
  119. """确保客户端分发桶存在"""
  120. if not self.client:
  121. return
  122. bucket = settings.MINIO_DISTRIBUTION_BUCKET_NAME
  123. try:
  124. if not self.client.bucket_exists(bucket):
  125. self.client.make_bucket(bucket)
  126. logger.info(f"Created distribution bucket: {bucket}")
  127. except S3Error as e:
  128. logger.error(f"MinIO distribution bucket error: {e}")
  129. def upload_distribution_file(
  130. self, file_data: bytes, filename: str, content_type: str,
  131. distribution_id: int
  132. ) -> str:
  133. """
  134. 上传客户端分发安装包到 MinIO
  135. 路径格式: versions/{distribution_id}/{uuid}.{ext}
  136. 返回: object_name
  137. """
  138. if not self.client:
  139. raise Exception("Storage service unavailable")
  140. self._ensure_distribution_bucket()
  141. bucket = settings.MINIO_DISTRIBUTION_BUCKET_NAME
  142. ext = filename.split('.')[-1] if '.' in filename else 'bin'
  143. object_name = f"versions/{distribution_id}/{uuid.uuid4()}.{ext}"
  144. try:
  145. self.client.put_object(
  146. bucket_name=bucket,
  147. object_name=object_name,
  148. data=io.BytesIO(file_data),
  149. length=len(file_data),
  150. content_type=content_type
  151. )
  152. return object_name
  153. except S3Error as e:
  154. logger.error(f"Distribution file upload failed: {e}")
  155. raise Exception("File upload failed")
  156. def get_distribution_presigned_url(self, object_name: str, expires=None) -> str | None:
  157. """生成分发桶内对象的预签名下载链接"""
  158. if not self.client:
  159. return None
  160. try:
  161. from datetime import timedelta
  162. if expires is None:
  163. expires = timedelta(hours=1)
  164. bucket = settings.MINIO_DISTRIBUTION_BUCKET_NAME
  165. return self.client.get_presigned_url(
  166. "GET", bucket, object_name, expires=expires
  167. )
  168. except Exception as e:
  169. logger.error(f"Generate distribution presigned url failed: {e}")
  170. return None
  171. def upload_distribution_icon(
  172. self, file_data: bytes, filename: str, content_type: str,
  173. distribution_id: int, user_id: int
  174. ) -> str:
  175. """
  176. 上传分发图标到 MinIO
  177. - distribution_id > 0: icons/{distribution_id}/{uuid}.{ext}
  178. - distribution_id == 0 (创建时): icons/temp/{user_id}/{uuid}.{ext}
  179. 返回: object_name
  180. """
  181. if not self.client:
  182. raise Exception("Storage service unavailable")
  183. self._ensure_distribution_bucket()
  184. bucket = settings.MINIO_DISTRIBUTION_BUCKET_NAME
  185. ext = filename.split('.')[-1].lower() if '.' in filename else 'png'
  186. if ext not in ('png', 'jpg', 'jpeg', 'gif', 'webp'):
  187. ext = 'png'
  188. if distribution_id > 0:
  189. object_name = f"icons/{distribution_id}/{uuid.uuid4()}.{ext}"
  190. else:
  191. object_name = f"icons/temp/{user_id}/{uuid.uuid4()}.{ext}"
  192. try:
  193. self.client.put_object(
  194. bucket_name=bucket,
  195. object_name=object_name,
  196. data=io.BytesIO(file_data),
  197. length=len(file_data),
  198. content_type=content_type
  199. )
  200. return object_name
  201. except S3Error as e:
  202. logger.error(f"Distribution icon upload failed: {e}")
  203. raise Exception("Icon upload failed")
  204. # ---------- 自建更新桶(公开读):路径 {时间戳}/{platform}/update/{filename} ----------
  205. def _ensure_update_bucket(self):
  206. """确保更新桶存在"""
  207. if not self.client:
  208. return
  209. bucket = settings.MINIO_UPDATE_BUCKET_NAME
  210. try:
  211. if not self.client.bucket_exists(bucket):
  212. self.client.make_bucket(bucket)
  213. logger.info(f"Created update bucket: {bucket}")
  214. except S3Error as e:
  215. logger.error(f"MinIO update bucket error: {e}")
  216. raise Exception("Update bucket unavailable")
  217. def _update_bucket_public_base_url(self) -> str:
  218. """更新桶公开访问的 base URL(末尾带 /)"""
  219. endpoint = (settings.MINIO_ENDPOINT or "").rstrip("/")
  220. bucket = settings.MINIO_UPDATE_BUCKET_NAME
  221. return f"{endpoint}/{bucket}/"
  222. def list_update_files(self, created_at_timestamp: int, platform: str) -> list:
  223. """
  224. 列出某分发、某平台下 update/ 目录中的文件。
  225. 返回: [{"key": str, "name": str, "size": int}, ...]
  226. """
  227. if not self.client:
  228. return []
  229. self._ensure_update_bucket()
  230. bucket = settings.MINIO_UPDATE_BUCKET_NAME
  231. prefix = f"{created_at_timestamp}/{platform}/update/"
  232. try:
  233. objects = self.client.list_objects(bucket, prefix=prefix, recursive=True)
  234. result = []
  235. for obj in objects:
  236. name = obj.object_name.split("/")[-1] if "/" in obj.object_name else obj.object_name
  237. size = 0
  238. if hasattr(obj, "size") and obj.size is not None:
  239. size = getattr(obj.size, "size", obj.size) if hasattr(obj.size, "size") else int(obj.size)
  240. result.append({"key": obj.object_name, "name": name, "size": size})
  241. return result
  242. except S3Error as e:
  243. logger.error(f"List update files failed: {e}")
  244. return []
  245. def upload_update_file(
  246. self,
  247. created_at_timestamp: int,
  248. platform: str,
  249. file_data: bytes,
  250. filename: str,
  251. content_type: str = "application/octet-stream",
  252. ) -> str:
  253. """
  254. 上传文件到更新桶,路径 {timestamp}/{platform}/update/{filename},透传不改名。
  255. 返回 object_key。
  256. """
  257. if not self.client:
  258. raise Exception("Storage service unavailable")
  259. self._ensure_update_bucket()
  260. bucket = settings.MINIO_UPDATE_BUCKET_NAME
  261. object_key = f"{created_at_timestamp}/{platform}/update/{filename}"
  262. try:
  263. self.client.put_object(
  264. bucket_name=bucket,
  265. object_name=object_key,
  266. data=io.BytesIO(file_data),
  267. length=len(file_data),
  268. content_type=content_type,
  269. )
  270. return object_key
  271. except S3Error as e:
  272. logger.error(f"Update file upload failed: {e}")
  273. raise Exception("Update file upload failed")
  274. def delete_update_file(self, object_key: str) -> None:
  275. """删除更新桶中的对象"""
  276. if not self.client:
  277. raise Exception("Storage service unavailable")
  278. bucket = settings.MINIO_UPDATE_BUCKET_NAME
  279. try:
  280. self.client.remove_object(bucket, object_key)
  281. except S3Error as e:
  282. logger.error(f"Update file delete failed: {e}")
  283. raise Exception("Update file delete failed")
  284. def get_update_file_public_url(self, object_key: str) -> str:
  285. """更新桶公开读,返回无需签名的固定 URL"""
  286. base = self._update_bucket_public_base_url()
  287. return f"{base}{object_key}"
  288. # 单例实例
  289. minio_storage = MessageStorage()