|
|
import os |
|
|
import pandas as pd |
|
|
import re |
|
|
import glob |
|
|
from tqdm import tqdm |
|
|
import datetime |
|
|
import openai |
|
|
import argparse |
|
|
import io |
|
|
|
|
|
def summarize_results(results_dirs, output_csv, model, no_llm = False): |
|
|
client = openai.OpenAI( |
|
|
api_key = os.environ.get('CBORG_API_KEY'), |
|
|
base_url = 'https://api.cborg.lbl.gov' |
|
|
) |
|
|
|
|
|
error_categorization_prompt = ( |
|
|
"You are an expert at classifying error messages from machine learning workflows in high energy physics.\n\n" |
|
|
"Workflow summary:\n" |
|
|
"- A user provides an analysis task prompt.\n" |
|
|
"- A supervisor agent breaks down the task and instructs a coder agent.\n" |
|
|
"- The coder agent generates code, which is executed.\n" |
|
|
"- The supervisor reviews results and may iterate with the coder to fix issues until the task is complete.\n" |
|
|
"Below is a list of error categories:\n" |
|
|
"all data weights = 0, " |
|
|
"dummy data created, " |
|
|
"function-calling error, " |
|
|
"incorrect branch name, " |
|
|
"intermediate file not found, " |
|
|
"semantic error, " |
|
|
"other." |
|
|
"Your task: For the given error description, select the single most appropriate error category from the list above. " |
|
|
"Base your choice on the underlying nature or root cause of the error, not on the symptoms, error messages, or observable effects. " |
|
|
"Focus on what fundamentally caused the error, such as logic mistakes, missing dependencies, data mismatches, or miscommunication, rather than how the error was reported or observed.\n" |
|
|
"Return ALL applicable category names, each wrapped with three asterisks on each side, separated by commas, like this: ***Category***" |
|
|
"Do not include any other text, explanation, or formatting." |
|
|
"log file:\n" |
|
|
) |
|
|
|
|
|
results = [] |
|
|
for results_dir in results_dirs: |
|
|
for name in tqdm(os.listdir(results_dir), desc=f"generating error descriptions for {results_dir}"): |
|
|
output_dir = os.path.join(results_dir, name) |
|
|
|
|
|
if os.path.isdir(output_dir): |
|
|
|
|
|
config_match = re.match(r'^(.*?)_step\d+', name) |
|
|
config = config_match.group(1) if config_match else None |
|
|
|
|
|
|
|
|
step_match = re.search(r'_step(\d+)', name) |
|
|
step = int(step_match.group(1)) if step_match else None |
|
|
|
|
|
result = { |
|
|
"supervisor": None, |
|
|
"coder": None, |
|
|
"step": step, |
|
|
"success": None, |
|
|
"iterations": None, |
|
|
"duration": None, |
|
|
"API_calls": None, |
|
|
"input_tokens": None, |
|
|
"output_tokens": None, |
|
|
"user_prompt_tokens": None, |
|
|
"supervisor_to_coder_tokens": None, |
|
|
"coder_output_tokens": None, |
|
|
"feedback_to_supervisor_tokens": None, |
|
|
"error": "Uncategorized", |
|
|
"error_description": None, |
|
|
"output_dir": output_dir, |
|
|
} |
|
|
|
|
|
log_dir = os.path.join(output_dir, "logs") |
|
|
if os.path.isdir(log_dir): |
|
|
comp_log_files = glob.glob(os.path.join(log_dir, "*comprehensive_log.txt")) |
|
|
comp_log_str = None |
|
|
if comp_log_files: |
|
|
with open(comp_log_files[0], "r") as f: |
|
|
comp_log_str = f.read() |
|
|
else: |
|
|
result["success"] = False |
|
|
result["error_description"] = "comprehensive log file not found" |
|
|
results.append(result) |
|
|
continue |
|
|
|
|
|
supervisor_match = re.search(r"Supervisor:\s*([^\s]+)", comp_log_str) |
|
|
coder_match = re.search(r"Coder:\s*([^\s]+)", comp_log_str) |
|
|
if supervisor_match: |
|
|
result["supervisor"] = supervisor_match.group(1) |
|
|
if coder_match: |
|
|
result["coder"] = coder_match.group(1) |
|
|
|
|
|
iterations_match = re.search(r"Total Iterations:\s*(\d+)", comp_log_str) |
|
|
if iterations_match: |
|
|
result["iterations"] = int(iterations_match.group(1)) |
|
|
|
|
|
duration_match = re.search(r"Duration:\s*([0-9:.\s]+)", comp_log_str) |
|
|
if duration_match: |
|
|
duration_str = duration_match.group(1).strip() |
|
|
try: |
|
|
t = datetime.datetime.strptime(duration_str, "%H:%M:%S.%f") |
|
|
except ValueError: |
|
|
t = datetime.datetime.strptime(duration_str, "%H:%M:%S") |
|
|
result["duration"] = t.hour * 3600 + t.minute * 60 + t.second + t.microsecond / 1e6 |
|
|
|
|
|
api_calls_match = re.search(r"Total API Calls:\s*(\d+)", comp_log_str) |
|
|
if api_calls_match: |
|
|
result["API_calls"] = int(api_calls_match.group(1)) |
|
|
input_tokens_match = re.search(r"Total Input Tokens:\s*(\d+)", comp_log_str) |
|
|
if input_tokens_match: |
|
|
result["input_tokens"] = int(input_tokens_match.group(1)) |
|
|
output_tokens_match = re.search(r"Total Output Tokens:\s*(\d+)", comp_log_str) |
|
|
if output_tokens_match: |
|
|
result["output_tokens"] = int(output_tokens_match.group(1)) |
|
|
|
|
|
match = re.search(r"User Prompt Tokens:\s*(\d+)", comp_log_str) |
|
|
if match: |
|
|
result["user_prompt_tokens"] = int(match.group(1)) |
|
|
match = re.search(r"Supervisor to Coder Tokens:\s*(\d+)", comp_log_str) |
|
|
if match: |
|
|
result["supervisor_to_coder_tokens"] = int(match.group(1)) |
|
|
match = re.search(r"Coder Output Tokens:\s*(\d+)", comp_log_str) |
|
|
if match: |
|
|
result["coder_output_tokens"] = int(match.group(1)) |
|
|
match = re.search(r"Feedback to Supervisor Tokens:\s*(\d+)", comp_log_str) |
|
|
if match: |
|
|
result["feedback_to_supervisor_tokens"] = int(match.group(1)) |
|
|
|
|
|
|
|
|
val_log_files = glob.glob(os.path.join(log_dir, "*validation.log")) |
|
|
val_log_str = None |
|
|
if val_log_files: |
|
|
with open(val_log_files[0], "r") as f: |
|
|
val_log_str = f.read() |
|
|
matches = re.findall(r'(✅ Validation successful|❌ Validation failed)', val_log_str) |
|
|
if not matches: |
|
|
result["success"] = False |
|
|
else: |
|
|
last = matches[-1] |
|
|
result["success"] = last == "✅ Validation successful" |
|
|
if (no_llm): |
|
|
if (result["success"]): |
|
|
result["error"] = None |
|
|
else: |
|
|
result["error"] = "Validation Error" |
|
|
val_log_str = val_log_str.replace('\n', '').replace('\r', '') |
|
|
else: |
|
|
result["success"] = False |
|
|
val_log_str = "" |
|
|
if (not no_llm): |
|
|
try: |
|
|
response = client.chat.completions.create( |
|
|
model = model, |
|
|
messages = [ |
|
|
{ |
|
|
'role': 'user', |
|
|
'content': error_categorization_prompt + |
|
|
"\nComprehensive Log:\n" + comp_log_str + |
|
|
"\nValidation Log:\n" + val_log_str |
|
|
} |
|
|
], |
|
|
) |
|
|
error_description = response.choices[-1].message.content |
|
|
def parse_categories(llm_output): |
|
|
|
|
|
return [cat.strip() for cat in re.findall(r"\*\*\*(.*?)\*\*\*", llm_output)] |
|
|
result["Error"] = parse_categories(error_description) |
|
|
except Exception as e: |
|
|
result["Error"] = "uncategorized" |
|
|
print(error_description) |
|
|
exit() |
|
|
print(f"OpenAI API error: {e}") |
|
|
else: |
|
|
if ("API call failed" in comp_log_str): |
|
|
result["error"] = "API Call Error" |
|
|
else: |
|
|
result["success"] = False |
|
|
result["Error"] = "job submission failure" |
|
|
results.append(result) |
|
|
|
|
|
df = pd.DataFrame(results) |
|
|
df = df.sort_values(by=["supervisor", "coder", "step", "output_dir"]) |
|
|
df.to_csv(output_csv, index=False) |
|
|
print(f"Results written to {output_csv}") |
|
|
|
|
|
|
|
|
def main(): |
|
|
parser = argparse.ArgumentParser(description="Summarize experiment logs and errors") |
|
|
parser.add_argument("--results_dir", type=str, default=" ", nargs='+', required=False, help="One or more directories containing experiment results") |
|
|
parser.add_argument("--output_csv", type=str, default="results_summary.csv", help="Path to output CSV file") |
|
|
parser.add_argument("--model", type=str, default="gpt-oss-120b", help="LLM model to use for error summarization") |
|
|
parser.add_argument("--no_llm", action="store_true", default=False, help="If set, only generate the CSV without LLM error description or categorization") |
|
|
args = parser.parse_args() |
|
|
|
|
|
summarize_results( |
|
|
results_dirs=args.results_dir, |
|
|
output_csv=args.output_csv, |
|
|
model=args.model, |
|
|
no_llm=args.no_llm |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |