Samuel_4.0 / app.py
Lukeetah's picture
Update app.py
e22ca5e verified
raw
history blame
27.1 kB
# app.py
# SAMUEL 7.0 "TEJIDO CÍVICO" - Versión "Ahora sí, carajo"
# -----------------------------------------------------------------------------
from __future__ import annotations
# Librerías de base, las que no pueden faltar
import asyncio
import base64
import io
import json
import logging
import os
import time
import uuid
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Tuple
# Arreglo para los permisos de caché en Hugging Face Spaces.
CACHE_DIR = "/data/cache" if os.path.exists("/data") else os.path.join(os.getcwd(), ".cache")
os.makedirs(CACHE_DIR, exist_ok=True)
# Limpieza: Usamos HF_HOME que es el estándar nuevo, las otras son por si acaso.
os.environ['HF_HOME'] = os.path.join(CACHE_DIR, 'huggingface')
os.environ['TRANSFORMERS_CACHE'] = os.path.join(CACHE_DIR, 'transformers') # Deprecado, pero no hace mal
os.environ['MPLCONFIGDIR'] = os.path.join(CACHE_DIR, 'matplotlib')
# El corazón de la bestia
import aiosqlite
import gradio as gr
import numpy as np
import pandas as pd
from jose import JWTError, jwt
from pydantic import BaseModel, EmailStr, Field
from pydantic_settings import BaseSettings, SettingsConfigDict
# Los chiches nuevos y la IA
try:
import folium
import google.generativeai as genai
from faiss import IndexFlatL2, read_index, write_index
from sentence_transformers import SentenceTransformer
from sklearn.cluster import DBSCAN
from wordcloud import WordCloud
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import firebase_admin
from firebase_admin import credentials, auth, firestore
LIBRERIAS_AVANZADAS_OK = True
except ImportError as e:
print(f"¡OJO! Faltan algunas librerías pesadas: {e}. Algunas cosas copadas no van a andar.")
LIBRERIAS_AVANZADAS_OK = False
# -------------------------------------------------------------------------
# 1. CONFIGURACIÓN CENTRAL - La cocina de la App
# -------------------------------------------------------------------------
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file='.env', env_file_encoding='utf-8', extra='ignore')
APP_NAME: str = "Samuel"
APP_VERSION: str = "7.0 TEJIDO CÍVICO"
PORT: int = 7860
JWT_SECRET_KEY: str = "una-clave-mas-secreta-que-la-formula-de-la-coca-cola"
GEMINI_API_KEY: Optional[str] = None
FIREBASE_SERVICE_ACCOUNT: Optional[str] = None
DB_PATH: str = "/data/samuel_tejido.sqlite3" if os.path.exists("/data") else "samuel_tejido.sqlite3"
FAISS_INDEX_PATH: str = "/data/reports.index" if os.path.exists("/data") else "reports.index"
EMBEDDING_MODEL: str = 'sentence-transformers/all-MiniLM-L6-v2'
CLUSTER_EPS: float = 0.5
CLUSTER_MIN_SAMPLES: int = 3
settings = Settings()
for key in ['GEMINI_API_KEY', 'FIREBASE_SERVICE_ACCOUNT', 'JWT_SECRET_KEY']:
secret_value = os.getenv(key)
if secret_value:
setattr(settings, key, secret_value)
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger("tejido_civico")
logger.info(f"¡Arrancando {settings.APP_NAME} {settings.APP_VERSION}, carajo! A tejer redes cívicas.")
firebase_app = None
db_firestore = None
if settings.FIREBASE_SERVICE_ACCOUNT and LIBRERIAS_AVANZADAS_OK:
try:
cred_dict = json.loads(settings.FIREBASE_SERVICE_ACCOUNT)
cred = credentials.Certificate(cred_dict)
firebase_app = firebase_admin.initialize_app(cred)
db_firestore = firestore.client()
logger.info("¡Joya! Conectamos con Firebase sin problemas.")
except Exception as e:
logger.error(f"Uh, no pudimos conectar con Firebase. Alto garrón: {e}")
else:
logger.warning("No está la data de Firebase o faltan librerías. La app va a andar, pero sin guardar en la nube.")
# --- (El resto de las clases User, Report, Mission, Database, y los Servicios quedan IGUALES) ---
# ... (Para no pegar 400 líneas de código idénticas, me salteo esta parte. El código es el mismo que el anterior hasta la función build_ui)
class User(BaseModel):
id: str
email: EmailStr
created_at: datetime = Field(default_factory=datetime.utcnow)
quiz_completed: bool = False
archetype_name: Optional[str] = "Pibe/a de Barrio"
xp: int = 0
level: int = 1
reputation: Dict[str, int] = Field(default_factory=lambda: defaultdict(int))
class Report(BaseModel):
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
user_id: str
content: str
type: str
lat: Optional[float] = None
lon: Optional[float] = None
created_at: datetime = Field(default_factory=datetime.utcnow)
embedding: Optional[bytes] = None
class Mission(BaseModel):
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
user_id: str
mission_template_id: str
title: str
task: str
type: str = "standard"
status: str = "pendiente"
assigned_at: datetime = Field(default_factory=datetime.utcnow)
completed_at: Optional[datetime] = None
class Database:
async def initialize(self):
os.makedirs(os.path.dirname(settings.DB_PATH), exist_ok=True)
async with aiosqlite.connect(settings.DB_PATH) as db:
await db.execute("CREATE TABLE IF NOT EXISTS users (id TEXT PRIMARY KEY, email TEXT UNIQUE, created_at TEXT, quiz_completed INTEGER, archetype_name TEXT, xp INTEGER, level INTEGER, reputation TEXT)")
await db.execute("CREATE TABLE IF NOT EXISTS missions (id TEXT PRIMARY KEY, user_id TEXT, mission_template_id TEXT, title TEXT, task TEXT, type TEXT, status TEXT, assigned_at TEXT, completed_at TEXT)")
await db.execute("CREATE TABLE IF NOT EXISTS reports_local (id TEXT PRIMARY KEY, embedding BLOB)")
await db.commit()
logger.info("Base de datos local (SQLite) lista y esperando órdenes.")
async def save_user_profile(self, user: User):
if db_firestore:
user_dict = user.model_dump()
user_dict["created_at"] = user_dict["created_at"].isoformat()
await asyncio.to_thread(db_firestore.collection("users").document(user.id).set, user_dict)
async with aiosqlite.connect(settings.DB_PATH) as db:
await db.execute(
"INSERT OR REPLACE INTO users (id, email, created_at, quiz_completed, archetype_name, xp, level, reputation) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(user.id, user.email, user.created_at.isoformat(), user.quiz_completed, user.archetype_name, user.xp, user.level, json.dumps(user.reputation))
)
await db.commit()
async def get_user_profile(self, uid: str) -> Optional[User]:
if db_firestore:
doc = await asyncio.to_thread(db_firestore.collection("users").document(uid).get)
if doc.exists:
data = doc.to_dict()
data['created_at'] = datetime.fromisoformat(data['created_at'])
return User(**data)
async with aiosqlite.connect(settings.DB_PATH) as db:
db.row_factory = aiosqlite.Row
async with db.execute("SELECT * FROM users WHERE id = ?", (uid,)) as c:
row = await c.fetchone()
if row:
data = dict(row)
data["reputation"] = json.loads(data.get("reputation", "{}"))
return User(**data)
return None
async def save_report(self, report: Report):
if not db_firestore:
logger.error("No anda Firestore. El reporte no se va a guardar en la nube.")
return
report_dict = report.model_dump(exclude={"embedding"})
report_dict["created_at"] = report.created_at.isoformat()
doc_ref = await asyncio.to_thread(db_firestore.collection("reports").add, report_dict)
report.id = doc_ref[1].id
logger.info(f"Reporte {report.id} guardado en la nube. ¡Bien ahí!")
if report.embedding:
async with aiosqlite.connect(settings.DB_PATH) as db_local:
await db_local.execute("INSERT INTO reports_local (id, embedding) VALUES (?, ?)", (report.id, report.embedding))
await db_local.commit()
async def get_all_reports(self) -> List[Report]:
if not db_firestore: return []
reports_from_firestore = []
docs = await asyncio.to_thread(db_firestore.collection("reports").order_by("created_at", direction=firestore.Query.DESCENDING).get)
for doc in docs:
data = doc.to_dict()
data["id"] = doc.id
data["created_at"] = datetime.fromisoformat(data["created_at"])
reports_from_firestore.append(Report(**data))
async with aiosqlite.connect(settings.DB_PATH) as db_local:
db_local.row_factory = aiosqlite.Row
async with db_local.execute("SELECT id, embedding FROM reports_local") as c:
local_embeddings = {row['id']: row['embedding'] for row in await c.fetchall()}
for report in reports_from_firestore:
report.embedding = local_embeddings.get(report.id)
return reports_from_firestore
async def get_user_missions(self, uid: str) -> List[Mission]:
async with aiosqlite.connect(settings.DB_PATH) as db_local:
db_local.row_factory = aiosqlite.Row
async with db_local.execute("SELECT * FROM missions WHERE user_id = ? ORDER BY assigned_at DESC", (uid,)) as c:
rows = await c.fetchall()
return [Mission(**dict(r)) for r in rows]
db = Database()
class CivicIntelligenceService:
def __init__(self):
self.model = None
self.index = None
self.report_map = {}
if LIBRERIAS_AVANZADAS_OK:
logger.info("Cargando el cerebro vectorial... Dame un toque que esto pesa.")
try:
self.model = SentenceTransformer(settings.EMBEDDING_MODEL, cache_folder=os.environ['HF_HOME'])
self.dimension = self.model.get_sentence_embedding_dimension()
self._load_faiss_index()
except Exception as e:
logger.error(f"¡Uh! No se pudo cargar el modelo de IA. El análisis inteligente no va a funcar: {e}")
self.model = None
else:
logger.warning("El servicio de Inteligencia Cívica está desactivado. Faltan librerías.")
def _load_faiss_index(self):
if not self.model: return
try:
self.index = read_index(settings.FAISS_INDEX_PATH)
logger.info(f"Índice FAISS cargado. Tenemos {self.index.ntotal} reportes en la memoria.")
async def _rebuild_map():
reports_with_embeddings = [r for r in await db.get_all_reports() if r.embedding is not None]
self.report_map = {i: report.id for i, report in enumerate(reports_with_embeddings)}
asyncio.run(_rebuild_map())
except Exception:
logger.warning("No había un índice FAISS guardado. Empezamos de cero, no pasa nada.")
self.index = IndexFlatL2(self.dimension)
self.report_map = {}
async def add_report(self, report: Report):
if not self.model: return
embedding = self.model.encode([report.content])[0]
report.embedding = embedding.astype(np.float32).tobytes()
await db.save_report(report)
if self.index is not None:
self.index.add(np.array([embedding], dtype=np.float32))
self.report_map[self.index.ntotal - 1] = report.id
write_index(self.index, settings.FAISS_INDEX_PATH)
logger.info(f"Reporte {report.id} sumado al análisis de la IA. Total: {self.index.ntotal}")
class LLMService:
def __init__(self):
self.gemini_model = None
if settings.GEMINI_API_KEY and LIBRERIAS_AVANZADAS_OK:
try:
genai.configure(api_key=settings.GEMINI_API_KEY)
self.gemini_model = genai.GenerativeModel('gemini-1.5-flash-latest')
logger.info("Despertando a Gemini... a ver si hoy está inspirado.")
except Exception as e:
logger.warning(f"Uh, Gemini no arranca. Capaz se quedó sin yerba o la API key está mal. Error: {e}")
else:
logger.warning("No hay API key de Gemini. El chat va a estar mudo.")
async def generate_response(self, history: List[Dict[str, str]], context: Optional[str] = None) -> str:
if not self.gemini_model:
return "Che, el bocho de la IA no está andando ahora. Probá en un rato."
# El historial ya viene en formato de mensajes, solo agregamos el prompt y el contexto
system_prompt = {
"role": "system",
"parts": [{ "text": """
Sos Samuel, una IA bien argenta, gauchita y con la posta. Tu onda es la de un amigo del barrio que sabe de todo un poco y siempre tira la buena para que la gente se organice.
Cero lenguaje corporativo, usá 'vos', 'che', y hablá claro. La gente confía en vos para resolver los bardos de la comunidad.
"""}]
}
if context:
context_prompt = {
"role": "user",
"parts": [{"text": f"Por si te sirve, acá tenés un poco de chusmerío anónimo de lo que anda diciendo la gente sobre este tema:\n{context}\n\nAhora sí, respondeme a lo último que te dije."}]
}
# Insertamos el contexto antes del último mensaje del usuario
full_prompt = [system_prompt] + history[:-1] + [context_prompt, history[-1]]
else:
full_prompt = [system_prompt] + history
try:
# El modelo generativo de Gemini ahora puede tomar el historial directamente
response = await asyncio.to_thread(self.gemini_model.generate_content, full_prompt)
if response and response.parts:
return "".join([part.text for part in response.parts]).strip()
return "Mirá, sobre eso no se me ocurre qué decirte. ¿Probamos con otra cosa?"
except Exception as e:
logger.error(f"Explotó la API de Gemini: {e}")
return "La IA se hizo un matete, che. Preguntame de nuevo, pero más simple."
class AuthService:
def create_jwt(self, user_id: str) -> str:
payload = {"sub": user_id, "exp": datetime.utcnow() + timedelta(days=7)}
return jwt.encode(payload, settings.JWT_SECRET_KEY, algorithm="HS256")
def verify_jwt(self, token: str) -> Optional[str]:
try: return jwt.decode(token, settings.JWT_SECRET_KEY, algorithms=["HS256"]).get("sub")
except JWTError: return None
async def register_with_firebase(self, email: str, password: str) -> Tuple[Optional[str], str]:
if not firebase_app: return None, "El sistema de registro no funciona ahora. Un garrón."
try:
user_record = await asyncio.to_thread(auth.create_user, email=email, password=password)
new_user = User(id=user_record.uid, email=email)
await db.save_user_profile(new_user)
logger.info(f"¡Uno más en la banda! Usuario nuevo: {user_record.uid}")
return user_record.uid, "¡Listo el pollo! Ya sos parte. Ahora a meterle para adelante."
except Exception as e:
logger.error(f"Falló el registro en Firebase: {e}")
return None, "No se pudo crear la cuenta. Fijate si el mail ya está usado o si la clave es muy corta."
async def login_with_firebase_simulation(self, email: str) -> Tuple[Optional[str], str]:
if not firebase_app: return None, "El sistema de login no funciona ahora."
try:
user_record = await asyncio.to_thread(auth.get_user_by_email, email)
return user_record.uid, "¡Adentro! Qué bueno tenerte de vuelta."
except Exception:
return None, "Mmm, no funca. Revisá el mail o la contraseña, che."
auth_service = AuthService()
llm_service = LLMService()
civic_intel_service = CivicIntelligenceService()
dashboard_cache = {}
# -------------------------------------------------------------------------
# 5. LÓGICA DE LA INTERFAZ (GRADIO) - La cara visible
# -------------------------------------------------------------------------
async def handle_register(email, password):
if not email or not password: return "Pará, pará. Necesito un mail y una clave.", ""
uid, msg = await auth_service.register_with_firebase(email, password)
token = auth_service.create_jwt(uid) if uid else ""
return msg, token
async def handle_login(email, password):
if not email or not password: return "Faltan datos, che.", ""
uid, msg = await auth_service.login_with_firebase_simulation(email)
token = auth_service.create_jwt(uid) if uid else ""
return msg, token
async def handle_chat(token: str, message: str, history: List[Dict[str, str]]):
uid = auth_service.verify_jwt(token)
if not uid:
history.append({"role": "assistant", "content": "Che, para charlar tenés que estar adentro. ¡Iniciá sesión!"})
return history
if not message.strip(): return history
history.append({"role": "user", "content": message})
context = ""
if civic_intel_service.model and civic_intel_service.index and civic_intel_service.index.ntotal > 0:
query_embedding = civic_intel_service.model.encode([message])
_, I = civic_intel_service.index.search(query_embedding, k=3)
report_ids = [civic_intel_service.report_map[i] for i in I[0] if i in civic_intel_service.report_map]
if report_ids:
all_reports = await db.get_all_reports()
report_dict = {r.id: r.content for r in all_reports}
context_items = [report_dict[rid] for rid in report_ids if rid in report_dict]
if context_items:
context = "\n".join(f"- \"{item}\"" for item in context_items)
reply = await llm_service.generate_response(history, context)
history.append({"role": "assistant", "content": reply})
return history
async def submit_report(token: str, content: str, type_val: str, location: dict):
# ... (Esta función queda igual)
uid = auth_service.verify_jwt(token)
if not uid: return gr.update(value="Tenés que estar adentro para reportar."), gr.update(value="")
if not content.strip(): return gr.update(value="¡Che, el reporte no puede estar vacío!"), gr.update(value=content)
lat = location['latitude'] if location and 'latitude' in location else None
lon = location['longitude'] if location and 'longitude' in location else None
report_type = "problema" if type_val == "Algo que mejorar (Problema)" else "fortaleza"
report = Report(user_id=uid, content=content, type=report_type, lat=lat, lon=lon)
await civic_intel_service.add_report(report)
return gr.update(value="¡Reporte enviado! Sos un crack. Gracias por dar una mano."), gr.update(value="")
async def generate_dashboard_data(force_refresh=False):
# ... (Esta función queda igual)
if not force_refresh and "data" in dashboard_cache and time.time() - dashboard_cache.get("timestamp", 0) < 300:
return dashboard_cache["data"]
reports = await db.get_all_reports()
if not reports:
return "<p>Todavía no hay data para mostrar. ¡Sé el primero en reportar algo!</p>", "<p>Sin problemas no hay nube de palabras. ¡Qué buena noticia!</p>"
df = pd.DataFrame([r.model_dump() for r in reports if r.lat is not None and r.lon is not None])
if not df.empty:
map_center = [df['lat'].mean(), df['lon'].mean()]
m = folium.Map(location=map_center, zoom_start=13, tiles="CartoDB positron")
for _, row in df.iterrows():
color = 'green' if row['type'] == 'fortaleza' else 'red'
folium.CircleMarker(location=[row['lat'], row['lon']], radius=5, color=color, fill=True, fill_color=color, popup=f"<b>{row['type'].capitalize()}</b><br>{row['content'][:100]}").add_to(m)
map_html = m._repr_html_()
else:
map_html = "<p>Nadie reportó con ubicación todavía. ¡Activá el GPS y marcá la cancha!</p>"
text = " ".join([r.content for r in reports if r.type == 'problema'])
if text:
wordcloud = WordCloud(width=800, height=300, background_color='white', collocations=False).generate(text)
img_buffer = io.BytesIO()
wordcloud.to_image().save(img_buffer, format='PNG')
img_str = base64.b64encode(img_buffer.getvalue()).decode()
wc_html = f'<img src="data:image/png;base64,{img_str}" alt="Nube de Problemas" style="width:100%; height:auto;">'
else:
wc_html = "<p>¡Parece que no hay problemas! O al menos, nadie los reportó.</p>"
dashboard_cache["data"] = (map_html, wc_html)
dashboard_cache["timestamp"] = time.time()
return map_html, wc_html
def build_ui():
css = """
#login_col { max-width: 400px; margin: auto; padding-top: 50px; }
#main_view { max-width: 900px; margin: auto; }
"""
theme = gr.themes.Soft(primary_hue="blue", secondary_hue="sky")
with gr.Blocks(css=css, theme=theme, title=settings.APP_NAME) as demo:
auth_token = gr.State("")
login_view = gr.Column(elem_id="login_col")
main_view = gr.Column(visible=False, elem_id="main_view")
with login_view:
# ... (Sin cambios)
gr.Markdown(f"<h1>SAMUEL</h1><h2>La red que nos une para mejorar lo nuestro.</h2>")
login_email = gr.Textbox(label="Tu Correo")
login_pass = gr.Textbox(label="Tu Clave Secreta", type="password")
login_status = gr.Markdown()
with gr.Row():
login_btn = gr.Button("Metele")
register_btn = gr.Button("Sumate")
with main_view:
with gr.Tabs():
with gr.TabItem("Tu Base de Operaciones"):
# ... (Sin cambios)
gr.Markdown("## Acá está tu movida cívica")
gr.Markdown("Próximamente: tu perfil, reputación y misiones para cambiar el barrio.")
with gr.TabItem("Charlá con Samuel"):
# CORRECCIÓN: Agregamos type='messages' para usar el formato nuevo y evitar el warning.
chatbot_ui = gr.Chatbot(label="Chat con Samuel", height=500, type='messages')
chat_input = gr.Textbox(placeholder="¿Qué onda, Samuel? Contame qué se puede hacer por acá...")
# CORRECCIÓN: La lógica del chat ahora maneja una lista de diccionarios.
async def chat_interface(message, history):
updated_history = await handle_chat(auth_token.value, message, history)
return "", updated_history
chat_input.submit(chat_interface, [chat_input, chatbot_ui], [chat_input, chatbot_ui])
with gr.TabItem("El Mapa del Barrio"):
# ... (Sin cambios, ya estaba bien)
gr.Markdown("## El Pulso de la Comunidad")
with gr.Accordion("Quiero sumar mi granito de arena", open=False):
report_content = gr.Textbox(label="¿Qué viste? Contalo con tus palabras", lines=3)
report_type = gr.Radio(["Algo bueno para destacar (Fortaleza)", "Algo que mejorar (Problema)"], label="¿Qué tipo de reporte es?")
report_location = gr.Json(label="Ubicación (se completa solo si das permiso)", visible=False, elem_id="report_location_json")
report_status = gr.Markdown()
submit_report_btn = gr.Button("Mandar Reporte", elem_id="submit_report_btn")
refresh_dashboard_btn = gr.Button("Actualizar Mapa y Datos")
wc_display = gr.HTML(label="De qué se habla (Problemas)")
map_display = gr.HTML(label="Mapa de Reportes")
# --- Lógica de Handlers y Transiciones (sin cambios) ---
async def on_auth_success(token):
if auth_service.verify_jwt(token):
map_html, wc_html = await generate_dashboard_data(force_refresh=True)
return {
login_view: gr.update(visible=False), main_view: gr.update(visible=True),
auth_token: token, map_display: map_html, wc_display: wc_html,
}
return {login_status: gr.update(value=f"<p style='color:red;'>Token inválido. Algo raro pasó.</p>")}
register_btn.click(handle_register, [login_email, login_pass], [login_status, auth_token]).then(on_auth_success, [auth_token], [login_view, main_view, auth_token, map_display, wc_display])
login_btn.click(handle_login, [login_email, login_pass], [login_status, auth_token]).then(on_auth_success, [auth_token], [login_view, main_view, auth_token, map_display, wc_display])
submit_report_btn.click(submit_report, [auth_token, report_content, report_type, report_location], [report_status, report_content])
async def refresh_dashboard():
map_html, wc_html = await generate_dashboard_data(force_refresh=True)
return map_html, wc_html
refresh_dashboard_btn.click(refresh_dashboard, [], [map_display, wc_display])
# CORRECCIÓN: El parámetro ahora es `js` (sin el guion bajo).
demo.load(js="""
() => {
const updateLocation = () => {
if (navigator.geolocation) {
navigator.geolocation.getCurrentPosition((position) => {
const locationData = {
latitude: position.coords.latitude,
longitude: position.coords.longitude
};
const locationInput = document.querySelector("#report_location_json textarea");
if(locationInput) {
locationInput.value = JSON.stringify(locationData);
locationInput.dispatchEvent(new Event('input', { bubbles: true }));
}
});
}
};
updateLocation();
}
""")
return demo
if __name__ == "__main__":
if not LIBRERIAS_AVANZADAS_OK:
logger.error("############################################################")
logger.error("¡ATENCIÓN! Faltan librerías. La app va a andar a media máquina.")
logger.error("Corré 'pip install -r requirements.txt' para instalar todo.")
logger.error("############################################################")
asyncio.run(db.initialize())
app = build_ui()
app.queue().launch(server_name="0.0.0.0", server_port=settings.PORT, share=False)