| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321 |
- from minio import Minio
- from minio.error import S3Error
- from app.core.config import settings
- import io
- import uuid
- from datetime import datetime
- import logging
- logger = logging.getLogger(__name__)
- class MessageStorage:
- def __init__(self):
- try:
- # Handle endpoint protocol
- endpoint = settings.MINIO_ENDPOINT
- secure = settings.MINIO_SECURE
-
- if endpoint.startswith("http://"):
- endpoint = endpoint.replace("http://", "")
- secure = False
- elif endpoint.startswith("https://"):
- endpoint = endpoint.replace("https://", "")
- secure = True
-
- self.client = Minio(
- endpoint=endpoint,
- access_key=settings.MINIO_ACCESS_KEY,
- secret_key=settings.MINIO_SECRET_KEY,
- secure=secure
- )
- self.bucket_name = settings.MINIO_BUCKET_NAME
- self._ensure_bucket()
- except Exception as e:
- logger.error(f"MinIO init failed: {e}")
- self.client = None
- def _ensure_bucket(self):
- if not self.client: return
- try:
- if not self.client.bucket_exists(self.bucket_name):
- self.client.make_bucket(self.bucket_name)
- # 移除公开读策略,默认为私有
- # self.client.set_bucket_policy(self.bucket_name, json.dumps(policy))
- except S3Error as e:
- logger.error(f"MinIO bucket error: {e}")
- def get_presigned_url(self, object_name: str, expires=None):
- """生成预签名访问链接"""
- if not self.client: return None
- try:
- from datetime import timedelta
- if expires is None:
- expires = timedelta(hours=1)
-
- return self.client.get_presigned_url(
- "GET",
- self.bucket_name,
- object_name,
- expires=expires
- )
- except Exception as e:
- logger.error(f"Generate presigned url failed: {e}")
- return None
- def upload_message_file(self, file_data: bytes, filename: str, content_type: str, user_id: int) -> str:
- """
- 上传消息附件
- 路径格式: messages/{user_id}/{year}/{month}/{uuid}.ext
- 返回: object_name (用于存储和生成签名URL)
- """
- if not self.client:
- raise Exception("Storage service unavailable")
- # 生成结构化路径
- ext = filename.split('.')[-1] if '.' in filename else 'bin'
- now = datetime.now()
- object_name = f"messages/{user_id}/{now.year}/{now.month:02d}/{uuid.uuid4()}.{ext}"
-
- try:
- self.client.put_object(
- bucket_name=self.bucket_name,
- object_name=object_name,
- data=io.BytesIO(file_data),
- length=len(file_data),
- content_type=content_type
- )
-
- return object_name
-
- except S3Error as e:
- logger.error(f"Upload failed: {e}")
- raise Exception("File upload failed")
- def upload_db_backup(self, file_path: str, filename: str) -> str:
- """
- 上传数据库备份文件到 MinIO(使用独立的备份桶)
- 路径格式: {year}/{month}/{filename}
- 返回: object_name
- """
- if not self.client:
- raise Exception("Storage service unavailable")
-
- # 使用数据库备份专用的 bucket
- backup_bucket = settings.MINIO_DB_BACKUP_BUCKET_NAME
-
- # 确保备份桶存在
- try:
- if not self.client.bucket_exists(backup_bucket):
- self.client.make_bucket(backup_bucket)
- logger.info(f"Created backup bucket: {backup_bucket}")
- except S3Error as e:
- logger.error(f"MinIO backup bucket error: {e}")
- raise Exception("Backup bucket unavailable")
-
- # 读取文件
- with open(file_path, 'rb') as f:
- file_data = f.read()
-
- # 生成结构化路径
- now = datetime.now()
- object_name = f"{now.year}/{now.month:02d}/{filename}"
-
- try:
- self.client.put_object(
- bucket_name=backup_bucket,
- object_name=object_name,
- data=io.BytesIO(file_data),
- length=len(file_data),
- content_type="application/gzip"
- )
-
- logger.info(f"Successfully uploaded database backup to MinIO: {backup_bucket}/{object_name}")
- return object_name
-
- except S3Error as e:
- logger.error(f"Database backup upload failed: {e}")
- raise Exception("Database backup upload failed")
- def _ensure_distribution_bucket(self):
- """确保客户端分发桶存在"""
- if not self.client:
- return
- bucket = settings.MINIO_DISTRIBUTION_BUCKET_NAME
- try:
- if not self.client.bucket_exists(bucket):
- self.client.make_bucket(bucket)
- logger.info(f"Created distribution bucket: {bucket}")
- except S3Error as e:
- logger.error(f"MinIO distribution bucket error: {e}")
- def upload_distribution_file(
- self, file_data: bytes, filename: str, content_type: str,
- distribution_id: int
- ) -> str:
- """
- 上传客户端分发安装包到 MinIO
- 路径格式: versions/{distribution_id}/{uuid}.{ext}
- 返回: object_name
- """
- if not self.client:
- raise Exception("Storage service unavailable")
- self._ensure_distribution_bucket()
- bucket = settings.MINIO_DISTRIBUTION_BUCKET_NAME
- ext = filename.split('.')[-1] if '.' in filename else 'bin'
- object_name = f"versions/{distribution_id}/{uuid.uuid4()}.{ext}"
- try:
- self.client.put_object(
- bucket_name=bucket,
- object_name=object_name,
- data=io.BytesIO(file_data),
- length=len(file_data),
- content_type=content_type
- )
- return object_name
- except S3Error as e:
- logger.error(f"Distribution file upload failed: {e}")
- raise Exception("File upload failed")
- def get_distribution_presigned_url(self, object_name: str, expires=None) -> str | None:
- """生成分发桶内对象的预签名下载链接"""
- if not self.client:
- return None
- try:
- from datetime import timedelta
- if expires is None:
- expires = timedelta(hours=1)
- bucket = settings.MINIO_DISTRIBUTION_BUCKET_NAME
- return self.client.get_presigned_url(
- "GET", bucket, object_name, expires=expires
- )
- except Exception as e:
- logger.error(f"Generate distribution presigned url failed: {e}")
- return None
- def upload_distribution_icon(
- self, file_data: bytes, filename: str, content_type: str,
- distribution_id: int, user_id: int
- ) -> str:
- """
- 上传分发图标到 MinIO
- - distribution_id > 0: icons/{distribution_id}/{uuid}.{ext}
- - distribution_id == 0 (创建时): icons/temp/{user_id}/{uuid}.{ext}
- 返回: object_name
- """
- if not self.client:
- raise Exception("Storage service unavailable")
- self._ensure_distribution_bucket()
- bucket = settings.MINIO_DISTRIBUTION_BUCKET_NAME
- ext = filename.split('.')[-1].lower() if '.' in filename else 'png'
- if ext not in ('png', 'jpg', 'jpeg', 'gif', 'webp'):
- ext = 'png'
- if distribution_id > 0:
- object_name = f"icons/{distribution_id}/{uuid.uuid4()}.{ext}"
- else:
- object_name = f"icons/temp/{user_id}/{uuid.uuid4()}.{ext}"
- try:
- self.client.put_object(
- bucket_name=bucket,
- object_name=object_name,
- data=io.BytesIO(file_data),
- length=len(file_data),
- content_type=content_type
- )
- return object_name
- except S3Error as e:
- logger.error(f"Distribution icon upload failed: {e}")
- raise Exception("Icon upload failed")
- # ---------- 自建更新桶(公开读):路径 {时间戳}/{platform}/update/{filename} ----------
- def _ensure_update_bucket(self):
- """确保更新桶存在"""
- if not self.client:
- return
- bucket = settings.MINIO_UPDATE_BUCKET_NAME
- try:
- if not self.client.bucket_exists(bucket):
- self.client.make_bucket(bucket)
- logger.info(f"Created update bucket: {bucket}")
- except S3Error as e:
- logger.error(f"MinIO update bucket error: {e}")
- raise Exception("Update bucket unavailable")
- def _update_bucket_public_base_url(self) -> str:
- """更新桶公开访问的 base URL(末尾带 /)"""
- endpoint = (settings.MINIO_ENDPOINT or "").rstrip("/")
- bucket = settings.MINIO_UPDATE_BUCKET_NAME
- return f"{endpoint}/{bucket}/"
- def list_update_files(self, created_at_timestamp: int, platform: str) -> list:
- """
- 列出某分发、某平台下 update/ 目录中的文件。
- 返回: [{"key": str, "name": str, "size": int}, ...]
- """
- if not self.client:
- return []
- self._ensure_update_bucket()
- bucket = settings.MINIO_UPDATE_BUCKET_NAME
- prefix = f"{created_at_timestamp}/{platform}/update/"
- try:
- objects = self.client.list_objects(bucket, prefix=prefix, recursive=True)
- result = []
- for obj in objects:
- name = obj.object_name.split("/")[-1] if "/" in obj.object_name else obj.object_name
- size = 0
- if hasattr(obj, "size") and obj.size is not None:
- size = getattr(obj.size, "size", obj.size) if hasattr(obj.size, "size") else int(obj.size)
- result.append({"key": obj.object_name, "name": name, "size": size})
- return result
- except S3Error as e:
- logger.error(f"List update files failed: {e}")
- return []
- def upload_update_file(
- self,
- created_at_timestamp: int,
- platform: str,
- file_data: bytes,
- filename: str,
- content_type: str = "application/octet-stream",
- ) -> str:
- """
- 上传文件到更新桶,路径 {timestamp}/{platform}/update/{filename},透传不改名。
- 返回 object_key。
- """
- if not self.client:
- raise Exception("Storage service unavailable")
- self._ensure_update_bucket()
- bucket = settings.MINIO_UPDATE_BUCKET_NAME
- object_key = f"{created_at_timestamp}/{platform}/update/{filename}"
- try:
- self.client.put_object(
- bucket_name=bucket,
- object_name=object_key,
- data=io.BytesIO(file_data),
- length=len(file_data),
- content_type=content_type,
- )
- return object_key
- except S3Error as e:
- logger.error(f"Update file upload failed: {e}")
- raise Exception("Update file upload failed")
- def delete_update_file(self, object_key: str) -> None:
- """删除更新桶中的对象"""
- if not self.client:
- raise Exception("Storage service unavailable")
- bucket = settings.MINIO_UPDATE_BUCKET_NAME
- try:
- self.client.remove_object(bucket, object_key)
- except S3Error as e:
- logger.error(f"Update file delete failed: {e}")
- raise Exception("Update file delete failed")
- def get_update_file_public_url(self, object_key: str) -> str:
- """更新桶公开读,返回无需签名的固定 URL"""
- base = self._update_bucket_public_base_url()
- return f"{base}{object_key}"
- # 单例实例
- minio_storage = MessageStorage()
|