from minio import Minio from minio.error import S3Error from app.core.config import settings import io import uuid from datetime import datetime import logging logger = logging.getLogger(__name__) class MessageStorage: def __init__(self): try: # Handle endpoint protocol endpoint = settings.MINIO_ENDPOINT secure = settings.MINIO_SECURE if endpoint.startswith("http://"): endpoint = endpoint.replace("http://", "") secure = False elif endpoint.startswith("https://"): endpoint = endpoint.replace("https://", "") secure = True self.client = Minio( endpoint=endpoint, access_key=settings.MINIO_ACCESS_KEY, secret_key=settings.MINIO_SECRET_KEY, secure=secure ) self.bucket_name = settings.MINIO_BUCKET_NAME self._ensure_bucket() except Exception as e: logger.error(f"MinIO init failed: {e}") self.client = None def _ensure_bucket(self): if not self.client: return try: if not self.client.bucket_exists(self.bucket_name): self.client.make_bucket(self.bucket_name) # 移除公开读策略,默认为私有 # self.client.set_bucket_policy(self.bucket_name, json.dumps(policy)) except S3Error as e: logger.error(f"MinIO bucket error: {e}") def get_presigned_url(self, object_name: str, expires=None): """生成预签名访问链接""" if not self.client: return None try: from datetime import timedelta if expires is None: expires = timedelta(hours=1) return self.client.get_presigned_url( "GET", self.bucket_name, object_name, expires=expires ) except Exception as e: logger.error(f"Generate presigned url failed: {e}") return None def upload_message_file(self, file_data: bytes, filename: str, content_type: str, user_id: int) -> str: """ 上传消息附件 路径格式: messages/{user_id}/{year}/{month}/{uuid}.ext 返回: object_name (用于存储和生成签名URL) """ if not self.client: raise Exception("Storage service unavailable") # 生成结构化路径 ext = filename.split('.')[-1] if '.' in filename else 'bin' now = datetime.now() object_name = f"messages/{user_id}/{now.year}/{now.month:02d}/{uuid.uuid4()}.{ext}" try: self.client.put_object( bucket_name=self.bucket_name, object_name=object_name, data=io.BytesIO(file_data), length=len(file_data), content_type=content_type ) return object_name except S3Error as e: logger.error(f"Upload failed: {e}") raise Exception("File upload failed") def upload_db_backup(self, file_path: str, filename: str) -> str: """ 上传数据库备份文件到 MinIO(使用独立的备份桶) 路径格式: {year}/{month}/{filename} 返回: object_name """ if not self.client: raise Exception("Storage service unavailable") # 使用数据库备份专用的 bucket backup_bucket = settings.MINIO_DB_BACKUP_BUCKET_NAME # 确保备份桶存在 try: if not self.client.bucket_exists(backup_bucket): self.client.make_bucket(backup_bucket) logger.info(f"Created backup bucket: {backup_bucket}") except S3Error as e: logger.error(f"MinIO backup bucket error: {e}") raise Exception("Backup bucket unavailable") # 读取文件 with open(file_path, 'rb') as f: file_data = f.read() # 生成结构化路径 now = datetime.now() object_name = f"{now.year}/{now.month:02d}/{filename}" try: self.client.put_object( bucket_name=backup_bucket, object_name=object_name, data=io.BytesIO(file_data), length=len(file_data), content_type="application/gzip" ) logger.info(f"Successfully uploaded database backup to MinIO: {backup_bucket}/{object_name}") return object_name except S3Error as e: logger.error(f"Database backup upload failed: {e}") raise Exception("Database backup upload failed") def _ensure_distribution_bucket(self): """确保客户端分发桶存在""" if not self.client: return bucket = settings.MINIO_DISTRIBUTION_BUCKET_NAME try: if not self.client.bucket_exists(bucket): self.client.make_bucket(bucket) logger.info(f"Created distribution bucket: {bucket}") except S3Error as e: logger.error(f"MinIO distribution bucket error: {e}") def upload_distribution_file( self, file_data: bytes, filename: str, content_type: str, distribution_id: int ) -> str: """ 上传客户端分发安装包到 MinIO 路径格式: versions/{distribution_id}/{uuid}.{ext} 返回: object_name """ if not self.client: raise Exception("Storage service unavailable") self._ensure_distribution_bucket() bucket = settings.MINIO_DISTRIBUTION_BUCKET_NAME ext = filename.split('.')[-1] if '.' in filename else 'bin' object_name = f"versions/{distribution_id}/{uuid.uuid4()}.{ext}" try: self.client.put_object( bucket_name=bucket, object_name=object_name, data=io.BytesIO(file_data), length=len(file_data), content_type=content_type ) return object_name except S3Error as e: logger.error(f"Distribution file upload failed: {e}") raise Exception("File upload failed") def get_distribution_presigned_url(self, object_name: str, expires=None) -> str | None: """生成分发桶内对象的预签名下载链接""" if not self.client: return None try: from datetime import timedelta if expires is None: expires = timedelta(hours=1) bucket = settings.MINIO_DISTRIBUTION_BUCKET_NAME return self.client.get_presigned_url( "GET", bucket, object_name, expires=expires ) except Exception as e: logger.error(f"Generate distribution presigned url failed: {e}") return None def upload_distribution_icon( self, file_data: bytes, filename: str, content_type: str, distribution_id: int, user_id: int ) -> str: """ 上传分发图标到 MinIO - distribution_id > 0: icons/{distribution_id}/{uuid}.{ext} - distribution_id == 0 (创建时): icons/temp/{user_id}/{uuid}.{ext} 返回: object_name """ if not self.client: raise Exception("Storage service unavailable") self._ensure_distribution_bucket() bucket = settings.MINIO_DISTRIBUTION_BUCKET_NAME ext = filename.split('.')[-1].lower() if '.' in filename else 'png' if ext not in ('png', 'jpg', 'jpeg', 'gif', 'webp'): ext = 'png' if distribution_id > 0: object_name = f"icons/{distribution_id}/{uuid.uuid4()}.{ext}" else: object_name = f"icons/temp/{user_id}/{uuid.uuid4()}.{ext}" try: self.client.put_object( bucket_name=bucket, object_name=object_name, data=io.BytesIO(file_data), length=len(file_data), content_type=content_type ) return object_name except S3Error as e: logger.error(f"Distribution icon upload failed: {e}") raise Exception("Icon upload failed") # 单例实例 minio_storage = MessageStorage()