| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231 |
- from minio import Minio
- from minio.error import S3Error
- from app.core.config import settings
- import io
- import uuid
- from datetime import datetime
- import logging
- logger = logging.getLogger(__name__)
- class MessageStorage:
- def __init__(self):
- try:
- # Handle endpoint protocol
- endpoint = settings.MINIO_ENDPOINT
- secure = settings.MINIO_SECURE
-
- if endpoint.startswith("http://"):
- endpoint = endpoint.replace("http://", "")
- secure = False
- elif endpoint.startswith("https://"):
- endpoint = endpoint.replace("https://", "")
- secure = True
-
- self.client = Minio(
- endpoint=endpoint,
- access_key=settings.MINIO_ACCESS_KEY,
- secret_key=settings.MINIO_SECRET_KEY,
- secure=secure
- )
- self.bucket_name = settings.MINIO_BUCKET_NAME
- self._ensure_bucket()
- except Exception as e:
- logger.error(f"MinIO init failed: {e}")
- self.client = None
- def _ensure_bucket(self):
- if not self.client: return
- try:
- if not self.client.bucket_exists(self.bucket_name):
- self.client.make_bucket(self.bucket_name)
- # 移除公开读策略,默认为私有
- # self.client.set_bucket_policy(self.bucket_name, json.dumps(policy))
- except S3Error as e:
- logger.error(f"MinIO bucket error: {e}")
- def get_presigned_url(self, object_name: str, expires=None):
- """生成预签名访问链接"""
- if not self.client: return None
- try:
- from datetime import timedelta
- if expires is None:
- expires = timedelta(hours=1)
-
- return self.client.get_presigned_url(
- "GET",
- self.bucket_name,
- object_name,
- expires=expires
- )
- except Exception as e:
- logger.error(f"Generate presigned url failed: {e}")
- return None
- def upload_message_file(self, file_data: bytes, filename: str, content_type: str, user_id: int) -> str:
- """
- 上传消息附件
- 路径格式: messages/{user_id}/{year}/{month}/{uuid}.ext
- 返回: object_name (用于存储和生成签名URL)
- """
- if not self.client:
- raise Exception("Storage service unavailable")
- # 生成结构化路径
- ext = filename.split('.')[-1] if '.' in filename else 'bin'
- now = datetime.now()
- object_name = f"messages/{user_id}/{now.year}/{now.month:02d}/{uuid.uuid4()}.{ext}"
-
- try:
- self.client.put_object(
- bucket_name=self.bucket_name,
- object_name=object_name,
- data=io.BytesIO(file_data),
- length=len(file_data),
- content_type=content_type
- )
-
- return object_name
-
- except S3Error as e:
- logger.error(f"Upload failed: {e}")
- raise Exception("File upload failed")
- def upload_db_backup(self, file_path: str, filename: str) -> str:
- """
- 上传数据库备份文件到 MinIO(使用独立的备份桶)
- 路径格式: {year}/{month}/{filename}
- 返回: object_name
- """
- if not self.client:
- raise Exception("Storage service unavailable")
-
- # 使用数据库备份专用的 bucket
- backup_bucket = settings.MINIO_DB_BACKUP_BUCKET_NAME
-
- # 确保备份桶存在
- try:
- if not self.client.bucket_exists(backup_bucket):
- self.client.make_bucket(backup_bucket)
- logger.info(f"Created backup bucket: {backup_bucket}")
- except S3Error as e:
- logger.error(f"MinIO backup bucket error: {e}")
- raise Exception("Backup bucket unavailable")
-
- # 读取文件
- with open(file_path, 'rb') as f:
- file_data = f.read()
-
- # 生成结构化路径
- now = datetime.now()
- object_name = f"{now.year}/{now.month:02d}/{filename}"
-
- try:
- self.client.put_object(
- bucket_name=backup_bucket,
- object_name=object_name,
- data=io.BytesIO(file_data),
- length=len(file_data),
- content_type="application/gzip"
- )
-
- logger.info(f"Successfully uploaded database backup to MinIO: {backup_bucket}/{object_name}")
- return object_name
-
- except S3Error as e:
- logger.error(f"Database backup upload failed: {e}")
- raise Exception("Database backup upload failed")
- def _ensure_distribution_bucket(self):
- """确保客户端分发桶存在"""
- if not self.client:
- return
- bucket = settings.MINIO_DISTRIBUTION_BUCKET_NAME
- try:
- if not self.client.bucket_exists(bucket):
- self.client.make_bucket(bucket)
- logger.info(f"Created distribution bucket: {bucket}")
- except S3Error as e:
- logger.error(f"MinIO distribution bucket error: {e}")
- def upload_distribution_file(
- self, file_data: bytes, filename: str, content_type: str,
- distribution_id: int
- ) -> str:
- """
- 上传客户端分发安装包到 MinIO
- 路径格式: versions/{distribution_id}/{uuid}.{ext}
- 返回: object_name
- """
- if not self.client:
- raise Exception("Storage service unavailable")
- self._ensure_distribution_bucket()
- bucket = settings.MINIO_DISTRIBUTION_BUCKET_NAME
- ext = filename.split('.')[-1] if '.' in filename else 'bin'
- object_name = f"versions/{distribution_id}/{uuid.uuid4()}.{ext}"
- try:
- self.client.put_object(
- bucket_name=bucket,
- object_name=object_name,
- data=io.BytesIO(file_data),
- length=len(file_data),
- content_type=content_type
- )
- return object_name
- except S3Error as e:
- logger.error(f"Distribution file upload failed: {e}")
- raise Exception("File upload failed")
- def get_distribution_presigned_url(self, object_name: str, expires=None) -> str | None:
- """生成分发桶内对象的预签名下载链接"""
- if not self.client:
- return None
- try:
- from datetime import timedelta
- if expires is None:
- expires = timedelta(hours=1)
- bucket = settings.MINIO_DISTRIBUTION_BUCKET_NAME
- return self.client.get_presigned_url(
- "GET", bucket, object_name, expires=expires
- )
- except Exception as e:
- logger.error(f"Generate distribution presigned url failed: {e}")
- return None
- def upload_distribution_icon(
- self, file_data: bytes, filename: str, content_type: str,
- distribution_id: int, user_id: int
- ) -> str:
- """
- 上传分发图标到 MinIO
- - distribution_id > 0: icons/{distribution_id}/{uuid}.{ext}
- - distribution_id == 0 (创建时): icons/temp/{user_id}/{uuid}.{ext}
- 返回: object_name
- """
- if not self.client:
- raise Exception("Storage service unavailable")
- self._ensure_distribution_bucket()
- bucket = settings.MINIO_DISTRIBUTION_BUCKET_NAME
- ext = filename.split('.')[-1].lower() if '.' in filename else 'png'
- if ext not in ('png', 'jpg', 'jpeg', 'gif', 'webp'):
- ext = 'png'
- if distribution_id > 0:
- object_name = f"icons/{distribution_id}/{uuid.uuid4()}.{ext}"
- else:
- object_name = f"icons/temp/{user_id}/{uuid.uuid4()}.{ext}"
- try:
- self.client.put_object(
- bucket_name=bucket,
- object_name=object_name,
- data=io.BytesIO(file_data),
- length=len(file_data),
- content_type=content_type
- )
- return object_name
- except S3Error as e:
- logger.error(f"Distribution icon upload failed: {e}")
- raise Exception("Icon upload failed")
- # 单例实例
- minio_storage = MessageStorage()
|