# src/core/data_fetcher.py import yfinance as yf import finnhub import pandas as pd import numpy as np from typing import List, Dict, Optional from datetime import datetime, timedelta import aiohttp import asyncio import json from bs4 import BeautifulSoup import requests from newspaper import Article import tweepy import os from dotenv import load_dotenv load_dotenv(os.path.join(os.path.dirname(__file__), 'Deepvest_system', 'src', 'data_API.env')) class DataFetcher: """Gestionnaire centralisé de récupération de données""" def __init__(self): self.finnhub_client = finnhub.Client(api_key="cu9bhbhr01qnf5nmldv0cu9bhbhr01qnf5nmldvg") self.twitter_auth = tweepy.OAuthHandler( "HaynwOOFiEd2Ty3m6Xu7sPhrd", "nuu7y46JtjB7qbYnhK79w86AqR9PC2maDzl2qDpJZLceQeEe5Y" ) self.twitter_auth.set_access_token( "1871112111419486208-taeYKwppobslPmDa4NaDMCqCEG9qoa", "N2izETc3KGe6tPE8yc1rvKTo20lnZewhxWw19HhzlebRp" ) self.twitter_api = tweepy.API(self.twitter_auth) async def fetch_market_data(self, symbols: List[str], period: str = "2y") -> pd.DataFrame: """Récupération des données de marché""" data = pd.DataFrame() for symbol in symbols: try: # Données de base avec yfinance yf_data = yf.download(symbol, period=period) # Données supplémentaires avec Finnhub finnhub_data = self._get_finnhub_data(symbol) # Combiner les données combined_data = pd.concat([yf_data, finnhub_data], axis=1) data[symbol] = combined_data['Close'] except Exception as e: print(f"Erreur lors de la récupération des données pour {symbol}: {e}") return data async def fetch_news_data(self, symbols: List[str], days: int = 7) -> List[Dict]: """Récupération des nouvelles financières""" news_data = [] async with aiohttp.ClientSession() as session: for symbol in symbols: try: # Nouvelles de Finnhub finnhub_news = self.finnhub_client.company_news( symbol, _from=(datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d'), to=datetime.now().strftime('%Y-%m-%d') ) # Articles de presse avec newspaper3k for news in finnhub_news: try: article = Article(news['url']) article.download() article.parse() news_data.append({ 'symbol': symbol, 'title': news['headline'], 'content': article.text, 'source': news['source'], 'url': news['url'], 'datetime': datetime.fromtimestamp(news['datetime']), 'sentiment': news.get('sentiment') }) except Exception as e: print(f"Erreur lors de l'analyse de l'article: {e}") except Exception as e: print(f"Erreur lors de la récupération des nouvelles pour {symbol}: {e}") return news_data async def fetch_alternative_data(self, symbols: List[str]) -> Dict: """Récupération des données alternatives""" return { 'social_media': await self._fetch_social_media_data(symbols), 'web_traffic': await self._fetch_web_traffic_data(symbols), 'satellite': await self._fetch_satellite_data(symbols) } async def _fetch_social_media_data(self, symbols: List[str]) -> List[Dict]: """Récupération des données des réseaux sociaux""" social_data = [] for symbol in symbols: try: # Tweets tweets = self.twitter_api.search_tweets( q=f"${symbol}", lang="en", count=100, tweet_mode="extended" ) for tweet in tweets: social_data.append({ 'platform': 'twitter', 'text': tweet.full_text, 'timestamp': tweet.created_at, 'engagement': tweet.favorite_count + tweet.retweet_count, 'symbol': symbol }) # Reddit (exemple avec PRAW) # Reddit data collection here... except Exception as e: print(f"Erreur lors de la récupération des données sociales pour {symbol}: {e}") return social_data async def _fetch_web_traffic_data(self, symbols: List[str]) -> pd.DataFrame: """Récupération des données de trafic web""" traffic_data = pd.DataFrame() for symbol in symbols: try: # Similerweb API (si disponible) company_domain = self._get_company_domain(symbol) if company_domain: traffic_metrics = await self._get_similerweb_metrics(company_domain) traffic_data[symbol] = traffic_metrics except Exception as e: print(f"Erreur lors de la récupération du trafic web pour {symbol}: {e}") return traffic_data async def _fetch_satellite_data(self, symbols: List[str]) -> Dict: """Récupération des données satellite""" satellite_data = {} for symbol in symbols: try: # Données RS Data (exemple) company_locations = self._get_company_locations(symbol) if company_locations: satellite_metrics = await self._get_satellite_metrics(company_locations) satellite_data[symbol] = satellite_metrics except Exception as e: print(f"Erreur lors de la récupération des données satellite pour {symbol}: {e}") return satellite_data def _get_company_domain(self, symbol: str) -> Optional[str]: """Récupère le domaine principal de l'entreprise""" try: company_profile = self.finnhub_client.company_profile2(symbol=symbol) return company_profile.get('weburl', '').replace('www.', '').replace('https://', '') except: return None async def _get_similerweb_metrics(self, domain: str) -> pd.Series: """Récupère les métriques de trafic web via Similerweb""" # Implémentez ici la logique de récupération des données Similerweb # Nécessite un compte Similerweb API pass def _get_company_locations(self, symbol: str) -> List[Dict]: """Récupère les emplacements principaux de l'entreprise""" try: # Implémenter la logique de récupération des emplacements pass except: return [] async def _get_satellite_metrics(self, locations: List[Dict]) -> Dict: """Récupère les métriques satellite pour les emplacements donnés""" # Implémenter la logique de récupération des données satellite pass class SatelliteDataAnalyzer: """Analyse des données satellite pour indicateurs économiques""" async def get_retail_activity(self, locations: List[Dict]) -> Dict[str, float]: """Analyse l'activité de vente au détail via le trafic de parkings""" retail_metrics = {} for location in locations: try: # Récupération du comptage de voitures parking_count = await self._analyze_parking_occupancy( lat=location['latitude'], lon=location['longitude'], radius=500 # mètres ) retail_metrics[location['name']] = { 'parking_occupancy': parking_count / location['total_spaces'], 'customer_traffic': self._estimate_customer_traffic(parking_count), 'yoy_change': self._calculate_yoy_change(location['id'], parking_count) } except Exception as e: print(f"Erreur analyse retail pour {location['name']}: {e}") return retail_metrics async def get_industrial_activity(self, locations: List[Dict]) -> Dict[str, float]: """Analyse l'activité industrielle via imagerie thermique et pollution""" industrial_metrics = {} for location in locations: try: # Analyse de l'activité thermique heat_signature = await self._analyze_thermal_activity( lat=location['latitude'], lon=location['longitude'] ) # Analyse des émissions emissions = await self._analyze_emissions( lat=location['latitude'], lon=location['longitude'] ) industrial_metrics[location['name']] = { 'activity_level': heat_signature['activity_score'], 'emissions_level': emissions['level'], 'operational_status': heat_signature['operational_status'] } except Exception as e: print(f"Erreur analyse industrielle pour {location['name']}: {e}") return industrial_metrics async def get_shipping_activity(self, ports: List[Dict]) -> Dict[str, float]: """Analyse l'activité maritime et logistique""" shipping_metrics = {} for port in ports: try: # Analyse du trafic maritime vessel_count = await self._analyze_vessel_traffic( lat=port['latitude'], lon=port['longitude'], radius=5000 # mètres ) # Analyse des conteneurs container_count = await self._analyze_container_stacks( lat=port['latitude'], lon=port['longitude'] ) shipping_metrics[port['name']] = { 'vessel_occupancy': vessel_count / port['capacity'], 'container_volume': container_count, 'port_congestion': self._calculate_congestion_score(vessel_count, port['capacity']) } except Exception as e: print(f"Erreur analyse maritime pour {port['name']}: {e}") return shipping_metrics class SocialMediaAnalyzer: """Analyse avancée des médias sociaux pour le sentiment des investisseurs""" async def analyze_investment_sentiment(self, symbols: List[str]) -> Dict[str, Dict]: """Analyse complète du sentiment des investisseurs""" sentiment_data = {} for symbol in symbols: try: # Reddit (r/wallstreetbets, r/stocks, etc.) reddit_sentiment = await self._analyze_reddit_sentiment(symbol) # StockTwits stocktwits_sentiment = await self._analyze_stocktwits_sentiment(symbol) # Twitter $cashtags twitter_sentiment = await self._analyze_twitter_sentiment(symbol) # Agrégation et normalisation sentiment_data[symbol] = { 'overall_sentiment': self._aggregate_sentiment_scores([ reddit_sentiment['score'], stocktwits_sentiment['score'], twitter_sentiment['score'] ]), 'sentiment_momentum': self._calculate_sentiment_momentum(symbol), 'retail_interest': self._gauge_retail_interest( reddit_sentiment['volume'], stocktwits_sentiment['volume'], twitter_sentiment['volume'] ), 'institutional_hints': self._detect_institutional_activity(symbol) } except Exception as e: print(f"Erreur analyse sentiment pour {symbol}: {e}") return sentiment_data def _aggregate_sentiment_scores(self, scores: List[float]) -> float: """Agrège les scores de sentiment avec pondération""" weights = [0.4, 0.3, 0.3] # Pondération par importance de la source return sum(score * weight for score, weight in zip(scores, weights)) def _calculate_sentiment_momentum(self, symbol: str) -> float: """Calcule la dynamique du sentiment""" # Implémentation de la logique de momentum pass def _gauge_retail_interest(self, *volumes: int) -> float: """Évalue l'intérêt des investisseurs particuliers""" return sum(volumes) / len(volumes) if volumes else 0 def _detect_institutional_activity(self, symbol: str) -> Dict: """Détecte les signes d'activité institutionnelle""" # Analyse des ordres importants, des dark pools, etc. pass