"""Parse all results and upload it into a single leaderboard""" import argparse import json import logging import os import sys from typing import Any import pandas as pd from datasets import Dataset, DownloadMode, load_dataset from huggingface_hub import list_datasets logging.basicConfig( format="%(asctime)s - %(levelname)s - %(name)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", handlers=[logging.StreamHandler(sys.stdout)], level=logging.INFO, ) def get_args(): # fmt: off parser = argparse.ArgumentParser(description="Parse all results from datasets of a given HF org, and upload it into a new dataset.") parser.add_argument("--hf_org", type=str, default="UD-Filipino", help="HuggingFace org to parse results from.") parser.add_argument("--hf_repo_output", type=str, default="UD-Filipino/filbench-results", help="HuggingFace dataset to upload all parsed results.") # fmt: on return parser.parse_args() def main(): args = get_args() if not os.getenv("HF_TOKEN"): raise ValueError("HF_TOKEN environment variable not set!") # List datasets with 'details' in their name within a given org datasets = [ds.id for ds in list_datasets(search="details", author=args.hf_org)] logging.info(f"Found {len(datasets)} datasets") parsed_results = pd.DataFrame([parse_outputs(dataset) for dataset in datasets]) logging.info(f"Uploading to {args.hf_repo_output}") Dataset.from_pandas(parsed_results).push_to_hub( repo_id=args.hf_repo_output, private=True, split="train" ) def parse_outputs(dataset_id: str) -> dict[str, Any]: """Parse a dataset ID and output a dataframe containing the relevant fields Based from: https://huggingface.co/docs/lighteval/en/saving-and-reading-results """ logging.info(f"Parsing results from dataset {dataset_id}") ds = load_dataset( dataset_id, "results", trust_remote_code=True, download_mode=DownloadMode.FORCE_REDOWNLOAD, ) # Save all metrics and versions for each task metrics = {} versions = {} for run in ds.keys(): df = ds[run].to_pandas() for task, result in json.loads(df.results.iloc[0]).items(): if task != "all": _, benchmark, n_shots = task.split("|") if int(n_shots) == 0: metrics[benchmark] = result versions.update(json.loads(df.versions.iloc[0])) logging.info(f"Found {len(metrics)} tasks!") latest_config = json.loads(ds["latest"].to_pandas().config_general.iloc[0]) model_config = { "model_name": latest_config.get("model_name"), "model_dtype": latest_config.get("model_dtype"), "model_size": latest_config.get("model_size"), } return { "config": model_config, "results": metrics, "versions": versions, } if __name__ == "__main__": main()