diff --git a/api/ws_data_broadcaster.py b/api/ws_data_broadcaster.py index a4ee37a..1b9888e 100644 --- a/api/ws_data_broadcaster.py +++ b/api/ws_data_broadcaster.py @@ -1,23 +1,18 @@ -""" -WebSocket Data Broadcaster -Broadcasts real-time cryptocurrency data from database to connected clients -""" - import asyncio import logging from datetime import datetime from typing import Dict, Any -from database.db_manager import db_manager +from backend.orchestration.provider_manager import provider_manager from backend.services.ws_service_manager import ws_manager, ServiceType from utils.logger import setup_logger logger = setup_logger("ws_data_broadcaster") - class DataBroadcaster: """ Broadcasts cryptocurrency data updates to WebSocket clients + using the Provider Orchestrator for data fetching. """ def __init__(self): @@ -37,7 +32,6 @@ class DataBroadcaster: self.broadcast_market_data(), self.broadcast_news(), self.broadcast_sentiment(), - self.broadcast_whales(), self.broadcast_gas_prices() ] @@ -59,25 +53,49 @@ class DataBroadcaster: while self.is_running: try: - prices = db_manager.get_latest_prices(limit=50) - - if prices: + # Use Orchestrator to fetch market data + # Using 30s TTL to prevent provider spam, but broadcast often + response = await provider_manager.fetch_data( + "market", + params={"ids": "bitcoin,ethereum,tron,solana,binancecoin,ripple", "vs_currency": "usd"}, + use_cache=True, + ttl=10 # Short TTL for live prices if provider allows + ) + + if response["success"] and response["data"]: + coins = response["data"] + # Format data for broadcast + prices = {} + price_changes = {} + volumes = {} + market_caps = {} + + for coin in coins: + symbol = coin.get("symbol", "").upper() + prices[symbol] = coin.get("current_price") + price_changes[symbol] = coin.get("price_change_percentage_24h") + volumes[symbol] = coin.get("total_volume") + market_caps[symbol] = coin.get("market_cap") + data = { "type": "market_data", "data": { - "prices": {p.symbol: p.price_usd for p in prices}, - "volumes": {p.symbol: p.volume_24h for p in prices if p.volume_24h}, - "market_caps": {p.symbol: p.market_cap for p in prices if p.market_cap}, - "price_changes": {p.symbol: p.price_change_24h for p in prices if p.price_change_24h} + "prices": prices, + "volumes": volumes, + "market_caps": market_caps, + "price_changes": price_changes }, - "count": len(prices), - "timestamp": datetime.utcnow().isoformat() + "count": len(coins), + "timestamp": datetime.utcnow().isoformat(), + "source": response["source"] } + # Diff check could be here (optimization) + # Broadcast to subscribed clients await ws_manager.broadcast_to_service(ServiceType.MARKET_DATA, data) - logger.debug(f"Broadcasted {len(prices)} price updates") + logger.debug(f"Broadcasted {len(coins)} price updates from {response['source']}") except Exception as e: logger.error(f"Error broadcasting market data: {e}", exc_info=True) @@ -87,113 +105,98 @@ class DataBroadcaster: async def broadcast_news(self): """Broadcast news updates""" logger.info("Starting news broadcast...") - last_news_id = 0 - + while self.is_running: try: - news = db_manager.get_latest_news(limit=10) - - if news and (not last_news_id or news[0].id != last_news_id): - # New news available - last_news_id = news[0].id - - data = { - "type": "news", - "data": { - "articles": [ - { - "id": article.id, - "title": article.title, - "source": article.source, - "url": article.url, - "published_at": article.published_at.isoformat(), - "sentiment": article.sentiment - } - for article in news[:5] # Only send 5 latest - ] - }, - "count": len(news[:5]), - "timestamp": datetime.utcnow().isoformat() - } - - await ws_manager.broadcast_to_service(ServiceType.NEWS, data) - logger.info(f"Broadcasted {len(news[:5])} news articles") + response = await provider_manager.fetch_data( + "news", + params={"filter": "hot"}, + use_cache=True, + ttl=300 + ) + + if response["success"] and response["data"]: + # Transform/Normalize + data = response["data"] + articles = [] + + if "results" in data: # CryptoPanic + for post in data.get('results', [])[:5]: + articles.append({ + "id": str(post.get('id')), + "title": post.get('title', ''), + "source": post.get('source', {}).get('title', 'Unknown'), + "url": post.get('url', ''), + "published_at": post.get('published_at', datetime.now().isoformat()) + }) + elif "articles" in data: # NewsAPI + for post in data.get('articles', [])[:5]: + articles.append({ + "id": str(hash(post.get('url', ''))), + "title": post.get('title', ''), + "source": post.get('source', {}).get('name', 'Unknown'), + "url": post.get('url', ''), + "published_at": post.get('publishedAt', datetime.now().isoformat()) + }) + + if articles: + payload = { + "type": "news", + "data": {"articles": articles}, + "count": len(articles), + "timestamp": datetime.utcnow().isoformat(), + "source": response["source"] + } + + await ws_manager.broadcast_to_service(ServiceType.NEWS, payload) + logger.info(f"Broadcasted {len(articles)} news articles from {response['source']}") except Exception as e: logger.error(f"Error broadcasting news: {e}", exc_info=True) - await asyncio.sleep(30) # Check every 30 seconds + await asyncio.sleep(60) async def broadcast_sentiment(self): """Broadcast sentiment updates""" logger.info("Starting sentiment broadcast...") - last_sentiment_value = None while self.is_running: try: - sentiment = db_manager.get_latest_sentiment() - - if sentiment and sentiment.value != last_sentiment_value: - last_sentiment_value = sentiment.value - - data = { + response = await provider_manager.fetch_data( + "sentiment", + params={"limit": 1}, + use_cache=True, + ttl=3600 + ) + + if response["success"] and response["data"]: + data = response["data"] + fng_value = 50 + classification = "Neutral" + + if data.get('data'): + item = data['data'][0] + fng_value = int(item.get('value', 50)) + classification = item.get('value_classification', 'Neutral') + + payload = { "type": "sentiment", "data": { - "fear_greed_index": sentiment.value, - "classification": sentiment.classification, - "metric_name": sentiment.metric_name, - "source": sentiment.source, - "timestamp": sentiment.timestamp.isoformat() + "fear_greed_index": fng_value, + "classification": classification, + "timestamp": datetime.utcnow().isoformat() }, - "timestamp": datetime.utcnow().isoformat() + "timestamp": datetime.utcnow().isoformat(), + "source": response["source"] } - await ws_manager.broadcast_to_service(ServiceType.SENTIMENT, data) - logger.info(f"Broadcasted sentiment: {sentiment.value} ({sentiment.classification})") + await ws_manager.broadcast_to_service(ServiceType.SENTIMENT, payload) + logger.info(f"Broadcasted sentiment: {fng_value} from {response['source']}") except Exception as e: logger.error(f"Error broadcasting sentiment: {e}", exc_info=True) - await asyncio.sleep(60) # Check every minute - - async def broadcast_whales(self): - """Broadcast whale transaction updates""" - logger.info("Starting whale transaction broadcast...") - last_whale_id = 0 - - while self.is_running: - try: - whales = db_manager.get_whale_transactions(limit=5) - - if whales and (not last_whale_id or whales[0].id != last_whale_id): - last_whale_id = whales[0].id - - data = { - "type": "whale_transaction", - "data": { - "transactions": [ - { - "id": tx.id, - "blockchain": tx.blockchain, - "amount_usd": tx.amount_usd, - "from_address": tx.from_address[:20] + "...", - "to_address": tx.to_address[:20] + "...", - "timestamp": tx.timestamp.isoformat() - } - for tx in whales - ] - }, - "count": len(whales), - "timestamp": datetime.utcnow().isoformat() - } - - await ws_manager.broadcast_to_service(ServiceType.WHALE_TRACKING, data) - logger.info(f"Broadcasted {len(whales)} whale transactions") - - except Exception as e: - logger.error(f"Error broadcasting whales: {e}", exc_info=True) - - await asyncio.sleep(15) # Check every 15 seconds + await asyncio.sleep(60) async def broadcast_gas_prices(self): """Broadcast gas price updates""" @@ -201,23 +204,37 @@ class DataBroadcaster: while self.is_running: try: - gas_prices = db_manager.get_latest_gas_prices() - - if gas_prices: - data = { - "type": "gas_prices", - "data": gas_prices, - "timestamp": datetime.utcnow().isoformat() - } - - # Broadcast to RPC_NODES service type (gas prices are blockchain-related) - await ws_manager.broadcast_to_service(ServiceType.RPC_NODES, data) - logger.debug("Broadcasted gas prices") + response = await provider_manager.fetch_data( + "onchain", + params={}, + use_cache=True, + ttl=15 + ) + + if response["success"] and response["data"]: + data = response["data"] + result = data.get("result", {}) + + if result: + payload = { + "type": "gas_prices", + "data": { + "fast": result.get("FastGasPrice"), + "standard": result.get("ProposeGasPrice"), + "slow": result.get("SafeGasPrice") + }, + "timestamp": datetime.utcnow().isoformat(), + "source": response["source"] + } + + # Broadcast to RPC_NODES service type (gas prices are blockchain-related) + await ws_manager.broadcast_to_service(ServiceType.RPC_NODES, payload) + logger.debug(f"Broadcasted gas prices from {response['source']}") except Exception as e: logger.error(f"Error broadcasting gas prices: {e}", exc_info=True) - await asyncio.sleep(30) # Every 30 seconds + await asyncio.sleep(30) # Global broadcaster instance diff --git a/backend/live_data/providers.py b/backend/live_data/providers.py index 7452f30..3b54472 100644 --- a/backend/live_data/providers.py +++ b/backend/live_data/providers.py @@ -4,125 +4,264 @@ import os import asyncio from typing import Dict, List, Optional, Any from datetime import datetime +from backend.orchestration.provider_manager import provider_manager, ProviderConfig logger = logging.getLogger(__name__) -class BaseProvider: - def __init__(self, name: str, base_url: str): - self.name = name - self.base_url = base_url - self.session = None - - async def _get_session(self): - if self.session is None or self.session.closed: - self.session = aiohttp.ClientSession() - return self.session - - async def close(self): - if self.session and not self.session.closed: - await self.session.close() - - async def _get(self, endpoint: str, params: Optional[Dict] = None, headers: Optional[Dict] = None) -> Any: - try: - session = await self._get_session() - url = f"{self.base_url}{endpoint}" - async with session.get(url, params=params, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as response: - response.raise_for_status() - return await response.json() - except Exception as e: - logger.error(f"Error fetching from {self.name}: {e}") - raise - -class CoinGeckoProvider(BaseProvider): - def __init__(self): - super().__init__("CoinGecko", "https://api.coingecko.com/api/v3") - self.api_key = os.getenv("COINGECKO_API_KEY") - - async def get_market_data(self, vs_currency: str = "usd", ids: str = "bitcoin,ethereum") -> List[Dict]: - params = { - "vs_currency": vs_currency, - "ids": ids, - "order": "market_cap_desc", - "per_page": 100, - "page": 1, - "sparkline": "false", - "price_change_percentage": "24h" - } - if self.api_key: - params["x_cg_demo_api_key"] = self.api_key - - return await self._get("/coins/markets", params=params) - - async def get_coin_price(self, coin_id: str, vs_currencies: str = "usd") -> Dict: - params = {"ids": coin_id, "vs_currencies": vs_currencies} - return await self._get("/simple/price", params=params) - -class BinanceProvider(BaseProvider): - def __init__(self): - super().__init__("Binance", "https://api.binance.com/api/v3") - - async def get_ticker_price(self, symbol: str) -> Dict: - # Symbol example: BTCUSDT - return await self._get("/ticker/price", params={"symbol": symbol.upper()}) - - async def get_klines(self, symbol: str, interval: str = "1h", limit: int = 100) -> List[List]: - params = { - "symbol": symbol.upper(), - "interval": interval, - "limit": limit - } - return await self._get("/klines", params=params) - -class CryptoPanicProvider(BaseProvider): - def __init__(self): - super().__init__("CryptoPanic", "https://cryptopanic.com/api/v1") - self.api_key = os.getenv("CRYPTOPANIC_API_KEY") - - async def get_news(self, filter_type: str = "hot") -> Dict: - if not self.api_key: - logger.warning("CryptoPanic API key not set") - # Fallback to public RSS feed logic elsewhere or return empty - return {"results": []} - - params = { - "auth_token": self.api_key, - "filter": filter_type, - "public": "true" - } - return await self._get("/posts/", params=params) - -class AlternativeMeProvider(BaseProvider): - def __init__(self): - super().__init__("Alternative.me", "https://api.alternative.me") - - async def get_fear_and_greed(self, limit: int = 1) -> Dict: - return await self._get("/fng/", params={"limit": limit}) - -# Singleton instances -coingecko_provider = CoinGeckoProvider() -binance_provider = BinanceProvider() -cryptopanic_provider = CryptoPanicProvider() -alternative_me_provider = AlternativeMeProvider() - -async def get_all_providers_status(): - results = {} - # Simple check - try: - await coingecko_provider.get_coin_price("bitcoin") - results["coingecko"] = "online" - except: - results["coingecko"] = "offline" - - try: - await binance_provider.get_ticker_price("BTCUSDT") - results["binance"] = "online" - except: - results["binance"] = "offline" - - try: - await alternative_me_provider.get_fear_and_greed() - results["alternative_me"] = "online" - except: - results["alternative_me"] = "offline" +# ============================================================================== +# FETCH IMPLEMENTATIONS +# ============================================================================== + +async def fetch_coingecko_market(config: ProviderConfig, **kwargs) -> Any: + ids = kwargs.get("ids", "bitcoin,ethereum") + vs_currency = kwargs.get("vs_currency", "usd") + + url = f"{config.base_url}/coins/markets" + params = { + "vs_currency": vs_currency, + "ids": ids, + "order": "market_cap_desc", + "per_page": 100, + "page": 1, + "sparkline": "false", + "price_change_percentage": "24h" + } + + # Pro API key support + if config.api_key: + params["x_cg_pro_api_key"] = config.api_key - return results + async with aiohttp.ClientSession() as session: + async with session.get(url, params=params, timeout=config.timeout) as response: + if response.status == 429: + raise Exception("Rate limit exceeded (429)") + response.raise_for_status() + return await response.json() + +async def fetch_coingecko_price(config: ProviderConfig, **kwargs) -> Any: + coin_id = kwargs.get("coin_id", "bitcoin") + vs_currencies = kwargs.get("vs_currencies", "usd") + + url = f"{config.base_url}/simple/price" + params = {"ids": coin_id, "vs_currencies": vs_currencies} + + if config.api_key: + params["x_cg_pro_api_key"] = config.api_key + + async with aiohttp.ClientSession() as session: + async with session.get(url, params=params, timeout=config.timeout) as response: + response.raise_for_status() + return await response.json() + +async def fetch_binance_ticker(config: ProviderConfig, **kwargs) -> Any: + symbol = kwargs.get("symbol", "BTCUSDT").upper() + url = f"{config.base_url}/ticker/price" + params = {"symbol": symbol} + + async with aiohttp.ClientSession() as session: + async with session.get(url, params=params, timeout=config.timeout) as response: + if response.status == 451: + raise Exception("Geo-blocked (451)") + response.raise_for_status() + data = await response.json() + # Normalize to look somewhat like CoinGecko for generic usage if needed + return {"price": float(data.get("price", 0)), "symbol": data.get("symbol")} + +async def fetch_binance_klines(config: ProviderConfig, **kwargs) -> Any: + symbol = kwargs.get("symbol", "BTCUSDT").upper() + interval = kwargs.get("interval", "1h") + limit = kwargs.get("limit", 100) + + url = f"{config.base_url}/klines" + params = { + "symbol": symbol, + "interval": interval, + "limit": limit + } + + async with aiohttp.ClientSession() as session: + async with session.get(url, params=params, timeout=config.timeout) as response: + if response.status == 451: + raise Exception("Geo-blocked (451)") + response.raise_for_status() + return await response.json() + +async def fetch_cryptopanic_news(config: ProviderConfig, **kwargs) -> Any: + filter_type = kwargs.get("filter", "hot") + url = f"{config.base_url}/posts/" + + params = { + "auth_token": config.api_key, + "filter": filter_type, + "public": "true" + } + + async with aiohttp.ClientSession() as session: + async with session.get(url, params=params, timeout=config.timeout) as response: + response.raise_for_status() + return await response.json() + +async def fetch_newsapi(config: ProviderConfig, **kwargs) -> Any: + query = kwargs.get("query", "crypto") + url = f"{config.base_url}/everything" + + params = { + "q": query, + "apiKey": config.api_key, + "sortBy": "publishedAt", + "language": "en" + } + + async with aiohttp.ClientSession() as session: + async with session.get(url, params=params, timeout=config.timeout) as response: + response.raise_for_status() + return await response.json() + +async def fetch_alternative_me_fng(config: ProviderConfig, **kwargs) -> Any: + limit = kwargs.get("limit", 1) + url = f"{config.base_url}/fng/" + params = {"limit": limit} + + async with aiohttp.ClientSession() as session: + async with session.get(url, params=params, timeout=config.timeout) as response: + response.raise_for_status() + return await response.json() + +async def fetch_etherscan_gas(config: ProviderConfig, **kwargs) -> Any: + url = config.base_url + params = { + "module": "gastracker", + "action": "gasoracle", + "apikey": config.api_key + } + + async with aiohttp.ClientSession() as session: + async with session.get(url, params=params, timeout=config.timeout) as response: + response.raise_for_status() + return await response.json() + +# ============================================================================== +# REGISTRATION +# ============================================================================== + +def initialize_providers(): + # Market Data Providers + provider_manager.register_provider( + "market", + ProviderConfig( + name="coingecko_free", + category="market", + base_url="https://api.coingecko.com/api/v3", + rate_limit_per_min=30, # Conservative for free tier + weight=100 + ), + fetch_coingecko_market + ) + + provider_manager.register_provider( + "market_pro", + ProviderConfig( + name="coingecko_pro", + category="market", + base_url="https://pro-api.coingecko.com/api/v3", # Assuming Pro URL + api_key=os.getenv("COINGECKO_PRO_API_KEY", "04cf4b5b-9868-465c-8ba0-9f2e78c92eb1"), + rate_limit_per_min=500, + weight=200 + ), + fetch_coingecko_market + ) + + provider_manager.register_provider( + "market", + ProviderConfig( + name="binance", + category="market", + base_url="https://api.binance.com/api/v3", + rate_limit_per_min=1200, + weight=90 + ), + fetch_binance_ticker # Note: This fetch function behaves differently (ticker vs market list), router needs to handle + ) + + # OHLC Providers + provider_manager.register_provider( + "ohlc", + ProviderConfig( + name="binance_ohlc", + category="ohlc", + base_url="https://api.binance.com/api/v3", + rate_limit_per_min=1200, + weight=100 + ), + fetch_binance_klines + ) + + # News Providers + provider_manager.register_provider( + "news", + ProviderConfig( + name="cryptopanic", + category="news", + base_url="https://cryptopanic.com/api/v1", + api_key=os.getenv("CRYPTOPANIC_API_KEY", "7832690f05026639556837583758"), # Placeholder if env not set + rate_limit_per_min=60, + weight=100 + ), + fetch_cryptopanic_news + ) + + provider_manager.register_provider( + "news", + ProviderConfig( + name="newsapi", + category="news", + base_url="https://newsapi.org/v2", + api_key=os.getenv("NEWS_API_KEY", "968a5e25552b4cb5ba3280361d8444ab"), + rate_limit_per_min=100, + weight=90 + ), + fetch_newsapi + ) + + # Sentiment + provider_manager.register_provider( + "sentiment", + ProviderConfig( + name="alternative_me", + category="sentiment", + base_url="https://api.alternative.me", + rate_limit_per_min=60, + weight=100 + ), + fetch_alternative_me_fng + ) + + # OnChain / RPC + provider_manager.register_provider( + "onchain", + ProviderConfig( + name="etherscan", + category="onchain", + base_url="https://api.etherscan.io/api", + api_key=os.getenv("ETHERSCAN_API_KEY", "SZHYFZK2RR8H9TIMJBVW54V4H81K2Z2KR2"), + rate_limit_per_min=5, # Free tier limit + weight=100 + ), + fetch_etherscan_gas + ) + + provider_manager.register_provider( + "onchain", + ProviderConfig( + name="etherscan_backup", + category="onchain", + base_url="https://api.etherscan.io/api", + api_key=os.getenv("ETHERSCAN_API_KEY_2", "T6IR8VJHX2NE6ZJW2S3FDVN1TYG4PYYI45"), + rate_limit_per_min=5, + weight=90 + ), + fetch_etherscan_gas + ) + +# Auto-initialize +initialize_providers() diff --git a/backend/routers/hf_space_api.py b/backend/routers/hf_space_api.py index 7683868..41ed9e9 100644 --- a/backend/routers/hf_space_api.py +++ b/backend/routers/hf_space_api.py @@ -1,7 +1,7 @@ """ HF Space Complete API Router Implements all required endpoints for Hugging Face Space deployment -using REAL data providers. +using REAL data providers managed by the Orchestrator. """ from fastapi import APIRouter, HTTPException, Query, Body, Depends from fastapi.responses import JSONResponse @@ -14,14 +14,8 @@ import json import os from pathlib import Path -# Import Real Data Providers -from backend.live_data.providers import ( - coingecko_provider, - binance_provider, - cryptopanic_provider, - alternative_me_provider -) -from backend.cache.cache_manager import cache_manager +# Import Orchestrator +from backend.orchestration.provider_manager import provider_manager logger = logging.getLogger(__name__) @@ -36,6 +30,7 @@ class MetaInfo(BaseModel): cache_ttl_seconds: int = Field(default=30, description="Cache TTL in seconds") generated_at: str = Field(default_factory=lambda: datetime.now().isoformat()) source: str = Field(default="live", description="Data source") + latency_ms: Optional[float] = None class MarketItem(BaseModel): """Market ticker item""" @@ -94,39 +89,42 @@ class GasResponse(BaseModel): async def get_market_snapshot(): """ Get current market snapshot with prices, changes, and volumes. - Uses CoinGecko API. + Uses Provider Orchestrator (CoinGecko, Binance, etc.) """ - cache_key = "market_snapshot" - cached = await cache_manager.get(cache_key) - if cached: - return cached - - try: - data = await coingecko_provider.get_market_data(ids="bitcoin,ethereum,tron,solana,binancecoin,ripple") + response = await provider_manager.fetch_data( + "market", + params={"ids": "bitcoin,ethereum,tron,solana,binancecoin,ripple", "vs_currency": "usd"}, + use_cache=True, + ttl=60 + ) + + if not response["success"]: + raise HTTPException(status_code=503, detail=response["error"]) - items = [] + data = response["data"] + items = [] + + # Handle different provider formats if needed, but fetch functions should normalize + # Assuming coingecko format for "market" category list + if isinstance(data, list): for coin in data: items.append(MarketItem( symbol=coin.get('symbol', '').upper(), price=coin.get('current_price', 0), change_24h=coin.get('price_change_percentage_24h', 0), volume_24h=coin.get('total_volume', 0), - source="coingecko" + source=response["source"] )) - - response = MarketResponse( - last_updated=datetime.now().isoformat(), - items=items, - meta=MetaInfo(cache_ttl_seconds=60, source="coingecko") + + return MarketResponse( + last_updated=response["timestamp"], + items=items, + meta=MetaInfo( + cache_ttl_seconds=60, + source=response["source"], + latency_ms=response.get("latency_ms") ) - - await cache_manager.set(cache_key, response, ttl=60) - return response - - except Exception as e: - logger.error(f"Error in get_market_snapshot: {e}") - # Return empty list or cached stale data if available, but NEVER fake data - raise HTTPException(status_code=503, detail="Market data unavailable") + ) @router.get("/api/market/ohlc") async def get_ohlc( @@ -134,55 +132,61 @@ async def get_ohlc( interval: int = Query(60, description="Interval in minutes"), limit: int = Query(100, description="Number of candles") ): - """Get OHLC candlestick data from Binance""" - cache_key = f"ohlc_{symbol}_{interval}_{limit}" - cached = await cache_manager.get(cache_key) - if cached: - return cached + """Get OHLC candlestick data via Orchestrator""" + + # Map minutes to common string format if needed by providers, + # but fetch_binance_klines handles it. + interval_str = "1h" + if interval < 60: + interval_str = f"{interval}m" + elif interval == 60: + interval_str = "1h" + elif interval == 240: + interval_str = "4h" + elif interval == 1440: + interval_str = "1d" - try: - # Map minutes to Binance intervals - binance_interval = "1h" - if interval == 1: binance_interval = "1m" - elif interval == 5: binance_interval = "5m" - elif interval == 15: binance_interval = "15m" - elif interval == 60: binance_interval = "1h" - elif interval == 240: binance_interval = "4h" - elif interval == 1440: binance_interval = "1d" + response = await provider_manager.fetch_data( + "ohlc", + params={ + "symbol": symbol, + "interval": interval_str, + "limit": limit + }, + use_cache=True, + ttl=60 + ) - # Binance symbol needs to be e.g., BTCUSDT - formatted_symbol = symbol.upper() - if not formatted_symbol.endswith("USDT") and not formatted_symbol.endswith("USD"): - formatted_symbol += "USDT" - - klines = await binance_provider.get_klines(formatted_symbol, interval=binance_interval, limit=limit) - - ohlc_data = [] + if not response["success"]: + raise HTTPException(status_code=503, detail=response["error"]) + + # Transform Binance Klines to standard OHLC + # [time, open, high, low, close, volume, ...] + klines = response["data"] + ohlc_data = [] + + if isinstance(klines, list): for k in klines: - # Binance kline: [open_time, open, high, low, close, volume, ...] - ohlc_data.append({ - "ts": int(k[0] / 1000), - "open": float(k[1]), - "high": float(k[2]), - "low": float(k[3]), - "close": float(k[4]), - "volume": float(k[5]) - }) - - response = { - "symbol": symbol, - "interval": interval, - "data": ohlc_data, - "meta": MetaInfo(cache_ttl_seconds=60, source="binance").dict() - } - - await cache_manager.set(cache_key, response, ttl=60) - return response + if isinstance(k, list) and len(k) >= 6: + ohlc_data.append({ + "ts": int(k[0] / 1000), + "open": float(k[1]), + "high": float(k[2]), + "low": float(k[3]), + "close": float(k[4]), + "volume": float(k[5]) + }) - except Exception as e: - logger.error(f"Error in get_ohlc: {e}") - # Try fallbacks? For now, fail gracefully. - raise HTTPException(status_code=503, detail="OHLC data unavailable") + return { + "symbol": symbol, + "interval": interval, + "data": ohlc_data, + "meta": MetaInfo( + cache_ttl_seconds=60, + source=response["source"], + latency_ms=response.get("latency_ms") + ).dict() + } # ============================================================================ # News & Sentiment Endpoints @@ -193,19 +197,24 @@ async def get_news( limit: int = Query(20, description="Number of articles"), source: Optional[str] = Query(None, description="Filter by source") ): - """Get cryptocurrency news from CryptoPanic""" - cache_key = f"news_{limit}_{source}" - cached = await cache_manager.get(cache_key) - if cached: - return cached + """Get cryptocurrency news via Orchestrator""" + + response = await provider_manager.fetch_data( + "news", + params={"filter": "hot", "query": "crypto"}, # Params for different providers + use_cache=True, + ttl=300 + ) + + if not response["success"]: + return NewsResponse(articles=[], meta=MetaInfo(source="error")) - try: - data = await cryptopanic_provider.get_news() - - articles = [] - results = data.get('results', [])[:limit] - - for post in results: + data = response["data"] + articles = [] + + # Normalize CryptoPanic / NewsAPI formats + if "results" in data: # CryptoPanic + for post in data.get('results', [])[:limit]: articles.append(NewsArticle( id=str(post.get('id')), title=post.get('title', ''), @@ -214,49 +223,60 @@ async def get_news( summary=post.get('slug', ''), published_at=post.get('published_at', datetime.now().isoformat()) )) - - response = NewsResponse( - articles=articles, - meta=MetaInfo(cache_ttl_seconds=300, source="cryptopanic") + elif "articles" in data: # NewsAPI + for post in data.get('articles', [])[:limit]: + articles.append(NewsArticle( + id=str(hash(post.get('url', ''))), + title=post.get('title', ''), + url=post.get('url', ''), + source=post.get('source', {}).get('name', 'Unknown'), + summary=post.get('description', ''), + published_at=post.get('publishedAt', datetime.now().isoformat()) + )) + + return NewsResponse( + articles=articles, + meta=MetaInfo( + cache_ttl_seconds=300, + source=response["source"], + latency_ms=response.get("latency_ms") ) - - await cache_manager.set(cache_key, response, ttl=300) - return response - - except Exception as e: - logger.error(f"Error in get_news: {e}") - return NewsResponse(articles=[], meta=MetaInfo(source="error")) + ) @router.get("/api/sentiment/global") async def get_global_sentiment(): - """Get global market sentiment (Fear & Greed Index)""" - cache_key = "sentiment_global" - cached = await cache_manager.get(cache_key) - if cached: - return cached - - try: - data = await alternative_me_provider.get_fear_and_greed() - fng_value = 50 - classification = "Neutral" + """Get global market sentiment via Orchestrator""" + + response = await provider_manager.fetch_data( + "sentiment", + params={"limit": 1}, + use_cache=True, + ttl=3600 + ) + + if not response["success"]: + raise HTTPException(status_code=503, detail=response["error"]) - if data.get('data'): - item = data['data'][0] - fng_value = int(item.get('value', 50)) - classification = item.get('value_classification', 'Neutral') - - result = { - "score": fng_value, - "label": classification, - "meta": MetaInfo(cache_ttl_seconds=3600, source="alternative.me").dict() - } + data = response["data"] + fng_value = 50 + classification = "Neutral" + + # Alternative.me format + if data.get('data'): + item = data['data'][0] + fng_value = int(item.get('value', 50)) + classification = item.get('value_classification', 'Neutral') - await cache_manager.set(cache_key, result, ttl=3600) - return result - except Exception as e: - logger.error(f"Error in get_global_sentiment: {e}") - raise HTTPException(status_code=503, detail="Sentiment data unavailable") + return { + "score": fng_value, + "label": classification, + "meta": MetaInfo( + cache_ttl_seconds=3600, + source=response["source"], + latency_ms=response.get("latency_ms") + ).dict() + } # ============================================================================ # Blockchain Endpoints @@ -264,14 +284,56 @@ async def get_global_sentiment(): @router.get("/api/crypto/blockchain/gas", response_model=GasResponse) async def get_gas_prices(chain: str = Query("ethereum", description="Blockchain network")): - """Get gas prices - Placeholder for real implementation""" - # TODO: Implement Etherscan or similar provider - # For now, return empty/null to indicate no data rather than fake data + """Get gas prices via Orchestrator""" + + if chain.lower() != "ethereum": + # Fallback or implement other chains + return GasResponse( + chain=chain, + gas_prices=None, + timestamp=datetime.now().isoformat(), + meta=MetaInfo(source="unavailable") + ) + + response = await provider_manager.fetch_data( + "onchain", + params={}, + use_cache=True, + ttl=15 + ) + + if not response["success"]: + return GasResponse( + chain=chain, + gas_prices=None, + timestamp=datetime.now().isoformat(), + meta=MetaInfo(source="unavailable") + ) + + data = response["data"] + result = data.get("result", {}) + + gas_price = None + if result: + # Etherscan returns data in result + try: + gas_price = GasPrice( + fast=float(result.get("FastGasPrice", 0)), + standard=float(result.get("ProposeGasPrice", 0)), + slow=float(result.get("SafeGasPrice", 0)) + ) + except: + pass + return GasResponse( chain=chain, - gas_prices=None, + gas_prices=gas_price, timestamp=datetime.now().isoformat(), - meta=MetaInfo(source="unavailable") + meta=MetaInfo( + cache_ttl_seconds=15, + source=response["source"], + latency_ms=response.get("latency_ms") + ) ) # ============================================================================ @@ -281,14 +343,12 @@ async def get_gas_prices(chain: str = Query("ethereum", description="Blockchain @router.get("/api/status") async def get_system_status(): """Get overall system status""" - from backend.live_data.providers import get_all_providers_status - - provider_status = await get_all_providers_status() + stats = provider_manager.get_stats() return { 'status': 'operational', 'timestamp': datetime.now().isoformat(), - 'providers': provider_status, - 'version': '1.0.0', + 'providers': stats, + 'version': '2.0.0', 'meta': MetaInfo(source="system").dict() }