Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Batch ECG Analysis Script for KVH High School | |
| Processes all ECGs in ecg_uploads_KVHSchool/ directory using ECG-FM Production API | |
| Updates KvhHighSchoollist.csv with comprehensive clinical analysis results | |
| NO DELAYS between analyses for maximum speed | |
| """ | |
| import pandas as pd | |
| import requests | |
| import json | |
| import time | |
| import os | |
| from typing import Dict, Any, List | |
| from datetime import datetime | |
| import traceback | |
| # Configuration | |
| API_BASE_URL = "https://mystic-cbk-ecg-fm-api.hf.space" | |
| ECG_DIR = "../ecg_uploads_KVHSchool/" | |
| INDEX_FILE = "../KvhHighSchoollist.csv" | |
| OUTPUT_FILE = "../KvhHighSchoollist_ECG_FM_Enhanced.csv" | |
| # ECG-FM Analysis Results Structure | |
| class ECGFMAnalysis: | |
| def __init__(self): | |
| self.rhythm = None | |
| self.heart_rate = None | |
| self.qrs_duration = None | |
| self.qt_interval = None | |
| self.pr_interval = None | |
| self.axis_deviation = None | |
| self.abnormalities = [] | |
| self.confidence = None | |
| self.signal_quality = None | |
| self.features_count = None | |
| self.processing_time = None | |
| self.analysis_timestamp = None | |
| self.api_status = None | |
| self.error_message = None | |
| def load_ecg_data(file_path: str) -> Dict[str, Any]: | |
| """Load ECG data from CSV file""" | |
| try: | |
| df = pd.read_csv(file_path) | |
| # Convert to the format expected by the API | |
| signal = [df[col].tolist() for col in df.columns] | |
| # Create enhanced payload with clinical metadata | |
| payload = { | |
| "signal": signal, | |
| "fs": 500, # Standard ECG sampling rate | |
| "lead_names": ["I", "II", "III", "aVR", "aVL", "aVF", "V1", "V2", "V3", "V4", "V5", "V6"], | |
| "recording_duration": len(signal[0]) / 500.0 | |
| } | |
| return payload | |
| except Exception as e: | |
| print(f"β Error loading ECG data from {file_path}: {e}") | |
| return None | |
| def analyze_ecg_with_api(ecg_file: str, patient_info: Dict[str, Any]) -> ECGFMAnalysis: | |
| """Analyze single ECG using ECG-FM Production API""" | |
| analysis = ECGFMAnalysis() | |
| analysis.analysis_timestamp = datetime.now().isoformat() | |
| try: | |
| # Load ECG data | |
| ecg_path = os.path.join(ECG_DIR, ecg_file) | |
| payload = load_ecg_data(ecg_path) | |
| if payload is None: | |
| analysis.api_status = "Failed to load ECG data" | |
| return analysis | |
| print(f" π Processing: {ecg_file}") | |
| print(f" π€ Patient: {patient_info['Patient Name']} ({patient_info['Age']} {patient_info['Gender']})") | |
| # Test API health first | |
| try: | |
| health_response = requests.get(f"{API_BASE_URL}/health", timeout=30) | |
| if health_response.status_code != 200: | |
| analysis.api_status = f"API unhealthy: {health_response.status_code}" | |
| return analysis | |
| except Exception as e: | |
| analysis.api_status = f"API connection failed: {str(e)}" | |
| return analysis | |
| # Perform full ECG analysis | |
| start_time = time.time() | |
| response = requests.post( | |
| f"{API_BASE_URL}/analyze", | |
| json=payload, | |
| timeout=180 # 3 minutes for full analysis | |
| ) | |
| total_time = time.time() - start_time | |
| if response.status_code == 200: | |
| analysis_data = response.json() | |
| # Extract clinical analysis | |
| clinical = analysis_data['clinical_analysis'] | |
| analysis.rhythm = clinical['rhythm'] | |
| analysis.heart_rate = clinical['heart_rate'] | |
| analysis.qrs_duration = clinical['qrs_duration'] | |
| analysis.qt_interval = clinical['qt_interval'] | |
| analysis.pr_interval = clinical['pr_interval'] | |
| analysis.axis_deviation = clinical['axis_deviation'] | |
| analysis.abnormalities = clinical['abnormalities'] | |
| analysis.confidence = clinical['confidence'] | |
| # Extract technical metrics | |
| analysis.signal_quality = analysis_data['signal_quality'] | |
| analysis.features_count = len(analysis_data['features']) | |
| analysis.processing_time = analysis_data['processing_time'] | |
| analysis.api_status = "Success" | |
| print(f" β Analysis completed in {analysis.processing_time}s") | |
| print(f" π₯ Rhythm: {analysis.rhythm}, HR: {analysis.heart_rate} BPM") | |
| print(f" π Quality: {analysis.signal_quality}, Confidence: {analysis.confidence:.2f}") | |
| else: | |
| analysis.api_status = f"API error: {response.status_code}" | |
| analysis.error_message = response.text | |
| print(f" β API error: {response.status_code} - {response.text}") | |
| except Exception as e: | |
| analysis.api_status = f"Processing error: {str(e)}" | |
| analysis.error_message = traceback.format_exc() | |
| print(f" β Processing error: {str(e)}") | |
| return analysis | |
| def update_index_with_ecg_fm_results(index_df: pd.DataFrame) -> pd.DataFrame: | |
| """Update index DataFrame with ECG-FM analysis results""" | |
| # Add new columns for ECG-FM results | |
| new_columns = [ | |
| 'ECG_FM_Rhythm', 'ECG_FM_HeartRate', 'ECG_FM_QRS_Duration', | |
| 'ECG_FM_QT_Interval', 'ECG_FM_PR_Interval', 'ECG_FM_AxisDeviation', | |
| 'ECG_FM_Abnormalities', 'ECG_FM_Confidence', 'ECG_FM_SignalQuality', | |
| 'ECG_FM_FeaturesCount', 'ECG_FM_ProcessingTime', 'ECG_FM_AnalysisTimestamp', | |
| 'ECG_FM_APIStatus', 'ECG_FM_ErrorMessage' | |
| ] | |
| for col in new_columns: | |
| index_df[col] = None | |
| # Process each ECG file | |
| total_files = len(index_df) | |
| successful_analyses = 0 | |
| failed_analyses = 0 | |
| print(f"\nπ Starting batch ECG analysis for {total_files} patients...") | |
| print("=" * 80) | |
| print("β‘ NO DELAYS - Maximum speed processing enabled!") | |
| print("=" * 80) | |
| for index, row in index_df.iterrows(): | |
| try: | |
| # Extract ECG filename from path | |
| ecg_path = row['ECG File Path'] | |
| if pd.isna(ecg_path) or ecg_path == "": | |
| print(f"β οΈ Skipping row {index + 1}: No ECG file path") | |
| continue | |
| ecg_file = os.path.basename(ecg_path) | |
| # Check if ECG file exists | |
| if not os.path.exists(os.path.join(ECG_DIR, ecg_file)): | |
| print(f"β οΈ Skipping row {index + 1}: ECG file not found: {ecg_file}") | |
| continue | |
| print(f"\nπ Processing {index + 1}/{total_files}: {ecg_file}") | |
| # Perform ECG analysis | |
| analysis = analyze_ecg_with_api(ecg_file, row) | |
| # Update DataFrame with results | |
| index_df.at[index, 'ECG_FM_Rhythm'] = analysis.rhythm | |
| index_df.at[index, 'ECG_FM_HeartRate'] = analysis.heart_rate | |
| index_df.at[index, 'ECG_FM_QRS_Duration'] = analysis.qrs_duration | |
| index_df.at[index, 'ECG_FM_QT_Interval'] = analysis.qt_interval | |
| index_df.at[index, 'ECG_FM_PR_Interval'] = analysis.pr_interval | |
| index_df.at[index, 'ECG_FM_AxisDeviation'] = analysis.axis_deviation | |
| index_df.at[index, 'ECG_FM_Abnormalities'] = '; '.join(analysis.abnormalities) if analysis.abnormalities else None | |
| index_df.at[index, 'ECG_FM_Confidence'] = analysis.confidence | |
| index_df.at[index, 'ECG_FM_SignalQuality'] = analysis.signal_quality | |
| index_df.at[index, 'ECG_FM_FeaturesCount'] = analysis.features_count | |
| index_df.at[index, 'ECG_FM_ProcessingTime'] = analysis.processing_time | |
| index_df.at[index, 'ECG_FM_AnalysisTimestamp'] = analysis.analysis_timestamp | |
| index_df.at[index, 'ECG_FM_APIStatus'] = analysis.api_status | |
| index_df.at[index, 'ECG_FM_ErrorMessage'] = analysis.error_message | |
| if analysis.api_status == "Success": | |
| successful_analyses += 1 | |
| else: | |
| failed_analyses += 1 | |
| # NO DELAY - Maximum speed processing | |
| # time.sleep(2) # REMOVED FOR MAXIMUM SPEED | |
| except Exception as e: | |
| print(f"β Error processing row {index + 1}: {str(e)}") | |
| index_df.at[index, 'ECG_FM_APIStatus'] = f"Row processing error: {str(e)}" | |
| failed_analyses += 1 | |
| print("\n" + "=" * 80) | |
| print("π BATCH ANALYSIS COMPLETE!") | |
| print(f"π Total files: {total_files}") | |
| print(f"β Successful analyses: {successful_analyses}") | |
| print(f"β Failed analyses: {failed_analyses}") | |
| print(f"π Success rate: {(successful_analyses/total_files)*100:.1f}%") | |
| return index_df | |
| def generate_analysis_summary(index_df: pd.DataFrame) -> None: | |
| """Generate summary statistics from the enhanced dataset""" | |
| print("\nπ ECG-FM ANALYSIS SUMMARY") | |
| print("=" * 50) | |
| # Filter successful analyses | |
| successful_df = index_df[index_df['ECG_FM_APIStatus'] == 'Success'] | |
| if len(successful_df) == 0: | |
| print("β No successful analyses to summarize") | |
| return | |
| print(f"π Total successful analyses: {len(successful_df)}") | |
| # Heart Rate Analysis | |
| hr_data = successful_df['ECG_FM_HeartRate'].dropna() | |
| if len(hr_data) > 0: | |
| print(f"π Heart Rate - Mean: {hr_data.mean():.1f} BPM, Range: {hr_data.min():.1f}-{hr_data.max():.1f} BPM") | |
| # QRS Duration Analysis | |
| qrs_data = successful_df['ECG_FM_QRS_Duration'].dropna() | |
| if len(qrs_data) > 0: | |
| print(f"π QRS Duration - Mean: {qrs_data.mean():.1f} ms, Range: {qrs_data.min():.1f}-{qrs_data.max():.1f} ms") | |
| # QT Interval Analysis | |
| qt_data = successful_df['ECG_FM_QT_Interval'].dropna() | |
| if len(qt_data) > 0: | |
| print(f"β±οΈ QT Interval - Mean: {qt_data.mean():.1f} ms, Range: {qt_data.min():.1f}-{qt_data.max():.1f} ms") | |
| # Signal Quality Distribution | |
| quality_counts = successful_df['ECG_FM_SignalQuality'].value_counts() | |
| print(f"π Signal Quality Distribution:") | |
| for quality, count in quality_counts.items(): | |
| print(f" {quality}: {count} ({count/len(successful_df)*100:.1f}%)") | |
| # Confidence Analysis | |
| conf_data = successful_df['ECG_FM_Confidence'].dropna() | |
| if len(conf_data) > 0: | |
| print(f"π― Analysis Confidence - Mean: {conf_data.mean():.2f}, Range: {conf_data.min():.2f}-{conf_data.max():.2f}") | |
| # Processing Time Analysis | |
| time_data = successful_df['ECG_FM_ProcessingTime'].dropna() | |
| if len(time_data) > 0: | |
| print(f"β‘ Processing Time - Mean: {time_data.mean():.3f}s, Range: {time_data.min():.3f}-{time_data.max():.3f}s") | |
| def main(): | |
| """Main function to run batch ECG analysis for KVH High School""" | |
| print("π§ͺ ECG-FM BATCH ANALYSIS SYSTEM - KVH HIGH SCHOOL") | |
| print("=" * 70) | |
| print(f"π API URL: {API_BASE_URL}") | |
| print(f"π ECG Directory: {ECG_DIR}") | |
| print(f"π Index File: {INDEX_FILE}") | |
| print(f"πΎ Output File: {OUTPUT_FILE}") | |
| print("β‘ NO DELAYS - Maximum speed processing!") | |
| print() | |
| # Check if files exist | |
| if not os.path.exists(INDEX_FILE): | |
| print(f"β Index file not found: {INDEX_FILE}") | |
| return | |
| if not os.path.exists(ECG_DIR): | |
| print(f"β ECG directory not found: {ECG_DIR}") | |
| return | |
| # Load index file | |
| try: | |
| print("π Loading patient index file...") | |
| index_df = pd.read_csv(INDEX_FILE) | |
| print(f"β Loaded {len(index_df)} patient records") | |
| except Exception as e: | |
| print(f"β Error loading index file: {e}") | |
| return | |
| # Check API health | |
| try: | |
| print("π₯ Checking API health...") | |
| health_response = requests.get(f"{API_BASE_URL}/health", timeout=30) | |
| if health_response.status_code == 200: | |
| health_data = health_response.json() | |
| print(f"β API healthy - Models loaded: {health_data['models_loaded']}") | |
| else: | |
| print(f"β οΈ API health check failed: {health_response.status_code}") | |
| proceed = input("Continue anyway? (y/n): ") | |
| if proceed.lower() != 'y': | |
| return | |
| except Exception as e: | |
| print(f"β οΈ API health check failed: {e}") | |
| proceed = input("Continue anyway? (y/n): ") | |
| if proceed.lower() != 'y': | |
| return | |
| # Process all ECGs | |
| enhanced_df = update_index_with_ecg_fm_results(index_df) | |
| # Generate summary | |
| generate_analysis_summary(enhanced_df) | |
| # Save enhanced dataset | |
| try: | |
| print(f"\nπΎ Saving enhanced dataset to: {OUTPUT_FILE}") | |
| enhanced_df.to_csv(OUTPUT_FILE, index=False) | |
| print("β Enhanced dataset saved successfully!") | |
| # Also save a backup with timestamp | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| backup_file = f"../KvhHighSchoollist_ECG_FM_Backup_{timestamp}.csv" | |
| enhanced_df.to_csv(backup_file, index=False) | |
| print(f"πΎ Backup saved to: {backup_file}") | |
| except Exception as e: | |
| print(f"β Error saving enhanced dataset: {e}") | |
| print(f"\nπ BATCH ANALYSIS COMPLETE!") | |
| print(f"π Enhanced dataset: {OUTPUT_FILE}") | |
| print(f"π Monitor your API at: {API_BASE_URL}") | |
| if __name__ == "__main__": | |
| main() | |