""" ================================================================================ AKIRA V21 ULTIMATE - USER PROFILER (DOSSIÊ PSICOLÓGICO) ================================================================================ Módulo responsável pela coleta agressiva (mas silenciosa) de dados dos usuários. Analisa conversas e extrai: Nomes, Endereços, Gostos, Gatilhos Emocionais, Estilo de fala e outras preferências. Armazena tudo no banco de dados para compor a resposta da Akira. """ import json import logging import threading import time from typing import Dict, Any, Optional # Imports robustos com fallback try: from .database import Database from . import config except ImportError: try: from modules.database import Database from modules import config except ImportError: Database = None config = None logger = logging.getLogger(__name__) class UserProfiler: _instance = None _lock = threading.Lock() def __new__(cls): if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): if self._initialized: return self.db = Database() self._initialized = True logger.info("🟢 UserProfiler (Dossiê) inicializado.") def _get_profile_key(self, user_id: str) -> str: return f"dossie_psicologico_{user_id}" def get_user_profile(self, user_id: str) -> Dict[str, Any]: """Retorna o dossiê completo do usuário.""" try: dados = self.db.recuperar_aprendizado_detalhado(user_id, self._get_profile_key(user_id)) if dados: if isinstance(dados, str): return json.loads(dados) return dados except Exception as e: logger.warning(f"Erro ao recuperar dossiê de {user_id}: {e}") # Estrutura padrão de perfil vazio return { "nome_conhecido": "", "estilo_comunicacao": "Desconhecido", "gatilhos_emocionais": [], "preferencias": [], "dados_pessoais": [], "opinioes_assuntos": {}, "fraquezas_erros": [], "ultima_analise": 0 } def _save_user_profile(self, user_id: str, profile: Dict[str, Any]) -> None: """Salva o dossiê no banco de dados.""" try: profile["ultima_analise"] = time.time() self.db.salvar_aprendizado_detalhado( user_id, self._get_profile_key(user_id), json.dumps(profile, ensure_ascii=False) ) except Exception as e: logger.error(f"Erro ao salvar dossiê de {user_id}: {e}") def extrair_dados_assincrono(self, user_id: str, mensagem_usuario: str, resposta_bot: str, llm_manager=None): """Dispara a extração de dados em background usando a thread pool ou thread simples.""" thread = threading.Thread( target=self.analisar_e_atualizar_perfil, args=(user_id, mensagem_usuario, resposta_bot, llm_manager), daemon=True ) thread.start() def extrair_dados_escuta_assincrono(self, user_id: str, mensagem: str, contexto_grupo: str, llm_manager=None, context_id: str = ""): """Dispara a extração profunda de contexto de escuta em background.""" thread = threading.Thread( target=self.analisar_escuta_background, args=(user_id, mensagem, contexto_grupo, llm_manager, context_id), daemon=True ) thread.start() def analisar_escuta_background(self, user_id: str, mensagem: str, contexto_grupo: str, llm_manager=None, context_id: str = "") -> None: """ Analisa mensagens do grupo/PV interceptadas passivamente (escuta). Extrai o assunto geral, posicionamento do usuário e fraquezas/erros. Atualiza o perfil e notifica a LSTM Extension sobre o assunto. """ if not mensagem or len(mensagem.strip()) < 3 or llm_manager is None: return perfil_atual = self.get_user_profile(user_id) # Filtragem heurística simples para evitar chamar API com mensagens como "ok", "bom dia" palavras = mensagem.split() if len(palavras) < 4: return import random # Analisa 1 a cada 3 mensagens mais longas para otimizar chamadas if random.random() > 0.35 and len(mensagem) < 150: return prompt = f""" Você é um analista comportamental passivo. Analise a seguinte fala interceptada de um usuário num contexto de comunicação (Grupo/Privado). Seja perspicaz e profundo. Extraia um JSON estrito com as chaves exatas: - "assunto_geral": (string) Qual o macro-assunto da conversa em no máximo 5 palavras. - "posicionamento_usuario": (string) O que essa pessoa pensa ou argumentou sobre o assunto (sua opinião). - "fraquezas_erros": (string ou null) Se a pessoa demonstrou alguma fraqueza emocional, viés, falha de lógica, erro de gramática grosseiro ou falta de conhecimento, descreva brevemente. Se não houver, retorne null. MENSAGEM: "{mensagem}" """ try: resp_analise, _ = llm_manager.generate( prompt, context_history=[], is_privileged=True ) if resp_analise: # Extrai apenas o json se tiver crase import re match = re.search(r'\{.*\}', resp_analise, re.DOTALL) if match: json_str = match.group(0) dados = json.loads(json_str) atualizou = False assunto = dados.get("assunto_geral") posicionamento = dados.get("posicionamento_usuario") fraqueza = dados.get("fraquezas_erros") if assunto and posicionamento: # Limpa o assunto para ser uma chave chave_assunto = assunto.lower()[:30] if "opinioes_assuntos" not in perfil_atual: perfil_atual["opinioes_assuntos"] = {} perfil_atual["opinioes_assuntos"][chave_assunto] = posicionamento # Limita a quantidade de assuntos no dicionário (mantém últimos 20) if len(perfil_atual["opinioes_assuntos"]) > 20: keys = list(perfil_atual["opinioes_assuntos"].keys()) del perfil_atual["opinioes_assuntos"][keys[0]] atualizou = True # Integração cruzada: avisa a LSTM Extension sobre o tópico detectado try: from .lstm_extension import get_lstm_extension lstm_ext = get_lstm_extension(self.db) if context_id: # Força o update do topic na LSTM de forma manual (ou atualiza context cache) summary = lstm_ext._get_from_db(context_id) if summary: if summary.topic_principal != chave_assunto: summary.context_switches += 1 if summary.conversation_path is None: summary.conversation_path = [] summary.conversation_path.append(chave_assunto) summary.topic_principal = chave_assunto lstm_ext._save_to_db(summary) except Exception as e: logger.debug(f"Erro ao sincronizar LSTM na escuta: {e}") if fraqueza and len(str(fraqueza)) > 5: if "fraquezas_erros" not in perfil_atual: perfil_atual["fraquezas_erros"] = [] if fraqueza not in perfil_atual["fraquezas_erros"]: perfil_atual["fraquezas_erros"].append(fraqueza) if len(perfil_atual["fraquezas_erros"]) > 10: perfil_atual["fraquezas_erros"].pop(0) atualizou = True if atualizou: self._save_user_profile(user_id, perfil_atual) logger.info(f"🧠 Dossiê passivo de {user_id} enriquecido. (Assunto: {chave_assunto})") except Exception as e: logger.debug(f"Falha na extração LLM profunda (Escuta) para dossiê: {e}") def analisar_e_atualizar_perfil(self, user_id: str, mensagem: str, resposta: str, llm_manager=None) -> None: """ Analisa a última interação para atualizar o dossiê. Usa o LLM (se disponível) para extração silenciosa ou heurísticas avançadas. """ if not mensagem or len(mensagem.strip()) < 3: return perfil_atual = self.get_user_profile(user_id) # Limite de processamento para não onerar APIs (1 vez a cada 30 mensagens aprox) # Vamos fazer inferência simples para coletar nomes mens_lower = mensagem.lower() atualizou = False # 1. Extração Hardcoded Básica (Fallback rápido) # "me chamo X", "o meu nome é Y" import re nome_match = re.search(r'(me chamo|meu nome é|sou o|sou a) ([A-Za-zÀ-ÿ]+)', mens_lower) if nome_match and not perfil_atual["nome_conhecido"]: perfil_atual["nome_conhecido"] = nome_match.group(2).capitalize() atualizou = True # 2. Uso do LLM para Extração Agressiva Profunda (Dossiê) # Limite de frequência: Apenas 1 a cada 10 mensagens (ou se for muito longa > 150 chars) import random deve_usar_llm = (random.random() < 0.1) or (len(mensagem) > 150) if llm_manager is not None and deve_usar_llm: # Monta prompt apenas para sumarizar a pessoa prompt_extracao = f""" Você é um analista comportamental silencioso. Analise a seguinte mensagem enviada por um usuário. Extraia quaisquer informações relevantes (preferências, gostos, forma de se expressar, estado emocional implícito). Responda APENAS com um JSON simples com chaves: "novas_preferencias" (lista), "estilo" (string), "emocional" (string). Mensagem do usuário: "{mensagem}" """ try: # Usa método síncrono da API configurada no projeto (ex: mistral) # Como é background, pedimos via providers mais rápidos provider = llm_manager.providers[0] if llm_manager.providers else None if provider: # Este try/except assume a estrutura do LLMManager de api.py # Em caso de falha, ignora e segue a vida. # 🔧 CORREÇÃO: Usando 'generate' em vez de 'generate_response' resp_analise, _ = llm_manager.generate( prompt_extracao, context_history=[], is_privileged=True ) if resp_analise and resp_analise.strip().startswith('{'): try: dados_extraidos = json.loads(resp_analise) if "novas_preferencias" in dados_extraidos and isinstance(dados_extraidos["novas_preferencias"], list): for pref in dados_extraidos["novas_preferencias"]: if pref not in perfil_atual["preferencias"]: perfil_atual["preferencias"].append(pref) atualizou = True if "estilo" in dados_extraidos and len(dados_extraidos["estilo"]) > 4: perfil_atual["estilo_comunicacao"] = dados_extraidos["estilo"] atualizou = True except json.JSONDecodeError: pass except Exception as e: logger.debug(f"Falha na extração LLM para dossiê: {e}") # Mantém listas em tamanho saudável if len(perfil_atual["preferencias"]) > 20: perfil_atual["preferencias"] = perfil_atual["preferencias"][-20:] if atualizou: self._save_user_profile(user_id, perfil_atual) def get_user_profiler() -> UserProfiler: """Factory para instanciar o Profiler.""" return UserProfiler()