Samuel_4.0 / auth.py
Lukeetah's picture
Upload 13 files
5b0bb4b verified
# auth.py
# Lógica de autenticación, hashing de contraseñas y gestión de tokens JWT.
import os
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from database import get_db, User
# --- Configuración de Seguridad ---
SECRET_KEY = os.getenv("JWT_SECRET_KEY")
if not SECRET_KEY:
raise ValueError("La variable de entorno JWT_SECRET_KEY no está configurada.")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 * 7 # Expira en 7 días
# Contexto para el hashing de contraseñas
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# Esquema de OAuth2 para que FastAPI lo use en la inyección de dependencias
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# --- Funciones de Hashing ---
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verifica una contraseña plana contra su hash."""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Genera el hash de una contraseña."""
return pwd_context.hash(password)
# --- Funciones de Token JWT ---
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""Crea un nuevo token de acceso JWT."""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# --- Dependencia de Usuario Actual ---
async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)) -> User:
"""
Dependencia de FastAPI para obtener el usuario actual a partir de un token JWT.
Valida el token, decodifica el ID de usuario y lo recupera de la base de datos.
"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="No se pudieron validar las credenciales",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
email: str = payload.get("sub")
if email is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = db.query(User).filter(User.email == email).first()
if user is None:
raise credentials_exception
return user