import datetime import html as html_lib import json import os import re from typing import Dict, List, Optional import gradio as gr from datasets import Dataset, load_dataset from huggingface_hub import HfApi, hf_hub_download from content import ( SUBMISSION_TEXT, format_error, format_log, format_warning, ) from evaluator import SimpleEvaluator TOKEN = os.getenv("HF_TOKEN") or os.getenv("TOKEN") DATASET_REPO = "RUC-NLPIR/GISA" RESULTS_REPO = "RUC-NLPIR/GISA-leaderboard" META_FILE = "encrypted_question.jsonl" ANSWER_DIR = "answer" CACHE_DIR = "cache/answers" SEED_FILE = os.path.join(os.path.dirname(__file__), "seed.json") ASSETS_DIR = os.path.join(os.path.dirname(__file__), "assets") INDEX_HTML = os.path.join(ASSETS_DIR, "index.html") STYLES_CSS = os.path.join(ASSETS_DIR, "styles.css") SCRIPT_JS = os.path.join(ASSETS_DIR, "script.js") ALLOWED_TYPES = {"item", "set", "list", "table"} os.makedirs(CACHE_DIR, exist_ok=True) api = HfApi() evaluator = SimpleEvaluator() def _extract_username(profile, request: Optional[gr.Request]) -> Optional[str]: """ Best-effort extraction of the HF username across Gradio versions. On Hugging Face Spaces with hf_oauth enabled, Gradio can inject an OAuth profile object (usually exposing `.username`). Some versions also provide `request.username`. """ if profile is not None: username = getattr(profile, "username", None) if username: return str(username) # Some versions may pass a dict-like profile if isinstance(profile, dict): for key in ("username", "preferred_username", "name"): val = profile.get(key) if val: return str(val) if request is None: return None username = getattr(request, "username", None) if username: return str(username) headers = getattr(request, "headers", None) if not headers: return None # Starlette Headers is case-insensitive; also tolerate plain dicts. for key in ( "x-forwarded-user", "x-hf-user", "x-huggingface-user", "x-user", ): try: val = headers.get(key) except Exception: val = None if val: return str(val) return None def _safe_float(val): try: if val is None: return None if isinstance(val, str) and not val.strip(): return None return float(val) except Exception: return None def _to_percent(val: Optional[float]) -> float: if val is None: return 0.0 return round(float(val) * 100, 2) def _load_text(path: str) -> str: with open(path, "r", encoding="utf-8") as f: return f.read() def load_meta_map() -> Dict[str, str]: meta_path = hf_hub_download( repo_id=DATASET_REPO, filename=META_FILE, repo_type="dataset", token=TOKEN, ) meta_map: Dict[str, str] = {} with open(meta_path, "r", encoding="utf-8") as f: for idx, line in enumerate(f, start=1): if not line.strip(): continue try: item = json.loads(line) except json.JSONDecodeError as e: raise ValueError(f"Invalid JSON in meta.jsonl at line {idx}: {e}") if "id" not in item or "answer_type" not in item: raise ValueError(f"meta.jsonl line {idx} missing id/answer_type") qid = str(item["id"]) qtype = str(item["answer_type"]).lower().strip() if qtype not in ALLOWED_TYPES: raise ValueError(f"Unsupported answer_type '{qtype}' for id {qid}") meta_map[qid] = qtype if not meta_map: raise ValueError("meta.jsonl is empty") return meta_map def download_answer(qid: str) -> str: filename = f"{ANSWER_DIR}/{qid}.csv" return hf_hub_download( repo_id=DATASET_REPO, filename=filename, repo_type="dataset", token=TOKEN, cache_dir=CACHE_DIR, ) def load_results_dataset(): try: return load_dataset(RESULTS_REPO, split="train", token=TOKEN) except Exception: return None def build_leaderboard_rows() -> List[dict]: ds = load_results_dataset() if ds is None or len(ds) == 0: seed_rows = load_seed_rows() if not seed_rows: return [] return _rows_from_source(seed_rows) return _rows_from_source(ds) def _rows_from_source(source) -> List[dict]: rows: List[dict] = [] for row in source: rows.append( { "model": row.get("model", "-"), "org": row.get("org", "-"), "framework": row.get("framework", "-"), "date": row.get("date", "-"), "overall": _safe_float(row.get("overall_em", row.get("overall"))), "item_em": _safe_float(row.get("item_em")), "set_em": _safe_float(row.get("set_em")), "set_f1": _safe_float(row.get("set_f1")), "list_em": _safe_float(row.get("list_em")), "list_f1": _safe_float(row.get("list_f1")), "list_order": _safe_float(row.get("list_order")), "table_em": _safe_float(row.get("table_em")), "table_row_f1": _safe_float(row.get("table_row_f1")), "table_item_f1": _safe_float(row.get("table_item_f1")), } ) return rows def load_seed_rows() -> List[dict]: if not os.path.exists(SEED_FILE): return _load_seed_from_root_script() try: with open(SEED_FILE, "r", encoding="utf-8") as f: data = json.load(f) return data if isinstance(data, list) else [] except Exception: return _load_seed_from_root_script() def _load_seed_from_root_script() -> List[dict]: root_script = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "script.js")) if not os.path.exists(root_script): return [] try: text = _load_text(root_script) match = re.search(r"const\\s+data\\s*=\\s*(\\[.*?\\]);", text, re.S) if not match: return [] arr_text = match.group(1) arr_text = re.sub(r"(\\w+)\\s*:", r'\"\\1\":', arr_text) arr_text = re.sub(r",\\s*([}\\]])", r"\\1", arr_text) data = json.loads(arr_text) return data if isinstance(data, list) else [] except Exception: return [] def render_page() -> str: page = _load_text(INDEX_HTML) page = page.replace("__LEADERBOARD_DATA__", "") page = page.replace("__SCRIPT__", "") return page def _format_score(val: Optional[float]) -> str: if val is None: return "-" try: return f"{float(val):.2f}" except Exception: return "-" def _render_leaderboard_rows(data: List[dict]) -> str: # Render a static table body so the leaderboard is not empty even if client JS # runs before Gradio mounts the HTML. if not data: return 'No submissions yet.' # Default sort: Overall desc, then date desc (best-effort). def _date_key(s: str) -> int: m = re.match(r"^(\\d{4})-(\\d{2})-(\\d{2})$", str(s or "")) if not m: return 0 return int(m.group(1)) * 10000 + int(m.group(2)) * 100 + int(m.group(3)) sorted_rows = sorted( data, key=lambda r: ( _safe_float(r.get("overall")) or 0.0, _date_key(r.get("date")), ), reverse=True, ) out = [] for idx, row in enumerate(sorted_rows, start=1): model = html_lib.escape(str(row.get("model") or "-")) org = html_lib.escape(str(row.get("org") or "-")) framework = html_lib.escape(str(row.get("framework") or "-")) date = html_lib.escape(str(row.get("date") or "-")) out.append( "\n".join( [ "", f" {idx}", ' ', f'
{model}
', f'
{org}
', " ", f" {framework}", f" {date}", f' {_format_score(_safe_float(row.get("overall")))}', f" {_format_score(_safe_float(row.get('item_em')))}", f" {_format_score(_safe_float(row.get('set_em')))}", f" {_format_score(_safe_float(row.get('set_f1')))}", f" {_format_score(_safe_float(row.get('list_em')))}", f" {_format_score(_safe_float(row.get('list_f1')))}", f" {_format_score(_safe_float(row.get('list_order')))}", f" {_format_score(_safe_float(row.get('table_em')))}", f" {_format_score(_safe_float(row.get('table_row_f1')))}", f" {_format_score(_safe_float(row.get('table_item_f1')))}", "", ] ) ) return "\n".join(out) def build_js(data: List[dict]) -> str: script = _load_text(SCRIPT_JS) data_json = json.dumps(data, ensure_ascii=False) return f"window.LEADERBOARD_DATA = {data_json};\n" + script def ensure_results_repo(): if not TOKEN: return api.create_repo( repo_id=RESULTS_REPO, repo_type="dataset", private=False, exist_ok=True, token=TOKEN, ) def seed_results_if_needed(): seed_rows = load_seed_rows() if not seed_rows: return ds = load_results_dataset() if ds is not None and len(ds) > 0: return if not TOKEN: return entries = [] for row in seed_rows: entries.append( { "model": row.get("model", "-"), "org": row.get("org", "-"), "framework": row.get("framework", "N/A"), "date": row.get("date", "-"), "overall_em": _safe_float(row.get("overall")), "item_em": _safe_float(row.get("item_em")), "set_em": _safe_float(row.get("set_em")), "set_f1": _safe_float(row.get("set_f1")), "list_em": _safe_float(row.get("list_em")), "list_f1": _safe_float(row.get("list_f1")), "list_order": _safe_float(row.get("list_order")), "table_em": _safe_float(row.get("table_em")), "table_row_f1": _safe_float(row.get("table_row_f1")), "table_item_f1": _safe_float(row.get("table_item_f1")), "url": row.get("url", ""), "email": row.get("email", ""), "username": row.get("username", "seed"), } ) try: ensure_results_repo() Dataset.from_list(entries).push_to_hub(RESULTS_REPO, token=TOKEN) except Exception: pass def _get_metric(summary: dict, qtype: str, key: str, fallback: float = 0.0) -> float: return float(summary.get(qtype, {}).get(key, fallback) or 0.0) def compute_leaderboard_metrics(score_list: List[dict]) -> Dict[str, float]: summary = evaluator.gather_results(score_list) overall_em = _to_percent(summary.get("overall_global_em", 0.0)) item_em = _to_percent( _get_metric(summary, "item", "overall_item_em", _get_metric(summary, "item", "overall_global_em")) ) set_em = _to_percent(_get_metric(summary, "set", "overall_global_em")) set_f1 = _to_percent(_get_metric(summary, "set", "overall_set_f1")) list_em = _to_percent(_get_metric(summary, "list", "overall_global_em")) list_f1 = _to_percent(_get_metric(summary, "list", "overall_list_content_f1")) list_order = _to_percent(_get_metric(summary, "list", "overall_list_order_score")) table_em = _to_percent(_get_metric(summary, "table", "overall_global_em")) table_row_f1 = _to_percent(_get_metric(summary, "table", "overall_table_row_f1")) table_item_f1 = _to_percent(_get_metric(summary, "table", "overall_table_item_f1")) return { "overall_em": overall_em, "item_em": item_em, "set_em": set_em, "set_f1": set_f1, "list_em": list_em, "list_f1": list_f1, "list_order": list_order, "table_em": table_em, "table_row_f1": table_row_f1, "table_item_f1": table_item_f1, } def parse_jsonl(file_path: str) -> Dict[str, str]: preds: Dict[str, str] = {} with open(file_path, "r", encoding="utf-8") as f: for idx, line in enumerate(f, start=1): if not line.strip(): continue try: item = json.loads(line) except json.JSONDecodeError as e: raise ValueError(f"Line {idx} is not valid JSON: {e}") if "id" not in item or "prediction" not in item: raise ValueError(f"Line {idx} must contain 'id' and 'prediction'") qid = str(item["id"]) if qid in preds: raise ValueError(f"Duplicate id: {qid}") preds[qid] = item["prediction"] if not preds: raise ValueError("Empty submission file") return preds def add_new_eval( model: str, org: str, framework: str, url: str, email: str, file_obj, profile: Optional[gr.OAuthProfile] = None, request: gr.Request = None, ): if not TOKEN: return format_error("Server misconfigured: HF_TOKEN is missing.") username = _extract_username(profile, request) if not username: return format_warning("Please log in with HuggingFace to submit.") if not model or not org: return format_warning("Please provide model name and organization.") if file_obj is None: return format_warning("Please upload a JSONL file.") today = datetime.date.today().isoformat() try: ds = load_results_dataset() if ds is not None: for row in ds: if row.get("username") == username and row.get("date") == today: return format_warning("You already submitted today. Please try again tomorrow.") except Exception: return format_error("Failed to load leaderboard results. Please try again later.") try: meta_map = load_meta_map() except Exception as e: return format_error(f"Failed to load meta.jsonl: {e}") try: preds = parse_jsonl(file_obj.name) except Exception as e: return format_error(str(e)) pred_ids = set(preds.keys()) meta_ids = set(meta_map.keys()) extra = sorted(pred_ids - meta_ids) missing = sorted(meta_ids - pred_ids) if extra: return format_error(f"Submission has {len(extra)} unknown ids (e.g., {extra[0]}).") if missing: return format_error(f"Submission missing {len(missing)} ids (e.g., {missing[0]}).") score_list: List[dict] = [] for qid, prediction in preds.items(): gt_path = download_answer(qid) qtype = meta_map[qid] metrics = evaluator.evaluate_one( prediction=str(prediction), gt_path=gt_path, question_type=qtype, qid=qid, ) score_list.append(metrics) metrics = compute_leaderboard_metrics(score_list) entry = { "model": model, "org": org, "framework": framework or "N/A", "url": url or "", "email": email or "", "username": username, "date": today, **metrics, } try: ensure_results_repo() if ds is None: Dataset.from_list([entry]).push_to_hub(RESULTS_REPO, token=TOKEN) else: ds = ds.add_item(entry) ds.push_to_hub(RESULTS_REPO, token=TOKEN) except Exception: return format_error("Failed to save results. Please contact the maintainers.") return format_log("Submission received! Please refresh the leaderboard to see your score.") seed_results_if_needed() leaderboard_data = build_leaderboard_rows() css = _load_text(STYLES_CSS) page_html = render_page() rows_html = _render_leaderboard_rows(leaderboard_data) page_html = page_html.replace( '', f'{rows_html}', ) js = build_js(leaderboard_data) with gr.Blocks() as demo: gr.HTML(page_html) with gr.Accordion("Submit your results", open=True): gr.Markdown(SUBMISSION_TEXT) with gr.Row(): with gr.Column(): model_text = gr.Textbox(label="Model / System") org_text = gr.Textbox(label="Organization") framework_text = gr.Textbox(label="Framework", value="ReAct") url_text = gr.Textbox(label="Model URL", placeholder="Optional") with gr.Column(): email_text = gr.Textbox(label="Contact email (public)") file_input = gr.File(label="Upload JSONL") with gr.Row(): login_btn = gr.LoginButton() submit_btn = gr.Button("Submit") result_md = gr.Markdown() submit_btn.click( add_new_eval, inputs=[ model_text, org_text, framework_text, url_text, email_text, file_input, ], outputs=result_md, ) def _launch(): demo.queue() demo.launch(css=css, js=js, ssr_mode=False) if __name__ == "__main__": _launch()