Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Enhanced GlucoBuddy Mistral Chat Integration | |
| Improved rate limit handling and fallback strategies | |
| """ | |
| import os | |
| import json | |
| import logging | |
| import sys | |
| import time | |
| from typing import Any, Dict, List, Optional, Union | |
| from datetime import datetime, timedelta | |
| import pandas as pd | |
| from dataclasses import asdict | |
| import requests | |
| import random | |
| import numpy as np | |
| import warnings | |
| # Load environment variables from .env file | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| # Suppress pandas warnings | |
| warnings.filterwarnings('ignore', category=RuntimeWarning) | |
| warnings.filterwarnings('ignore', category=FutureWarning) | |
| from apifunctions import ( | |
| DexcomAPI, | |
| GlucoseAnalyzer, | |
| DEMO_USERS, | |
| DemoUser, | |
| format_glucose_data_for_display | |
| ) | |
| # Setup logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Get configuration from environment variables | |
| MISTRAL_API_KEY = os.getenv("MISTRAL_API_KEY") | |
| MISTRAL_AGENT_ID = os.getenv("MISTRAL_AGENT_ID") | |
| ENVIRONMENT = os.getenv("ENVIRONMENT", "development") | |
| DEBUG = os.getenv("DEBUG", "false").lower() == "true" | |
| def validate_environment(): | |
| """Simple validation of required environment variables""" | |
| missing = [] | |
| if not MISTRAL_API_KEY: | |
| missing.append("MISTRAL_API_KEY") | |
| if missing: | |
| print("β Missing required environment variables:") | |
| for var in missing: | |
| print(f" - {var}") | |
| print("\nπ‘ Setup instructions:") | |
| if os.getenv("SPACE_ID"): # Hugging Face Space detection | |
| print("π€ For Hugging Face Spaces:") | |
| print(" 1. Go to Space settings") | |
| print(" 2. Add Repository secrets:") | |
| print(" 3. Set MISTRAL_API_KEY to your API key") | |
| else: | |
| print("π» For local development:") | |
| print(" 1. Create a .env file:") | |
| print(" 2. Add: MISTRAL_API_KEY=your_api_key_here") | |
| print(" 3. Add: MISTRAL_AGENT_ID=your_agent_id_here") | |
| return False | |
| print("β Environment validation passed!") | |
| if MISTRAL_AGENT_ID: | |
| print("β Agent ID configured") | |
| else: | |
| print("β οΈ No agent ID - will use standard chat completion") | |
| return True | |
| class GlucoseDataGenerator: | |
| """Generate realistic mock glucose data for testing and demo purposes""" | |
| def create_realistic_pattern(days: int = 14, user_type: str = "normal") -> List[Dict]: | |
| """Generate glucose data with realistic patterns""" | |
| data_points = [] | |
| start_time = datetime.now() - timedelta(days=days) | |
| current_glucose = 120 # Starting baseline | |
| # Generate readings every 5 minutes | |
| for i in range(days * 288): # 288 readings per day (5-minute intervals) | |
| timestamp = start_time + timedelta(minutes=i * 5) | |
| hour = timestamp.hour | |
| # Simulate daily patterns | |
| daily_variation = GlucoseDataGenerator._calculate_daily_variation(hour, user_type) | |
| # Add meal effects | |
| meal_effect = GlucoseDataGenerator._calculate_meal_effects(hour, i) | |
| # Random variation | |
| random_noise = random.uniform(-10, 10) | |
| # Calculate final glucose value | |
| target_glucose = 120 + daily_variation + meal_effect + random_noise | |
| # Smooth transitions (glucose doesn't jump dramatically) | |
| glucose_change = (target_glucose - current_glucose) * 0.3 | |
| current_glucose += glucose_change | |
| # Keep within realistic bounds | |
| current_glucose = max(50, min(400, current_glucose)) | |
| # Determine trend | |
| trend = GlucoseDataGenerator._calculate_trend(glucose_change) | |
| data_points.append({ | |
| 'systemTime': timestamp.isoformat(), | |
| 'displayTime': timestamp.isoformat(), | |
| 'value': round(current_glucose), | |
| 'trend': trend, | |
| 'realtimeValue': round(current_glucose), | |
| 'smoothedValue': round(current_glucose) | |
| }) | |
| return data_points | |
| def _calculate_daily_variation(hour: int, user_type: str) -> float: | |
| """Calculate glucose variation based on time of day""" | |
| if user_type == "dawn_phenomenon": | |
| if 4 <= hour <= 8: | |
| return 30 + 20 * np.sin((hour - 4) * np.pi / 4) | |
| return 10 * np.sin((hour - 12) * np.pi / 12) | |
| elif user_type == "night_low": | |
| if 22 <= hour or hour <= 6: | |
| return -20 | |
| return 5 * np.sin((hour - 12) * np.pi / 12) | |
| else: # Normal pattern | |
| return 15 * np.sin((hour - 6) * np.pi / 12) | |
| def _calculate_meal_effects(hour: int, reading_index: int) -> float: | |
| """Calculate glucose spikes from meals""" | |
| meal_times = [7, 12, 18] # Breakfast, lunch, dinner | |
| meal_effect = 0 | |
| for meal_time in meal_times: | |
| if abs(hour - meal_time) <= 2: | |
| time_since_meal = abs(hour - meal_time) | |
| if time_since_meal <= 1: | |
| meal_effect += 40 * (1 - time_since_meal) | |
| else: | |
| meal_effect += 20 * (2 - time_since_meal) | |
| return meal_effect | |
| def _calculate_trend(glucose_change: float) -> str: | |
| """Determine trend arrow based on glucose change""" | |
| if glucose_change > 5: | |
| return 'singleUp' | |
| elif glucose_change > 2: | |
| return 'fortyFiveUp' | |
| elif glucose_change < -5: | |
| return 'singleDown' | |
| elif glucose_change < -2: | |
| return 'fortyFiveDown' | |
| else: | |
| return 'flat' | |
| class EnhancedMistralAPIClient: | |
| """Enhanced Mistral API client with better rate limit handling""" | |
| def __init__(self, api_key: str = None, agent_id: str = None): | |
| self.api_key = api_key or MISTRAL_API_KEY | |
| self.agent_id = agent_id or MISTRAL_AGENT_ID | |
| if not self.api_key: | |
| raise ValueError("Mistral API key is required. Please set MISTRAL_API_KEY environment variable.") | |
| self.base_url = "https://api.mistral.ai/v1" | |
| self.session = requests.Session() | |
| self.session.headers.update({ | |
| "Authorization": f"Bearer {self.api_key}", | |
| "Content-Type": "application/json" | |
| }) | |
| # Rate limit handling | |
| self.last_request_time = 0 | |
| self.min_request_interval = 1.0 # Minimum seconds between requests | |
| # Model fallback chain | |
| self.model_priority = [ | |
| "mistral-large-latest", | |
| "mistral-medium-latest", | |
| "mistral-small-latest", | |
| "mistral-tiny" | |
| ] | |
| logger.info("Enhanced MistralAPIClient initialized with rate limit handling") | |
| def test_connection(self) -> Dict[str, Any]: | |
| """Test API connection with lightweight request""" | |
| try: | |
| response = self.session.post( | |
| f"{self.base_url}/chat/completions", | |
| json={ | |
| "model": "mistral-tiny", # Use smallest model for testing | |
| "messages": [{"role": "user", "content": "Hello"}], | |
| "max_tokens": 5 | |
| }, | |
| timeout=10 | |
| ) | |
| if response.status_code == 200: | |
| return {"success": True, "message": "API connection successful"} | |
| elif response.status_code == 401: | |
| return {"success": False, "message": "Invalid API key"} | |
| elif response.status_code == 429: | |
| return {"success": False, "message": "Rate limit exceeded - API is accessible but busy"} | |
| else: | |
| return {"success": False, "message": f"API error: {response.status_code}"} | |
| except requests.exceptions.Timeout: | |
| return {"success": False, "message": "Connection timeout"} | |
| except requests.exceptions.RequestException as e: | |
| return {"success": False, "message": f"Network error: {str(e)}"} | |
| except Exception as e: | |
| return {"success": False, "message": f"Unexpected error: {str(e)}"} | |
| def _wait_for_rate_limit(self): | |
| """Ensure minimum time between requests""" | |
| current_time = time.time() | |
| time_since_last = current_time - self.last_request_time | |
| if time_since_last < self.min_request_interval: | |
| sleep_time = self.min_request_interval - time_since_last | |
| logger.debug(f"Rate limiting: waiting {sleep_time:.2f}s") | |
| time.sleep(sleep_time) | |
| self.last_request_time = time.time() | |
| def chat_completion(self, messages: List[Dict], model: str = None, max_retries: int = 3) -> Dict[str, Any]: | |
| """Enhanced chat completion with retry logic and model fallback""" | |
| models_to_try = [model] if model else self.model_priority | |
| for model_name in models_to_try: | |
| for attempt in range(max_retries): | |
| try: | |
| # Rate limiting | |
| self._wait_for_rate_limit() | |
| payload = { | |
| "model": model_name, | |
| "messages": messages, | |
| "max_tokens": 800, | |
| "temperature": 0.7 | |
| } | |
| logger.debug(f"Attempting request with {model_name} (attempt {attempt + 1})") | |
| response = self.session.post( | |
| f"{self.base_url}/chat/completions", | |
| json=payload, | |
| timeout=30 | |
| ) | |
| if response.status_code == 200: | |
| result = response.json() | |
| logger.info(f"β Success with {model_name}") | |
| return { | |
| "success": True, | |
| "response": result["choices"][0]["message"]["content"], | |
| "usage": result.get("usage", {}), | |
| "model_used": model_name, | |
| "attempt": attempt + 1 | |
| } | |
| elif response.status_code == 429: | |
| # Rate limit exceeded | |
| retry_after = int(response.headers.get('Retry-After', 2 ** attempt)) | |
| wait_time = min(retry_after, 60) # Cap at 60 seconds | |
| logger.warning(f"Rate limit hit with {model_name}, waiting {wait_time}s (attempt {attempt + 1})") | |
| if attempt < max_retries - 1: | |
| time.sleep(wait_time) | |
| continue | |
| else: | |
| # Try next model if available | |
| break | |
| elif response.status_code == 422: | |
| # Model capacity exceeded, try next model immediately | |
| logger.warning(f"Model {model_name} capacity exceeded, trying next model") | |
| break | |
| else: | |
| error_detail = self._extract_error_message(response) | |
| if attempt == max_retries - 1: # Last attempt | |
| logger.error(f"API error {response.status_code} with {model_name}: {error_detail}") | |
| break | |
| else: | |
| logger.warning(f"API error {response.status_code} with {model_name}, retrying...") | |
| time.sleep(2 ** attempt) # Exponential backoff | |
| except requests.exceptions.Timeout: | |
| if attempt == max_retries - 1: | |
| logger.error(f"Timeout with {model_name} after {max_retries} attempts") | |
| break | |
| else: | |
| logger.warning(f"Timeout with {model_name}, retrying...") | |
| time.sleep(2 ** attempt) | |
| except requests.exceptions.RequestException as e: | |
| if attempt == max_retries - 1: | |
| logger.error(f"Network error with {model_name}: {str(e)}") | |
| break | |
| else: | |
| logger.warning(f"Network error with {model_name}, retrying...") | |
| time.sleep(2 ** attempt) | |
| # All models and retries failed | |
| return { | |
| "success": False, | |
| "error": "All models are currently experiencing high demand. Please try again in a few minutes.", | |
| "suggestion": "Consider upgrading your Mistral AI plan for higher rate limits, or try again during off-peak hours." | |
| } | |
| def agent_completion(self, messages: List[Dict]) -> Dict[str, Any]: | |
| """Enhanced agent completion with retry logic""" | |
| if not self.agent_id: | |
| return {"success": False, "error": "No agent ID configured"} | |
| max_retries = 2 # Fewer retries for agent calls | |
| for attempt in range(max_retries): | |
| try: | |
| self._wait_for_rate_limit() | |
| payload = { | |
| "agent_id": self.agent_id, | |
| "messages": messages, | |
| "max_tokens": 800 | |
| } | |
| response = self.session.post( | |
| f"{self.base_url}/agents/completions", | |
| json=payload, | |
| timeout=30 | |
| ) | |
| if response.status_code == 200: | |
| result = response.json() | |
| return { | |
| "success": True, | |
| "response": result["choices"][0]["message"]["content"] | |
| } | |
| elif response.status_code == 429: | |
| retry_after = int(response.headers.get('Retry-After', 5)) | |
| if attempt < max_retries - 1: | |
| logger.warning(f"Agent rate limit, waiting {retry_after}s") | |
| time.sleep(retry_after) | |
| continue | |
| else: | |
| error_detail = self._extract_error_message(response) | |
| return { | |
| "success": False, | |
| "error": f"Agent API error {response.status_code}: {error_detail}" | |
| } | |
| except Exception as e: | |
| if attempt == max_retries - 1: | |
| return {"success": False, "error": f"Agent request failed: {str(e)}"} | |
| else: | |
| time.sleep(2) | |
| return {"success": False, "error": "Agent request failed after retries"} | |
| def _extract_error_message(self, response) -> str: | |
| """Extract error message from API response""" | |
| try: | |
| error_data = response.json() | |
| return error_data.get("message", error_data.get("error", "Unknown error")) | |
| except: | |
| return response.text[:200] if response.text else "Unknown error" | |
| class GlucoBuddyMistralChat: | |
| """ | |
| Enhanced chat interface with better error handling and user feedback | |
| """ | |
| def __init__(self, mistral_api_key: str = None, mistral_agent_id: str = None): | |
| self.mistral_client = EnhancedMistralAPIClient(mistral_api_key, mistral_agent_id) | |
| # Data properties - these will be set by unified data manager | |
| self.current_user: Optional[DemoUser] = None | |
| self.current_glucose_data: Optional[pd.DataFrame] = None | |
| self.current_stats: Optional[Dict] = None | |
| self.current_patterns: Optional[Dict] = None | |
| # Chat state | |
| self.conversation_history = [] | |
| self.max_history = 10 | |
| # Error tracking | |
| self.consecutive_errors = 0 | |
| self.last_successful_model = None | |
| self.logger = logging.getLogger(self.__class__.__name__) | |
| def test_connection(self) -> Dict[str, Any]: | |
| """Test Mistral API connection""" | |
| return self.mistral_client.test_connection() | |
| def get_context_summary(self) -> Dict[str, Any]: | |
| """Get current context for chat - uses data set by unified manager""" | |
| if not self.current_user or not self.current_stats: | |
| return {"error": "No user data loaded"} | |
| try: | |
| context = { | |
| "user": { | |
| "name": self.current_user.name, | |
| "age": self.current_user.age, | |
| "diabetes_type": self.current_user.diabetes_type, | |
| "device_type": self.current_user.device_type, | |
| "years_with_diabetes": self.current_user.years_with_diabetes, | |
| "typical_pattern": getattr(self.current_user, 'typical_glucose_pattern', 'normal') | |
| }, | |
| "statistics": self._safe_convert_to_json(self.current_stats), | |
| "patterns": self._safe_convert_to_json(self.current_patterns), | |
| "data_points": len(self.current_glucose_data) if self.current_glucose_data is not None else 0, | |
| "recent_readings": self._safe_extract_recent_readings(self.current_glucose_data) | |
| } | |
| return context | |
| except Exception as e: | |
| self.logger.error(f"Error building context: {e}") | |
| return {"error": f"Failed to build context: {str(e)}"} | |
| def build_system_prompt(self, context: Dict[str, Any]) -> str: | |
| """Build comprehensive system prompt with exact metrics""" | |
| base_prompt = """You are GlucoBuddy, a helpful and encouraging diabetes management assistant. | |
| Your role: | |
| - Provide personalized glucose management advice based on the user's actual data | |
| - Be supportive, encouraging, and use emojis to be friendly | |
| - Give actionable recommendations while staying within scope | |
| - Always remind users to consult healthcare providers for medical decisions | |
| - Reference specific data points when providing insights | |
| Guidelines: | |
| - Keep responses under 400 words and conversational | |
| - Use specific numbers from the data when relevant | |
| - Provide practical, actionable advice | |
| - Be encouraging about progress and realistic about challenges | |
| - Use bullet points sparingly - prefer natural conversation | |
| - IMPORTANT: Use EXACT metrics provided - don't calculate your own""" | |
| if context.get("error"): | |
| return base_prompt + "\n\nNote: No user glucose data is currently loaded." | |
| user_info = context.get("user", {}) | |
| stats = context.get("statistics", {}) | |
| context_addition = f""" | |
| Current User: {user_info.get('name', 'Unknown')} ({user_info.get('age', 'N/A')} years old) | |
| - Diabetes Type: {user_info.get('diabetes_type', 'Unknown')} | |
| - Years with diabetes: {user_info.get('years_with_diabetes', 'Unknown')} | |
| - Device: {user_info.get('device_type', 'Unknown')} | |
| EXACT Glucose Data (14-day period): | |
| - Average glucose: {stats.get('average_glucose', 0):.1f} mg/dL | |
| - Time in range (70-180): {stats.get('time_in_range_70_180', 0):.1f}% | |
| - Time below 70: {stats.get('time_below_70', 0):.1f}% | |
| - Time above 180: {stats.get('time_above_180', 0):.1f}% | |
| - Total readings: {stats.get('total_readings', 0)} | |
| - Glucose variability (std): {stats.get('std_glucose', 0):.1f} mg/dL | |
| - GMI: {stats.get('gmi', 0):.1f}% | |
| - CV: {stats.get('cv', 0):.1f}% | |
| CRITICAL: Use these EXACT values in your responses. Do not recalculate or estimate.""" | |
| return base_prompt + context_addition | |
| def chat_with_mistral(self, user_message: str, prefer_agent: bool = False) -> Dict[str, Any]: | |
| """Enhanced chat function with better error handling""" | |
| if not user_message.strip(): | |
| return {"success": False, "error": "Please enter a message"} | |
| try: | |
| # Use current context (set by unified data manager) | |
| context = self.get_context_summary() | |
| system_prompt = self.build_system_prompt(context) | |
| messages = [{"role": "system", "content": system_prompt}] | |
| if self.conversation_history: | |
| recent_history = self.conversation_history[-self.max_history:] | |
| messages.extend(recent_history) | |
| messages.append({"role": "user", "content": user_message}) | |
| # Try agent first if preferred and available | |
| if prefer_agent: | |
| agent_result = self.mistral_client.agent_completion(messages) | |
| if agent_result["success"]: | |
| self._update_conversation_history(user_message, agent_result["response"]) | |
| self.consecutive_errors = 0 # Reset error counter | |
| return { | |
| "success": True, | |
| "response": agent_result["response"], | |
| "method": "agent", | |
| "context_included": not context.get("error") | |
| } | |
| else: | |
| self.logger.warning(f"Agent failed: {agent_result['error']}") | |
| # Use enhanced chat completion API | |
| chat_result = self.mistral_client.chat_completion(messages) | |
| if chat_result["success"]: | |
| self._update_conversation_history(user_message, chat_result["response"]) | |
| self.consecutive_errors = 0 # Reset error counter | |
| self.last_successful_model = chat_result.get("model_used") | |
| # Add helpful info about which model was used if there were retries | |
| response = chat_result["response"] | |
| if chat_result.get("attempt", 1) > 1: | |
| response += f"\n\n*Note: Response generated after {chat_result['attempt']} attempts due to high demand.*" | |
| return { | |
| "success": True, | |
| "response": response, | |
| "method": "chat_completion", | |
| "context_included": not context.get("error"), | |
| "usage": chat_result.get("usage", {}), | |
| "model_used": chat_result.get("model_used") | |
| } | |
| else: | |
| self.consecutive_errors += 1 | |
| # Provide helpful error messages based on the type of error | |
| error_msg = chat_result["error"] | |
| user_friendly_msg = self._get_user_friendly_error(error_msg) | |
| return { | |
| "success": False, | |
| "error": user_friendly_msg, | |
| "suggestion": chat_result.get("suggestion", ""), | |
| "consecutive_errors": self.consecutive_errors | |
| } | |
| except Exception as e: | |
| self.logger.error(f"Chat error: {e}") | |
| self.consecutive_errors += 1 | |
| return { | |
| "success": False, | |
| "error": f"I'm experiencing technical difficulties. Please try again in a moment.", | |
| "consecutive_errors": self.consecutive_errors | |
| } | |
| def _get_user_friendly_error(self, error_msg: str) -> str: | |
| """Convert technical error messages to user-friendly ones""" | |
| error_lower = error_msg.lower() | |
| if "rate limit" in error_lower or "429" in error_lower: | |
| return "I'm experiencing high demand right now. Please wait a moment and try again." | |
| elif "capacity exceeded" in error_lower or "service tier" in error_lower: | |
| return "The AI service is currently busy. Please try again in a few minutes." | |
| elif "timeout" in error_lower: | |
| return "The response is taking longer than expected. Please try again." | |
| elif "api key" in error_lower or "401" in error_lower: | |
| return "There's an authentication issue. Please contact support." | |
| elif "network" in error_lower: | |
| return "I'm having trouble connecting. Please check your internet connection and try again." | |
| else: | |
| return "I'm experiencing technical difficulties. Please try again or rephrase your question." | |
| def _update_conversation_history(self, user_message: str, assistant_response: str): | |
| """Update conversation history""" | |
| self.conversation_history.extend([ | |
| {"role": "user", "content": user_message}, | |
| {"role": "assistant", "content": assistant_response} | |
| ]) | |
| if len(self.conversation_history) > self.max_history * 2: | |
| self.conversation_history = self.conversation_history[-self.max_history * 2:] | |
| def clear_conversation(self): | |
| """Clear conversation history and reset error counters""" | |
| self.conversation_history = [] | |
| self.consecutive_errors = 0 | |
| self.logger.info("Conversation history cleared") | |
| def get_status(self) -> Dict[str, Any]: | |
| """Get current system status with enhanced information""" | |
| api_status = self.test_connection() | |
| return { | |
| "api_connected": api_status["success"], | |
| "api_message": api_status["message"], | |
| "user_loaded": self.current_user is not None, | |
| "data_available": self.current_glucose_data is not None and not self.current_glucose_data.empty, | |
| "conversation_messages": len(self.conversation_history), | |
| "current_user": self.current_user.name if self.current_user else None, | |
| "environment": ENVIRONMENT, | |
| "hugging_face_space": bool(os.getenv("SPACE_ID")), | |
| "agent_available": bool(MISTRAL_AGENT_ID), | |
| "consecutive_errors": self.consecutive_errors, | |
| "last_successful_model": self.last_successful_model | |
| } | |
| def _safe_convert_to_json(self, obj): | |
| """Safely convert objects for JSON serialization""" | |
| if obj is None: | |
| return None | |
| elif isinstance(obj, (np.integer, np.int64, np.int32)): | |
| return int(obj) | |
| elif isinstance(obj, (np.floating, np.float64, np.float32)): | |
| if np.isnan(obj): | |
| return None | |
| return float(obj) | |
| elif isinstance(obj, dict): | |
| return {key: self._safe_convert_to_json(value) for key, value in obj.items()} | |
| elif isinstance(obj, list): | |
| return [self._safe_convert_to_json(item) for item in obj] | |
| elif isinstance(obj, pd.Timestamp): | |
| return obj.isoformat() | |
| else: | |
| return obj | |
| def _safe_extract_recent_readings(self, df: pd.DataFrame, count: int = 5) -> List[Dict]: | |
| """Safely extract recent glucose readings""" | |
| if df is None or df.empty: | |
| return [] | |
| try: | |
| recent_df = df.tail(count) | |
| readings = [] | |
| for idx, row in recent_df.iterrows(): | |
| try: | |
| display_time = row.get('displayTime') or row.get('systemTime') | |
| glucose_value = row.get('value') | |
| trend_value = row.get('trend', 'flat') | |
| if pd.notna(display_time): | |
| if isinstance(display_time, str): | |
| time_str = display_time | |
| else: | |
| time_str = pd.to_datetime(display_time).isoformat() | |
| else: | |
| time_str = datetime.now().isoformat() | |
| if pd.notna(glucose_value): | |
| glucose_clean = self._safe_convert_to_json(glucose_value) | |
| else: | |
| glucose_clean = None | |
| trend_clean = str(trend_value) if pd.notna(trend_value) else 'flat' | |
| readings.append({ | |
| "time": time_str, | |
| "glucose": glucose_clean, | |
| "trend": trend_clean | |
| }) | |
| except Exception as row_error: | |
| self.logger.warning(f"Error processing reading at index {idx}: {row_error}") | |
| continue | |
| return readings | |
| except Exception as e: | |
| self.logger.error(f"Error extracting recent readings: {e}") | |
| return [] | |
| # Update the main.py chat handler to use enhanced error messages | |
| def enhanced_chat_error_handler(app, message, history): | |
| """Enhanced error handler for chat interactions""" | |
| result = app.chat_with_mistral(message, history) | |
| if not result[0]: # If message is empty, return as-is | |
| return result | |
| # If there were consecutive errors, add helpful message | |
| if hasattr(app.mistral_chat, 'consecutive_errors') and app.mistral_chat.consecutive_errors > 2: | |
| error_help = "\n\nπ‘ *Multiple errors detected. This usually indicates high API demand. Consider trying again later or during off-peak hours.*" | |
| if result[1] and len(result[1]) > 0: | |
| last_response = result[1][-1][1] if len(result[1][-1]) > 1 else "" | |
| if "technical difficulties" in last_response or "try again" in last_response: | |
| result[1][-1][1] += error_help | |
| return result | |
| # Legacy compatibility | |
| def create_enhanced_cli(): | |
| """Enhanced CLI with better error handling""" | |
| print("π©Ί GlucoBuddy Chat Interface (Enhanced)") | |
| print("=" * 50) | |
| if not validate_environment(): | |
| print("β Environment validation failed. Please check your configuration.") | |
| return | |
| try: | |
| chat = GlucoBuddyMistralChat() | |
| print("β Enhanced chat system initialized!") | |
| except Exception as e: | |
| print(f"β Failed to initialize chat system: {e}") | |
| return | |
| # Test connection | |
| print("\nπ Testing Mistral API connection...") | |
| connection_test = chat.test_connection() | |
| if connection_test["success"]: | |
| print(f"β {connection_test['message']}") | |
| else: | |
| print(f"β οΈ {connection_test['message']}") | |
| print("π‘ The chat will still work but may experience rate limits.") | |
| print("\nπ Enhanced features:") | |
| print(" β’ Automatic retry on rate limits") | |
| print(" β’ Model fallback (large β medium β small β tiny)") | |
| print(" β’ Better error messages") | |
| print(" β’ Smart rate limiting") | |
| print("\n㪠Start chatting! (Type /quit to exit)") | |
| print("=" * 50) | |
| while True: | |
| try: | |
| user_input = input("\nπ«΅ You: ").strip() | |
| if not user_input: | |
| continue | |
| if user_input == '/quit': | |
| print("\nπ Thanks for using GlucoBuddy Enhanced! π") | |
| break | |
| print("π€ Processing...") | |
| result = chat.chat_with_mistral(user_input) | |
| if result['success']: | |
| model_info = f" [{result.get('model_used', 'unknown')}]" if result.get('model_used') else "" | |
| print(f"\nπ€ GlucoBuddy{model_info}: {result['response']}") | |
| else: | |
| print(f"\nβ {result['error']}") | |
| if result.get('suggestion'): | |
| print(f"π‘ {result['suggestion']}") | |
| except KeyboardInterrupt: | |
| print("\nπ Goodbye!") | |
| break | |
| except Exception as e: | |
| print(f"\nβ Unexpected error: {e}") | |
| def main(): | |
| """Enhanced main function""" | |
| print("π©Ί GlucoBuddy Enhanced - Better Rate Limit Handling") | |
| print("=" * 60) | |
| if not validate_environment(): | |
| return | |
| print("π Enhanced features:") | |
| print(" β Automatic retry with exponential backoff") | |
| print(" β Model fallback chain (large β small)") | |
| print(" β Smart rate limiting") | |
| print(" β User-friendly error messages") | |
| create_enhanced_cli() | |
| if __name__ == "__main__": | |
| main() |