import difflib import os import re from collections import Counter from io import StringIO from typing import Optional, Tuple, Union import numpy as np import pandas as pd class SimpleEvaluator: def _normalize_val(self, val: Union[str, int, float]) -> str: val_str = str(val).strip() if not val_str or val_str.lower() in ["nan", "none", "null"]: return "" clean_num = val_str.replace(",", "").replace("$", "") is_percent = False if clean_num.endswith("%"): is_percent = True clean_num = clean_num[:-1] try: f_val = float(clean_num) if is_percent: f_val /= 100.0 if f_val.is_integer(): return str(int(f_val)) formatted = "{:.6f}".format(f_val).rstrip("0").rstrip(".") return formatted if formatted else "0" except ValueError: pass normed = val_str.lower().replace(" ", "").replace("*", "").replace("\n", "") return normed def _extract_model_output(self, model_output: str) -> Optional[pd.DataFrame]: pattern = r"```(?:tsv)?\s*(.*?)```" match = re.search(pattern, model_output, re.DOTALL) raw_content = match.group(1) if match else model_output try: raw_content = "\n".join([line for line in raw_content.split("\n") if line.strip()]) if not raw_content: return None output = pd.read_csv(StringIO(raw_content), sep="\t") output.columns = [str(col).strip().lower().replace(" ", "") for col in output.columns] output = output.map(self._normalize_val) except Exception: output = None return output def load_ground_truth(self, file_path: str, question_type: str = "table") -> pd.DataFrame: if not os.path.exists(file_path): raise FileNotFoundError(f"GT file not found: {file_path}") header = "infer" if question_type == "table" else None try: df = pd.read_csv(file_path, header=header) except Exception as e: if "codec" in str(e): df = pd.read_csv(file_path, header=header, encoding="gbk") else: raise e df.columns = [str(col).strip().lower().replace(" ", "") for col in df.columns] df = df.map(self._normalize_val) return df def _calculate_f1(self, tp: int, n_pred: int, n_gt: int) -> Tuple[float, float, float]: precision = tp / n_pred if n_pred > 0 else 0.0 recall = tp / n_gt if n_gt > 0 else 0.0 if (precision + recall) == 0: f1 = 0.0 else: f1 = 2 * (precision * recall) / (precision + recall) return precision, recall, f1 def flatten_table(self, df: pd.DataFrame): items = [] for col in df.columns: values = df[col] for val in values: items.append((col, val)) return items def evaluate_item(self, pred_df: pd.DataFrame, gt_df: pd.DataFrame) -> dict: if pred_df is None or pred_df.empty: return {"item_em": 0} pred_item = "".join(pred_df.iloc[0, :].tolist()) gt_item = "".join(gt_df.iloc[0, :].tolist()) is_match = 1 if pred_item == gt_item else 0 return {"item_em": is_match} def evaluate_set(self, pred_df: pd.DataFrame, gt_df: pd.DataFrame) -> dict: if pred_df is None or pred_df.empty: return {"set_precision": 0.0, "set_recall": 0.0, "set_f1": 0.0} pred_set = set(pred_df.iloc[:, -1].tolist()) gt_set = set(gt_df.iloc[:, -1].tolist()) tp = len(pred_set.intersection(gt_set)) p, r, f1 = self._calculate_f1(tp, len(pred_set), len(gt_set)) return {"set_precision": p, "set_recall": r, "set_f1": f1} def evaluate_list(self, pred_df: pd.DataFrame, gt_df: pd.DataFrame) -> dict: if pred_df is None or pred_df.empty: return {"list_content_f1": 0.0, "list_order_score": 0.0} pred_list = pred_df.iloc[:, -1].tolist() gt_list = gt_df.iloc[:, -1].tolist() gt_counter = Counter(gt_list) pred_counter = Counter(pred_list) intersection = gt_counter & pred_counter num_common = sum(intersection.values()) len_gt = len(gt_list) len_pred = len(pred_list) precision = num_common / len_pred if len_pred else 0.0 recall = num_common / len_gt if len_gt else 0.0 if (precision + recall) == 0: content_f1 = 0.0 else: content_f1 = 2 * (precision * recall) / (precision + recall) matcher = difflib.SequenceMatcher(None, gt_list, pred_list) order_score = matcher.ratio() return { "list_content_f1": round(content_f1, 4), "list_order_score": round(order_score, 4), } def evaluate_table(self, pred_df: pd.DataFrame, gt_df: pd.DataFrame) -> dict: default_res = { "table_row_f1": 0.0, "table_row_precision": 0.0, "table_row_recall": 0.0, "table_item_f1": 0.0, "table_item_precision": 0.0, "table_item_recall": 0.0, } if pred_df is None or pred_df.empty: return default_res.copy() common_cols = [c for c in gt_df.columns if c in pred_df.columns] if not common_cols: row_p, row_r, row_f1 = 0.0, 0.0, 0.0 else: pred_rows = set( tuple(row) for row in pred_df[common_cols].fillna("__NAN__").astype(str).to_numpy() ) gt_rows = set( tuple(row) for row in gt_df[common_cols].fillna("__NAN__").astype(str).to_numpy() ) tp_rows = len(pred_rows.intersection(gt_rows)) row_p, row_r, row_f1 = self._calculate_f1(tp_rows, len(pred_rows), len(gt_rows)) pred_items = self.flatten_table(pred_df) gt_items = self.flatten_table(gt_df) pred_counter = Counter(pred_items) gt_counter = Counter(gt_items) intersection = pred_counter & gt_counter tp_items = sum(intersection.values()) n_pred_items = sum(pred_counter.values()) n_gt_items = sum(gt_counter.values()) item_p, item_r, item_f1 = self._calculate_f1(tp_items, n_pred_items, n_gt_items) return { "table_row_f1": row_f1, "table_row_precision": row_p, "table_row_recall": row_r, "table_item_f1": item_f1, "table_item_precision": item_p, "table_item_recall": item_r, } def evaluate_one(self, prediction: str, gt_path: str, question_type: str, qid=None) -> dict: if prediction.endswith(".csv"): pred_df = self.load_ground_truth(prediction, question_type=question_type.lower()) else: pred_df = self._extract_model_output(prediction) if pred_df is None: print(f"qid:{qid} prediction is empty") gt_df = self.load_ground_truth(gt_path, question_type=question_type.lower()) q_type = question_type.lower() if q_type == "item": metrics = self.evaluate_item(pred_df, gt_df) elif q_type == "set": metrics = self.evaluate_set(pred_df, gt_df) elif q_type == "list": metrics = self.evaluate_list(pred_df, gt_df) elif q_type == "table": metrics = self.evaluate_table(pred_df, gt_df) else: metrics = self.evaluate_item(pred_df, gt_df) if pred_df is not None: if q_type != "set": metrics["global_em"] = int(np.array_equal(pred_df.to_numpy(), gt_df.to_numpy())) else: pred_set = set(pred_df.iloc[:, 0].tolist()) gt_set = set(gt_df.iloc[:, 0].tolist()) metrics["global_em"] = int(pred_set == gt_set) else: metrics["global_em"] = 0 metrics["question_type"] = question_type return metrics def gather_results(self, score_list): df = pd.DataFrame(score_list) overall_em = df["global_em"].mean() type_report = df.groupby("question_type").mean().round(4) detail_score_dict = type_report.to_dict(orient="index") count_by_type = df["question_type"].value_counts().to_dict() summary = {"overall_global_em": overall_em} for type_name in count_by_type: type_result = { "num_samples": count_by_type[type_name], **{ f"overall_{k}": round(v, 4) for k, v in detail_score_dict[type_name].items() if not pd.isna(v) }, } summary[type_name] = type_result return summary