import os from typing import List, Dict, Optional import numpy as np import pandas as pd from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse from pydantic import BaseModel, Field from chronos import Chronos2Pipeline # ========================= # Configuración del modelo # ========================= MODEL_ID = os.getenv("CHRONOS_MODEL_ID", "amazon/chronos-2") DEVICE_MAP = os.getenv("DEVICE_MAP", "cpu") # "cpu" o "cuda" app = FastAPI( title="Chronos-2 Universal Forecasting API + Excel Add-in", description=( "Servidor para pronósticos con Chronos-2: univariante, " "multivariante, covariables, escenarios, anomalías y backtesting. " "Incluye Excel Add-in v2.1.0 con archivos estáticos." ), version="2.1.0", ) # Configurar CORS para Excel Add-in app.add_middleware( CORSMiddleware, allow_origins=[ "https://localhost:3001", "https://localhost:3000", "https://ttzzs-chronos2-excel-forecasting-api.hf.space", "*" # Permitir todos los orígenes para Office Add-ins ], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Carga única del modelo al iniciar el proceso pipeline = Chronos2Pipeline.from_pretrained(MODEL_ID, device_map=DEVICE_MAP) # ========================= # Archivos estáticos para Excel Add-in # ========================= # Montar directorios estáticos si existen if os.path.exists("static"): app.mount("/assets", StaticFiles(directory="static/assets"), name="assets") app.mount("/taskpane", StaticFiles(directory="static/taskpane"), name="taskpane") app.mount("/commands", StaticFiles(directory="static/commands"), name="commands") # Endpoint para manifest.xml @app.get("/manifest.xml", response_class=FileResponse) async def get_manifest(): """Devuelve el manifest.xml del Excel Add-in""" return FileResponse("static/manifest.xml", media_type="application/xml") @app.get("/", tags=["Info"]) async def root_with_addon(): """Información del API + Add-in""" return { "name": "Chronos-2 Forecasting API", "version": "2.1.0", "model": MODEL_ID, "endpoints": { "api": [ "/health", "/forecast_univariate", "/forecast_multi_id", "/forecast_with_covariates", "/forecast_multivariate", "/forecast_scenarios", "/detect_anomalies", "/backtest_simple" ], "add_in": [ "/manifest.xml", "/taskpane/taskpane.html", "/assets/icon-*.png" ] }, "docs": "/docs", "excel_add_in": { "manifest_url": "https://ttzzs-chronos2-excel-forecasting-api.hf.space/manifest.xml", "version": "2.1.0", "features": [ "Univariate Forecast", "Multi-Series Forecast", "Forecast with Covariates", "Scenario Analysis", "Multivariate Forecast", "Anomaly Detection", "Backtest" ] } } else: @app.get("/", tags=["Info"]) async def root_api_only(): """Información del API (sin Add-in)""" return { "name": "Chronos-2 Forecasting API", "version": "2.1.0", "model": MODEL_ID, "docs": "/docs" } # ========================= # Modelos Pydantic comunes # ========================= class BaseForecastConfig(BaseModel): prediction_length: int = Field( 7, description="Horizonte de predicción (número de pasos futuros)" ) quantile_levels: List[float] = Field( default_factory=lambda: [0.1, 0.5, 0.9], description="Cuantiles para el pronóstico probabilístico", ) start_timestamp: Optional[str] = Field( default=None, description=( "Fecha/hora inicial del histórico (formato ISO). " "Si no se especifica, se usan índices enteros." ), ) freq: str = Field( "D", description="Frecuencia temporal (p.ej. 'D' diario, 'H' horario, 'W' semanal...).", ) class UnivariateSeries(BaseModel): values: List[float] class MultiSeriesItem(BaseModel): series_id: str values: List[float] class CovariatePoint(BaseModel): """ Punto temporal usado tanto para contexto (histórico) como para covariables futuras. """ timestamp: Optional[str] = None # opcional si se usan índices enteros id: Optional[str] = None # id de serie, por defecto 'series_0' target: Optional[float] = None # valor de la variable objetivo (histórico) covariates: Dict[str, float] = Field( default_factory=dict, description="Nombre -> valor de cada covariable dinámica.", ) # ========================= # 1) Healthcheck # ========================= @app.get("/health") def health(): """ Devuelve información básica del estado del servidor y el modelo cargado. """ return { "status": "ok", "model_id": MODEL_ID, "device_map": DEVICE_MAP, } # ========================= # 2) Pronóstico univariante # ========================= class ForecastUnivariateRequest(BaseForecastConfig): series: UnivariateSeries class ForecastUnivariateResponse(BaseModel): timestamps: List[str] median: List[float] quantiles: Dict[str, List[float]] # "0.1" -> [..], "0.9" -> [..] @app.post("/forecast_univariate", response_model=ForecastUnivariateResponse) def forecast_univariate(req: ForecastUnivariateRequest): """ Pronóstico para una sola serie temporal (univariante, sin covariables). Pensado para uso directo desde Excel u otras herramientas sencillas. """ values = req.series.values n = len(values) if n == 0: raise HTTPException(status_code=400, detail="La serie no puede estar vacía.") # Construimos contexto como DataFrame largo (id, timestamp, target) if req.start_timestamp: timestamps = pd.date_range( start=pd.to_datetime(req.start_timestamp), periods=n, freq=req.freq, ) else: timestamps = pd.RangeIndex(start=0, stop=n, step=1) context_df = pd.DataFrame( { "id": ["series_0"] * n, "timestamp": timestamps, "target": values, } ) pred_df = pipeline.predict_df( context_df, prediction_length=req.prediction_length, quantile_levels=req.quantile_levels, id_column="id", timestamp_column="timestamp", target="target", ) pred_df = pred_df.sort_values("timestamp") timestamps_out = pred_df["timestamp"].astype(str).tolist() median = pred_df["predictions"].astype(float).tolist() quantiles_dict: Dict[str, List[float]] = {} for q in req.quantile_levels: key = f"{q:.3g}" if key in pred_df.columns: quantiles_dict[key] = pred_df[key].astype(float).tolist() return ForecastUnivariateResponse( timestamps=timestamps_out, median=median, quantiles=quantiles_dict, ) # ========================= # 3) Multi-serie (multi-id) # ========================= class ForecastMultiSeriesRequest(BaseForecastConfig): series_list: List[MultiSeriesItem] class SeriesForecast(BaseModel): series_id: str timestamps: List[str] median: List[float] quantiles: Dict[str, List[float]] class ForecastMultiSeriesResponse(BaseModel): forecasts: List[SeriesForecast] @app.post("/forecast_multi_id", response_model=ForecastMultiSeriesResponse) def forecast_multi_id(req: ForecastMultiSeriesRequest): """ Pronóstico para múltiples series (por ejemplo, varios SKU o tiendas). """ if not req.series_list: raise HTTPException(status_code=400, detail="Debes enviar al menos una serie.") frames = [] for item in req.series_list: n = len(item.values) if n == 0: continue if req.start_timestamp: timestamps = pd.date_range( start=pd.to_datetime(req.start_timestamp), periods=n, freq=req.freq, ) else: timestamps = pd.RangeIndex(start=0, stop=n, step=1) frames.append( pd.DataFrame( { "id": [item.series_id] * n, "timestamp": timestamps, "target": item.values, } ) ) if not frames: raise HTTPException(status_code=400, detail="Todas las series están vacías.") context_df = pd.concat(frames, ignore_index=True) pred_df = pipeline.predict_df( context_df, prediction_length=req.prediction_length, quantile_levels=req.quantile_levels, id_column="id", timestamp_column="timestamp", target="target", ) forecasts: List[SeriesForecast] = [] for series_id, group in pred_df.groupby("id"): group = group.sort_values("timestamp") timestamps_out = group["timestamp"].astype(str).tolist() median = group["predictions"].astype(float).tolist() quantiles_dict: Dict[str, List[float]] = {} for q in req.quantile_levels: key = f"{q:.3g}" if key in group.columns: quantiles_dict[key] = group[key].astype(float).tolist() forecasts.append( SeriesForecast( series_id=series_id, timestamps=timestamps_out, median=median, quantiles=quantiles_dict, ) ) return ForecastMultiSeriesResponse(forecasts=forecasts) # ========================= # 4) Pronóstico con covariables # ========================= class ForecastWithCovariatesRequest(BaseForecastConfig): context: List[CovariatePoint] future: Optional[List[CovariatePoint]] = None class ForecastWithCovariatesResponse(BaseModel): # filas con todas las columnas de pred_df serializadas como string pred_df: List[Dict[str, str]] @app.post("/forecast_with_covariates", response_model=ForecastWithCovariatesResponse) def forecast_with_covariates(req: ForecastWithCovariatesRequest): """ Pronóstico con información de covariables (promos, precio, clima...) tanto en el histórico (context) como en futuros posibles (future). """ if not req.context: raise HTTPException(status_code=400, detail="El contexto no puede estar vacío.") ctx_rows = [] for p in req.context: if p.target is None: continue row = { "id": p.id or "series_0", "timestamp": p.timestamp, "target": p.target, } for k, v in p.covariates.items(): row[k] = v ctx_rows.append(row) context_df = pd.DataFrame(ctx_rows) if "timestamp" not in context_df or context_df["timestamp"].isna().any(): context_df["timestamp"] = pd.RangeIndex(start=0, stop=len(context_df), step=1) future_df = None if req.future: fut_rows = [] for p in req.future: row = { "id": p.id or "series_0", "timestamp": p.timestamp, } for k, v in p.covariates.items(): row[k] = v fut_rows.append(row) future_df = pd.DataFrame(fut_rows) if "timestamp" not in future_df or future_df["timestamp"].isna().any(): future_df["timestamp"] = pd.RangeIndex( start=context_df["timestamp"].max() + 1, stop=context_df["timestamp"].max() + 1 + len(future_df), step=1, ) pred_df = pipeline.predict_df( context_df, future_df=future_df, prediction_length=req.prediction_length, quantile_levels=req.quantile_levels, id_column="id", timestamp_column="timestamp", target="target", ) pred_df = pred_df.sort_values(["id", "timestamp"]) out_records: List[Dict[str, str]] = [] for _, row in pred_df.iterrows(): record = {k: str(v) for k, v in row.items()} out_records.append(record) return ForecastWithCovariatesResponse(pred_df=out_records) # ========================= # 5) Multivariante (varios targets) # ========================= class MultivariateContextPoint(BaseModel): timestamp: Optional[str] = None id: Optional[str] = None targets: Dict[str, float] # p.ej. {"demand": 100, "returns": 5} covariates: Dict[str, float] = Field(default_factory=dict) class ForecastMultivariateRequest(BaseForecastConfig): context: List[MultivariateContextPoint] target_columns: List[str] # nombres de columnas objetivo class ForecastMultivariateResponse(BaseModel): pred_df: List[Dict[str, str]] @app.post("/forecast_multivariate", response_model=ForecastMultivariateResponse) def forecast_multivariate(req: ForecastMultivariateRequest): """ Pronóstico multivariante: múltiples columnas objetivo (p.ej. demanda y devoluciones). """ if not req.context: raise HTTPException(status_code=400, detail="El contexto no puede estar vacío.") if not req.target_columns: raise HTTPException(status_code=400, detail="Debes indicar columnas objetivo.") rows = [] for p in req.context: base = { "id": p.id or "series_0", "timestamp": p.timestamp, } for t_name, t_val in p.targets.items(): base[t_name] = t_val for k, v in p.covariates.items(): base[k] = v rows.append(base) context_df = pd.DataFrame(rows) if "timestamp" not in context_df or context_df["timestamp"].isna().any(): context_df["timestamp"] = pd.RangeIndex(start=0, stop=len(context_df), step=1) pred_df = pipeline.predict_df( context_df, prediction_length=req.prediction_length, quantile_levels=req.quantile_levels, id_column="id", timestamp_column="timestamp", target=req.target_columns, ) pred_df = pred_df.sort_values(["id", "timestamp"]) out_records = [{k: str(v) for k, v in row.items()} for _, row in pred_df.iterrows()] return ForecastMultivariateResponse(pred_df=out_records) # ========================= # 6) Escenarios (what-if) # ========================= class ScenarioDefinition(BaseModel): name: str future_covariates: List[CovariatePoint] class ScenarioForecast(BaseModel): name: str pred_df: List[Dict[str, str]] class ForecastScenariosRequest(BaseForecastConfig): context: List[CovariatePoint] scenarios: List[ScenarioDefinition] class ForecastScenariosResponse(BaseModel): scenarios: List[ScenarioForecast] @app.post("/forecast_scenarios", response_model=ForecastScenariosResponse) def forecast_scenarios(req: ForecastScenariosRequest): """ Evaluación de múltiples escenarios (what-if) cambiando las covariables futuras (por ejemplo, promo ON/OFF, diferentes precios, etc.). """ if not req.context: raise HTTPException(status_code=400, detail="El contexto no puede estar vacío.") if not req.scenarios: raise HTTPException(status_code=400, detail="Debes definir al menos un escenario.") ctx_rows = [] for p in req.context: if p.target is None: continue row = { "id": p.id or "series_0", "timestamp": p.timestamp, "target": p.target, } for k, v in p.covariates.items(): row[k] = v ctx_rows.append(row) context_df = pd.DataFrame(ctx_rows) if "timestamp" not in context_df or context_df["timestamp"].isna().any(): context_df["timestamp"] = pd.RangeIndex(start=0, stop=len(context_df), step=1) results: List[ScenarioForecast] = [] for scen in req.scenarios: fut_rows = [] for p in scen.future_covariates: row = { "id": p.id or "series_0", "timestamp": p.timestamp, } for k, v in p.covariates.items(): row[k] = v fut_rows.append(row) future_df = pd.DataFrame(fut_rows) if "timestamp" not in future_df or future_df["timestamp"].isna().any(): future_df["timestamp"] = pd.RangeIndex( start=context_df["timestamp"].max() + 1, stop=context_df["timestamp"].max() + 1 + len(future_df), step=1, ) pred_df = pipeline.predict_df( context_df, future_df=future_df, prediction_length=req.prediction_length, quantile_levels=req.quantile_levels, id_column="id", timestamp_column="timestamp", target="target", ) pred_df = pred_df.sort_values(["id", "timestamp"]) out_records = [{k: str(v) for k, v in row.items()} for _, row in pred_df.iterrows()] results.append(ScenarioForecast(name=scen.name, pred_df=out_records)) return ForecastScenariosResponse(scenarios=results) # ========================= # 7) Detección de anomalías # ========================= class AnomalyDetectionRequest(BaseModel): context: UnivariateSeries recent_observed: List[float] prediction_length: int = 7 quantile_low: float = 0.05 quantile_high: float = 0.95 class AnomalyPoint(BaseModel): index: int value: float predicted_median: float lower: float upper: float is_anomaly: bool class AnomalyDetectionResponse(BaseModel): anomalies: List[AnomalyPoint] @app.post("/detect_anomalies", response_model=AnomalyDetectionResponse) def detect_anomalies(req: AnomalyDetectionRequest): """ Marca como anomalías los puntos observados recientes que caen fuera del intervalo [quantile_low, quantile_high] del pronóstico. """ n_hist = len(req.context.values) if n_hist == 0: raise HTTPException(status_code=400, detail="La serie histórica no puede estar vacía.") if len(req.recent_observed) != req.prediction_length: raise HTTPException( status_code=400, detail="recent_observed debe tener la misma longitud que prediction_length.", ) context_df = pd.DataFrame( { "id": ["series_0"] * n_hist, "timestamp": pd.RangeIndex(start=0, stop=n_hist, step=1), "target": req.context.values, } ) quantiles = sorted({req.quantile_low, 0.5, req.quantile_high}) pred_df = pipeline.predict_df( context_df, prediction_length=req.prediction_length, quantile_levels=quantiles, id_column="id", timestamp_column="timestamp", target="target", ).sort_values("timestamp") q_low_col = f"{req.quantile_low:.3g}" q_high_col = f"{req.quantile_high:.3g}" anomalies: List[AnomalyPoint] = [] for i, (obs, (_, row)) in enumerate(zip(req.recent_observed, pred_df.iterrows())): lower = float(row[q_low_col]) upper = float(row[q_high_col]) median = float(row["predictions"]) is_anom = (obs < lower) or (obs > upper) anomalies.append( AnomalyPoint( index=i, value=obs, predicted_median=median, lower=lower, upper=upper, is_anomaly=is_anom, ) ) return AnomalyDetectionResponse(anomalies=anomalies) # ========================= # 8) Backtest simple # ========================= class BacktestRequest(BaseModel): series: UnivariateSeries prediction_length: int = 7 test_length: int = 28 class BacktestMetrics(BaseModel): mae: float mape: float wql: float # Weighted Quantile Loss aproximada para el cuantil 0.5 class BacktestResponse(BaseModel): metrics: BacktestMetrics forecast_median: List[float] forecast_timestamps: List[str] actuals: List[float] @app.post("/backtest_simple", response_model=BacktestResponse) def backtest_simple(req: BacktestRequest): """ Backtest sencillo: separamos un tramo final de la serie como test, pronosticamos ese tramo y calculamos métricas MAE / MAPE / WQL. """ values = np.array(req.series.values, dtype=float) n = len(values) if n <= req.test_length: raise HTTPException( status_code=400, detail="La serie debe ser más larga que test_length.", ) train = values[: n - req.test_length] test = values[n - req.test_length :] context_df = pd.DataFrame( { "id": ["series_0"] * len(train), "timestamp": pd.RangeIndex(start=0, stop=len(train), step=1), "target": train.tolist(), } ) pred_df = pipeline.predict_df( context_df, prediction_length=req.test_length, quantile_levels=[0.5], id_column="id", timestamp_column="timestamp", target="target", ).sort_values("timestamp") forecast = pred_df["predictions"].to_numpy(dtype=float) timestamps = pred_df["timestamp"].astype(str).tolist() mae = float(np.mean(np.abs(test - forecast))) eps = 1e-8 mape = float(np.mean(np.abs((test - forecast) / (test + eps)))) * 100.0 tau = 0.5 diff = test - forecast wql = float(np.mean(np.maximum(tau * diff, (tau - 1) * diff))) metrics = BacktestMetrics(mae=mae, mape=mape, wql=wql) return BacktestResponse( metrics=metrics, forecast_median=forecast.tolist(), forecast_timestamps=timestamps, actuals=test.tolist(), )