|
|
|
|
|
|
| import asyncio
|
| import aiohttp
|
| from playwright.async_api import async_playwright, Page
|
| from bs4 import BeautifulSoup
|
| import requests
|
| import tweepy
|
| import pandas as pd
|
| import time
|
| import re
|
| from datetime import datetime, timedelta
|
| from huggingface_hub import InferenceClient
|
| import os
|
| import sys
|
| import random
|
|
|
| import sqlite3
|
| import logging
|
| import imaplib
|
| import email
|
| import smtplib
|
| from email.mime.text import MIMEText
|
| import schedule
|
| import hashlib
|
| import json
|
| import threading
|
| from contextlib import contextmanager
|
| from dataclasses import dataclass
|
| from typing import List, Dict, Optional, Tuple
|
| from urllib.parse import urljoin, urlparse
|
|
|
|
|
|
|
|
|
|
|
| @dataclass
|
| class PersonalInfo:
|
| prenom: str = "Valentin"
|
| nom: str = "Cora"
|
| email: str = "valouassol@outlook.com"
|
| email_derivee: str = "valouassol+concours@outlook.com"
|
| telephone: str = "+41791234567"
|
| adresse: str = "Av Chantemerle 9"
|
| code_postal: str = "1009"
|
| ville: str = "Pully"
|
| pays: str = "Suisse"
|
|
|
| @dataclass
|
| class APIConfig:
|
| hf_token: Optional[str] = None
|
| x_token: Optional[str] = None
|
| google_api_key: Optional[str] = None
|
| google_cx: Optional[str] = None
|
| gemini_key: Optional[str] = None
|
| email_app_password: Optional[str] = None
|
| telegram_bot_token: Optional[str] = None
|
| telegram_chat_id: Optional[str] = None
|
|
|
| @classmethod
|
| def from_env(cls):
|
| return cls(
|
| hf_token=os.getenv("HF_TOKEN"),
|
| x_token=os.getenv("X_TOKEN"),
|
| google_api_key=os.getenv("GOOGLE_API_KEY"),
|
| google_cx=os.getenv("GOOGLE_CX"),
|
| gemini_key=os.getenv("GEMINI_KEY"),
|
| email_app_password=os.getenv("EMAIL_APP_PASSWORD"),
|
| telegram_bot_token=os.getenv("TELEGRAM_BOT_TOKEN"),
|
| telegram_chat_id=os.getenv("TELEGRAM_CHAT_ID")
|
| )
|
|
|
| @dataclass
|
| class Contest:
|
| title: str
|
| url: str
|
| description: str
|
| source: str
|
| deadline: Optional[str] = None
|
| prize: Optional[str] = None
|
| difficulty_score: int = 0
|
|
|
| @dataclass
|
| class FormField:
|
| selector: str
|
| field_type: str
|
| label: str
|
| required: bool
|
| current_value: str = ""
|
| ai_context: str = ""
|
|
|
| @dataclass
|
| class FormAnalysis:
|
| fields: List[FormField]
|
| complexity_score: int
|
| estimated_success_rate: float
|
| requires_captcha: bool
|
| requires_social_media: bool
|
| form_url: str
|
|
|
|
|
|
|
|
|
|
|
|
|
| PERSONAL_INFO = PersonalInfo()
|
| API_CONFIG = APIConfig.from_env()
|
|
|
|
|
| SITES_CH = [
|
| 'https://www.concours.ch/concours/tous',
|
| 'https://www.jeu-concours.biz/concours-pays_suisse.html',
|
| 'https://www.loisirs.ch/concours/',
|
| 'https://www.radin.ch/',
|
| 'https://win4win.ch/fr/',
|
| 'https://www.concours-suisse.ch/',
|
| 'https://corporate.migros.ch/fr/concours',
|
| 'https://www.20min.ch/fr/concours-et-jeux',
|
| 'https://dein-gewinnspiel.ch/en',
|
| 'https://www.myswitzerland.com/fr/planification/vie-pratique/concours/'
|
| ]
|
|
|
|
|
| PROXY_LIST = ["http://20.206.106.192:80", "http://51.15.242.202:8888"]
|
| USER_AGENTS = [
|
| 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
| 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
| 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
|
| ]
|
|
|
|
|
| logging.basicConfig(
|
| level=logging.INFO,
|
| format='%(asctime)s - %(levelname)s - %(message)s',
|
| handlers=[
|
| logging.FileHandler('concours_bot.log'),
|
| logging.StreamHandler()
|
| ]
|
| )
|
|
|
|
|
|
|
|
|
|
|
| class DatabaseManager:
|
| def __init__(self, db_path: str = 'concours_optimized.sqlite'):
|
| self.db_path = db_path
|
| self.local = threading.local()
|
| self._init_db()
|
|
|
| def _get_connection(self):
|
| if not hasattr(self.local, 'conn'):
|
| self.local.conn = sqlite3.connect(self.db_path)
|
| self.local.conn.row_factory = sqlite3.Row
|
| return self.local.conn
|
|
|
| @contextmanager
|
| def transaction(self):
|
| conn = self._get_connection()
|
| try:
|
| yield conn
|
| conn.commit()
|
| except Exception:
|
| conn.rollback()
|
| raise
|
|
|
| def _init_db(self):
|
| with self.transaction() as conn:
|
| conn.execute('''
|
| CREATE TABLE IF NOT EXISTS participations (
|
| url TEXT PRIMARY KEY,
|
| title TEXT,
|
| source TEXT,
|
| status TEXT,
|
| difficulty_score INTEGER,
|
| success_rate REAL,
|
| date TEXT,
|
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
| )
|
| ''')
|
| conn.execute('''
|
| CREATE TABLE IF NOT EXISTS victories (
|
| email_id TEXT PRIMARY KEY,
|
| date TEXT,
|
| lot TEXT,
|
| source TEXT,
|
| confirmed BOOLEAN DEFAULT FALSE,
|
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
| )
|
| ''')
|
| conn.execute('''
|
| CREATE TABLE IF NOT EXISTS contest_cache (
|
| url TEXT PRIMARY KEY,
|
| content TEXT,
|
| timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
| )
|
| ''')
|
|
|
| conn.execute('CREATE INDEX IF NOT EXISTS idx_participations_date ON participations(date)')
|
| conn.execute('CREATE INDEX IF NOT EXISTS idx_cache_timestamp ON contest_cache(timestamp)')
|
|
|
| def add_participation(self, contest: Contest, status: str = 'pending', success_rate: float = 0.0):
|
| with self.transaction() as conn:
|
| conn.execute('''
|
| INSERT OR REPLACE INTO participations
|
| (url, title, source, status, difficulty_score, success_rate, date)
|
| VALUES (?, ?, ?, ?, ?, ?, date('now'))
|
| ''', (contest.url, contest.title, contest.source, status, contest.difficulty_score, success_rate))
|
|
|
| def participation_exists(self, url: str) -> bool:
|
| conn = self._get_connection()
|
| result = conn.execute("SELECT 1 FROM participations WHERE url = ?", (url,)).fetchone()
|
| return result is not None
|
|
|
| def add_victory(self, email_id: str, lot: str, source: str):
|
| with self.transaction() as conn:
|
| conn.execute('''
|
| INSERT OR IGNORE INTO victories (email_id, date, lot, source)
|
| VALUES (?, date('now'), ?, ?)
|
| ''', (email_id, lot, source))
|
|
|
| def get_stats(self) -> Dict:
|
| conn = self._get_connection()
|
| stats = {}
|
|
|
|
|
| stats['total_participations'] = conn.execute("SELECT COUNT(*) FROM participations").fetchone()[0]
|
| stats['successful_participations'] = conn.execute("SELECT COUNT(*) FROM participations WHERE status='success'").fetchone()[0]
|
| stats['total_victories'] = conn.execute("SELECT COUNT(*) FROM victories").fetchone()[0]
|
|
|
|
|
| source_stats = conn.execute('''
|
| SELECT source, COUNT(*) as count
|
| FROM participations
|
| GROUP BY source
|
| ORDER BY count DESC
|
| ''').fetchall()
|
| stats['by_source'] = {row[0]: row[1] for row in source_stats}
|
|
|
| return stats
|
|
|
|
|
|
|
|
|
|
|
| class AIEngine:
|
| def __init__(self, api_config: APIConfig):
|
| self.api_config = api_config
|
| self.cache = {}
|
|
|
| def generate_response(self, question: str, context: str = "", response_type: str = "qa") -> str:
|
| """Génère une réponse IA avec fallback entre Gemini et HuggingFace"""
|
| cache_key = hashlib.md5(f"{question}{context}{response_type}".encode()).hexdigest()
|
|
|
| if cache_key in self.cache:
|
| return self.cache[cache_key]
|
|
|
| response = self._try_gemini(question, context, response_type)
|
| if not response:
|
| response = self._try_huggingface(question, context, response_type)
|
|
|
| if not response:
|
| response = self._generate_fallback_response(question, response_type)
|
|
|
|
|
| self.cache[cache_key] = response
|
| return response
|
|
|
| def _try_gemini(self, question: str, context: str, response_type: str) -> Optional[str]:
|
|
|
| return None
|
|
|
| def _try_huggingface(self, question: str, context: str, response_type: str) -> Optional[str]:
|
| if not self.api_config.hf_token:
|
| return None
|
|
|
| try:
|
| client = InferenceClient(token=self.api_config.hf_token)
|
|
|
| if response_type == "qa" and context:
|
| result = client.question_answering(
|
| question=question,
|
| context=context,
|
| model="distilbert-base-cased-distilled-squad"
|
| )
|
| return result.answer
|
| else:
|
| prompt = f"Question: {question}\nContexte: {context}\nRéponse:"
|
| result = client.text_generation(
|
| prompt,
|
| model="gpt2",
|
| max_new_tokens=100,
|
| temperature=0.7
|
| )
|
| return result[0]['generated_text'].split('Réponse:')[-1].strip()
|
|
|
| except Exception as e:
|
| logging.warning(f"HuggingFace API error: {e}")
|
| return None
|
|
|
| def _generate_fallback_response(self, question: str, response_type: str) -> str:
|
| """Système de réponses intelligentes sans API"""
|
| question_lower = question.lower()
|
|
|
| if response_type == "motivation":
|
|
|
| if any(word in question_lower for word in ["voyage", "vacances", "séjour"]):
|
| return random.choice([
|
| "J'adore voyager et découvrir de nouveaux horizons. Ce prix serait une opportunité fantastique pour moi de vivre une expérience inoubliable.",
|
| "Voyager est ma passion et ce concours représente le voyage de mes rêves. J'espère avoir la chance de le remporter.",
|
| "En tant que passionné de voyages, ce prix m'offrirait l'occasion parfaite de découvrir de nouveaux paysages et cultures."
|
| ])
|
| elif any(word in question_lower for word in ["produit", "cosmétique", "beauté"]):
|
| return random.choice([
|
| "Je suis toujours à la recherche de nouveaux produits de qualité et j'aimerais beaucoup tester cette gamme.",
|
| "Ces produits m'intéressent énormément et je serais ravi de pouvoir les découvrir.",
|
| "J'ai entendu beaucoup de bien de cette marque et j'aimerais avoir l'opportunité de l'essayer."
|
| ])
|
| elif any(word in question_lower for word in ["technologie", "smartphone", "ordinateur"]):
|
| return random.choice([
|
| "En tant que passionné de technologie, ce prix m'intéresse beaucoup et m'aiderait dans mes projets.",
|
| "J'ai besoin de ce type d'équipement pour mes études et ce serait formidable de le gagner.",
|
| "La technologie fait partie de ma vie quotidienne et ce prix serait très utile."
|
| ])
|
| else:
|
| return random.choice([
|
| "Je participe avec enthousiasme à ce concours car le prix m'intéresse vraiment et correspond à mes besoins.",
|
| "Ce concours m'attire particulièrement et je serais très heureux de remporter ce magnifique prix.",
|
| "J'espère avoir la chance de gagner car ce prix me ferait énormément plaisir.",
|
| "Je suis motivé à participer car cette opportunité pourrait vraiment changer ma journée."
|
| ])
|
|
|
| elif response_type == "quiz":
|
|
|
| if "suisse" in question_lower:
|
| if "capitale" in question_lower:
|
| return "Berne"
|
| elif "langue" in question_lower:
|
| return "Français, Allemand, Italien, Romanche"
|
| elif "monnaie" in question_lower:
|
| return "Franc suisse"
|
| elif "population" in question_lower:
|
| return "8.7 millions"
|
|
|
| if "couleur" in question_lower:
|
| return random.choice(["Rouge", "Bleu", "Vert", "Jaune"])
|
|
|
| if any(word in question_lower for word in ["combien", "nombre", "quantité"]):
|
| return random.choice(["3", "5", "10", "12", "20"])
|
|
|
| if any(word in question_lower for word in ["année", "date", "quand"]):
|
| return random.choice(["2024", "2023", "2025"])
|
|
|
|
|
| return random.choice(["A", "B", "C", "Oui", "Non", "Vrai", "Faux"])
|
|
|
|
|
| response_mapping = {
|
| "age": random.choice(["25", "28", "30", "32"]),
|
| "profession": random.choice(["Étudiant", "Employé", "Consultant"]),
|
| "ville": "Pully",
|
| "pays": "Suisse",
|
| "default": "Merci"
|
| }
|
|
|
| return response_mapping.get(response_type, response_mapping["default"])
|
|
|
|
|
|
|
|
|
|
|
| class IntelligentScraper:
|
| def __init__(self, db_manager: DatabaseManager, cache_duration_hours: int = 6):
|
| self.db = db_manager
|
| self.cache_duration = timedelta(hours=cache_duration_hours)
|
| self.session = None
|
| self.field_patterns = {
|
| 'prenom': r'prenom|prénom|first.*name|nom.*prenom',
|
| 'nom': r'nom(?!.*prenom)|last.*name|family.*name|surname',
|
| 'email': r'email|e-mail|courriel',
|
| 'telephone': r'tel|phone|telephone|téléphone',
|
| 'adresse': r'adresse|address|rue|street',
|
| 'code_postal': r'code.postal|zip|postal',
|
| 'ville': r'ville|city|localité',
|
| 'pays': r'pays|country|nation',
|
| 'motivation': r'motivation|pourquoi|why|reason|raison',
|
| 'quiz': r'question|quiz|réponse|answer'
|
| }
|
|
|
| async def __aenter__(self):
|
| connector = aiohttp.TCPConnector(limit=10, limit_per_host=3)
|
| timeout = aiohttp.ClientTimeout(total=30, connect=10)
|
| self.session = aiohttp.ClientSession(
|
| connector=connector,
|
| timeout=timeout,
|
| headers={'User-Agent': random.choice(USER_AGENTS)}
|
| )
|
| return self
|
|
|
| async def __aexit__(self, exc_type, exc_val, exc_tb):
|
| if self.session:
|
| await self.session.close()
|
|
|
| async def scrape_all_sources(self) -> List[Contest]:
|
| """Scrape tous les sites et sources"""
|
| all_contests = []
|
|
|
|
|
| web_contests = await self._scrape_websites()
|
| all_contests.extend(web_contests)
|
|
|
|
|
| google_contests = await self._scrape_google_search()
|
| all_contests.extend(google_contests)
|
|
|
|
|
| twitter_contests = await self._scrape_twitter()
|
| all_contests.extend(twitter_contests)
|
|
|
|
|
| unique_contests = self._filter_unique_contests(all_contests)
|
|
|
| logging.info(f"Total contests found: {len(all_contests)}, unique new: {len(unique_contests)}")
|
| return unique_contests
|
|
|
| async def _scrape_websites(self) -> List[Contest]:
|
| """Scrape les sites web en parallèle"""
|
| batch_size = 3
|
| all_contests = []
|
|
|
| for i in range(0, len(SITES_CH), batch_size):
|
| batch = SITES_CH[i:i + batch_size]
|
| tasks = [self._scrape_single_site(site) for site in batch]
|
|
|
| batch_results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
| for result in batch_results:
|
| if isinstance(result, list):
|
| all_contests.extend(result)
|
| elif isinstance(result, Exception):
|
| logging.error(f"Batch scraping error: {result}")
|
|
|
| await asyncio.sleep(2)
|
|
|
| return all_contests
|
|
|
| async def _scrape_single_site(self, url: str) -> List[Contest]:
|
| """Scrape un site web spécifique"""
|
| try:
|
| content = await self._fetch_with_retry(url)
|
| if not content:
|
| return []
|
|
|
| soup = BeautifulSoup(content, 'html.parser')
|
| contests = self._extract_contests_from_soup(soup, url)
|
|
|
| logging.info(f"Found {len(contests)} contests on {url}")
|
| return contests
|
|
|
| except Exception as e:
|
| logging.error(f"Error scraping {url}: {e}")
|
| return []
|
|
|
| async def _fetch_with_retry(self, url: str, max_retries: int = 3) -> Optional[str]:
|
| """Fetch avec retry et gestion d'erreurs"""
|
| for attempt in range(max_retries):
|
| try:
|
| proxy = random.choice(PROXY_LIST) if PROXY_LIST else None
|
| async with self.session.get(url, proxy=proxy) as response:
|
| if response.status == 200:
|
| return await response.text()
|
| elif response.status == 429:
|
| wait_time = 2 ** attempt * 5
|
| logging.warning(f"Rate limited on {url}, waiting {wait_time}s")
|
| await asyncio.sleep(wait_time)
|
| else:
|
| logging.warning(f"HTTP {response.status} for {url}")
|
|
|
| except Exception as e:
|
| logging.error(f"Attempt {attempt+1} failed for {url}: {e}")
|
| if attempt < max_retries - 1:
|
| await asyncio.sleep(2 ** attempt)
|
|
|
| return None
|
|
|
| def _extract_contests_from_soup(self, soup: BeautifulSoup, base_url: str) -> List[Contest]:
|
| """Extrait les concours d'une page HTML"""
|
| contests = []
|
|
|
|
|
| selectors = [
|
| '.contest', '.concours', '.jeu', '.competition', '.giveaway',
|
| '[data-contest]', '[data-concours]', '.prize', '.lot',
|
| 'article[class*="concours"]', '.entry', '.participate'
|
| ]
|
|
|
| containers = []
|
| for selector in selectors:
|
| containers.extend(soup.select(selector))
|
|
|
|
|
| if not containers:
|
| containers = soup.find_all('a', href=re.compile(r'concours|jeu|contest|participate', re.I))
|
|
|
| for container in containers[:20]:
|
| try:
|
| contest = self._parse_contest_container(container, base_url)
|
| if contest and self._is_valid_contest(contest):
|
| contests.append(contest)
|
| except Exception as e:
|
| logging.debug(f"Error parsing container: {e}")
|
|
|
| return contests
|
|
|
| def _parse_contest_container(self, container, base_url: str) -> Optional[Contest]:
|
| """Parse un conteneur de concours"""
|
|
|
| title_selectors = ['h1', 'h2', 'h3', '.title', '.titre', '.contest-title']
|
| title = ""
|
| for selector in title_selectors:
|
| title_elem = container.select_one(selector)
|
| if title_elem:
|
| title = title_elem.get_text(strip=True)
|
| break
|
|
|
| if not title:
|
| title = container.get_text(strip=True)[:100]
|
|
|
|
|
| url = ""
|
| link_elem = container if container.name == 'a' else container.find('a')
|
| if link_elem and link_elem.get('href'):
|
| url = urljoin(base_url, link_elem['href'])
|
|
|
|
|
| description = container.get_text(strip=True)[:500]
|
|
|
|
|
| deadline = self._extract_deadline(description)
|
| prize = self._extract_prize(description)
|
|
|
| if not title or not url:
|
| return None
|
|
|
| return Contest(
|
| title=title[:200],
|
| url=url,
|
| description=description,
|
| source=base_url,
|
| deadline=deadline,
|
| prize=prize,
|
| difficulty_score=self._estimate_difficulty(description)
|
| )
|
|
|
| def _extract_deadline(self, text: str) -> Optional[str]:
|
| """Extrait la date limite du texte"""
|
| patterns = [
|
| r"jusqu[\'']?au (\d{1,2}[\/\-]\d{1,2}[\/\-]\d{2,4})",
|
| r"avant le (\d{1,2}[\/\-]\d{1,2}[\/\-]\d{2,4})",
|
| r"fin le (\d{1,2}[\/\-]\d{1,2}[\/\-]\d{2,4})"
|
| ]
|
|
|
| for pattern in patterns:
|
| match = re.search(pattern, text, re.I)
|
| if match:
|
| return match.group(1)
|
| return None
|
|
|
| def _extract_prize(self, text: str) -> Optional[str]:
|
| """Extrait le prix du texte"""
|
| patterns = [
|
| r"gagne[rz]?\s+([^.!?]{1,50})",
|
| r"prix[:\s]+([^.!?]{1,50})",
|
| r"lot[:\s]+([^.!?]{1,50})",
|
| r"(\d+\s*CHF|\d+\s*euros?|\d+\s*francs?)"
|
| ]
|
|
|
| for pattern in patterns:
|
| match = re.search(pattern, text, re.I)
|
| if match:
|
| return match.group(1).strip()
|
| return None
|
|
|
| def _estimate_difficulty(self, description: str) -> int:
|
| """Estime la difficulté de participation (0-10)"""
|
| difficulty = 0
|
|
|
| if re.search(r'justifi|motivation|pourquoi|essay', description, re.I):
|
| difficulty += 3
|
| if re.search(r'photo|image|créat', description, re.I):
|
| difficulty += 2
|
| if re.search(r'quiz|question|répond', description, re.I):
|
| difficulty += 1
|
| if re.search(r'partag|social|facebook|twitter', description, re.I):
|
| difficulty += 1
|
| if re.search(r'inscription|compte|profil', description, re.I):
|
| difficulty += 1
|
|
|
| return min(difficulty, 10)
|
|
|
| def _is_valid_contest(self, contest: Contest) -> bool:
|
| """Valide qu'un concours est légitime"""
|
| swiss_indicators = [
|
| 'suisse', 'switzerland', 'ch', 'romandie', 'genève', 'lausanne',
|
| 'zurich', 'bern', 'ouvert en suisse', 'résidents suisses'
|
| ]
|
|
|
| full_text = (contest.title + " " + contest.description).lower()
|
| has_swiss_access = any(indicator in full_text for indicator in swiss_indicators)
|
|
|
| excluded_terms = [
|
| 'payant', 'payment', 'carte bancaire', 'spam', 'phishing',
|
| 'adult', 'casino', 'bitcoin', 'crypto', 'investment'
|
| ]
|
|
|
| has_excluded = any(term in full_text for term in excluded_terms)
|
| valid_url = contest.url.startswith(('http://', 'https://'))
|
|
|
| return has_swiss_access and not has_excluded and valid_url
|
|
|
| async def _scrape_google_search(self) -> List[Contest]:
|
| """Scrape via Google Custom Search API"""
|
| if not API_CONFIG.google_api_key or not API_CONFIG.google_cx:
|
| return []
|
|
|
| try:
|
| query = "concours gratuit Suisse 2025 site:.ch"
|
| response = requests.get(
|
| "https://www.googleapis.com/customsearch/v1",
|
| params={
|
| "key": API_CONFIG.google_api_key,
|
| "cx": API_CONFIG.google_cx,
|
| "q": query,
|
| "num": 10
|
| },
|
| timeout=10
|
| )
|
|
|
| results = response.json().get('items', [])
|
| contests = []
|
|
|
| for res in results:
|
| title = res.get('title', '')
|
| url = res.get('link', '')
|
| description = res.get('snippet', '')
|
|
|
| contest = Contest(
|
| title=title,
|
| url=url,
|
| description=description,
|
| source='Google Search',
|
| difficulty_score=self._estimate_difficulty(description)
|
| )
|
|
|
| if self._is_valid_contest(contest):
|
| contests.append(contest)
|
|
|
| logging.info(f"Found {len(contests)} contests via Google Search")
|
| return contests
|
|
|
| except Exception as e:
|
| logging.error(f"Google Search API error: {e}")
|
| return []
|
|
|
| async def _scrape_twitter(self) -> List[Contest]:
|
| """Scrape Twitter/X pour les concours"""
|
| if not API_CONFIG.x_token:
|
| return []
|
|
|
| try:
|
| client = tweepy.Client(bearer_token=API_CONFIG.x_token)
|
| tweets = client.search_recent_tweets(
|
| query="concours gratuit Suisse lang:fr",
|
| max_results=10
|
| )
|
|
|
| contests = []
|
| if tweets.data:
|
| for tweet in tweets.data:
|
| if self._is_swiss_accessible(tweet.text):
|
| contest = Contest(
|
| title=tweet.text[:50] + "...",
|
| url=f"https://x.com/i/status/{tweet.id}",
|
| description=tweet.text,
|
| source='Twitter/X',
|
| difficulty_score=5
|
| )
|
| contests.append(contest)
|
|
|
| logging.info(f"Found {len(contests)} contests on Twitter/X")
|
| return contests
|
|
|
| except Exception as e:
|
| logging.error(f"Twitter/X API error: {e}")
|
| return []
|
|
|
| def _is_swiss_accessible(self, text: str) -> bool:
|
| """Vérifie si accessible depuis la Suisse"""
|
| swiss_pattern = r"(suisse|ch|romandie|ouvert\s+a\s+la\s+suisse|geneve|lausanne)"
|
| return bool(re.search(swiss_pattern, text.lower(), re.IGNORECASE))
|
|
|
| def _filter_unique_contests(self, contests: List[Contest]) -> List[Contest]:
|
| """Filtre les concours uniques et non déjà traités"""
|
| unique_contests = []
|
| seen_urls = set()
|
|
|
| for contest in contests:
|
| if contest.url not in seen_urls and not self.db.participation_exists(contest.url):
|
| unique_contests.append(contest)
|
| seen_urls.add(contest.url)
|
|
|
| return unique_contests
|
|
|
|
|
|
|
|
|
|
|
| class SmartParticipator:
|
| def __init__(self, db_manager: DatabaseManager, ai_engine: AIEngine):
|
| self.db = db_manager
|
| self.ai = ai_engine
|
| self.personal_info = PERSONAL_INFO
|
|
|
| self.field_patterns = {
|
| 'prenom': [r'prenom|prénom|first.*name'],
|
| 'nom': [r'nom(?!.*prenom)|last.*name|family.*name'],
|
| 'email': [r'email|e-mail|courriel'],
|
| 'telephone': [r'tel|phone|telephone|téléphone'],
|
| 'adresse': [r'adresse|address|rue|street'],
|
| 'code_postal': [r'code.postal|zip|postal'],
|
| 'ville': [r'ville|city|localité'],
|
| 'pays': [r'pays|country|nation'],
|
| 'motivation': [r'motivation|pourquoi|why|reason'],
|
| 'quiz': [r'question|quiz|réponse|answer']
|
| }
|
|
|
| async def participate_in_contest(self, contest: Contest) -> bool:
|
| """Participe à un concours de manière intelligente"""
|
| async with async_playwright() as p:
|
| browser = await p.chromium.launch(
|
| headless=True,
|
| args=['--no-sandbox', '--disable-blink-features=AutomationControlled']
|
| )
|
|
|
| context = await browser.new_context(
|
| user_agent=random.choice(USER_AGENTS),
|
| viewport={'width': 1920, 'height': 1080}
|
| )
|
|
|
| page = await context.new_page()
|
|
|
| try:
|
|
|
| analysis = await self._analyze_form(page, contest.url)
|
|
|
| if analysis.estimated_success_rate < 0.3:
|
| logging.warning(f"Low success rate for {contest.url}: {analysis.estimated_success_rate}")
|
| self.db.add_participation(contest, 'skipped_low_success', analysis.estimated_success_rate)
|
| return False
|
|
|
| if analysis.requires_captcha:
|
| logging.warning(f"CAPTCHA detected for {contest.url}, skipping")
|
| self.db.add_participation(contest, 'skipped_captcha', analysis.estimated_success_rate)
|
| return False
|
|
|
|
|
| success = await self._fill_and_submit_form(page, analysis, contest)
|
|
|
| status = 'success' if success else 'failed'
|
| self.db.add_participation(contest, status, analysis.estimated_success_rate)
|
|
|
| logging.info(f"Participation {'successful' if success else 'failed'} for {contest.title}")
|
| return success
|
|
|
| except Exception as e:
|
| logging.error(f"Participation error for {contest.url}: {e}")
|
| self.db.add_participation(contest, 'error', 0.0)
|
| return False
|
|
|
| finally:
|
| await browser.close()
|
|
|
| async def _analyze_form(self, page: Page, url: str) -> FormAnalysis:
|
| """Analyse un formulaire de concours"""
|
| try:
|
| await page.goto(url, wait_until='networkidle', timeout=15000)
|
|
|
|
|
| fields = await self._detect_form_fields(page)
|
|
|
|
|
| complexity = sum(self._calculate_field_complexity(field) for field in fields)
|
|
|
|
|
| has_captcha = await self._detect_captcha(page)
|
| has_social_requirements = await self._detect_social_requirements(page)
|
|
|
|
|
| success_rate = self._estimate_success_rate(fields, complexity, has_captcha)
|
|
|
| return FormAnalysis(
|
| fields=fields,
|
| complexity_score=complexity,
|
| estimated_success_rate=success_rate,
|
| requires_captcha=has_captcha,
|
| requires_social_media=has_social_requirements,
|
| form_url=url
|
| )
|
|
|
| except Exception as e:
|
| logging.error(f"Form analysis error: {e}")
|
| return FormAnalysis([], 10, 0.0, True, True, url)
|
|
|
| async def _detect_form_fields(self, page: Page) -> List[FormField]:
|
| """Détecte tous les champs de formulaire"""
|
| fields = []
|
|
|
| selectors = [
|
| 'input[type="text"]', 'input[type="email"]', 'input[type="tel"]',
|
| 'input[type="number"]', 'input:not([type])', 'textarea', 'select'
|
| ]
|
|
|
| for selector in selectors:
|
| elements = await page.query_selector_all(selector)
|
|
|
| for element in elements:
|
| try:
|
| field = await self._analyze_single_field(element, page)
|
| if field:
|
| fields.append(field)
|
| except Exception:
|
| continue
|
|
|
| return fields
|
|
|
| async def _analyze_single_field(self, element, page: Page) -> Optional[FormField]:
|
| """Analyse un champ individuel"""
|
| try:
|
| tag_name = await element.evaluate('el => el.tagName.toLowerCase()')
|
| field_type = await element.evaluate('el => el.type || el.tagName.toLowerCase()')
|
| name = await element.evaluate('el => el.name || el.id || ""')
|
| placeholder = await element.evaluate('el => el.placeholder || ""')
|
| required = await element.evaluate('el => el.required')
|
|
|
|
|
| label_text = await self._find_field_label(element, page)
|
|
|
|
|
| selector = await self._create_unique_selector(element)
|
|
|
| return FormField(
|
| selector=selector,
|
| field_type=field_type,
|
| label=label_text,
|
| required=required,
|
| ai_context=f"Name: {name}, Placeholder: {placeholder}, Label: {label_text}"
|
| )
|
|
|
| except Exception:
|
| return None
|
|
|
| async def _find_field_label(self, element, page: Page) -> str:
|
| """Trouve le label associé à un champ"""
|
| try:
|
|
|
| element_id = await element.evaluate('el => el.id')
|
| if element_id:
|
| label = await page.query_selector(f'label[for="{element_id}"]')
|
| if label:
|
| return await label.inner_text()
|
|
|
|
|
| parent_label = await element.evaluate('''
|
| el => {
|
| let parent = el.parentElement;
|
| while (parent && parent.tagName !== 'BODY') {
|
| if (parent.tagName === 'LABEL') {
|
| return parent.innerText;
|
| }
|
| parent = parent.parentElement;
|
| }
|
| return '';
|
| }
|
| ''')
|
|
|
| if parent_label:
|
| return parent_label.strip()
|
|
|
|
|
| prev_text = await element.evaluate('''
|
| el => {
|
| const prev = el.previousElementSibling;
|
| return prev ? prev.innerText : '';
|
| }
|
| ''')
|
|
|
| return prev_text.strip()
|
|
|
| except Exception:
|
| return ""
|
|
|
| async def _create_unique_selector(self, element) -> str:
|
| """Crée un sélecteur CSS unique"""
|
|
|
| element_id = await element.evaluate('el => el.id')
|
| if element_id:
|
| return f'#{element_id}'
|
|
|
| name = await element.evaluate('el => el.name')
|
| if name:
|
| return f'[name="{name}"]'
|
|
|
| class_name = await element.evaluate('el => el.className')
|
| tag_name = await element.evaluate('el => el.tagName.toLowerCase()')
|
|
|
| if class_name:
|
| return f'{tag_name}.{class_name.split()[0]}'
|
|
|
|
|
| return f'{tag_name}:nth-of-type(1)'
|
|
|
| def _calculate_field_complexity(self, field: FormField) -> int:
|
| """Calcule la complexité d'un champ"""
|
| complexity = 1
|
|
|
| if field.field_type == 'textarea':
|
| complexity += 3
|
| elif field.field_type == 'select':
|
| complexity += 2
|
| elif field.required:
|
| complexity += 1
|
|
|
| if re.search(r'motivation|justifi|pourquoi', field.label, re.I):
|
| complexity += 3
|
| elif re.search(r'quiz|question', field.label, re.I):
|
| complexity += 2
|
|
|
| return complexity
|
|
|
| async def _detect_captcha(self, page: Page) -> bool:
|
| """Détecte la présence de CAPTCHA"""
|
| captcha_selectors = [
|
| '.g-recaptcha', '.h-captcha', '#captcha', '.captcha',
|
| 'iframe[src*="recaptcha"]', 'iframe[src*="hcaptcha"]'
|
| ]
|
|
|
| for selector in captcha_selectors:
|
| element = await page.query_selector(selector)
|
| if element:
|
| return True
|
|
|
| return False
|
|
|
| async def _detect_social_requirements(self, page: Page) -> bool:
|
| """Détecte les exigences de réseaux sociaux"""
|
| content = await page.content()
|
| social_patterns = [
|
| r'follow.*us', r'partag.*facebook', r'retweet',
|
| r'like.*page', r'subscribe.*channel'
|
| ]
|
|
|
| for pattern in social_patterns:
|
| if re.search(pattern, content, re.I):
|
| return True
|
|
|
| return False
|
|
|
| def _estimate_success_rate(self, fields: List[FormField], complexity: int, has_captcha: bool) -> float:
|
| """Estime le taux de succès"""
|
| base_rate = 0.8
|
|
|
| if has_captcha:
|
| base_rate *= 0.1
|
|
|
| if complexity > 15:
|
| base_rate *= 0.4
|
| elif complexity > 10:
|
| base_rate *= 0.6
|
| elif complexity > 5:
|
| base_rate *= 0.8
|
|
|
| required_fields = [f for f in fields if f.required]
|
| if len(required_fields) <= 3:
|
| base_rate *= 1.1
|
|
|
| return min(base_rate, 1.0)
|
|
|
| async def _fill_and_submit_form(self, page: Page, analysis: FormAnalysis, contest: Contest) -> bool:
|
| """Remplit et soumet le formulaire"""
|
| try:
|
| filled_fields = 0
|
|
|
| for field in analysis.fields:
|
| try:
|
|
|
| await page.wait_for_selector(field.selector, timeout=3000)
|
| element = await page.query_selector(field.selector)
|
|
|
| if not element:
|
| continue
|
|
|
|
|
| value = await self._generate_field_value(field, contest, page)
|
| if not value:
|
| continue
|
|
|
|
|
| if field.field_type == 'select':
|
| await self._fill_select_field(element, value, page)
|
| else:
|
| await element.fill(value)
|
|
|
| filled_fields += 1
|
| await asyncio.sleep(random.uniform(0.3, 0.8))
|
|
|
| except Exception as e:
|
| logging.debug(f"Error filling field {field.selector}: {e}")
|
| continue
|
|
|
|
|
| submit_success = await self._submit_form(page)
|
|
|
| logging.info(f"Filled {filled_fields}/{len(analysis.fields)} fields, submitted: {submit_success}")
|
| return filled_fields > 0 and submit_success
|
|
|
| except Exception as e:
|
| logging.error(f"Form filling error: {e}")
|
| return False
|
|
|
| async def _generate_field_value(self, field: FormField, contest: Contest, page: Page) -> Optional[str]:
|
| """Génère une valeur pour un champ"""
|
|
|
| field_type = self._identify_field_type(field)
|
|
|
|
|
| personal_mapping = {
|
| 'prenom': self.personal_info.prenom,
|
| 'nom': self.personal_info.nom,
|
| 'email': self.personal_info.email_derivee,
|
| 'telephone': self.personal_info.telephone,
|
| 'adresse': self.personal_info.adresse,
|
| 'code_postal': self.personal_info.code_postal,
|
| 'ville': self.personal_info.ville,
|
| 'pays': self.personal_info.pays
|
| }
|
|
|
| if field_type in personal_mapping:
|
| return personal_mapping[field_type]
|
|
|
|
|
| if field_type == 'motivation':
|
| return self.ai.generate_response(
|
| field.label,
|
| contest.description,
|
| "motivation"
|
| )
|
| elif field_type == 'quiz':
|
| return self.ai.generate_response(
|
| field.label,
|
| contest.description,
|
| "quiz"
|
| )
|
|
|
|
|
| default_values = {
|
| 'age': '25',
|
| 'genre': 'Monsieur',
|
| 'profession': 'Étudiant'
|
| }
|
|
|
| for key, value in default_values.items():
|
| if key in field.label.lower():
|
| return value
|
|
|
| return None
|
|
|
| def _identify_field_type(self, field: FormField) -> str:
|
| """Identifie le type de champ"""
|
| combined_text = f"{field.ai_context} {field.label}".lower()
|
|
|
| for field_type, patterns in self.field_patterns.items():
|
| for pattern in patterns:
|
| if re.search(pattern, combined_text, re.I):
|
| return field_type
|
|
|
| return 'unknown'
|
|
|
| async def _fill_select_field(self, element, value: str, page: Page):
|
| """Remplit un champ select"""
|
| try:
|
| options = await element.query_selector_all('option')
|
|
|
| for option in options:
|
| option_text = await option.inner_text()
|
| option_value = await option.get_attribute('value')
|
|
|
| if (value.lower() in option_text.lower() or
|
| value.lower() in (option_value or "").lower()):
|
| await element.select_option(value=option_value)
|
| return
|
|
|
|
|
| if options and len(options) > 1:
|
| first_option = await options[1].get_attribute('value')
|
| await element.select_option(value=first_option)
|
|
|
| except Exception as e:
|
| logging.debug(f"Select field error: {e}")
|
|
|
| async def _submit_form(self, page: Page) -> bool:
|
| """Soumet le formulaire"""
|
| submit_selectors = [
|
| 'input[type="submit"]',
|
| 'button[type="submit"]',
|
| 'button:has-text("Participer")',
|
| 'button:has-text("Envoyer")',
|
| 'button:has-text("Valider")',
|
| '.submit-btn',
|
| '.participate-btn'
|
| ]
|
|
|
| for selector in submit_selectors:
|
| try:
|
| element = await page.query_selector(selector)
|
| if element:
|
| is_visible = await element.is_visible()
|
| is_enabled = await element.is_enabled()
|
|
|
| if is_visible and is_enabled:
|
| await element.click()
|
| await page.wait_for_timeout(3000)
|
| return True
|
|
|
| except Exception:
|
| continue
|
|
|
| return False
|
|
|
|
|
|
|
|
|
|
|
| class EmailManager:
|
| def __init__(self, api_config: APIConfig, ai_engine: AIEngine, db_manager: DatabaseManager):
|
| self.api_config = api_config
|
| self.ai = ai_engine
|
| self.db = db_manager
|
| self.personal_info = PERSONAL_INFO
|
|
|
| def check_and_analyze_emails(self):
|
| """Vérifie et analyse les emails pour détecter les victoires"""
|
| if not self.api_config.email_app_password:
|
| logging.warning("Email app password not configured")
|
| return
|
|
|
| try:
|
| mail = imaplib.IMAP4_SSL('outlook.office365.com')
|
| mail.login(self.personal_info.email, self.api_config.email_app_password)
|
| mail.select('inbox')
|
|
|
|
|
| since_date = (datetime.now() - timedelta(days=7)).strftime('%d-%b-%Y')
|
| status, messages = mail.search(None, f'(SINCE "{since_date}")')
|
|
|
| victories = []
|
| email_count = 0
|
|
|
| for num in messages[0].split()[-20:]:
|
| try:
|
| email_count += 1
|
| _, msg = mail.fetch(num, '(RFC822)')
|
| email_msg = email.message_from_bytes(msg[0][1])
|
|
|
| subject = email_msg['Subject'] or ""
|
| from_addr = email_msg['From'] or ""
|
|
|
|
|
| body = self._extract_email_body(email_msg)
|
|
|
|
|
| analysis = self.ai.generate_response(
|
| f"Cet email indique-t-il une victoire dans un concours ? Sujet: {subject}",
|
| body[:1000],
|
| "quiz"
|
| )
|
|
|
| if 'oui' in analysis.lower() or 'gagne' in analysis.lower():
|
| prize = self._extract_prize_from_email(body, subject)
|
| victories.append({
|
| 'email_id': num.decode(),
|
| 'date': datetime.now().strftime('%Y-%m-%d'),
|
| 'prize': prize,
|
| 'source': from_addr,
|
| 'subject': subject
|
| })
|
|
|
|
|
| self.db.add_victory(num.decode(), prize, from_addr)
|
|
|
|
|
| self._send_victory_alert(prize, from_addr, subject)
|
|
|
| logging.info(f"Victory detected: {prize} from {from_addr}")
|
|
|
| except Exception as e:
|
| logging.error(f"Error processing email {num}: {e}")
|
| continue
|
|
|
| mail.logout()
|
| logging.info(f"Processed {email_count} emails, found {len(victories)} victories")
|
|
|
| except Exception as e:
|
| logging.error(f"Email checking error: {e}")
|
|
|
| def _extract_email_body(self, email_msg) -> str:
|
| """Extrait le corps de l'email"""
|
| body = ""
|
|
|
| if email_msg.is_multipart():
|
| for part in email_msg.walk():
|
| if part.get_content_type() == 'text/plain':
|
| try:
|
| body = part.get_payload(decode=True).decode('utf-8', errors='ignore')
|
| break
|
| except:
|
| continue
|
| else:
|
| try:
|
| body = email_msg.get_payload(decode=True).decode('utf-8', errors='ignore')
|
| except:
|
| body = str(email_msg.get_payload())
|
|
|
| return body
|
|
|
| def _extract_prize_from_email(self, body: str, subject: str) -> str:
|
| """Extrait le prix gagné de l'email"""
|
| text = f"{subject} {body}"
|
|
|
| prize_patterns = [
|
| r"vous avez gagné\s+([^.!?\n]{1,100})",
|
| r"prix[:\s]+([^.!?\n]{1,100})",
|
| r"lot[:\s]+([^.!?\n]{1,100})",
|
| r"remporté\s+([^.!?\n]{1,100})",
|
| r"(\d+\s*CHF|\d+\s*euros?|\d+\s*francs?)"
|
| ]
|
|
|
| for pattern in prize_patterns:
|
| match = re.search(pattern, text, re.I)
|
| if match:
|
| return match.group(1).strip()
|
|
|
| return "Prix non spécifié"
|
|
|
| def _send_victory_alert(self, prize: str, source: str, subject: str):
|
| """Envoie une alerte de victoire"""
|
| if not self.api_config.telegram_bot_token or not self.api_config.telegram_chat_id:
|
| return
|
|
|
| message = f"""
|
| 🎉 **VICTOIRE DÉTECTÉE !**
|
|
|
| 🏆 **Prix**: {prize}
|
| 📧 **Source**: {source}
|
| 📋 **Sujet**: {subject}
|
| 📅 **Date**: {datetime.now().strftime('%d/%m/%Y %H:%M')}
|
|
|
| Félicitations ! 🎊
|
| """
|
|
|
| try:
|
| url = f"https://api.telegram.org/bot{self.api_config.telegram_bot_token}/sendMessage"
|
| payload = {
|
| 'chat_id': self.api_config.telegram_chat_id,
|
| 'text': message,
|
| 'parse_mode': 'Markdown'
|
| }
|
|
|
| response = requests.post(url, json=payload, timeout=10)
|
| if response.status_code == 200:
|
| logging.info("Victory alert sent successfully")
|
| else:
|
| logging.error(f"Telegram alert failed: {response.text}")
|
|
|
| except Exception as e:
|
| logging.error(f"Telegram alert error: {e}")
|
|
|
|
|
|
|
|
|
|
|
| class MonitoringSystem:
|
| def __init__(self, db_manager: DatabaseManager):
|
| self.db = db_manager
|
|
|
| def generate_daily_report(self) -> str:
|
| """Génère un rapport quotidien"""
|
| stats = self.db.get_stats()
|
|
|
|
|
| today = datetime.now().strftime('%Y-%m-%d')
|
| conn = self.db._get_connection()
|
|
|
| today_participations = conn.execute(
|
| "SELECT COUNT(*) FROM participations WHERE date = ?", (today,)
|
| ).fetchone()[0]
|
|
|
| today_successes = conn.execute(
|
| "SELECT COUNT(*) FROM participations WHERE date = ? AND status = 'success'", (today,)
|
| ).fetchone()[0]
|
|
|
| success_rate = (today_successes / max(today_participations, 1)) * 100
|
|
|
| report = f"""
|
| 📊 **RAPPORT QUOTIDIEN - {today}**
|
|
|
| 🎯 **Aujourd'hui**:
|
| • Participations: {today_participations}
|
| • Succès: {today_successes}
|
| • Taux de succès: {success_rate:.1f}%
|
|
|
| 📈 **Total**:
|
| • Participations totales: {stats['total_participations']}
|
| • Participations réussies: {stats['successful_participations']}
|
| • Victoires détectées: {stats['total_victories']}
|
|
|
| 🌐 **Par source**:
|
| """
|
|
|
| for source, count in stats['by_source'].items():
|
| report += f" • {source}: {count}\n"
|
|
|
| return report
|
|
|
| def send_daily_report(self):
|
| """Envoie le rapport quotidien"""
|
| if not API_CONFIG.telegram_bot_token or not API_CONFIG.telegram_chat_id:
|
| return
|
|
|
| report = self.generate_daily_report()
|
|
|
| try:
|
| url = f"https://api.telegram.org/bot{API_CONFIG.telegram_bot_token}/sendMessage"
|
| payload = {
|
| 'chat_id': API_CONFIG.telegram_chat_id,
|
| 'text': report,
|
| 'parse_mode': 'Markdown'
|
| }
|
|
|
| response = requests.post(url, json=payload, timeout=10)
|
| if response.status_code == 200:
|
| logging.info("Daily report sent successfully")
|
| else:
|
| logging.error(f"Daily report failed: {response.text}")
|
|
|
| except Exception as e:
|
| logging.error(f"Daily report error: {e}")
|
|
|
|
|
|
|
|
|
|
|
| class ContestBotOrchestrator:
|
| def __init__(self):
|
| self.db = DatabaseManager()
|
| self.ai = AIEngine(API_CONFIG)
|
| self.scraper = None
|
| self.participator = SmartParticipator(self.db, self.ai)
|
| self.email_manager = EmailManager(API_CONFIG, self.ai, self.db)
|
| self.monitor = MonitoringSystem(self.db)
|
|
|
| async def run_full_cycle(self):
|
| """Execute un cycle complet de scraping et participation"""
|
| logging.info("Starting full contest bot cycle")
|
|
|
| try:
|
|
|
| async with IntelligentScraper(self.db) as scraper:
|
| self.scraper = scraper
|
| contests = await scraper.scrape_all_sources()
|
|
|
| if not contests:
|
| logging.info("No new contests found")
|
| return
|
|
|
|
|
| contests.sort(key=lambda x: x.difficulty_score)
|
|
|
|
|
| participation_count = 0
|
| max_daily_participations = 20
|
|
|
| for contest in contests[:max_daily_participations]:
|
| try:
|
|
|
| if participation_count > 0:
|
| wait_time = random.uniform(30, 120)
|
| logging.info(f"Waiting {wait_time:.0f}s before next participation")
|
| await asyncio.sleep(wait_time)
|
|
|
| success = await self.participator.participate_in_contest(contest)
|
| participation_count += 1
|
|
|
| if success:
|
| logging.info(f"✅ Successfully participated in: {contest.title}")
|
| else:
|
| logging.warning(f"❌ Failed to participate in: {contest.title}")
|
|
|
|
|
| if success:
|
| await asyncio.sleep(random.uniform(60, 180))
|
|
|
| except Exception as e:
|
| logging.error(f"Error participating in {contest.title}: {e}")
|
| continue
|
|
|
| logging.info(f"Participation cycle completed: {participation_count} attempts")
|
|
|
| except Exception as e:
|
| logging.error(f"Full cycle error: {e}")
|
|
|
| def run_email_check(self):
|
| """Vérifie les emails pour les victoires"""
|
| try:
|
| logging.info("Checking emails for victories")
|
| self.email_manager.check_and_analyze_emails()
|
| except Exception as e:
|
| logging.error(f"Email check error: {e}")
|
|
|
| def run_daily_report(self):
|
| """Génère et envoie le rapport quotidien"""
|
| try:
|
| logging.info("Generating daily report")
|
| self.monitor.send_daily_report()
|
| except Exception as e:
|
| logging.error(f"Daily report error: {e}")
|
|
|
|
|
|
|
|
|
|
|
| def run_bot_cycle():
|
| """Point d'entrée pour le scheduler"""
|
| bot = ContestBotOrchestrator()
|
|
|
|
|
| asyncio.run(bot.run_full_cycle())
|
|
|
|
|
| bot.run_email_check()
|
|
|
| def run_daily_report():
|
| """Point d'entrée pour le rapport quotidien"""
|
| bot = ContestBotOrchestrator()
|
| bot.run_daily_report()
|
|
|
| def main():
|
| """Fonction principale avec scheduler"""
|
| logging.info("Starting Contest Bot with scheduler")
|
|
|
|
|
| schedule.every().day.at("08:00").do(run_bot_cycle)
|
| schedule.every().day.at("14:00").do(run_bot_cycle)
|
| schedule.every().day.at("20:00").do(run_daily_report)
|
|
|
|
|
| if len(sys.argv) > 1 and sys.argv[1] == "--run-now":
|
| logging.info("Running immediate cycle")
|
| run_bot_cycle()
|
| return
|
|
|
|
|
| logging.info("Scheduler started. Waiting for scheduled tasks...")
|
|
|
| while True:
|
| try:
|
| schedule.run_pending()
|
| time.sleep(60)
|
| except KeyboardInterrupt:
|
| logging.info("Bot stopped by user")
|
| break
|
| except Exception as e:
|
| logging.error(f"Scheduler error: {e}")
|
| time.sleep(300)
|
|
|
| if __name__ == "__main__":
|
| main() |