import datetime
import html as html_lib
import json
import os
import re
from typing import Dict, List, Optional
import gradio as gr
from datasets import Dataset, load_dataset
from huggingface_hub import HfApi, hf_hub_download
from content import (
SUBMISSION_TEXT,
format_error,
format_log,
format_warning,
)
from evaluator import SimpleEvaluator
TOKEN = os.getenv("HF_TOKEN") or os.getenv("TOKEN")
DATASET_REPO = "RUC-NLPIR/GISA"
RESULTS_REPO = "RUC-NLPIR/GISA-leaderboard"
META_FILE = "encrypted_question.jsonl"
ANSWER_DIR = "answer"
CACHE_DIR = "cache/answers"
SEED_FILE = os.path.join(os.path.dirname(__file__), "seed.json")
ASSETS_DIR = os.path.join(os.path.dirname(__file__), "assets")
INDEX_HTML = os.path.join(ASSETS_DIR, "index.html")
STYLES_CSS = os.path.join(ASSETS_DIR, "styles.css")
SCRIPT_JS = os.path.join(ASSETS_DIR, "script.js")
ALLOWED_TYPES = {"item", "set", "list", "table"}
os.makedirs(CACHE_DIR, exist_ok=True)
api = HfApi()
evaluator = SimpleEvaluator()
def _extract_username(profile, request: Optional[gr.Request]) -> Optional[str]:
"""
Best-effort extraction of the HF username across Gradio versions.
On Hugging Face Spaces with hf_oauth enabled, Gradio can inject an OAuth profile object
(usually exposing `.username`). Some versions also provide `request.username`.
"""
if profile is not None:
username = getattr(profile, "username", None)
if username:
return str(username)
# Some versions may pass a dict-like profile
if isinstance(profile, dict):
for key in ("username", "preferred_username", "name"):
val = profile.get(key)
if val:
return str(val)
if request is None:
return None
username = getattr(request, "username", None)
if username:
return str(username)
headers = getattr(request, "headers", None)
if not headers:
return None
# Starlette Headers is case-insensitive; also tolerate plain dicts.
for key in (
"x-forwarded-user",
"x-hf-user",
"x-huggingface-user",
"x-user",
):
try:
val = headers.get(key)
except Exception:
val = None
if val:
return str(val)
return None
def _safe_float(val):
try:
if val is None:
return None
if isinstance(val, str) and not val.strip():
return None
return float(val)
except Exception:
return None
def _to_percent(val: Optional[float]) -> float:
if val is None:
return 0.0
return round(float(val) * 100, 2)
def _load_text(path: str) -> str:
with open(path, "r", encoding="utf-8") as f:
return f.read()
def load_meta_map() -> Dict[str, str]:
meta_path = hf_hub_download(
repo_id=DATASET_REPO,
filename=META_FILE,
repo_type="dataset",
token=TOKEN,
)
meta_map: Dict[str, str] = {}
with open(meta_path, "r", encoding="utf-8") as f:
for idx, line in enumerate(f, start=1):
if not line.strip():
continue
try:
item = json.loads(line)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in meta.jsonl at line {idx}: {e}")
if "id" not in item or "answer_type" not in item:
raise ValueError(f"meta.jsonl line {idx} missing id/answer_type")
qid = str(item["id"])
qtype = str(item["answer_type"]).lower().strip()
if qtype not in ALLOWED_TYPES:
raise ValueError(f"Unsupported answer_type '{qtype}' for id {qid}")
meta_map[qid] = qtype
if not meta_map:
raise ValueError("meta.jsonl is empty")
return meta_map
def download_answer(qid: str) -> str:
filename = f"{ANSWER_DIR}/{qid}.csv"
return hf_hub_download(
repo_id=DATASET_REPO,
filename=filename,
repo_type="dataset",
token=TOKEN,
cache_dir=CACHE_DIR,
)
def load_results_dataset():
try:
return load_dataset(RESULTS_REPO, split="train", token=TOKEN)
except Exception:
return None
def build_leaderboard_rows() -> List[dict]:
ds = load_results_dataset()
if ds is None or len(ds) == 0:
seed_rows = load_seed_rows()
if not seed_rows:
return []
return _rows_from_source(seed_rows)
return _rows_from_source(ds)
def _rows_from_source(source) -> List[dict]:
rows: List[dict] = []
for row in source:
rows.append(
{
"model": row.get("model", "-"),
"org": row.get("org", "-"),
"framework": row.get("framework", "-"),
"date": row.get("date", "-"),
"overall": _safe_float(row.get("overall_em", row.get("overall"))),
"item_em": _safe_float(row.get("item_em")),
"set_em": _safe_float(row.get("set_em")),
"set_f1": _safe_float(row.get("set_f1")),
"list_em": _safe_float(row.get("list_em")),
"list_f1": _safe_float(row.get("list_f1")),
"list_order": _safe_float(row.get("list_order")),
"table_em": _safe_float(row.get("table_em")),
"table_row_f1": _safe_float(row.get("table_row_f1")),
"table_item_f1": _safe_float(row.get("table_item_f1")),
}
)
return rows
def load_seed_rows() -> List[dict]:
if not os.path.exists(SEED_FILE):
return _load_seed_from_root_script()
try:
with open(SEED_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
return data if isinstance(data, list) else []
except Exception:
return _load_seed_from_root_script()
def _load_seed_from_root_script() -> List[dict]:
root_script = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "script.js"))
if not os.path.exists(root_script):
return []
try:
text = _load_text(root_script)
match = re.search(r"const\\s+data\\s*=\\s*(\\[.*?\\]);", text, re.S)
if not match:
return []
arr_text = match.group(1)
arr_text = re.sub(r"(\\w+)\\s*:", r'\"\\1\":', arr_text)
arr_text = re.sub(r",\\s*([}\\]])", r"\\1", arr_text)
data = json.loads(arr_text)
return data if isinstance(data, list) else []
except Exception:
return []
def render_page() -> str:
page = _load_text(INDEX_HTML)
page = page.replace("__LEADERBOARD_DATA__", "")
page = page.replace("__SCRIPT__", "")
return page
def _format_score(val: Optional[float]) -> str:
if val is None:
return "-"
try:
return f"{float(val):.2f}"
except Exception:
return "-"
def _render_leaderboard_rows(data: List[dict]) -> str:
# Render a static table body so the leaderboard is not empty even if client JS
# runs before Gradio mounts the HTML.
if not data:
return '
| No submissions yet. |
'
# Default sort: Overall desc, then date desc (best-effort).
def _date_key(s: str) -> int:
m = re.match(r"^(\\d{4})-(\\d{2})-(\\d{2})$", str(s or ""))
if not m:
return 0
return int(m.group(1)) * 10000 + int(m.group(2)) * 100 + int(m.group(3))
sorted_rows = sorted(
data,
key=lambda r: (
_safe_float(r.get("overall")) or 0.0,
_date_key(r.get("date")),
),
reverse=True,
)
out = []
for idx, row in enumerate(sorted_rows, start=1):
model = html_lib.escape(str(row.get("model") or "-"))
org = html_lib.escape(str(row.get("org") or "-"))
framework = html_lib.escape(str(row.get("framework") or "-"))
date = html_lib.escape(str(row.get("date") or "-"))
out.append(
"\n".join(
[
"",
f" | {idx} | ",
' ',
f' {model} ',
f' {org} ',
" | ",
f" {framework} | ",
f" {date} | ",
f' {_format_score(_safe_float(row.get("overall")))} | ',
f" {_format_score(_safe_float(row.get('item_em')))} | ",
f" {_format_score(_safe_float(row.get('set_em')))} | ",
f" {_format_score(_safe_float(row.get('set_f1')))} | ",
f" {_format_score(_safe_float(row.get('list_em')))} | ",
f" {_format_score(_safe_float(row.get('list_f1')))} | ",
f" {_format_score(_safe_float(row.get('list_order')))} | ",
f" {_format_score(_safe_float(row.get('table_em')))} | ",
f" {_format_score(_safe_float(row.get('table_row_f1')))} | ",
f" {_format_score(_safe_float(row.get('table_item_f1')))} | ",
"
",
]
)
)
return "\n".join(out)
def build_js(data: List[dict]) -> str:
script = _load_text(SCRIPT_JS)
data_json = json.dumps(data, ensure_ascii=False)
return f"window.LEADERBOARD_DATA = {data_json};\n" + script
def ensure_results_repo():
if not TOKEN:
return
api.create_repo(
repo_id=RESULTS_REPO,
repo_type="dataset",
private=False,
exist_ok=True,
token=TOKEN,
)
def seed_results_if_needed():
seed_rows = load_seed_rows()
if not seed_rows:
return
ds = load_results_dataset()
if ds is not None and len(ds) > 0:
return
if not TOKEN:
return
entries = []
for row in seed_rows:
entries.append(
{
"model": row.get("model", "-"),
"org": row.get("org", "-"),
"framework": row.get("framework", "N/A"),
"date": row.get("date", "-"),
"overall_em": _safe_float(row.get("overall")),
"item_em": _safe_float(row.get("item_em")),
"set_em": _safe_float(row.get("set_em")),
"set_f1": _safe_float(row.get("set_f1")),
"list_em": _safe_float(row.get("list_em")),
"list_f1": _safe_float(row.get("list_f1")),
"list_order": _safe_float(row.get("list_order")),
"table_em": _safe_float(row.get("table_em")),
"table_row_f1": _safe_float(row.get("table_row_f1")),
"table_item_f1": _safe_float(row.get("table_item_f1")),
"url": row.get("url", ""),
"email": row.get("email", ""),
"username": row.get("username", "seed"),
}
)
try:
ensure_results_repo()
Dataset.from_list(entries).push_to_hub(RESULTS_REPO, token=TOKEN)
except Exception:
pass
def _get_metric(summary: dict, qtype: str, key: str, fallback: float = 0.0) -> float:
return float(summary.get(qtype, {}).get(key, fallback) or 0.0)
def compute_leaderboard_metrics(score_list: List[dict]) -> Dict[str, float]:
summary = evaluator.gather_results(score_list)
overall_em = _to_percent(summary.get("overall_global_em", 0.0))
item_em = _to_percent(
_get_metric(summary, "item", "overall_item_em", _get_metric(summary, "item", "overall_global_em"))
)
set_em = _to_percent(_get_metric(summary, "set", "overall_global_em"))
set_f1 = _to_percent(_get_metric(summary, "set", "overall_set_f1"))
list_em = _to_percent(_get_metric(summary, "list", "overall_global_em"))
list_f1 = _to_percent(_get_metric(summary, "list", "overall_list_content_f1"))
list_order = _to_percent(_get_metric(summary, "list", "overall_list_order_score"))
table_em = _to_percent(_get_metric(summary, "table", "overall_global_em"))
table_row_f1 = _to_percent(_get_metric(summary, "table", "overall_table_row_f1"))
table_item_f1 = _to_percent(_get_metric(summary, "table", "overall_table_item_f1"))
return {
"overall_em": overall_em,
"item_em": item_em,
"set_em": set_em,
"set_f1": set_f1,
"list_em": list_em,
"list_f1": list_f1,
"list_order": list_order,
"table_em": table_em,
"table_row_f1": table_row_f1,
"table_item_f1": table_item_f1,
}
def parse_jsonl(file_path: str) -> Dict[str, str]:
preds: Dict[str, str] = {}
with open(file_path, "r", encoding="utf-8") as f:
for idx, line in enumerate(f, start=1):
if not line.strip():
continue
try:
item = json.loads(line)
except json.JSONDecodeError as e:
raise ValueError(f"Line {idx} is not valid JSON: {e}")
if "id" not in item or "prediction" not in item:
raise ValueError(f"Line {idx} must contain 'id' and 'prediction'")
qid = str(item["id"])
if qid in preds:
raise ValueError(f"Duplicate id: {qid}")
preds[qid] = item["prediction"]
if not preds:
raise ValueError("Empty submission file")
return preds
def add_new_eval(
model: str,
org: str,
framework: str,
url: str,
email: str,
file_obj,
profile: Optional[gr.OAuthProfile] = None,
request: gr.Request = None,
):
if not TOKEN:
return format_error("Server misconfigured: HF_TOKEN is missing.")
username = _extract_username(profile, request)
if not username:
return format_warning("Please log in with HuggingFace to submit.")
if not model or not org:
return format_warning("Please provide model name and organization.")
if file_obj is None:
return format_warning("Please upload a JSONL file.")
today = datetime.date.today().isoformat()
try:
ds = load_results_dataset()
if ds is not None:
for row in ds:
if row.get("username") == username and row.get("date") == today:
return format_warning("You already submitted today. Please try again tomorrow.")
except Exception:
return format_error("Failed to load leaderboard results. Please try again later.")
try:
meta_map = load_meta_map()
except Exception as e:
return format_error(f"Failed to load meta.jsonl: {e}")
try:
preds = parse_jsonl(file_obj.name)
except Exception as e:
return format_error(str(e))
pred_ids = set(preds.keys())
meta_ids = set(meta_map.keys())
extra = sorted(pred_ids - meta_ids)
missing = sorted(meta_ids - pred_ids)
if extra:
return format_error(f"Submission has {len(extra)} unknown ids (e.g., {extra[0]}).")
if missing:
return format_error(f"Submission missing {len(missing)} ids (e.g., {missing[0]}).")
score_list: List[dict] = []
for qid, prediction in preds.items():
gt_path = download_answer(qid)
qtype = meta_map[qid]
metrics = evaluator.evaluate_one(
prediction=str(prediction),
gt_path=gt_path,
question_type=qtype,
qid=qid,
)
score_list.append(metrics)
metrics = compute_leaderboard_metrics(score_list)
entry = {
"model": model,
"org": org,
"framework": framework or "N/A",
"url": url or "",
"email": email or "",
"username": username,
"date": today,
**metrics,
}
try:
ensure_results_repo()
if ds is None:
Dataset.from_list([entry]).push_to_hub(RESULTS_REPO, token=TOKEN)
else:
ds = ds.add_item(entry)
ds.push_to_hub(RESULTS_REPO, token=TOKEN)
except Exception:
return format_error("Failed to save results. Please contact the maintainers.")
return format_log("Submission received! Please refresh the leaderboard to see your score.")
seed_results_if_needed()
leaderboard_data = build_leaderboard_rows()
css = _load_text(STYLES_CSS)
page_html = render_page()
rows_html = _render_leaderboard_rows(leaderboard_data)
page_html = page_html.replace(
'',
f'{rows_html}',
)
js = build_js(leaderboard_data)
with gr.Blocks() as demo:
gr.HTML(page_html)
with gr.Accordion("Submit your results", open=True):
gr.Markdown(SUBMISSION_TEXT)
with gr.Row():
with gr.Column():
model_text = gr.Textbox(label="Model / System")
org_text = gr.Textbox(label="Organization")
framework_text = gr.Textbox(label="Framework", value="ReAct")
url_text = gr.Textbox(label="Model URL", placeholder="Optional")
with gr.Column():
email_text = gr.Textbox(label="Contact email (public)")
file_input = gr.File(label="Upload JSONL")
with gr.Row():
login_btn = gr.LoginButton()
submit_btn = gr.Button("Submit")
result_md = gr.Markdown()
submit_btn.click(
add_new_eval,
inputs=[
model_text,
org_text,
framework_text,
url_text,
email_text,
file_input,
],
outputs=result_md,
)
def _launch():
demo.queue()
demo.launch(css=css, js=js, ssr_mode=False)
if __name__ == "__main__":
_launch()