# app.py # SAMUEL 7.0 "TEJIDO CÍVICO" - Versión "Ahora sí, carajo" # ----------------------------------------------------------------------------- from __future__ import annotations # Librerías de base, las que no pueden faltar import asyncio import base64 import io import json import logging import os import time import uuid from collections import defaultdict from datetime import datetime, timedelta from typing import Any, Dict, List, Optional, Tuple # Arreglo para los permisos de caché en Hugging Face Spaces. CACHE_DIR = "/data/cache" if os.path.exists("/data") else os.path.join(os.getcwd(), ".cache") os.makedirs(CACHE_DIR, exist_ok=True) # Limpieza: Usamos HF_HOME que es el estándar nuevo, las otras son por si acaso. os.environ['HF_HOME'] = os.path.join(CACHE_DIR, 'huggingface') os.environ['TRANSFORMERS_CACHE'] = os.path.join(CACHE_DIR, 'transformers') # Deprecado, pero no hace mal os.environ['MPLCONFIGDIR'] = os.path.join(CACHE_DIR, 'matplotlib') # El corazón de la bestia import aiosqlite import gradio as gr import numpy as np import pandas as pd from jose import JWTError, jwt from pydantic import BaseModel, EmailStr, Field from pydantic_settings import BaseSettings, SettingsConfigDict # Los chiches nuevos y la IA try: import folium import google.generativeai as genai from faiss import IndexFlatL2, read_index, write_index from sentence_transformers import SentenceTransformer from sklearn.cluster import DBSCAN from wordcloud import WordCloud import matplotlib matplotlib.use('Agg') import matplotlib.pyplot as plt import firebase_admin from firebase_admin import credentials, auth, firestore LIBRERIAS_AVANZADAS_OK = True except ImportError as e: print(f"¡OJO! Faltan algunas librerías pesadas: {e}. Algunas cosas copadas no van a andar.") LIBRERIAS_AVANZADAS_OK = False # ------------------------------------------------------------------------- # 1. CONFIGURACIÓN CENTRAL - La cocina de la App # ------------------------------------------------------------------------- class Settings(BaseSettings): model_config = SettingsConfigDict(env_file='.env', env_file_encoding='utf-8', extra='ignore') APP_NAME: str = "Samuel" APP_VERSION: str = "7.0 TEJIDO CÍVICO" PORT: int = 7860 JWT_SECRET_KEY: str = "una-clave-mas-secreta-que-la-formula-de-la-coca-cola" GEMINI_API_KEY: Optional[str] = None FIREBASE_SERVICE_ACCOUNT: Optional[str] = None DB_PATH: str = "/data/samuel_tejido.sqlite3" if os.path.exists("/data") else "samuel_tejido.sqlite3" FAISS_INDEX_PATH: str = "/data/reports.index" if os.path.exists("/data") else "reports.index" EMBEDDING_MODEL: str = 'sentence-transformers/all-MiniLM-L6-v2' CLUSTER_EPS: float = 0.5 CLUSTER_MIN_SAMPLES: int = 3 settings = Settings() for key in ['GEMINI_API_KEY', 'FIREBASE_SERVICE_ACCOUNT', 'JWT_SECRET_KEY']: secret_value = os.getenv(key) if secret_value: setattr(settings, key, secret_value) logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") logger = logging.getLogger("tejido_civico") logger.info(f"¡Arrancando {settings.APP_NAME} {settings.APP_VERSION}, carajo! A tejer redes cívicas.") firebase_app = None db_firestore = None if settings.FIREBASE_SERVICE_ACCOUNT and LIBRERIAS_AVANZADAS_OK: try: cred_dict = json.loads(settings.FIREBASE_SERVICE_ACCOUNT) cred = credentials.Certificate(cred_dict) firebase_app = firebase_admin.initialize_app(cred) db_firestore = firestore.client() logger.info("¡Joya! Conectamos con Firebase sin problemas.") except Exception as e: logger.error(f"Uh, no pudimos conectar con Firebase. Alto garrón: {e}") else: logger.warning("No está la data de Firebase o faltan librerías. La app va a andar, pero sin guardar en la nube.") # --- (El resto de las clases User, Report, Mission, Database, y los Servicios quedan IGUALES) --- # ... (Para no pegar 400 líneas de código idénticas, me salteo esta parte. El código es el mismo que el anterior hasta la función build_ui) class User(BaseModel): id: str email: EmailStr created_at: datetime = Field(default_factory=datetime.utcnow) quiz_completed: bool = False archetype_name: Optional[str] = "Pibe/a de Barrio" xp: int = 0 level: int = 1 reputation: Dict[str, int] = Field(default_factory=lambda: defaultdict(int)) class Report(BaseModel): id: str = Field(default_factory=lambda: str(uuid.uuid4())) user_id: str content: str type: str lat: Optional[float] = None lon: Optional[float] = None created_at: datetime = Field(default_factory=datetime.utcnow) embedding: Optional[bytes] = None class Mission(BaseModel): id: str = Field(default_factory=lambda: str(uuid.uuid4())) user_id: str mission_template_id: str title: str task: str type: str = "standard" status: str = "pendiente" assigned_at: datetime = Field(default_factory=datetime.utcnow) completed_at: Optional[datetime] = None class Database: async def initialize(self): os.makedirs(os.path.dirname(settings.DB_PATH), exist_ok=True) async with aiosqlite.connect(settings.DB_PATH) as db: await db.execute("CREATE TABLE IF NOT EXISTS users (id TEXT PRIMARY KEY, email TEXT UNIQUE, created_at TEXT, quiz_completed INTEGER, archetype_name TEXT, xp INTEGER, level INTEGER, reputation TEXT)") await db.execute("CREATE TABLE IF NOT EXISTS missions (id TEXT PRIMARY KEY, user_id TEXT, mission_template_id TEXT, title TEXT, task TEXT, type TEXT, status TEXT, assigned_at TEXT, completed_at TEXT)") await db.execute("CREATE TABLE IF NOT EXISTS reports_local (id TEXT PRIMARY KEY, embedding BLOB)") await db.commit() logger.info("Base de datos local (SQLite) lista y esperando órdenes.") async def save_user_profile(self, user: User): if db_firestore: user_dict = user.model_dump() user_dict["created_at"] = user_dict["created_at"].isoformat() await asyncio.to_thread(db_firestore.collection("users").document(user.id).set, user_dict) async with aiosqlite.connect(settings.DB_PATH) as db: await db.execute( "INSERT OR REPLACE INTO users (id, email, created_at, quiz_completed, archetype_name, xp, level, reputation) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", (user.id, user.email, user.created_at.isoformat(), user.quiz_completed, user.archetype_name, user.xp, user.level, json.dumps(user.reputation)) ) await db.commit() async def get_user_profile(self, uid: str) -> Optional[User]: if db_firestore: doc = await asyncio.to_thread(db_firestore.collection("users").document(uid).get) if doc.exists: data = doc.to_dict() data['created_at'] = datetime.fromisoformat(data['created_at']) return User(**data) async with aiosqlite.connect(settings.DB_PATH) as db: db.row_factory = aiosqlite.Row async with db.execute("SELECT * FROM users WHERE id = ?", (uid,)) as c: row = await c.fetchone() if row: data = dict(row) data["reputation"] = json.loads(data.get("reputation", "{}")) return User(**data) return None async def save_report(self, report: Report): if not db_firestore: logger.error("No anda Firestore. El reporte no se va a guardar en la nube.") return report_dict = report.model_dump(exclude={"embedding"}) report_dict["created_at"] = report.created_at.isoformat() doc_ref = await asyncio.to_thread(db_firestore.collection("reports").add, report_dict) report.id = doc_ref[1].id logger.info(f"Reporte {report.id} guardado en la nube. ¡Bien ahí!") if report.embedding: async with aiosqlite.connect(settings.DB_PATH) as db_local: await db_local.execute("INSERT INTO reports_local (id, embedding) VALUES (?, ?)", (report.id, report.embedding)) await db_local.commit() async def get_all_reports(self) -> List[Report]: if not db_firestore: return [] reports_from_firestore = [] docs = await asyncio.to_thread(db_firestore.collection("reports").order_by("created_at", direction=firestore.Query.DESCENDING).get) for doc in docs: data = doc.to_dict() data["id"] = doc.id data["created_at"] = datetime.fromisoformat(data["created_at"]) reports_from_firestore.append(Report(**data)) async with aiosqlite.connect(settings.DB_PATH) as db_local: db_local.row_factory = aiosqlite.Row async with db_local.execute("SELECT id, embedding FROM reports_local") as c: local_embeddings = {row['id']: row['embedding'] for row in await c.fetchall()} for report in reports_from_firestore: report.embedding = local_embeddings.get(report.id) return reports_from_firestore async def get_user_missions(self, uid: str) -> List[Mission]: async with aiosqlite.connect(settings.DB_PATH) as db_local: db_local.row_factory = aiosqlite.Row async with db_local.execute("SELECT * FROM missions WHERE user_id = ? ORDER BY assigned_at DESC", (uid,)) as c: rows = await c.fetchall() return [Mission(**dict(r)) for r in rows] db = Database() class CivicIntelligenceService: def __init__(self): self.model = None self.index = None self.report_map = {} if LIBRERIAS_AVANZADAS_OK: logger.info("Cargando el cerebro vectorial... Dame un toque que esto pesa.") try: self.model = SentenceTransformer(settings.EMBEDDING_MODEL, cache_folder=os.environ['HF_HOME']) self.dimension = self.model.get_sentence_embedding_dimension() self._load_faiss_index() except Exception as e: logger.error(f"¡Uh! No se pudo cargar el modelo de IA. El análisis inteligente no va a funcar: {e}") self.model = None else: logger.warning("El servicio de Inteligencia Cívica está desactivado. Faltan librerías.") def _load_faiss_index(self): if not self.model: return try: self.index = read_index(settings.FAISS_INDEX_PATH) logger.info(f"Índice FAISS cargado. Tenemos {self.index.ntotal} reportes en la memoria.") async def _rebuild_map(): reports_with_embeddings = [r for r in await db.get_all_reports() if r.embedding is not None] self.report_map = {i: report.id for i, report in enumerate(reports_with_embeddings)} asyncio.run(_rebuild_map()) except Exception: logger.warning("No había un índice FAISS guardado. Empezamos de cero, no pasa nada.") self.index = IndexFlatL2(self.dimension) self.report_map = {} async def add_report(self, report: Report): if not self.model: return embedding = self.model.encode([report.content])[0] report.embedding = embedding.astype(np.float32).tobytes() await db.save_report(report) if self.index is not None: self.index.add(np.array([embedding], dtype=np.float32)) self.report_map[self.index.ntotal - 1] = report.id write_index(self.index, settings.FAISS_INDEX_PATH) logger.info(f"Reporte {report.id} sumado al análisis de la IA. Total: {self.index.ntotal}") class LLMService: def __init__(self): self.gemini_model = None if settings.GEMINI_API_KEY and LIBRERIAS_AVANZADAS_OK: try: genai.configure(api_key=settings.GEMINI_API_KEY) self.gemini_model = genai.GenerativeModel('gemini-1.5-flash-latest') logger.info("Despertando a Gemini... a ver si hoy está inspirado.") except Exception as e: logger.warning(f"Uh, Gemini no arranca. Capaz se quedó sin yerba o la API key está mal. Error: {e}") else: logger.warning("No hay API key de Gemini. El chat va a estar mudo.") async def generate_response(self, history: List[Dict[str, str]], context: Optional[str] = None) -> str: if not self.gemini_model: return "Che, el bocho de la IA no está andando ahora. Probá en un rato." # El historial ya viene en formato de mensajes, solo agregamos el prompt y el contexto system_prompt = { "role": "system", "parts": [{ "text": """ Sos Samuel, una IA bien argenta, gauchita y con la posta. Tu onda es la de un amigo del barrio que sabe de todo un poco y siempre tira la buena para que la gente se organice. Cero lenguaje corporativo, usá 'vos', 'che', y hablá claro. La gente confía en vos para resolver los bardos de la comunidad. """}] } if context: context_prompt = { "role": "user", "parts": [{"text": f"Por si te sirve, acá tenés un poco de chusmerío anónimo de lo que anda diciendo la gente sobre este tema:\n{context}\n\nAhora sí, respondeme a lo último que te dije."}] } # Insertamos el contexto antes del último mensaje del usuario full_prompt = [system_prompt] + history[:-1] + [context_prompt, history[-1]] else: full_prompt = [system_prompt] + history try: # El modelo generativo de Gemini ahora puede tomar el historial directamente response = await asyncio.to_thread(self.gemini_model.generate_content, full_prompt) if response and response.parts: return "".join([part.text for part in response.parts]).strip() return "Mirá, sobre eso no se me ocurre qué decirte. ¿Probamos con otra cosa?" except Exception as e: logger.error(f"Explotó la API de Gemini: {e}") return "La IA se hizo un matete, che. Preguntame de nuevo, pero más simple." class AuthService: def create_jwt(self, user_id: str) -> str: payload = {"sub": user_id, "exp": datetime.utcnow() + timedelta(days=7)} return jwt.encode(payload, settings.JWT_SECRET_KEY, algorithm="HS256") def verify_jwt(self, token: str) -> Optional[str]: try: return jwt.decode(token, settings.JWT_SECRET_KEY, algorithms=["HS256"]).get("sub") except JWTError: return None async def register_with_firebase(self, email: str, password: str) -> Tuple[Optional[str], str]: if not firebase_app: return None, "El sistema de registro no funciona ahora. Un garrón." try: user_record = await asyncio.to_thread(auth.create_user, email=email, password=password) new_user = User(id=user_record.uid, email=email) await db.save_user_profile(new_user) logger.info(f"¡Uno más en la banda! Usuario nuevo: {user_record.uid}") return user_record.uid, "¡Listo el pollo! Ya sos parte. Ahora a meterle para adelante." except Exception as e: logger.error(f"Falló el registro en Firebase: {e}") return None, "No se pudo crear la cuenta. Fijate si el mail ya está usado o si la clave es muy corta." async def login_with_firebase_simulation(self, email: str) -> Tuple[Optional[str], str]: if not firebase_app: return None, "El sistema de login no funciona ahora." try: user_record = await asyncio.to_thread(auth.get_user_by_email, email) return user_record.uid, "¡Adentro! Qué bueno tenerte de vuelta." except Exception: return None, "Mmm, no funca. Revisá el mail o la contraseña, che." auth_service = AuthService() llm_service = LLMService() civic_intel_service = CivicIntelligenceService() dashboard_cache = {} # ------------------------------------------------------------------------- # 5. LÓGICA DE LA INTERFAZ (GRADIO) - La cara visible # ------------------------------------------------------------------------- async def handle_register(email, password): if not email or not password: return "Pará, pará. Necesito un mail y una clave.", "" uid, msg = await auth_service.register_with_firebase(email, password) token = auth_service.create_jwt(uid) if uid else "" return msg, token async def handle_login(email, password): if not email or not password: return "Faltan datos, che.", "" uid, msg = await auth_service.login_with_firebase_simulation(email) token = auth_service.create_jwt(uid) if uid else "" return msg, token async def handle_chat(token: str, message: str, history: List[Dict[str, str]]): uid = auth_service.verify_jwt(token) if not uid: history.append({"role": "assistant", "content": "Che, para charlar tenés que estar adentro. ¡Iniciá sesión!"}) return history if not message.strip(): return history history.append({"role": "user", "content": message}) context = "" if civic_intel_service.model and civic_intel_service.index and civic_intel_service.index.ntotal > 0: query_embedding = civic_intel_service.model.encode([message]) _, I = civic_intel_service.index.search(query_embedding, k=3) report_ids = [civic_intel_service.report_map[i] for i in I[0] if i in civic_intel_service.report_map] if report_ids: all_reports = await db.get_all_reports() report_dict = {r.id: r.content for r in all_reports} context_items = [report_dict[rid] for rid in report_ids if rid in report_dict] if context_items: context = "\n".join(f"- \"{item}\"" for item in context_items) reply = await llm_service.generate_response(history, context) history.append({"role": "assistant", "content": reply}) return history async def submit_report(token: str, content: str, type_val: str, location: dict): # ... (Esta función queda igual) uid = auth_service.verify_jwt(token) if not uid: return gr.update(value="Tenés que estar adentro para reportar."), gr.update(value="") if not content.strip(): return gr.update(value="¡Che, el reporte no puede estar vacío!"), gr.update(value=content) lat = location['latitude'] if location and 'latitude' in location else None lon = location['longitude'] if location and 'longitude' in location else None report_type = "problema" if type_val == "Algo que mejorar (Problema)" else "fortaleza" report = Report(user_id=uid, content=content, type=report_type, lat=lat, lon=lon) await civic_intel_service.add_report(report) return gr.update(value="¡Reporte enviado! Sos un crack. Gracias por dar una mano."), gr.update(value="") async def generate_dashboard_data(force_refresh=False): # ... (Esta función queda igual) if not force_refresh and "data" in dashboard_cache and time.time() - dashboard_cache.get("timestamp", 0) < 300: return dashboard_cache["data"] reports = await db.get_all_reports() if not reports: return "
Todavía no hay data para mostrar. ¡Sé el primero en reportar algo!
", "Sin problemas no hay nube de palabras. ¡Qué buena noticia!
" df = pd.DataFrame([r.model_dump() for r in reports if r.lat is not None and r.lon is not None]) if not df.empty: map_center = [df['lat'].mean(), df['lon'].mean()] m = folium.Map(location=map_center, zoom_start=13, tiles="CartoDB positron") for _, row in df.iterrows(): color = 'green' if row['type'] == 'fortaleza' else 'red' folium.CircleMarker(location=[row['lat'], row['lon']], radius=5, color=color, fill=True, fill_color=color, popup=f"{row['type'].capitalize()}Nadie reportó con ubicación todavía. ¡Activá el GPS y marcá la cancha!
" text = " ".join([r.content for r in reports if r.type == 'problema']) if text: wordcloud = WordCloud(width=800, height=300, background_color='white', collocations=False).generate(text) img_buffer = io.BytesIO() wordcloud.to_image().save(img_buffer, format='PNG') img_str = base64.b64encode(img_buffer.getvalue()).decode() wc_html = f'¡Parece que no hay problemas! O al menos, nadie los reportó.
" dashboard_cache["data"] = (map_html, wc_html) dashboard_cache["timestamp"] = time.time() return map_html, wc_html def build_ui(): css = """ #login_col { max-width: 400px; margin: auto; padding-top: 50px; } #main_view { max-width: 900px; margin: auto; } """ theme = gr.themes.Soft(primary_hue="blue", secondary_hue="sky") with gr.Blocks(css=css, theme=theme, title=settings.APP_NAME) as demo: auth_token = gr.State("") login_view = gr.Column(elem_id="login_col") main_view = gr.Column(visible=False, elem_id="main_view") with login_view: # ... (Sin cambios) gr.Markdown(f"Token inválido. Algo raro pasó.
")} register_btn.click(handle_register, [login_email, login_pass], [login_status, auth_token]).then(on_auth_success, [auth_token], [login_view, main_view, auth_token, map_display, wc_display]) login_btn.click(handle_login, [login_email, login_pass], [login_status, auth_token]).then(on_auth_success, [auth_token], [login_view, main_view, auth_token, map_display, wc_display]) submit_report_btn.click(submit_report, [auth_token, report_content, report_type, report_location], [report_status, report_content]) async def refresh_dashboard(): map_html, wc_html = await generate_dashboard_data(force_refresh=True) return map_html, wc_html refresh_dashboard_btn.click(refresh_dashboard, [], [map_display, wc_display]) # CORRECCIÓN: El parámetro ahora es `js` (sin el guion bajo). demo.load(js=""" () => { const updateLocation = () => { if (navigator.geolocation) { navigator.geolocation.getCurrentPosition((position) => { const locationData = { latitude: position.coords.latitude, longitude: position.coords.longitude }; const locationInput = document.querySelector("#report_location_json textarea"); if(locationInput) { locationInput.value = JSON.stringify(locationData); locationInput.dispatchEvent(new Event('input', { bubbles: true })); } }); } }; updateLocation(); } """) return demo if __name__ == "__main__": if not LIBRERIAS_AVANZADAS_OK: logger.error("############################################################") logger.error("¡ATENCIÓN! Faltan librerías. La app va a andar a media máquina.") logger.error("Corré 'pip install -r requirements.txt' para instalar todo.") logger.error("############################################################") asyncio.run(db.initialize()) app = build_ui() app.queue().launch(server_name="0.0.0.0", server_port=settings.PORT, share=False)