|
|
@@ -3,7 +3,7 @@ from fastapi import Depends, HTTPException, status, Response
|
|
|
from fastapi.security import OAuth2PasswordBearer, APIKeyHeader
|
|
|
from jose import jwt, JWTError
|
|
|
from sqlalchemy.orm import Session
|
|
|
-from datetime import datetime
|
|
|
+from datetime import datetime, timedelta
|
|
|
from app.core import security
|
|
|
from app.core.config import settings
|
|
|
from app.core.database import SessionLocal
|
|
|
@@ -44,13 +44,28 @@ def get_current_user(
|
|
|
# Sliding Expiration Check
|
|
|
# If token is valid but expires soon (e.g. less than half of total lifetime), renew it
|
|
|
exp = payload.get("exp")
|
|
|
+ is_long_term = payload.get("long_term", False)
|
|
|
+
|
|
|
if exp:
|
|
|
now = datetime.now().timestamp()
|
|
|
remaining_seconds = exp - now
|
|
|
+
|
|
|
+ threshold = settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60 / 2
|
|
|
+ if is_long_term:
|
|
|
+ threshold = settings.ACCESS_TOKEN_EXPIRE_MINUTES_LONG * 60 / 2
|
|
|
+
|
|
|
# If remaining time is less than half of the configured expiration time
|
|
|
- if remaining_seconds < (settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60 / 2):
|
|
|
+ if remaining_seconds < threshold:
|
|
|
+ expires_delta = None
|
|
|
+ if is_long_term:
|
|
|
+ expires_delta = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES_LONG)
|
|
|
+
|
|
|
# Issue new token
|
|
|
- new_token = security.create_access_token(subject=token_data.sub)
|
|
|
+ new_token = security.create_access_token(
|
|
|
+ subject=token_data.sub,
|
|
|
+ expires_delta=expires_delta,
|
|
|
+ is_long_term=is_long_term
|
|
|
+ )
|
|
|
# Set in response header
|
|
|
response.headers["X-New-Token"] = new_token
|
|
|
|
|
|
@@ -98,11 +113,26 @@ def get_current_user_optional(
|
|
|
|
|
|
# Sliding Expiration Check for Optional Auth
|
|
|
exp = payload.get("exp")
|
|
|
+ is_long_term = payload.get("long_term", False)
|
|
|
+
|
|
|
if exp:
|
|
|
now = datetime.now().timestamp()
|
|
|
remaining_seconds = exp - now
|
|
|
- if remaining_seconds < (settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60 / 2):
|
|
|
- new_token = security.create_access_token(subject=token_data.sub)
|
|
|
+
|
|
|
+ threshold = settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60 / 2
|
|
|
+ if is_long_term:
|
|
|
+ threshold = settings.ACCESS_TOKEN_EXPIRE_MINUTES_LONG * 60 / 2
|
|
|
+
|
|
|
+ if remaining_seconds < threshold:
|
|
|
+ expires_delta = None
|
|
|
+ if is_long_term:
|
|
|
+ expires_delta = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES_LONG)
|
|
|
+
|
|
|
+ new_token = security.create_access_token(
|
|
|
+ subject=token_data.sub,
|
|
|
+ expires_delta=expires_delta,
|
|
|
+ is_long_term=is_long_term
|
|
|
+ )
|
|
|
response.headers["X-New-Token"] = new_token
|
|
|
|
|
|
except (JWTError, Exception):
|