Changed dependency management to poetry (#1)
Co-authored-by: pptx704 <rafeedm.bhuiyan@gmail.com> Reviewed-on: #1
This commit is contained in:
@ -1,25 +1,26 @@
|
||||
import os
|
||||
from passlib.context import CryptContext
|
||||
from datetime import UTC, datetime, timedelta
|
||||
|
||||
import jwt
|
||||
from datetime import datetime, timedelta, UTC
|
||||
from bcrypt import checkpw, gensalt, hashpw
|
||||
from fastapi import HTTPException, Security
|
||||
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
||||
|
||||
from app import schemas
|
||||
from app.database import SessionLocal
|
||||
from app.models import User
|
||||
from app.settings import settings
|
||||
|
||||
from fastapi import Security, HTTPException
|
||||
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
||||
from .database import get_db
|
||||
from .models import User
|
||||
from .settings import settings
|
||||
|
||||
from . import schemas
|
||||
|
||||
pwd_context = CryptContext(schemes=["bcrypt"])
|
||||
|
||||
def get_password_hash(password: str) -> str:
|
||||
return pwd_context.hash(password)
|
||||
return hashpw(password.encode("utf-8"), gensalt()).decode("utf-8")
|
||||
|
||||
|
||||
def verify_password(plain_password: str, hashed_password: str) -> bool:
|
||||
return pwd_context.verify(plain_password, hashed_password)
|
||||
return checkpw(
|
||||
plain_password.encode("utf-8"), hashed_password.encode("utf-8")
|
||||
)
|
||||
|
||||
|
||||
def create_jwt_token(data: dict) -> str:
|
||||
_ed = timedelta(minutes=settings.JWT_EXPIRE_MINUTES)
|
||||
@ -28,25 +29,38 @@ def create_jwt_token(data: dict) -> str:
|
||||
token_payload = data
|
||||
token_payload.update({"iat": iat, "exp": exp})
|
||||
|
||||
token = jwt.encode(token_payload, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM)
|
||||
token = jwt.encode(
|
||||
token_payload, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM
|
||||
)
|
||||
|
||||
return token
|
||||
|
||||
|
||||
def get_user_from_token(token: str) -> schemas.User:
|
||||
try:
|
||||
payload = jwt.decode(token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM])
|
||||
payload = jwt.decode(
|
||||
token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM]
|
||||
)
|
||||
except jwt.ExpiredSignatureError:
|
||||
raise HTTPException(status_code=401, detail="Token has expired")
|
||||
except jwt.JWTError:
|
||||
raise HTTPException(status_code=401, detail="Invalid authentication credentials")
|
||||
|
||||
raise HTTPException(
|
||||
status_code=401, detail="Invalid authentication credentials"
|
||||
)
|
||||
|
||||
user_id = payload.get("user_id")
|
||||
|
||||
|
||||
# Return user from database
|
||||
...
|
||||
|
||||
def get_user(authorization: HTTPAuthorizationCredentials = Security(HTTPBearer())) -> schemas.User:
|
||||
|
||||
def get_user(
|
||||
authorization: HTTPAuthorizationCredentials = Security(HTTPBearer()),
|
||||
) -> schemas.User:
|
||||
if authorization.scheme.lower() != "bearer":
|
||||
raise HTTPException(status_code=401, detail="Invalid authentication scheme")
|
||||
|
||||
raise HTTPException(
|
||||
status_code=401, detail="Invalid authentication scheme"
|
||||
)
|
||||
|
||||
token = authorization.credentials
|
||||
return get_user_from_token(token)
|
||||
|
||||
Reference in New Issue
Block a user