from minio import Minio from minio.error import S3Error from app.core.config import settings import io import uuid from datetime import datetime import logging logger = logging.getLogger(__name__) class MessageStorage: def __init__(self): try: # Handle endpoint protocol endpoint = settings.MINIO_ENDPOINT secure = settings.MINIO_SECURE if endpoint.startswith("http://"): endpoint = endpoint.replace("http://", "") secure = False elif endpoint.startswith("https://"): endpoint = endpoint.replace("https://", "") secure = True self.client = Minio( endpoint=endpoint, access_key=settings.MINIO_ACCESS_KEY, secret_key=settings.MINIO_SECRET_KEY, secure=secure ) self.bucket_name = settings.MINIO_BUCKET_NAME self._ensure_bucket() except Exception as e: logger.error(f"MinIO init failed: {e}") self.client = None def _ensure_bucket(self): if not self.client: return try: if not self.client.bucket_exists(self.bucket_name): self.client.make_bucket(self.bucket_name) # 移除公开读策略,默认为私有 # self.client.set_bucket_policy(self.bucket_name, json.dumps(policy)) except S3Error as e: logger.error(f"MinIO bucket error: {e}") def get_presigned_url(self, object_name: str, expires=None): """生成预签名访问链接""" if not self.client: return None try: from datetime import timedelta if expires is None: expires = timedelta(hours=1) return self.client.get_presigned_url( "GET", self.bucket_name, object_name, expires=expires ) except Exception as e: logger.error(f"Generate presigned url failed: {e}") return None def upload_message_file(self, file_data: bytes, filename: str, content_type: str, user_id: int) -> str: """ 上传消息附件 路径格式: messages/{user_id}/{year}/{month}/{uuid}.ext 返回: object_name (用于存储和生成签名URL) """ if not self.client: raise Exception("Storage service unavailable") # 生成结构化路径 ext = filename.split('.')[-1] if '.' in filename else 'bin' now = datetime.now() object_name = f"messages/{user_id}/{now.year}/{now.month:02d}/{uuid.uuid4()}.{ext}" try: self.client.put_object( bucket_name=self.bucket_name, object_name=object_name, data=io.BytesIO(file_data), length=len(file_data), content_type=content_type ) return object_name except S3Error as e: logger.error(f"Upload failed: {e}") raise Exception("File upload failed") def upload_db_backup(self, file_path: str, filename: str) -> str: """ 上传数据库备份文件到 MinIO(使用独立的备份桶) 路径格式: {year}/{month}/{filename} 返回: object_name """ if not self.client: raise Exception("Storage service unavailable") # 使用数据库备份专用的 bucket backup_bucket = settings.MINIO_DB_BACKUP_BUCKET_NAME # 确保备份桶存在 try: if not self.client.bucket_exists(backup_bucket): self.client.make_bucket(backup_bucket) logger.info(f"Created backup bucket: {backup_bucket}") except S3Error as e: logger.error(f"MinIO backup bucket error: {e}") raise Exception("Backup bucket unavailable") # 读取文件 with open(file_path, 'rb') as f: file_data = f.read() # 生成结构化路径 now = datetime.now() object_name = f"{now.year}/{now.month:02d}/{filename}" try: self.client.put_object( bucket_name=backup_bucket, object_name=object_name, data=io.BytesIO(file_data), length=len(file_data), content_type="application/gzip" ) logger.info(f"Successfully uploaded database backup to MinIO: {backup_bucket}/{object_name}") return object_name except S3Error as e: logger.error(f"Database backup upload failed: {e}") raise Exception("Database backup upload failed") def _ensure_distribution_bucket(self): """确保客户端分发桶存在""" if not self.client: return bucket = settings.MINIO_DISTRIBUTION_BUCKET_NAME try: if not self.client.bucket_exists(bucket): self.client.make_bucket(bucket) logger.info(f"Created distribution bucket: {bucket}") except S3Error as e: logger.error(f"MinIO distribution bucket error: {e}") def upload_distribution_file( self, file_data: bytes, filename: str, content_type: str, distribution_id: int ) -> str: """ 上传客户端分发安装包到 MinIO 路径格式: versions/{distribution_id}/{uuid}.{ext} 返回: object_name """ if not self.client: raise Exception("Storage service unavailable") self._ensure_distribution_bucket() bucket = settings.MINIO_DISTRIBUTION_BUCKET_NAME ext = filename.split('.')[-1] if '.' in filename else 'bin' object_name = f"versions/{distribution_id}/{uuid.uuid4()}.{ext}" try: self.client.put_object( bucket_name=bucket, object_name=object_name, data=io.BytesIO(file_data), length=len(file_data), content_type=content_type ) return object_name except S3Error as e: logger.error(f"Distribution file upload failed: {e}") raise Exception("File upload failed") def get_distribution_presigned_url(self, object_name: str, expires=None) -> str | None: """生成分发桶内对象的预签名下载链接""" if not self.client: return None try: from datetime import timedelta if expires is None: expires = timedelta(hours=1) bucket = settings.MINIO_DISTRIBUTION_BUCKET_NAME return self.client.get_presigned_url( "GET", bucket, object_name, expires=expires ) except Exception as e: logger.error(f"Generate distribution presigned url failed: {e}") return None def upload_distribution_icon( self, file_data: bytes, filename: str, content_type: str, distribution_id: int, user_id: int ) -> str: """ 上传分发图标到 MinIO - distribution_id > 0: icons/{distribution_id}/{uuid}.{ext} - distribution_id == 0 (创建时): icons/temp/{user_id}/{uuid}.{ext} 返回: object_name """ if not self.client: raise Exception("Storage service unavailable") self._ensure_distribution_bucket() bucket = settings.MINIO_DISTRIBUTION_BUCKET_NAME ext = filename.split('.')[-1].lower() if '.' in filename else 'png' if ext not in ('png', 'jpg', 'jpeg', 'gif', 'webp'): ext = 'png' if distribution_id > 0: object_name = f"icons/{distribution_id}/{uuid.uuid4()}.{ext}" else: object_name = f"icons/temp/{user_id}/{uuid.uuid4()}.{ext}" try: self.client.put_object( bucket_name=bucket, object_name=object_name, data=io.BytesIO(file_data), length=len(file_data), content_type=content_type ) return object_name except S3Error as e: logger.error(f"Distribution icon upload failed: {e}") raise Exception("Icon upload failed") # ---------- 自建更新桶(公开读):路径 {时间戳}/{platform}/update/{filename} ---------- def _ensure_update_bucket(self): """确保更新桶存在""" if not self.client: return bucket = settings.MINIO_UPDATE_BUCKET_NAME try: if not self.client.bucket_exists(bucket): self.client.make_bucket(bucket) logger.info(f"Created update bucket: {bucket}") except S3Error as e: logger.error(f"MinIO update bucket error: {e}") raise Exception("Update bucket unavailable") def _update_bucket_public_base_url(self) -> str: """更新桶公开访问的 base URL(末尾带 /)""" endpoint = (settings.MINIO_ENDPOINT or "").rstrip("/") bucket = settings.MINIO_UPDATE_BUCKET_NAME return f"{endpoint}/{bucket}/" def list_update_files(self, created_at_timestamp: int, platform: str) -> list: """ 列出某分发、某平台下 update/ 目录中的文件。 返回: [{"key": str, "name": str, "size": int}, ...] """ if not self.client: return [] self._ensure_update_bucket() bucket = settings.MINIO_UPDATE_BUCKET_NAME prefix = f"{created_at_timestamp}/{platform}/update/" try: objects = self.client.list_objects(bucket, prefix=prefix, recursive=True) result = [] for obj in objects: name = obj.object_name.split("/")[-1] if "/" in obj.object_name else obj.object_name size = 0 if hasattr(obj, "size") and obj.size is not None: size = getattr(obj.size, "size", obj.size) if hasattr(obj.size, "size") else int(obj.size) result.append({"key": obj.object_name, "name": name, "size": size}) return result except S3Error as e: logger.error(f"List update files failed: {e}") return [] def upload_update_file( self, created_at_timestamp: int, platform: str, file_data: bytes, filename: str, content_type: str = "application/octet-stream", ) -> str: """ 上传文件到更新桶,路径 {timestamp}/{platform}/update/{filename},透传不改名。 返回 object_key。 """ if not self.client: raise Exception("Storage service unavailable") self._ensure_update_bucket() bucket = settings.MINIO_UPDATE_BUCKET_NAME object_key = f"{created_at_timestamp}/{platform}/update/{filename}" try: self.client.put_object( bucket_name=bucket, object_name=object_key, data=io.BytesIO(file_data), length=len(file_data), content_type=content_type, ) return object_key except S3Error as e: logger.error(f"Update file upload failed: {e}") raise Exception("Update file upload failed") def delete_update_file(self, object_key: str) -> None: """删除更新桶中的对象""" if not self.client: raise Exception("Storage service unavailable") bucket = settings.MINIO_UPDATE_BUCKET_NAME try: self.client.remove_object(bucket, object_key) except S3Error as e: logger.error(f"Update file delete failed: {e}") raise Exception("Update file delete failed") def get_update_file_public_url(self, object_key: str) -> str: """更新桶公开读,返回无需签名的固定 URL""" base = self._update_bucket_public_base_url() return f"{base}{object_key}" # 单例实例 minio_storage = MessageStorage()