from typing import Generator, Optional from fastapi import Depends, HTTPException, status, Response from fastapi.security import OAuth2PasswordBearer, APIKeyHeader from jose import jwt, JWTError from sqlalchemy.orm import Session from datetime import datetime from app.core import security from app.core.config import settings from app.core.database import SessionLocal from app.models.user import User from app.models.application import Application from app.schemas.token import TokenPayload reusable_oauth2 = OAuth2PasswordBearer( tokenUrl=f"{settings.API_V1_STR}/auth/login", auto_error=False # Allow optional token ) token_header_scheme = APIKeyHeader(name="X-App-Access-Token", auto_error=False) def get_db() -> Generator: try: db = SessionLocal() yield db finally: db.close() def get_current_user( response: Response, db: Session = Depends(get_db), token: str = Depends(reusable_oauth2) ) -> User: if not token: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not authenticated", ) try: payload = jwt.decode( token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM] ) token_data = TokenPayload(**payload) # Sliding Expiration Check # If token is valid but expires soon (e.g. less than half of total lifetime), renew it exp = payload.get("exp") if exp: now = datetime.now().timestamp() remaining_seconds = exp - now # If remaining time is less than half of the configured expiration time if remaining_seconds < (settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60 / 2): # Issue new token new_token = security.create_access_token(subject=token_data.sub) # Set in response header response.headers["X-New-Token"] = new_token except (JWTError, Exception): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Could not validate credentials", ) # Ensure it's a user token (numeric ID) if not token_data.sub or not token_data.sub.isdigit(): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Invalid token type", ) user = db.query(User).filter(User.id == int(token_data.sub)).first() if not user: raise HTTPException(status_code=404, detail="User not found") return user def get_current_active_user( current_user: User = Depends(get_current_user), ) -> User: if current_user.status != "ACTIVE": raise HTTPException(status_code=400, detail="Inactive user") return current_user def get_current_user_optional( response: Response, db: Session = Depends(get_db), token: Optional[str] = Depends(reusable_oauth2) ) -> Optional[User]: """ Returns the user if the token is valid, otherwise None. Does NOT raise 403. """ if not token: return None try: payload = jwt.decode( token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM] ) token_data = TokenPayload(**payload) # Sliding Expiration Check for Optional Auth exp = payload.get("exp") if exp: now = datetime.now().timestamp() remaining_seconds = exp - now if remaining_seconds < (settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60 / 2): new_token = security.create_access_token(subject=token_data.sub) response.headers["X-New-Token"] = new_token except (JWTError, Exception): return None if not token_data.sub or not token_data.sub.isdigit(): return None user = db.query(User).filter(User.id == int(token_data.sub)).first() return user def get_current_active_user_optional( current_user: Optional[User] = Depends(get_current_user_optional), ) -> Optional[User]: if current_user and current_user.status == "ACTIVE": return current_user return None def get_current_app( db: Session = Depends(get_db), token: Optional[str] = Depends(reusable_oauth2), access_token: Optional[str] = Depends(token_header_scheme) ) -> Application: """ Get application from token (Machine-to-Machine auth). Supports: 1. JWT Bearer Token (Subject: "app:{id}") 2. Permanent Access Token (Header: X-App-Access-Token) """ # 1. Try Access Token first if present if access_token: # Use simple auth with permanent token app = db.query(Application).filter(Application.access_token == access_token).first() if not app: raise HTTPException(status_code=403, detail="Invalid access token") return app # 2. Try JWT Bearer Token if not token: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not authenticated", ) try: payload = jwt.decode( token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM] ) token_data = TokenPayload(**payload) except (JWTError, Exception): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Could not validate credentials", ) sub = token_data.sub if not sub or not sub.startswith("app:"): raise HTTPException(status_code=403, detail="Not an app token") try: app_id = int(sub.split(":")[1]) except (ValueError, IndexError): raise HTTPException(status_code=403, detail="Invalid app token format") app = db.query(Application).filter(Application.id == app_id).first() if not app: raise HTTPException(status_code=404, detail="App not found") return app