Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Dexcom Client for glucose data analysis - RESTORED WORKING VERSION | |
| This restores the original working API code before unified data manager | |
| """ | |
| import json | |
| import logging | |
| import sys | |
| from typing import Any, Dict, List, Optional, Union | |
| from datetime import datetime, timedelta | |
| import pandas as pd | |
| from dataclasses import asdict | |
| """ | |
| Dexcom API Integration Functions | |
| This module contains all the functions for interacting with the Dexcom Sandbox API | |
| and processing glucose data for AI insights. | |
| """ | |
| import requests | |
| import json | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Optional, Tuple | |
| import pandas as pd | |
| from dataclasses import dataclass | |
| import logging | |
| import base64 | |
| import secrets | |
| import hashlib | |
| import requests | |
| from typing import Dict, List, Optional | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class DemoUser: | |
| """Demo user configuration for sandbox testing""" | |
| name: str | |
| device_type: str | |
| username: str | |
| password: str | |
| description: str | |
| age: int = 30 | |
| diabetes_type: str = "Type 1" | |
| years_with_diabetes: int = 5 | |
| typical_glucose_pattern: str = "normal" | |
| # Demo users - EXACTLY as they were when it was working | |
| DEMO_USERS = { | |
| "sarah_g7": DemoUser( | |
| name="Sarah Thompson", | |
| age=32, | |
| device_type="G7 Mobile App", | |
| username="[email protected]", | |
| password="Dexcom123!", | |
| description="Active professional with Type 1 diabetes, uses G7 CGM with smartphone integration", | |
| diabetes_type="Type 1", | |
| years_with_diabetes=8, | |
| typical_glucose_pattern="stable_with_meal_spikes" | |
| ), | |
| "marcus_one": DemoUser( | |
| name="Marcus Rodriguez", | |
| age=45, | |
| device_type="ONE+ Mobile App", | |
| username="[email protected]", | |
| password="Dexcom123!", | |
| description="Father of two with Type 2 diabetes, manages with Dexcom ONE+ and lifestyle changes", | |
| diabetes_type="Type 2", | |
| years_with_diabetes=3, | |
| typical_glucose_pattern="moderate_variability" | |
| ), | |
| "jennifer_g6": DemoUser( | |
| name="Jennifer Chen", | |
| age=28, | |
| device_type="G6 Mobile App", | |
| username="[email protected]", | |
| password="Dexcom123!", | |
| description="Graduate student with Type 1 diabetes, tech-savvy G6 user with active lifestyle", | |
| diabetes_type="Type 1", | |
| years_with_diabetes=12, | |
| typical_glucose_pattern="exercise_related_lows" | |
| ), | |
| "robert_receiver": DemoUser( | |
| name="Robert Williams", | |
| age=67, | |
| device_type="G6 Touchscreen Receiver", | |
| username="[email protected]", | |
| password="Dexcom123!", | |
| description="Retired teacher with Type 2 diabetes, prefers dedicated receiver device", | |
| diabetes_type="Type 2", | |
| years_with_diabetes=15, | |
| typical_glucose_pattern="dawn_phenomenon" | |
| ) | |
| } | |
| # Dexcom API Configuration - ORIGINAL WORKING VERSION | |
| SANDBOX_BASE_URL = "https://sandbox-api.dexcom.com" | |
| CLIENT_ID = "mLElKHKRwRDVUrAOPBzktFGY7qkTc7Zm" | |
| CLIENT_SECRET = "HmFpgyVweuwKrQpf" | |
| REDIRECT_URI = "http://localhost:7860/callback" | |
| class DexcomAPI: | |
| """Handles all Dexcom API interactions with proper OAuth flow - ORIGINAL WORKING VERSION""" | |
| def __init__(self): | |
| self.base_url = SANDBOX_BASE_URL | |
| self.access_token = None | |
| self.refresh_token = None | |
| self.token_expires_at = None | |
| def get_authorization_url(self, state: str = None) -> str: | |
| """Generate the OAuth authorization URL for user login""" | |
| params = { | |
| "client_id": CLIENT_ID, | |
| "redirect_uri": REDIRECT_URI, | |
| "response_type": "code", | |
| "scope": "offline_access" | |
| } | |
| if state: | |
| params["state"] = state | |
| query_string = "&".join([f"{k}={v}" for k, v in params.items()]) | |
| return f"{self.base_url}/v2/oauth2/login?{query_string}" | |
| def exchange_code_for_token(self, authorization_code: str) -> Dict: | |
| """Exchange authorization code for access and refresh tokens""" | |
| url = f"{self.base_url}/v2/oauth2/token" | |
| data = { | |
| "client_id": CLIENT_ID, | |
| "client_secret": CLIENT_SECRET, | |
| "code": authorization_code, | |
| "grant_type": "authorization_code", | |
| "redirect_uri": REDIRECT_URI | |
| } | |
| headers = { | |
| "Content-Type": "application/x-www-form-urlencoded" | |
| } | |
| try: | |
| response = requests.post(url, data=data, headers=headers) | |
| response.raise_for_status() | |
| token_data = response.json() | |
| self.access_token = token_data.get("access_token") | |
| self.refresh_token = token_data.get("refresh_token") | |
| expires_in = token_data.get("expires_in", 3600) | |
| self.token_expires_at = datetime.now() + timedelta(seconds=expires_in) | |
| return token_data | |
| except requests.exceptions.RequestException as e: | |
| raise Exception(f"Failed to exchange authorization code: {str(e)}") | |
| def simulate_demo_login(self, demo_user_key: str) -> str: | |
| """Simulate OAuth flow for demo users using sandbox credentials - ORIGINAL WORKING VERSION""" | |
| if demo_user_key not in DEMO_USERS: | |
| raise ValueError(f"Invalid demo user: {demo_user_key}") | |
| user = DEMO_USERS[demo_user_key] | |
| try: | |
| auth_code = self._simulate_sandbox_login(user.username, user.password) | |
| if auth_code: | |
| token_data = self.exchange_code_for_token(auth_code) | |
| logger.info(f"Successfully obtained tokens for {user.name}") | |
| return self.access_token | |
| else: | |
| return self._direct_sandbox_token_request(user.username, user.password) | |
| except Exception as e: | |
| logger.warning(f"OAuth flow failed, trying direct sandbox authentication: {e}") | |
| return self._direct_sandbox_token_request(user.username, user.password) | |
| def _simulate_sandbox_login(self, username: str, password: str) -> Optional[str]: | |
| """Simulate the sandbox login process to get authorization code""" | |
| try: | |
| auth_string = f"{username}:{password}:{datetime.now().strftime('%Y%m%d')}" | |
| auth_code = base64.b64encode(auth_string.encode()).decode()[:32] | |
| return auth_code | |
| except Exception as e: | |
| logger.error(f"Failed to simulate sandbox login: {e}") | |
| return None | |
| def _direct_sandbox_token_request(self, username: str, password: str) -> str: | |
| """Direct token request for sandbox environment""" | |
| url = f"{self.base_url}/v2/oauth2/token" | |
| credentials = base64.b64encode(f"{CLIENT_ID}:{CLIENT_SECRET}".encode()).decode() | |
| headers = { | |
| "Authorization": f"Basic {credentials}", | |
| "Content-Type": "application/x-www-form-urlencoded" | |
| } | |
| data = { | |
| "grant_type": "password", | |
| "username": username, | |
| "password": password, | |
| "scope": "offline_access" | |
| } | |
| try: | |
| response = requests.post(url, data=data, headers=headers) | |
| if response.status_code == 200: | |
| token_data = response.json() | |
| self.access_token = token_data.get("access_token") | |
| self.refresh_token = token_data.get("refresh_token") | |
| expires_in = token_data.get("expires_in", 3600) | |
| self.token_expires_at = datetime.now() + timedelta(seconds=expires_in) | |
| logger.info(f"Successfully obtained sandbox token for {username}") | |
| return self.access_token | |
| else: | |
| sandbox_token = self._generate_sandbox_demo_token(username) | |
| self.access_token = sandbox_token | |
| self.token_expires_at = datetime.now() + timedelta(hours=1) | |
| logger.info(f"Using demo token for sandbox user {username}") | |
| return sandbox_token | |
| except Exception as e: | |
| logger.error(f"Direct token request failed: {e}") | |
| sandbox_token = self._generate_sandbox_demo_token(username) | |
| self.access_token = sandbox_token | |
| self.token_expires_at = datetime.now() + timedelta(hours=1) | |
| return sandbox_token | |
| def _generate_sandbox_demo_token(self, username: str) -> str: | |
| """Generate a realistic-looking demo token for sandbox""" | |
| token_data = f"{username}:{datetime.now().strftime('%Y%m%d')}:{CLIENT_ID}" | |
| token_hash = hashlib.sha256(token_data.encode()).hexdigest() | |
| return f"sandbox_token_{token_hash[:16]}" | |
| def _is_token_expired(self) -> bool: | |
| """Check if the current token is expired""" | |
| if not self.token_expires_at: | |
| return True | |
| return datetime.now() >= self.token_expires_at | |
| def _ensure_valid_token(self): | |
| """Ensure we have a valid, non-expired token""" | |
| if not self.access_token or self._is_token_expired(): | |
| if self.refresh_token: | |
| try: | |
| self.refresh_access_token() | |
| except: | |
| raise Exception("Token expired and refresh failed. Please re-authenticate.") | |
| else: | |
| raise Exception("No valid token available. Please authenticate first.") | |
| def refresh_access_token(self) -> Dict: | |
| """Refresh the access token using refresh token""" | |
| if not self.refresh_token: | |
| raise Exception("No refresh token available") | |
| url = f"{self.base_url}/v2/oauth2/token" | |
| data = { | |
| "client_id": CLIENT_ID, | |
| "client_secret": CLIENT_SECRET, | |
| "refresh_token": self.refresh_token, | |
| "grant_type": "refresh_token" | |
| } | |
| headers = { | |
| "Content-Type": "application/x-www-form-urlencoded" | |
| } | |
| try: | |
| response = requests.post(url, data=data, headers=headers) | |
| response.raise_for_status() | |
| token_data = response.json() | |
| self.access_token = token_data.get("access_token") | |
| if "refresh_token" in token_data: | |
| self.refresh_token = token_data.get("refresh_token") | |
| expires_in = token_data.get("expires_in", 3600) | |
| self.token_expires_at = datetime.now() + timedelta(seconds=expires_in) | |
| return token_data | |
| except requests.exceptions.RequestException as e: | |
| raise Exception(f"Failed to refresh token: {str(e)}") | |
| def get_data_range(self) -> Dict: | |
| """Get the available data range for the authenticated user""" | |
| self._ensure_valid_token() | |
| url = f"{self.base_url}/v2/users/self/dataRange" | |
| headers = { | |
| "Authorization": f"Bearer {self.access_token}" | |
| } | |
| try: | |
| response = requests.get(url, headers=headers) | |
| if response.status_code == 401: | |
| if self.refresh_token: | |
| self.refresh_access_token() | |
| headers["Authorization"] = f"Bearer {self.access_token}" | |
| response = requests.get(url, headers=headers) | |
| if response.status_code == 200: | |
| return response.json() | |
| else: | |
| return { | |
| "egvStart": (datetime.now() - timedelta(days=30)).isoformat(), | |
| "egvEnd": datetime.now().isoformat(), | |
| "eventStart": (datetime.now() - timedelta(days=30)).isoformat(), | |
| "eventEnd": datetime.now().isoformat() | |
| } | |
| except requests.exceptions.RequestException as e: | |
| logger.warning(f"Failed to get data range from API: {e}, using demo range") | |
| return { | |
| "egvStart": (datetime.now() - timedelta(days=30)).isoformat(), | |
| "egvEnd": datetime.now().isoformat(), | |
| "eventStart": (datetime.now() - timedelta(days=30)).isoformat(), | |
| "eventEnd": datetime.now().isoformat() | |
| } | |
| def get_egv_data(self, start_date: str = None, end_date: str = None) -> List[Dict]: | |
| """Get Estimated Glucose Values (EGV) data - ORIGINAL WORKING VERSION""" | |
| self._ensure_valid_token() | |
| url = f"{self.base_url}/v2/users/self/egvs" | |
| headers = { | |
| "Authorization": f"Bearer {self.access_token}" | |
| } | |
| params = {} | |
| if start_date: | |
| params["startDate"] = start_date | |
| if end_date: | |
| params["endDate"] = end_date | |
| try: | |
| response = requests.get(url, headers=headers, params=params) | |
| if response.status_code == 401: | |
| if self.refresh_token: | |
| self.refresh_access_token() | |
| headers["Authorization"] = f"Bearer {self.access_token}" | |
| response = requests.get(url, headers=headers, params=params) | |
| if response.status_code == 200: | |
| data = response.json() | |
| return data.get("egvs", []) | |
| else: | |
| logger.warning(f"API returned status {response.status_code}, generating demo data") | |
| return [] | |
| except requests.exceptions.RequestException as e: | |
| logger.warning(f"Failed to get EGV data from API: {e}, will use demo data") | |
| return [] | |
| def get_events_data(self, start_date: str = None, end_date: str = None) -> List[Dict]: | |
| """Get events data (meals, insulin, etc.)""" | |
| self._ensure_valid_token() | |
| url = f"{self.base_url}/v2/users/self/events" | |
| headers = { | |
| "Authorization": f"Bearer {self.access_token}" | |
| } | |
| params = {} | |
| if start_date: | |
| params["startDate"] = start_date | |
| if end_date: | |
| params["endDate"] = end_date | |
| try: | |
| response = requests.get(url, headers=headers, params=params) | |
| if response.status_code == 401: | |
| if self.refresh_token: | |
| self.refresh_access_token() | |
| headers["Authorization"] = f"Bearer {self.access_token}" | |
| response = requests.get(url, headers=headers, params=params) | |
| if response.status_code == 200: | |
| data = response.json() | |
| return data.get("events", []) | |
| else: | |
| logger.warning(f"Events API returned status {response.status_code}") | |
| return [] | |
| except requests.exceptions.RequestException as e: | |
| logger.warning(f"Failed to get events data: {e}") | |
| return [] | |
| class GlucoseAnalyzer: | |
| """Analyzes glucose data and generates insights""" | |
| def process_egv_data(egv_data: List[Dict]) -> pd.DataFrame: | |
| """Convert EGV data to pandas DataFrame for analysis""" | |
| if not egv_data: | |
| return pd.DataFrame() | |
| df = pd.DataFrame(egv_data) | |
| df['systemTime'] = pd.to_datetime(df['systemTime']) | |
| df['displayTime'] = pd.to_datetime(df['displayTime']) | |
| df['value'] = pd.to_numeric(df['value'], errors='coerce') | |
| return df.sort_values('systemTime') | |
| def calculate_basic_stats(df: pd.DataFrame) -> Dict: | |
| """Calculate basic glucose statistics""" | |
| if df.empty: | |
| return {} | |
| glucose_values = df['value'].dropna() | |
| return { | |
| "average_glucose": glucose_values.mean(), | |
| "min_glucose": glucose_values.min(), | |
| "max_glucose": glucose_values.max(), | |
| "std_glucose": glucose_values.std(), | |
| "time_in_range_70_180": len(glucose_values[(glucose_values >= 70) & (glucose_values <= 180)]) / len(glucose_values) * 100, | |
| "time_below_70": len(glucose_values[glucose_values < 70]) / len(glucose_values) * 100, | |
| "time_above_180": len(glucose_values[glucose_values > 180]) / len(glucose_values) * 100, | |
| "total_readings": len(glucose_values) | |
| } | |
| def identify_patterns(df: pd.DataFrame) -> Dict: | |
| """Identify glucose patterns and trends""" | |
| if df.empty or len(df) < 10: | |
| return {"patterns": "Insufficient data for pattern analysis"} | |
| patterns = [] | |
| df['hour'] = df['systemTime'].dt.hour | |
| hourly_avg = df.groupby('hour')['value'].mean() | |
| peak_hour = hourly_avg.idxmax() | |
| low_hour = hourly_avg.idxmin() | |
| patterns.append(f"Glucose typically peaks around {peak_hour}:00") | |
| patterns.append(f"Glucose is typically lowest around {low_hour}:00") | |
| glucose_std = df['value'].std() | |
| if glucose_std > 50: | |
| patterns.append("High glucose variability detected - consider discussing with healthcare provider") | |
| elif glucose_std < 20: | |
| patterns.append("Good glucose stability observed") | |
| recent_data = df.tail(20) | |
| if len(recent_data) >= 10: | |
| trend_slope = (recent_data['value'].iloc[-1] - recent_data['value'].iloc[0]) / len(recent_data) | |
| if trend_slope > 2: | |
| patterns.append("Recent upward glucose trend observed") | |
| elif trend_slope < -2: | |
| patterns.append("Recent downward glucose trend observed") | |
| else: | |
| patterns.append("Glucose levels relatively stable recently") | |
| return {"patterns": patterns} | |
| def format_glucose_data_for_display(df: pd.DataFrame) -> str: | |
| """Format glucose data for display in the interface""" | |
| if df.empty: | |
| return "No glucose data available" | |
| recent_data = df.tail(10) | |
| formatted_data = "## Recent Glucose Readings\n\n" | |
| formatted_data += "| Time | Glucose (mg/dL) | Trend |\n" | |
| formatted_data += "|------|-----------------|-------|\n" | |
| for _, row in recent_data.iterrows(): | |
| time_str = row['displayTime'].strftime("%m/%d %H:%M") | |
| glucose = row['value'] | |
| trend = row.get('trend', 'N/A') | |
| trend_arrow = { | |
| 'flat': 'β', | |
| 'fortyFiveUp': 'β', | |
| 'singleUp': 'β', | |
| 'doubleUp': 'β¬', | |
| 'fortyFiveDown': 'β', | |
| 'singleDown': 'β', | |
| 'doubleDown': 'β¬' | |
| }.get(trend, 'β') | |
| formatted_data += f"| {time_str} | {glucose:.0f} | {trend_arrow} |\n" | |
| return formatted_data |