# auth.py # Lógica de autenticación, hashing de contraseñas y gestión de tokens JWT. import os from datetime import datetime, timedelta from typing import Optional from jose import JWTError, jwt from passlib.context import CryptContext from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from sqlalchemy.orm import Session from database import get_db, User # --- Configuración de Seguridad --- SECRET_KEY = os.getenv("JWT_SECRET_KEY") if not SECRET_KEY: raise ValueError("La variable de entorno JWT_SECRET_KEY no está configurada.") ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 * 7 # Expira en 7 días # Contexto para el hashing de contraseñas pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # Esquema de OAuth2 para que FastAPI lo use en la inyección de dependencias oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") # --- Funciones de Hashing --- def verify_password(plain_password: str, hashed_password: str) -> bool: """Verifica una contraseña plana contra su hash.""" return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: """Genera el hash de una contraseña.""" return pwd_context.hash(password) # --- Funciones de Token JWT --- def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: """Crea un nuevo token de acceso JWT.""" to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt # --- Dependencia de Usuario Actual --- async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)) -> User: """ Dependencia de FastAPI para obtener el usuario actual a partir de un token JWT. Valida el token, decodifica el ID de usuario y lo recupera de la base de datos. """ credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="No se pudieron validar las credenciales", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) email: str = payload.get("sub") if email is None: raise credentials_exception except JWTError: raise credentials_exception user = db.query(User).filter(User.email == email).first() if user is None: raise credentials_exception return user