| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 |
- import random
- import string
- import json
- from app.core.cache import redis_client
- from app.core.config import settings
- # Aliyun SDK imports
- try:
- # Use dypnsapi (Number Auth) as requested
- from alibabacloud_dypnsapi20170525.client import Client as DypnsApiClient
- from alibabacloud_tea_openapi import models as open_api_models
- from alibabacloud_dypnsapi20170525 import models as dypns_api_models
- from alibabacloud_tea_util import models as util_models
- except ImportError:
- DypnsApiClient = None
- class SmsService:
- _client = None
- @classmethod
- def get_client(cls):
- if cls._client:
- return cls._client
-
- if not settings.ALIYUN_ACCESS_KEY_ID or not settings.ALIYUN_ACCESS_KEY_SECRET:
- raise ValueError("Aliyun AccessKey config missing")
- config = open_api_models.Config(
- access_key_id=settings.ALIYUN_ACCESS_KEY_ID,
- access_key_secret=settings.ALIYUN_ACCESS_KEY_SECRET
- )
- # Endpoint for dypns (SMS Authentication Service)
- config.endpoint = 'dypnsapi.aliyuncs.com'
- cls._client = DypnsApiClient(config)
- return cls._client
- @staticmethod
- def send_code(mobile: str) -> str:
- """
- Generates and 'sends' a code.
- Returns the code for dev/mock purposes (or logs it).
- """
- # 1. Check Rate Limit (1 minute)
- limit_key = f"SMS_LIMIT:{mobile}"
- if redis_client.exists(limit_key):
- # Using local import to avoid circular dependency if any,
- # though HTTPException is from fastapi which is fine.
- from fastapi import HTTPException
- raise HTTPException(status_code=400, detail="发送过于频繁,请1分钟后再试")
- if settings.SMS_PROVIDER == "aliyun":
- code = SmsService.send_aliyun_sms(mobile)
- else:
- code = SmsService.send_mock_sms(mobile)
-
- # 2. Set Rate Limit (60 seconds)
- redis_client.setex(limit_key, 60, "1")
- return code
- @staticmethod
- def send_mock_sms(mobile: str) -> str:
- code = ''.join(random.choices(string.digits, k=6))
-
- # Store in Redis: SMS:{mobile} -> code
- key = f"SMS:{mobile}"
- # Expire in 5 minutes
- redis_client.setex(key, 300, code)
-
- # In production, call actual SMS provider here.
- print(f"=======================================")
- print(f" [MOCK SMS] To: {mobile}, Code: {code}")
- print(f"=======================================")
-
- return code
- @staticmethod
- def send_aliyun_sms(mobile: str) -> str:
- if DypnsApiClient is None:
- print("Aliyun Dypns SDK not installed, falling back to mock")
- return SmsService.send_mock_sms(mobile)
- try:
- client = SmsService.get_client()
-
- # Use SendSmsVerifyCode API from SMS Authentication Service
- # We request the code to be returned so we can store it in Redis
- # TemplateParam logic:
- # - code: mapped to ##code## (Dypns generates this)
- # - min: mapped to expiration time in minutes
- expire_minutes = str(int(settings.CAPTCHA_EXPIRE_SECONDS / 60))
- template_param = json.dumps({"code": "##code##", "min": expire_minutes})
- request = dypns_api_models.SendSmsVerifyCodeRequest(
- phone_number=mobile,
- sign_name=settings.ALIYUN_SMS_SIGN_NAME,
- template_code=settings.ALIYUN_SMS_TEMPLATE_CODE,
- template_param=template_param,
- return_verify_code=True,
- valid_time=settings.CAPTCHA_EXPIRE_SECONDS
- )
-
- runtime = util_models.RuntimeOptions()
- response = client.send_sms_verify_code_with_options(request, runtime)
-
- if response.body.code == 'OK':
- # For Dypns SendSmsVerifyCode, the code is in the Model
- verify_code = response.body.model.verify_code
- request_id = response.body.request_id
- biz_id = response.body.model.biz_id
-
- # Store in Redis
- key = f"SMS:{mobile}"
- redis_client.setex(key, settings.CAPTCHA_EXPIRE_SECONDS, verify_code)
-
- print(f" [ALIYUN SMS AUTH] To: {mobile}, Code: {verify_code} (Sent via SendSmsVerifyCode)")
- print(f" [ALIYUN SMS AUTH DEBUG] RequestId: {request_id}, BizId: {biz_id}")
- return verify_code
- else:
- error_msg = response.body.message or "Unknown error"
- request_id = response.body.request_id
- print(f" [ALIYUN SMS ERROR] {error_msg} | RequestId: {request_id}")
- raise Exception(f"Aliyun SMS failed: {error_msg}")
-
- except Exception as error:
- print(f" [ALIYUN SMS EXCEPTION] {error}")
- raise error
- @staticmethod
- def verify_code(mobile: str, code: str) -> bool:
- key = f"SMS:{mobile}"
- stored_code = redis_client.get(key)
-
- if not stored_code:
- return False
-
- if stored_code == code:
- redis_client.delete(key) # One-time use
- return True
- return False
|