mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
Merge pull request #52 from Hestia-Homes/main
Added placeholder code for user validation and implemented jwt expiry…
This commit is contained in:
commit
5ff317f7bb
1 changed files with 20 additions and 12 deletions
|
|
@ -1,13 +1,17 @@
|
|||
from fastapi import Depends, HTTPException, status, Request
|
||||
from fastapi.security import APIKeyHeader, OAuth2PasswordBearer
|
||||
from jose import JWTError, jwe
|
||||
from jose import JWTError, jwe, jwt
|
||||
from Crypto.Protocol.KDF import HKDF
|
||||
from Crypto.Hash import SHA256
|
||||
from typing import Any
|
||||
import json
|
||||
import logging
|
||||
from app.config import get_settings
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
api_key_header = APIKeyHeader(name=get_settings().API_KEY_NAME, auto_error=False)
|
||||
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
|
||||
|
||||
|
|
@ -30,6 +34,10 @@ def get_user(user_id: str):
|
|||
user = None
|
||||
if user_id == "known_id":
|
||||
user = {"id": user_id, "name": "Known User"}
|
||||
else:
|
||||
print("IMPLEMENT ME! - fetch user from database")
|
||||
user = {"id": user_id, "name": "Dummy User"}
|
||||
|
||||
return user
|
||||
|
||||
|
||||
|
|
@ -65,24 +73,24 @@ def validate_jwt_token(token: str = Depends(oauth2_scheme)):
|
|||
# The SECRET_KEY should match the NEXTAUTH_SECRET in the front end
|
||||
try:
|
||||
payload = get_token_payload(token, get_settings().SECRET_KEY)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
except jwt.ExpiredSignatureError:
|
||||
logger.error("JWT token has expired.")
|
||||
raise credentials_exception
|
||||
except Exception as e:
|
||||
logger.error(f"An error occurred while validating the token: {e}")
|
||||
raise credentials_exception
|
||||
|
||||
user_id: str = payload.get("dbId")
|
||||
if user_id is None:
|
||||
logger.error("No user ID found in the JWT token.")
|
||||
raise credentials_exception
|
||||
|
||||
user = get_user(user_id=user_id)
|
||||
if user is None:
|
||||
logger.error(f"No user found for user ID: {user_id}.")
|
||||
raise credentials_exception
|
||||
|
||||
return user
|
||||
except JWTError:
|
||||
logger.error("An error occurred while decoding the JWT token.")
|
||||
raise credentials_exception
|
||||
|
||||
|
||||
async def validate_token(token: str = Depends(oauth2_scheme), request: Request = None):
|
||||
token_data = validate_jwt_token(token)
|
||||
if not token_data:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN, detail="Could not validate credentials"
|
||||
)
|
||||
return token
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue