#!/bin/bash # # test_stats_parallel.sh - Run ATLAS analysis tasks in parallel using GNU parallel # # This script runs 5 independent Snakemake workflows in parallel: # 1. summarize_root: Convert ROOT files to summary format (2 sequential steps: summarize_root -> insert_root_summary) # 2. create_numpy: Create NumPy arrays from ROOT data # 3. preprocess: Preprocess data for ML # 4. scores: Generate signal/background scores # 5. categorization: Perform event categorization # # Usage: # ./test_stats_parallel.sh # # Requirements: # - GNU parallel must be installed # - config.yml must exist with model configuration # - All workflow/*.smk files must be present # - Python environment with required packages # # The script will: # - Copy and modify workflow files # - Run all 5 tasks concurrently (no job limits) # - Log each task's output separately # - Measure individual execution time for each task # - Collect statistics and check results # - Clean up temporary files # # Output: # - Individual task logs in $OUT_DIR/*.log # - Individual task times in $OUT_DIR/*.time # - Combined statistics in stats.csv # - Results validation via check_soln.py # NAME=$(grep '^name:' config.yml | awk '{print$2}' | tr -d "'") MODEL=$(grep '^model:' config.yml | awk '{print$2}' | tr -d "'") OUT_DIR=$(grep '^out_dir:' config.yml | awk '{print$2}' | tr -d "'") cp -r prompts prompts_temp sed -i "s#{BASE_DIR}#$OUT_DIR#g" prompts_temp/*.txt # Copy all 5 smk files to temp versions cp workflow/summarize_root.smk summarize_root_temp.smk cp workflow/create_numpy.smk create_numpy_temp.smk cp workflow/preprocess.smk preprocess_temp.smk cp workflow/scores.smk scores_temp.smk cp workflow/categorization.smk categorization_temp.smk sed -i "s#{BASE_DIR}#$OUT_DIR#g" *_temp.smk mkdir -p $OUT_DIR/generated_code cp utils.py $OUT_DIR/generated_code/utils.py rm -f $OUT_DIR/logs/success.npy rm -f $OUT_DIR/logs/calls.npy rm -f $OUT_DIR/logs/input_tokens.npy rm -f $OUT_DIR/logs/output_tokens.npy echo "Starting all tasks in parallel using GNU parallel..." echo "Running 5 independent tasks concurrently (no job limits)" echo "Tasks: summarize_root (2 steps), create_numpy, preprocess, scores, categorization" echo "" # Start timing for all tasks START_TIME=$SECONDS # Define the tasks as a function for GNU parallel run_task() { local task_name=$1 local smk_file=$2 local target1=$3 local target2=$4 # Optional second target local log_file="$OUT_DIR/${task_name}.log" local time_file="$OUT_DIR/${task_name}.time" echo "Starting $task_name..." local task_start=$SECONDS # Run first target if ! snakemake -s "$smk_file" -j 1 --forcerun "$target1" --rerun-incomplete --configfile config.yml --latency-wait 120 --verbose > "$log_file" 2>&1; then local task_time=$((SECONDS - task_start)) echo "$task_time" > "$time_file" echo "ERROR: $task_name failed on $target1 after $task_time seconds" return 1 fi # Run second target if provided (for summarize_root workflow) if [ -n "$target2" ]; then echo "Running $task_name second stage: $target2..." if ! snakemake -s "$smk_file" -j 1 --forcerun "$target2" --rerun-incomplete --configfile config.yml --latency-wait 120 --verbose >> "$log_file" 2>&1; then local task_time=$((SECONDS - task_start)) echo "$task_time" > "$time_file" echo "ERROR: $task_name failed on $target2 after $task_time seconds" return 1 fi fi local task_time=$((SECONDS - task_start)) echo "$task_time" > "$time_file" echo "$task_name completed successfully in $task_time seconds" return 0 } export -f run_task export OUT_DIR export CONFIG_FILE=config.yml # Export necessary environment variables export PYTHONPATH="$OUT_DIR:$PYTHONPATH" # Run all 5 tasks in parallel using GNU parallel # No job limit - let GNU parallel manage concurrency based on system resources # This allows maximum parallelism for independent tasks parallel --no-notice --halt soon,fail=1 --line-buffer ::: \ "run_task summarize_root summarize_root_temp.smk summarize_root insert_root_summary" \ "run_task create_numpy create_numpy_temp.smk create_numpy" \ "run_task preprocess preprocess_temp.smk preprocess" \ "run_task scores scores_temp.smk scores" \ "run_task categorization categorization_temp.smk categorization" # Check exit status if [ $? -eq 0 ]; then echo "All tasks completed successfully" else echo "ERROR: One or more tasks failed!" # Show logs of failed tasks for log in "$OUT_DIR"/*.log; do if [ -f "$log" ] && grep -q "ERROR\|failed" "$log"; then echo "=== Failed task log: $(basename "$log") ===" tail -20 "$log" fi done exit 1 fi # Calculate total time TOTAL_TIME=$((SECONDS-START_TIME)) echo "Total time: $TOTAL_TIME seconds" echo "Checking results" python check_soln.py --out_dir $OUT_DIR echo "Writing stats" # Read individual task times TIME1=$(cat "$OUT_DIR/summarize_root.time" 2>/dev/null || echo "0") TIME2=$(cat "$OUT_DIR/create_numpy.time" 2>/dev/null || echo "0") TIME3=$(cat "$OUT_DIR/preprocess.time" 2>/dev/null || echo "0") TIME4=$(cat "$OUT_DIR/scores.time" 2>/dev/null || echo "0") TIME5=$(cat "$OUT_DIR/categorization.time" 2>/dev/null || echo "0") echo "Task times: summarize_root=${TIME1}s, create_numpy=${TIME2}s, preprocess=${TIME3}s, scores=${TIME4}s, categorization=${TIME5}s" # Get arrays for all 5 tasks (assuming the structure is now 5 elements) read -r -a success_arr < <(python get_arr.py --name success --out_dir $OUT_DIR) SUCCESS1=${success_arr[0]:-0} SUCCESS2=${success_arr[1]:-0} SUCCESS3=${success_arr[2]:-0} SUCCESS4=${success_arr[3]:-0} SUCCESS5=${success_arr[4]:-0} read -r -a call_arr < <(python get_arr.py --name calls --out_dir $OUT_DIR) CALLS1=${call_arr[0]:-0} CALLS2=${call_arr[1]:-0} CALLS3=${call_arr[2]:-0} CALLS4=${call_arr[3]:-0} CALLS5=${call_arr[4]:-0} read -r -a input_token_arr < <(python get_arr.py --name input_tokens --out_dir $OUT_DIR) INPUT_TOKENS1=${input_token_arr[0]:-0} INPUT_TOKENS2=${input_token_arr[1]:-0} INPUT_TOKENS3=${input_token_arr[2]:-0} INPUT_TOKENS4=${input_token_arr[3]:-0} INPUT_TOKENS5=${input_token_arr[4]:-0} read -r -a output_token_arr < <(python get_arr.py --name output_tokens --out_dir $OUT_DIR) OUTPUT_TOKENS1=${output_token_arr[0]:-0} OUTPUT_TOKENS2=${output_token_arr[1]:-0} OUTPUT_TOKENS3=${output_token_arr[2]:-0} OUTPUT_TOKENS4=${output_token_arr[3]:-0} OUTPUT_TOKENS5=${output_token_arr[4]:-0} # Update stats with all 5 tasks using individual task times python update_stats.py --name $NAME \ --success1 $SUCCESS1 --time1 $TIME1 --calls1 $CALLS1 --input_tokens1 $INPUT_TOKENS1 \ --success2 $SUCCESS2 --time2 $TIME2 --calls2 $CALLS2 --input_tokens2 $INPUT_TOKENS2 \ --success3 $SUCCESS3 --time3 $TIME3 --calls3 $CALLS3 --input_tokens3 $INPUT_TOKENS3 \ --success4 $SUCCESS4 --time4 $TIME4 --calls4 $CALLS4 --input_tokens4 $INPUT_TOKENS4 \ --success5 $SUCCESS5 --time5 $TIME5 --calls5 $CALLS5 --input_tokens5 $INPUT_TOKENS5 \ --output_tokens1 $OUTPUT_TOKENS1 --output_tokens2 $OUTPUT_TOKENS2 --output_tokens3 $OUTPUT_TOKENS3 \ --output_tokens4 $OUTPUT_TOKENS4 --output_tokens5 $OUTPUT_TOKENS5 # Clean up temp files rm -r prompts_temp rm summarize_root_temp.smk rm create_numpy_temp.smk rm preprocess_temp.smk rm scores_temp.smk rm categorization_temp.smk rm -f "$OUT_DIR"/*.time echo "Finished!"