import random import string import json from app.core.cache import redis_client from app.core.config import settings # Aliyun SDK imports try: # Use dypnsapi (Number Auth) as requested from alibabacloud_dypnsapi20170525.client import Client as DypnsApiClient from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_dypnsapi20170525 import models as dypns_api_models from alibabacloud_tea_util import models as util_models except ImportError: DypnsApiClient = None class SmsService: _client = None @classmethod def get_client(cls): if cls._client: return cls._client if not settings.ALIYUN_ACCESS_KEY_ID or not settings.ALIYUN_ACCESS_KEY_SECRET: raise ValueError("Aliyun AccessKey config missing") config = open_api_models.Config( access_key_id=settings.ALIYUN_ACCESS_KEY_ID, access_key_secret=settings.ALIYUN_ACCESS_KEY_SECRET ) # Endpoint for dypns (SMS Authentication Service) config.endpoint = 'dypnsapi.aliyuncs.com' cls._client = DypnsApiClient(config) return cls._client @staticmethod def send_code(mobile: str) -> str: """ Generates and 'sends' a code. Returns the code for dev/mock purposes (or logs it). """ # 1. Check Rate Limit (1 minute) limit_key = f"SMS_LIMIT:{mobile}" if redis_client.exists(limit_key): # Using local import to avoid circular dependency if any, # though HTTPException is from fastapi which is fine. from fastapi import HTTPException raise HTTPException(status_code=400, detail="发送过于频繁,请1分钟后再试") if settings.SMS_PROVIDER == "aliyun": code = SmsService.send_aliyun_sms(mobile) else: code = SmsService.send_mock_sms(mobile) # 2. Set Rate Limit (60 seconds) redis_client.setex(limit_key, 60, "1") return code @staticmethod def send_mock_sms(mobile: str) -> str: code = ''.join(random.choices(string.digits, k=6)) # Store in Redis: SMS:{mobile} -> code key = f"SMS:{mobile}" # Expire in 5 minutes redis_client.setex(key, 300, code) # In production, call actual SMS provider here. print(f"=======================================") print(f" [MOCK SMS] To: {mobile}, Code: {code}") print(f"=======================================") return code @staticmethod def send_aliyun_sms(mobile: str) -> str: if DypnsApiClient is None: print("Aliyun Dypns SDK not installed, falling back to mock") return SmsService.send_mock_sms(mobile) try: client = SmsService.get_client() # Use SendSmsVerifyCode API from SMS Authentication Service # We request the code to be returned so we can store it in Redis # TemplateParam logic: # - code: mapped to ##code## (Dypns generates this) # - min: mapped to expiration time in minutes expire_minutes = str(int(settings.CAPTCHA_EXPIRE_SECONDS / 60)) template_param = json.dumps({"code": "##code##", "min": expire_minutes}) request = dypns_api_models.SendSmsVerifyCodeRequest( phone_number=mobile, sign_name=settings.ALIYUN_SMS_SIGN_NAME, template_code=settings.ALIYUN_SMS_TEMPLATE_CODE, template_param=template_param, return_verify_code=True, valid_time=settings.CAPTCHA_EXPIRE_SECONDS ) runtime = util_models.RuntimeOptions() response = client.send_sms_verify_code_with_options(request, runtime) if response.body.code == 'OK': # For Dypns SendSmsVerifyCode, the code is in the Model verify_code = response.body.model.verify_code request_id = response.body.request_id biz_id = response.body.model.biz_id # Store in Redis key = f"SMS:{mobile}" redis_client.setex(key, settings.CAPTCHA_EXPIRE_SECONDS, verify_code) print(f" [ALIYUN SMS AUTH] To: {mobile}, Code: {verify_code} (Sent via SendSmsVerifyCode)") print(f" [ALIYUN SMS AUTH DEBUG] RequestId: {request_id}, BizId: {biz_id}") return verify_code else: error_msg = response.body.message or "Unknown error" request_id = response.body.request_id print(f" [ALIYUN SMS ERROR] {error_msg} | RequestId: {request_id}") raise Exception(f"Aliyun SMS failed: {error_msg}") except Exception as error: print(f" [ALIYUN SMS EXCEPTION] {error}") raise error @staticmethod def verify_code(mobile: str, code: str) -> bool: key = f"SMS:{mobile}" stored_code = redis_client.get(key) if not stored_code: return False if stored_code == code: redis_client.delete(key) # One-time use return True return False