67 lines
1.8 KiB
Python
67 lines
1.8 KiB
Python
import os
|
|
from datetime import UTC, datetime, timedelta
|
|
|
|
import jwt
|
|
from bcrypt import checkpw, gensalt, hashpw
|
|
from fastapi import HTTPException, Security
|
|
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
|
|
|
from app import schemas
|
|
from app.database import SessionLocal
|
|
from app.models import User
|
|
from app.settings import settings
|
|
|
|
|
|
def get_password_hash(password: str) -> str:
|
|
return hashpw(password.encode("utf-8"), gensalt()).decode("utf-8")
|
|
|
|
|
|
def verify_password(plain_password: str, hashed_password: str) -> bool:
|
|
return checkpw(
|
|
plain_password.encode("utf-8"), hashed_password.encode("utf-8")
|
|
)
|
|
|
|
|
|
def create_jwt_token(data: dict) -> str:
|
|
_ed = timedelta(minutes=settings.JWT_EXPIRE_MINUTES)
|
|
iat = datetime.now(UTC)
|
|
exp = datetime.now(UTC) + _ed
|
|
token_payload = data
|
|
token_payload.update({"iat": iat, "exp": exp})
|
|
|
|
token = jwt.encode(
|
|
token_payload, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM
|
|
)
|
|
|
|
return token
|
|
|
|
|
|
def get_user_from_token(token: str) -> schemas.User:
|
|
try:
|
|
payload = jwt.decode(
|
|
token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM]
|
|
)
|
|
except jwt.ExpiredSignatureError:
|
|
raise HTTPException(status_code=401, detail="Token has expired")
|
|
except jwt.JWTError:
|
|
raise HTTPException(
|
|
status_code=401, detail="Invalid authentication credentials"
|
|
)
|
|
|
|
user_id = payload.get("user_id")
|
|
|
|
# Return user from database
|
|
...
|
|
|
|
|
|
def get_user(
|
|
authorization: HTTPAuthorizationCredentials = Security(HTTPBearer()),
|
|
) -> schemas.User:
|
|
if authorization.scheme.lower() != "bearer":
|
|
raise HTTPException(
|
|
status_code=401, detail="Invalid authentication scheme"
|
|
)
|
|
|
|
token = authorization.credentials
|
|
return get_user_from_token(token)
|