import logging import os import re import sys import gradio as gr import pandas as pd from apscheduler.schedulers.background import BackgroundScheduler from datasets import load_dataset from datasets.data_files import EmptyDatasetError from gradio_leaderboard import ColumnFilter, Leaderboard, SelectColumns from huggingface_hub import HfApi from src import about from src.display.css_html_js import custom_css from src.plots import plot_cost_efficiency, plot_parameter_efficiency from src.schema import AutoEvalColumn, EvalResult, fields logging.basicConfig( format="%(asctime)s - %(levelname)s - %(name)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", handlers=[logging.StreamHandler(sys.stdout)], level=logging.INFO, ) # 1. Initialization _hf_token = os.environ.get("HF_TOKEN") if not _hf_token: raise ValueError("HF_TOKEN not set!") api = HfApi(token=_hf_token) REPO_ID = "filbench/filbench-leaderboard" REPO_RESULTS = "filbench/filbench-results" SUBMISSION_RESULTS = "filbench/filbench-results-submission" def restart_space(): api.restart_space(repo_id=REPO_ID) # 2. Load and populate leaderboard data def get_results( source: str, aggregate: bool = False, submissions: str = None ) -> tuple[pd.DataFrame, list]: """Load results from a given source and return a DataFrame with the relevant columns. If `aggregate` is True, it returns the aggregated results. source (str): The source dataset to load results from. aggregate (bool): Whether to return aggregated results or not. submissions (str, optional): The submissions dataset to load results from. RETURNS (tuple[pd.DataFrame, list]): A tuple containing the DataFrame with results and a list of master columns. """ results = load_dataset(source, split="train").to_pandas().to_dict(orient="records") raw_data = [EvalResult.init_from_dict(result) for result in results] if submissions: try: submission_results = ( load_dataset( submissions, split="train", download_mode="force_redownload" ) .to_pandas() .to_dict(orient="records") ) except EmptyDatasetError: logging.info("Empty dataset for submissions, skipping...") submission_results = [] if len(submission_results) == 0: logging.info("No external submissions found!") else: logging.info(f"Found {len(submission_results)} submission/s!") raw_data += [ EvalResult.init_from_dict(result, is_submission=True) for result in submission_results ] all_data_json = [v.to_dict() for v in raw_data] df = pd.DataFrame.from_records(all_data_json) df = df.sort_values(by=[AutoEvalColumn.average.name], ascending=False) df["Incomplete"] = ~df.isna().any(axis=1) master_columns = [] for col in fields(AutoEvalColumn): if col.meta: master_columns.append(col.name) if aggregate: if col.aggregate: master_columns.append(col.name) else: if not col.aggregate: master_columns.append(col.name) cols = [ c.name for c in fields(AutoEvalColumn) if not c.hidden and c.name in master_columns ] cols.append("Incomplete") df = df[cols].round(decimals=2) return df, master_columns def init_leaderboard( source: str, aggregate: bool = False, submissions: str = None ) -> Leaderboard: df, master_columns = get_results( source=source, aggregate=aggregate, submissions=submissions ) return Leaderboard( value=df, datatype=[c.type for c in fields(AutoEvalColumn) if c.name in master_columns], select_columns=SelectColumns( default_selection=[ c.name for c in fields(AutoEvalColumn) if c.displayed_by_default and c.name in master_columns ], cant_deselect=[ c.name for c in fields(AutoEvalColumn) if c.never_hidden and c.name in master_columns ], label="Select Columns to Display:", ), filter_columns=[ # fmt: off ColumnFilter("Incomplete", type="boolean", label="Hide incomplete evaluations", default=True), ColumnFilter("Submission", type="boolean", label="Show only submitted results", default=False), # ColumnFilter(AutoEvalColumn.precision.name, type="checkboxgroup", label="Precision"), ColumnFilter(AutoEvalColumn.model_type.name, type="checkboxgroup", label="Model type"), ColumnFilter(AutoEvalColumn.multilingual.name, type="checkboxgroup", label="Multilinguality"), ColumnFilter(AutoEvalColumn.param_size.name, type="slider", min=0.01, max=150, label="Select the number of parameters (B)", default=[-1, 83]), # fmt: on ], search_columns=[AutoEvalColumn.model.name], hide_columns=[ c.name for c in fields(AutoEvalColumn) if c.hidden and c.name in master_columns ], interactive=False, ) def get_clean_df() -> pd.DataFrame: df, _ = get_results( source=REPO_RESULTS, aggregate=False, submissions=SUBMISSION_RESULTS ) df_agg, _ = get_results( source=REPO_RESULTS, aggregate=True, submissions=SUBMISSION_RESULTS ) # Cleanup def extract_names(html_string): match = re.search(r"]*>(.*?)", html_string) if match: extracted_text = match.group(1) # "some value" return extracted_text def remove_emojis(string): emoji_pattern = re.compile( "[" "\U0001f600-\U0001f64f" # emoticons "\U0001f300-\U0001f5ff" # symbols & pictographs "\U0001f680-\U0001f6ff" # transport & map symbols "\U0001f700-\U0001f77f" # alchemical symbols "\U0001f780-\U0001f7ff" # Geometric Shapes Extended "\U0001f800-\U0001f8ff" # Supplemental Arrows-C "\U0001f900-\U0001f9ff" # Supplemental Symbols and Pictographs "\U0001fa00-\U0001fa6f" # Chess Symbols "\U0001fa70-\U0001faff" # Symbols and Pictographs Extended-A "\U00002702-\U000027b0" # Dingbats "\U000024c2-\U0001f251" "]+", flags=re.UNICODE, ) return emoji_pattern.sub(r"", string).strip() df["Model"] = df["Model"].apply(extract_names) df = df.rename(columns={col: remove_emojis(col).strip() for col in df.columns}) df["Multilingual"] = df["Multilingual"].apply(remove_emojis) df["Model Type"] = df["Model Type"].apply(remove_emojis) df = df.reset_index(drop=True) # Cleanup the aggregated dataset df_agg["Model"] = df_agg["Model"].apply(extract_names) df_agg = df_agg.rename( columns={col: remove_emojis(col).strip() for col in df_agg.columns} ) df_agg = df_agg.reset_index(drop=True) df_agg = df_agg[ [ "Model", "Cultural Knowledge", "Classical NLP", "Reading Comprehension", "Generation", ] ] df_agg = df_agg.rename( columns={col: f"agg_{col}" for col in df_agg.columns if col != "Model"} ) df_merge = df.merge(df_agg, on="Model") return df_merge def download_results(): df = get_clean_df() filepath = "filbench_results.csv" df.to_csv(filepath, index=False) return filepath # 3. Actual setup of the HF Space demo = gr.Blocks(css=custom_css) with demo: with gr.Column(scale=6): num_models = len( get_results(REPO_RESULTS, aggregate=True, submissions=SUBMISSION_RESULTS)[0] ) gr.Markdown(about.TOP_TEXT.format(str(num_models))) with gr.Tabs(elem_classes="tab-buttons") as tabs: with gr.TabItem( "🏅 FilBench Leaderboard", elem_id="llm-benchmark-tab-table", id=0 ): leaderboard = init_leaderboard( REPO_RESULTS, aggregate=True, submissions=SUBMISSION_RESULTS ) with gr.TabItem( "🔍 FilBench - Detailed", elem_id="llm-benchmark-tab-table", id=1 ): leaderboard = init_leaderboard( REPO_RESULTS, aggregate=False, submissions=SUBMISSION_RESULTS ) with gr.TabItem("📊 Analysis", id=2): df = get_clean_df() with gr.Row(): with gr.Column(): gr.Markdown("## Parameter-Efficiency Plot") plot_parameter_efficiency(df) gr.Markdown( "Model performance on FilBench with respect to their parameter size. " "For mixture-of-experts models, we plot their full parameter count. " "In general, we find that model size and performance are positively correlated." ) with gr.Column(): gr.Markdown("## Cost-Efficiency Plot") plot_cost_efficiency(df) gr.Markdown( "Model performance on FilBench with respect to their per-token output cost ($/1M tokens). " "We use the token-pricing as published in [OpenRouter](https://openrouter.ai/models). " "For models not in OpenRouter, we either exlude them from the chart or use the cost of the base model it was finetuned from." ) with gr.TabItem("📝 About", elem_id="llm-benchmark-tab-table", id=3): gr.Markdown(about.LLM_BENCHMARKS_TEXT, elem_classes="markdown-text") with gr.Row(): download_button = gr.DownloadButton("Download results (CSV)") download_button.click(download_results, outputs=download_button) with gr.Accordion("📙 Citation", open=False): citation_button = gr.Textbox( value=about.CITATION_BUTTON_TEXT, label=about.CITATION_BUTTON_LABEL, lines=20, elem_id="citation-button", show_copy_button=True, ) scheduler = BackgroundScheduler() scheduler.add_job(restart_space, "interval", seconds=3600) scheduler.start() demo.queue(default_concurrency_limit=40).launch()