minio.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. from minio import Minio
  2. from minio.error import S3Error
  3. from app.core.config import settings
  4. import io
  5. import uuid
  6. from datetime import datetime
  7. import logging
  8. logger = logging.getLogger(__name__)
  9. class MessageStorage:
  10. def __init__(self):
  11. try:
  12. # Handle endpoint protocol
  13. endpoint = settings.MINIO_ENDPOINT
  14. secure = settings.MINIO_SECURE
  15. if endpoint.startswith("http://"):
  16. endpoint = endpoint.replace("http://", "")
  17. secure = False
  18. elif endpoint.startswith("https://"):
  19. endpoint = endpoint.replace("https://", "")
  20. secure = True
  21. self.client = Minio(
  22. endpoint=endpoint,
  23. access_key=settings.MINIO_ACCESS_KEY,
  24. secret_key=settings.MINIO_SECRET_KEY,
  25. secure=secure
  26. )
  27. self.bucket_name = settings.MINIO_BUCKET_NAME
  28. self._ensure_bucket()
  29. except Exception as e:
  30. logger.error(f"MinIO init failed: {e}")
  31. self.client = None
  32. def _ensure_bucket(self):
  33. if not self.client: return
  34. try:
  35. if not self.client.bucket_exists(self.bucket_name):
  36. self.client.make_bucket(self.bucket_name)
  37. # 移除公开读策略,默认为私有
  38. # self.client.set_bucket_policy(self.bucket_name, json.dumps(policy))
  39. except S3Error as e:
  40. logger.error(f"MinIO bucket error: {e}")
  41. def get_presigned_url(self, object_name: str, expires=None):
  42. """生成预签名访问链接"""
  43. if not self.client: return None
  44. try:
  45. from datetime import timedelta
  46. if expires is None:
  47. expires = timedelta(hours=1)
  48. return self.client.get_presigned_url(
  49. "GET",
  50. self.bucket_name,
  51. object_name,
  52. expires=expires
  53. )
  54. except Exception as e:
  55. logger.error(f"Generate presigned url failed: {e}")
  56. return None
  57. def upload_message_file(self, file_data: bytes, filename: str, content_type: str, user_id: int) -> str:
  58. """
  59. 上传消息附件
  60. 路径格式: messages/{user_id}/{year}/{month}/{uuid}.ext
  61. 返回: object_name (用于存储和生成签名URL)
  62. """
  63. if not self.client:
  64. raise Exception("Storage service unavailable")
  65. # 生成结构化路径
  66. ext = filename.split('.')[-1] if '.' in filename else 'bin'
  67. now = datetime.now()
  68. object_name = f"messages/{user_id}/{now.year}/{now.month:02d}/{uuid.uuid4()}.{ext}"
  69. try:
  70. self.client.put_object(
  71. bucket_name=self.bucket_name,
  72. object_name=object_name,
  73. data=io.BytesIO(file_data),
  74. length=len(file_data),
  75. content_type=content_type
  76. )
  77. return object_name
  78. except S3Error as e:
  79. logger.error(f"Upload failed: {e}")
  80. raise Exception("File upload failed")
  81. def upload_db_backup(self, file_path: str, filename: str) -> str:
  82. """
  83. 上传数据库备份文件到 MinIO(使用独立的备份桶)
  84. 路径格式: {year}/{month}/{filename}
  85. 返回: object_name
  86. """
  87. if not self.client:
  88. raise Exception("Storage service unavailable")
  89. # 使用数据库备份专用的 bucket
  90. backup_bucket = settings.MINIO_DB_BACKUP_BUCKET_NAME
  91. # 确保备份桶存在
  92. try:
  93. if not self.client.bucket_exists(backup_bucket):
  94. self.client.make_bucket(backup_bucket)
  95. logger.info(f"Created backup bucket: {backup_bucket}")
  96. except S3Error as e:
  97. logger.error(f"MinIO backup bucket error: {e}")
  98. raise Exception("Backup bucket unavailable")
  99. # 读取文件
  100. with open(file_path, 'rb') as f:
  101. file_data = f.read()
  102. # 生成结构化路径
  103. now = datetime.now()
  104. object_name = f"{now.year}/{now.month:02d}/{filename}"
  105. try:
  106. self.client.put_object(
  107. bucket_name=backup_bucket,
  108. object_name=object_name,
  109. data=io.BytesIO(file_data),
  110. length=len(file_data),
  111. content_type="application/gzip"
  112. )
  113. logger.info(f"Successfully uploaded database backup to MinIO: {backup_bucket}/{object_name}")
  114. return object_name
  115. except S3Error as e:
  116. logger.error(f"Database backup upload failed: {e}")
  117. raise Exception("Database backup upload failed")
  118. def _ensure_distribution_bucket(self):
  119. """确保客户端分发桶存在"""
  120. if not self.client:
  121. return
  122. bucket = settings.MINIO_DISTRIBUTION_BUCKET_NAME
  123. try:
  124. if not self.client.bucket_exists(bucket):
  125. self.client.make_bucket(bucket)
  126. logger.info(f"Created distribution bucket: {bucket}")
  127. except S3Error as e:
  128. logger.error(f"MinIO distribution bucket error: {e}")
  129. def upload_distribution_file(
  130. self, file_data: bytes, filename: str, content_type: str,
  131. distribution_id: int
  132. ) -> str:
  133. """
  134. 上传客户端分发安装包到 MinIO
  135. 路径格式: versions/{distribution_id}/{uuid}.{ext}
  136. 返回: object_name
  137. """
  138. if not self.client:
  139. raise Exception("Storage service unavailable")
  140. self._ensure_distribution_bucket()
  141. bucket = settings.MINIO_DISTRIBUTION_BUCKET_NAME
  142. ext = filename.split('.')[-1] if '.' in filename else 'bin'
  143. object_name = f"versions/{distribution_id}/{uuid.uuid4()}.{ext}"
  144. try:
  145. self.client.put_object(
  146. bucket_name=bucket,
  147. object_name=object_name,
  148. data=io.BytesIO(file_data),
  149. length=len(file_data),
  150. content_type=content_type
  151. )
  152. return object_name
  153. except S3Error as e:
  154. logger.error(f"Distribution file upload failed: {e}")
  155. raise Exception("File upload failed")
  156. def get_distribution_presigned_url(self, object_name: str, expires=None) -> str | None:
  157. """生成分发桶内对象的预签名下载链接"""
  158. if not self.client:
  159. return None
  160. try:
  161. from datetime import timedelta
  162. if expires is None:
  163. expires = timedelta(hours=1)
  164. bucket = settings.MINIO_DISTRIBUTION_BUCKET_NAME
  165. return self.client.get_presigned_url(
  166. "GET", bucket, object_name, expires=expires
  167. )
  168. except Exception as e:
  169. logger.error(f"Generate distribution presigned url failed: {e}")
  170. return None
  171. def upload_distribution_icon(
  172. self, file_data: bytes, filename: str, content_type: str,
  173. distribution_id: int, user_id: int
  174. ) -> str:
  175. """
  176. 上传分发图标到 MinIO
  177. - distribution_id > 0: icons/{distribution_id}/{uuid}.{ext}
  178. - distribution_id == 0 (创建时): icons/temp/{user_id}/{uuid}.{ext}
  179. 返回: object_name
  180. """
  181. if not self.client:
  182. raise Exception("Storage service unavailable")
  183. self._ensure_distribution_bucket()
  184. bucket = settings.MINIO_DISTRIBUTION_BUCKET_NAME
  185. ext = filename.split('.')[-1].lower() if '.' in filename else 'png'
  186. if ext not in ('png', 'jpg', 'jpeg', 'gif', 'webp'):
  187. ext = 'png'
  188. if distribution_id > 0:
  189. object_name = f"icons/{distribution_id}/{uuid.uuid4()}.{ext}"
  190. else:
  191. object_name = f"icons/temp/{user_id}/{uuid.uuid4()}.{ext}"
  192. try:
  193. self.client.put_object(
  194. bucket_name=bucket,
  195. object_name=object_name,
  196. data=io.BytesIO(file_data),
  197. length=len(file_data),
  198. content_type=content_type
  199. )
  200. return object_name
  201. except S3Error as e:
  202. logger.error(f"Distribution icon upload failed: {e}")
  203. raise Exception("Icon upload failed")
  204. # 单例实例
  205. minio_storage = MessageStorage()