#!/usr/bin/env python3 """ Generic particle analyzer for ROOT files Usage: python analyze_particles.py [filepath] Examples: python analyze_particles.py lep python analyze_particles.py photon python analyze_particles.py tau python analyze_particles.py jet """ import sys import os import uproot import numpy as np import argparse def get_available_prefixes(filepath): """Get all available particle prefixes in the ROOT file""" with uproot.open(filepath) as file: tree = file['mini;1'] branches = list(tree.keys()) prefixes = set() for branch in branches: if '_' in branch: prefix = branch.split('_')[0] prefixes.add(prefix) return sorted(list(prefixes)) def analyze_particles(filepath, prefix, max_events=None): """Analyze particle properties in detail""" print(f"Analyzing {prefix} properties in: {filepath}") print("=" * 60) with uproot.open(filepath) as file: tree = file['mini;1'] branches = list(tree.keys()) # Find all branches with the given prefix prefix_branches = [b for b in branches if b.startswith(prefix + '_')] if not prefix_branches: print(f"No branches found with prefix '{prefix}_'") print(f"Available prefixes: {get_available_prefixes(filepath)}") return print(f"Found {len(prefix_branches)} branches with prefix '{prefix}_':") for branch in sorted(prefix_branches): print(f" - {branch}") print() # Load data data = {} for branch in prefix_branches: try: data[branch] = tree[branch].array() if max_events: data[branch] = data[branch][:max_events] except Exception as e: print(f"Warning: Could not load {branch}: {e}") continue if not data: print("No data could be loaded!") return n_events = len(data[list(data.keys())[0]]) print(f"Total events analyzed: {n_events}") print() # Analyze multiplicity if available multiplicity_branch = f"{prefix}_n" has_variable_multiplicity = multiplicity_branch in data # For photons, check if ID variables are stored as fixed arrays has_fixed_multiplicity = False if prefix == 'photon': # Check if identification variables exist and are 2D id_branches = [b for b in prefix_branches if b in [f'{prefix}_isTightID', f'{prefix}_truthMatched', f'{prefix}_trigMatched']] if id_branches: sample_id = data[id_branches[0]] # Check if it's a 2D array (events × 2 photons) using awkward array properties try: # For awkward arrays, check if we can access [:,1] (second photon) test_access = sample_id[:,1] has_fixed_multiplicity = True except: has_fixed_multiplicity = False if has_variable_multiplicity: analyze_multiplicity(data[multiplicity_branch], prefix) # Analyze kinematic variables kinematic_vars = ['pt', 'eta', 'phi', 'E', 'm'] for var in kinematic_vars: branch_name = f"{prefix}_{var}" if branch_name in data: analyze_kinematic(data[branch_name], var.upper(), prefix, has_variable_multiplicity, has_fixed_multiplicity) # Analyze identification variables id_vars = ['type', 'charge', 'isTightID', 'truthMatched', 'trigMatched'] for var in id_vars: branch_name = f"{prefix}_{var}" if branch_name in data: analyze_identification(data[branch_name], var, prefix, has_variable_multiplicity, has_fixed_multiplicity, prefix) def analyze_multiplicity(mult_data, prefix): """Analyze particle multiplicity""" print(f"{prefix.upper()} multiplicity distribution:") unique, counts = np.unique(mult_data, return_counts=True) for n, count in zip(unique, counts): percentage = count / len(mult_data) * 100 print(" {} {}(s): {:6d} events ({:.1f}%)".format(n, prefix, count, percentage)) print() def analyze_kinematic(var_data, var_name, prefix, has_variable_multiplicity=False, has_fixed_multiplicity=False): """Analyze kinematic variables""" print(f"{prefix.upper()} {var_name} analysis:") if has_variable_multiplicity: # Handle variable number of particles per event all_values = [] leading_values = [] subleading_values = [] for event_values in var_data: if len(event_values) > 0: # Sort by pT if this is pT, otherwise just take as is if var_name == 'PT': sorted_values = sorted(event_values, reverse=True) else: sorted_values = event_values all_values.extend(sorted_values) # Store leading and subleading if len(sorted_values) >= 1: leading_values.append(sorted_values[0]) if len(sorted_values) >= 2: subleading_values.append(sorted_values[1]) values = np.array(all_values) leading = np.array(leading_values) if leading_values else None subleading = np.array(subleading_values) if subleading_values else None print(f" Total number of {prefix}(s): {len(values)}") print(f" Events with ≥1 {prefix}: {len(leading) if leading is not None else 0}") print(f" Events with ≥2 {prefix}(s): {len(subleading) if subleading is not None else 0}") elif has_fixed_multiplicity: # Handle fixed number of particles per event (like exactly 2 photons) values = np.array(var_data) if var_name == 'PT': # For fixed multiplicity, assume first column is leading, second is subleading leading = values[:, 0] if values.shape[1] > 0 else None subleading = values[:, 1] if values.shape[1] > 1 else None else: leading = values[:, 0] if values.shape[1] > 0 else None subleading = values[:, 1] if values.shape[1] > 1 else None print(f" Total number of {prefix}(s): {len(values)}") else: # Handle single values per event values = np.array(var_data) leading = None subleading = None print(f" Total number of {prefix}(s): {len(values)}") if len(values) == 0: print(" No data available") return # Convert to GeV if it's energy/momentum if var_name in ['PT', 'E', 'M']: values_gev = values / 1000 leading_gev = leading / 1000 if leading is not None else None subleading_gev = subleading / 1000 if subleading is not None else None unit = "GeV" else: values_gev = values leading_gev = leading subleading_gev = subleading unit = "" print(f" {var_name} statistics ({unit}) - All {prefix}(s):") print(" Mean: {:.3f}".format(np.mean(values_gev))) print(" Median: {:.3f}".format(np.median(values_gev))) print(" Min: {:.3f}".format(np.min(values_gev))) print(" Max: {:.3f}".format(np.max(values_gev))) print(" Std: {:.3f}".format(np.std(values_gev))) # Show leading particle stats if leading_gev is not None and len(leading_gev) > 0: print(f" {var_name} statistics ({unit}) - Leading {prefix}:") print(" Mean: {:.3f}".format(np.mean(leading_gev))) print(" Median: {:.3f}".format(np.median(leading_gev))) print(" Min: {:.3f}".format(np.min(leading_gev))) print(" Max: {:.3f}".format(np.max(leading_gev))) print(" Std: {:.3f}".format(np.std(leading_gev))) # Show subleading particle stats if subleading_gev is not None and len(subleading_gev) > 0: print(f" {var_name} statistics ({unit}) - Subleading {prefix}:") print(" Mean: {:.3f}".format(np.mean(subleading_gev))) print(" Median: {:.3f}".format(np.median(subleading_gev))) print(" Min: {:.3f}".format(np.min(subleading_gev))) print(" Max: {:.3f}".format(np.max(subleading_gev))) print(" Std: {:.3f}".format(np.std(subleading_gev))) # Show ratio between leading and subleading if both exist if leading_gev is not None and len(leading_gev) == len(subleading_gev): ratio = subleading_gev / leading_gev print(f" {var_name} ratio (Subleading/Leading):") print(" Mean: {:.3f}".format(np.mean(ratio))) print(" Median: {:.3f}".format(np.median(ratio))) print(" Min: {:.3f}".format(np.min(ratio))) print(" Max: {:.3f}".format(np.max(ratio))) print() def analyze_identification(var_data, var_name, prefix, has_variable_multiplicity=False, has_fixed_multiplicity=False, particle_prefix=None): """Analyze identification variables""" print(f"{prefix.upper()} {var_name} analysis:") # For photons, prioritize fixed multiplicity logic even if variable multiplicity exists use_fixed_multiplicity = has_fixed_multiplicity and (particle_prefix == 'photon' or not has_variable_multiplicity) if use_fixed_multiplicity: # Handle fixed number of particles per event (like exactly 2 photons) values = np.array(var_data) # For fixed multiplicity, analyze leading and subleading separately if values.shape[1] >= 2: leading_values = values[:, 0] subleading_values = values[:, 1] print(f" Overall {var_name} distribution:") analyze_id_distribution(values.flatten(), var_name, prefix) print(f" Leading {prefix} {var_name} distribution:") analyze_id_distribution(leading_values, var_name, prefix) print(f" Subleading {prefix} {var_name} distribution:") analyze_id_distribution(subleading_values, var_name, prefix) # Show correlation between leading and subleading if var_name in ['isTightID', 'truthMatched', 'trigMatched']: analyze_correlation(leading_values, subleading_values, var_name, prefix) print() return elif has_variable_multiplicity: # Handle variable number of particles per event all_values = [] for event_values in var_data: if len(event_values) > 0: all_values.extend(event_values) values = np.array(all_values) else: # Handle single values per event values = np.array(var_data) # For variable multiplicity or single values analyze_id_distribution(values, var_name, prefix) print() def analyze_id_distribution(values, var_name, prefix): """Analyze a single identification variable distribution""" if var_name == 'type': analyze_particle_types(values, prefix) elif var_name in ['isTightID', 'truthMatched', 'trigMatched']: analyze_boolean_flags(values, var_name, prefix) elif var_name == 'charge': analyze_charges(values, prefix) else: # Generic analysis unique, counts = np.unique(values, return_counts=True) total = len(values) print(f" Distribution:") for val, count in zip(unique[:10], counts[:10]): # Show first 10 percentage = count / total * 100 print(" {}: {:6d} ({:.1f}%)".format(val, count, percentage)) if len(unique) > 10: print(f" ... and {len(unique) - 10} more values") def analyze_correlation(leading, subleading, var_name, prefix): """Analyze correlation between leading and subleading particle properties""" print(f" {prefix.upper()} {var_name} correlation (Leading × Subleading):") # Create contingency table both_true = np.sum((leading == True) & (subleading == True)) leading_true_sub_false = np.sum((leading == True) & (subleading == False)) leading_false_sub_true = np.sum((leading == False) & (subleading == True)) both_false = np.sum((leading == False) & (subleading == False)) total = len(leading) print(" Both True: {:6d} ({:.1f}%)".format(both_true, both_true/total*100)) print(" Leading True, Subleading False: {:6d} ({:.1f}%)".format( leading_true_sub_false, leading_true_sub_false/total*100)) print(" Leading False, Subleading True: {:6d} ({:.1f}%)".format( leading_false_sub_true, leading_false_sub_true/total*100)) print(" Both False: {:6d} ({:.1f}%)".format(both_false, both_false/total*100)) def analyze_particle_types(types, prefix): """Analyze particle types""" type_dict = {11: 'electron', 13: 'muon', 15: 'tau', 22: 'photon'} print(f" {prefix.upper()} type distribution:") unique_types, counts = np.unique(types, return_counts=True) for ptype, count in zip(unique_types, counts): type_name = type_dict.get(ptype, f'unknown({ptype})') percentage = count / len(types) * 100 print(" {}: {:6d} ({:.1f}%)".format(type_name, count, percentage)) print() def analyze_boolean_flags(flags, flag_name, prefix): """Analyze boolean flags""" true_count = np.sum(flags) false_count = len(flags) - true_count true_pct = true_count / len(flags) * 100 false_pct = false_count / len(flags) * 100 print(f" {prefix.upper()} {flag_name} distribution:") print(" True: {:6d} ({:.1f}%)".format(true_count, true_pct)) print(" False: {:6d} ({:.1f}%)".format(false_count, false_pct)) print() def analyze_charges(charges, prefix): """Analyze particle charges""" unique_charges, counts = np.unique(charges, return_counts=True) print(f" {prefix.upper()} charge distribution:") for charge, count in zip(unique_charges, counts): percentage = count / len(charges) * 100 print(" {}: {:6d} ({:.1f}%)".format(charge, count, percentage)) print() def main(): parser = argparse.ArgumentParser(description='Generic particle analyzer for ROOT files') parser.add_argument('--list-prefixes', action='store_true', help='List all available prefixes in the file') parser.add_argument('prefix', nargs='?', help='Particle prefix (e.g., lep, photon, tau, jet)') parser.add_argument('filepath', nargs='?', default="/global/cfs/projectdirs/atlas/eligd/llm_for_analysis_copy/data/mc_341081.ttH125_gamgam.GamGam.root", help='Path to ROOT file') parser.add_argument('--max-events', type=int, help='Limit analysis to first N events') args = parser.parse_args() if args.list_prefixes: if not os.path.exists(args.filepath): print(f"Error: File '{args.filepath}' does not exist!") return print("Available prefixes in the file:") prefixes = get_available_prefixes(args.filepath) for prefix in prefixes: print(f" - {prefix}") return if not args.prefix: print("Error: Please specify a particle prefix (e.g., lep, photon, tau, jet)") print("Use --list-prefixes to see available options") return if not os.path.exists(args.filepath): print(f"Error: File '{args.filepath}' does not exist!") return analyze_particles(args.filepath, args.prefix, args.max_events) if __name__ == "__main__": main()