| 1234567891011121314151617181920212223242526272829303132333435363738394041424344 |
- from typing import Generator, Optional
- from fastapi import Depends, HTTPException, status
- from fastapi.security import OAuth2PasswordBearer
- from jose import jwt, JWTError
- from sqlalchemy.orm import Session
- from backend.app.core.database import get_db
- from backend.app.core.config import settings
- from backend.app.models import sql_models
- from backend.app.schemas import schemas
- oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_V1_STR}/login")
- def get_current_user(
- db: Session = Depends(get_db),
- token: str = Depends(oauth2_scheme)
- ) -> sql_models.User:
- credentials_exception = HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- try:
- payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
- username: str = payload.get("sub")
- if username is None:
- raise credentials_exception
- token_data = schemas.TokenData(username=username)
- except JWTError:
- raise credentials_exception
-
- user = db.query(sql_models.User).filter(sql_models.User.username == token_data.username).first()
- if user is None:
- raise credentials_exception
- return user
- def get_current_active_superuser(
- current_user: sql_models.User = Depends(get_current_user),
- ) -> sql_models.User:
- if not current_user.is_superuser:
- raise HTTPException(
- status_code=400, detail="The user doesn't have enough privileges"
- )
- return current_user
|