LLM4HEP / test_stats_parallel.sh
ho22joshua's picture
initial commit
cfcbbc8
#!/bin/bash
#
# test_stats_parallel.sh - Run ATLAS analysis tasks in parallel using GNU parallel
#
# This script runs 5 independent Snakemake workflows in parallel:
# 1. summarize_root: Convert ROOT files to summary format (2 sequential steps: summarize_root -> insert_root_summary)
# 2. create_numpy: Create NumPy arrays from ROOT data
# 3. preprocess: Preprocess data for ML
# 4. scores: Generate signal/background scores
# 5. categorization: Perform event categorization
#
# Usage:
# ./test_stats_parallel.sh
#
# Requirements:
# - GNU parallel must be installed
# - config.yml must exist with model configuration
# - All workflow/*.smk files must be present
# - Python environment with required packages
#
# The script will:
# - Copy and modify workflow files
# - Run all 5 tasks concurrently (no job limits)
# - Log each task's output separately
# - Measure individual execution time for each task
# - Collect statistics and check results
# - Clean up temporary files
#
# Output:
# - Individual task logs in $OUT_DIR/*.log
# - Individual task times in $OUT_DIR/*.time
# - Combined statistics in stats.csv
# - Results validation via check_soln.py
#
NAME=$(grep '^name:' config.yml | awk '{print$2}' | tr -d "'")
MODEL=$(grep '^model:' config.yml | awk '{print$2}' | tr -d "'")
OUT_DIR=$(grep '^out_dir:' config.yml | awk '{print$2}' | tr -d "'")
cp -r prompts prompts_temp
sed -i "s#{BASE_DIR}#$OUT_DIR#g" prompts_temp/*.txt
# Copy all 5 smk files to temp versions
cp workflow/summarize_root.smk summarize_root_temp.smk
cp workflow/create_numpy.smk create_numpy_temp.smk
cp workflow/preprocess.smk preprocess_temp.smk
cp workflow/scores.smk scores_temp.smk
cp workflow/categorization.smk categorization_temp.smk
sed -i "s#{BASE_DIR}#$OUT_DIR#g" *_temp.smk
mkdir -p $OUT_DIR/generated_code
cp utils.py $OUT_DIR/generated_code/utils.py
rm -f $OUT_DIR/logs/success.npy
rm -f $OUT_DIR/logs/calls.npy
rm -f $OUT_DIR/logs/input_tokens.npy
rm -f $OUT_DIR/logs/output_tokens.npy
echo "Starting all tasks in parallel using GNU parallel..."
echo "Running 5 independent tasks concurrently (no job limits)"
echo "Tasks: summarize_root (2 steps), create_numpy, preprocess, scores, categorization"
echo ""
# Start timing for all tasks
START_TIME=$SECONDS
# Define the tasks as a function for GNU parallel
run_task() {
local task_name=$1
local smk_file=$2
local target1=$3
local target2=$4 # Optional second target
local log_file="$OUT_DIR/${task_name}.log"
local time_file="$OUT_DIR/${task_name}.time"
echo "Starting $task_name..."
local task_start=$SECONDS
# Run first target
if ! snakemake -s "$smk_file" -j 1 --forcerun "$target1" --rerun-incomplete --configfile config.yml --latency-wait 120 --verbose > "$log_file" 2>&1; then
local task_time=$((SECONDS - task_start))
echo "$task_time" > "$time_file"
echo "ERROR: $task_name failed on $target1 after $task_time seconds"
return 1
fi
# Run second target if provided (for summarize_root workflow)
if [ -n "$target2" ]; then
echo "Running $task_name second stage: $target2..."
if ! snakemake -s "$smk_file" -j 1 --forcerun "$target2" --rerun-incomplete --configfile config.yml --latency-wait 120 --verbose >> "$log_file" 2>&1; then
local task_time=$((SECONDS - task_start))
echo "$task_time" > "$time_file"
echo "ERROR: $task_name failed on $target2 after $task_time seconds"
return 1
fi
fi
local task_time=$((SECONDS - task_start))
echo "$task_time" > "$time_file"
echo "$task_name completed successfully in $task_time seconds"
return 0
}
export -f run_task
export OUT_DIR
export CONFIG_FILE=config.yml
# Export necessary environment variables
export PYTHONPATH="$OUT_DIR:$PYTHONPATH"
# Run all 5 tasks in parallel using GNU parallel
# No job limit - let GNU parallel manage concurrency based on system resources
# This allows maximum parallelism for independent tasks
parallel --no-notice --halt soon,fail=1 --line-buffer ::: \
"run_task summarize_root summarize_root_temp.smk summarize_root insert_root_summary" \
"run_task create_numpy create_numpy_temp.smk create_numpy" \
"run_task preprocess preprocess_temp.smk preprocess" \
"run_task scores scores_temp.smk scores" \
"run_task categorization categorization_temp.smk categorization"
# Check exit status
if [ $? -eq 0 ]; then
echo "All tasks completed successfully"
else
echo "ERROR: One or more tasks failed!"
# Show logs of failed tasks
for log in "$OUT_DIR"/*.log; do
if [ -f "$log" ] && grep -q "ERROR\|failed" "$log"; then
echo "=== Failed task log: $(basename "$log") ==="
tail -20 "$log"
fi
done
exit 1
fi
# Calculate total time
TOTAL_TIME=$((SECONDS-START_TIME))
echo "Total time: $TOTAL_TIME seconds"
echo "Checking results"
python check_soln.py --out_dir $OUT_DIR
echo "Writing stats"
# Read individual task times
TIME1=$(cat "$OUT_DIR/summarize_root.time" 2>/dev/null || echo "0")
TIME2=$(cat "$OUT_DIR/create_numpy.time" 2>/dev/null || echo "0")
TIME3=$(cat "$OUT_DIR/preprocess.time" 2>/dev/null || echo "0")
TIME4=$(cat "$OUT_DIR/scores.time" 2>/dev/null || echo "0")
TIME5=$(cat "$OUT_DIR/categorization.time" 2>/dev/null || echo "0")
echo "Task times: summarize_root=${TIME1}s, create_numpy=${TIME2}s, preprocess=${TIME3}s, scores=${TIME4}s, categorization=${TIME5}s"
# Get arrays for all 5 tasks (assuming the structure is now 5 elements)
read -r -a success_arr < <(python get_arr.py --name success --out_dir $OUT_DIR)
SUCCESS1=${success_arr[0]:-0}
SUCCESS2=${success_arr[1]:-0}
SUCCESS3=${success_arr[2]:-0}
SUCCESS4=${success_arr[3]:-0}
SUCCESS5=${success_arr[4]:-0}
read -r -a call_arr < <(python get_arr.py --name calls --out_dir $OUT_DIR)
CALLS1=${call_arr[0]:-0}
CALLS2=${call_arr[1]:-0}
CALLS3=${call_arr[2]:-0}
CALLS4=${call_arr[3]:-0}
CALLS5=${call_arr[4]:-0}
read -r -a input_token_arr < <(python get_arr.py --name input_tokens --out_dir $OUT_DIR)
INPUT_TOKENS1=${input_token_arr[0]:-0}
INPUT_TOKENS2=${input_token_arr[1]:-0}
INPUT_TOKENS3=${input_token_arr[2]:-0}
INPUT_TOKENS4=${input_token_arr[3]:-0}
INPUT_TOKENS5=${input_token_arr[4]:-0}
read -r -a output_token_arr < <(python get_arr.py --name output_tokens --out_dir $OUT_DIR)
OUTPUT_TOKENS1=${output_token_arr[0]:-0}
OUTPUT_TOKENS2=${output_token_arr[1]:-0}
OUTPUT_TOKENS3=${output_token_arr[2]:-0}
OUTPUT_TOKENS4=${output_token_arr[3]:-0}
OUTPUT_TOKENS5=${output_token_arr[4]:-0}
# Update stats with all 5 tasks using individual task times
python update_stats.py --name $NAME \
--success1 $SUCCESS1 --time1 $TIME1 --calls1 $CALLS1 --input_tokens1 $INPUT_TOKENS1 \
--success2 $SUCCESS2 --time2 $TIME2 --calls2 $CALLS2 --input_tokens2 $INPUT_TOKENS2 \
--success3 $SUCCESS3 --time3 $TIME3 --calls3 $CALLS3 --input_tokens3 $INPUT_TOKENS3 \
--success4 $SUCCESS4 --time4 $TIME4 --calls4 $CALLS4 --input_tokens4 $INPUT_TOKENS4 \
--success5 $SUCCESS5 --time5 $TIME5 --calls5 $CALLS5 --input_tokens5 $INPUT_TOKENS5 \
--output_tokens1 $OUTPUT_TOKENS1 --output_tokens2 $OUTPUT_TOKENS2 --output_tokens3 $OUTPUT_TOKENS3 \
--output_tokens4 $OUTPUT_TOKENS4 --output_tokens5 $OUTPUT_TOKENS5
# Clean up temp files
rm -r prompts_temp
rm summarize_root_temp.smk
rm create_numpy_temp.smk
rm preprocess_temp.smk
rm scores_temp.smk
rm categorization_temp.smk
rm -f "$OUT_DIR"/*.time
echo "Finished!"