Spaces:
Sleeping
Sleeping
| # src/core/data_fetcher.py | |
| import yfinance as yf | |
| import finnhub | |
| import pandas as pd | |
| import numpy as np | |
| from typing import List, Dict, Optional | |
| from datetime import datetime, timedelta | |
| import aiohttp | |
| import asyncio | |
| import json | |
| from bs4 import BeautifulSoup | |
| import requests | |
| from newspaper import Article | |
| import tweepy | |
| import os | |
| from dotenv import load_dotenv | |
| load_dotenv(os.path.join(os.path.dirname(__file__), 'Deepvest_system', 'src', 'data_API.env')) | |
| class DataFetcher: | |
| """Gestionnaire centralisé de récupération de données""" | |
| def __init__(self): | |
| self.finnhub_client = finnhub.Client(api_key="cu9bhbhr01qnf5nmldv0cu9bhbhr01qnf5nmldvg") | |
| self.twitter_auth = tweepy.OAuthHandler( | |
| "HaynwOOFiEd2Ty3m6Xu7sPhrd", | |
| "nuu7y46JtjB7qbYnhK79w86AqR9PC2maDzl2qDpJZLceQeEe5Y" | |
| ) | |
| self.twitter_auth.set_access_token( | |
| "1871112111419486208-taeYKwppobslPmDa4NaDMCqCEG9qoa", | |
| "N2izETc3KGe6tPE8yc1rvKTo20lnZewhxWw19HhzlebRp" | |
| ) | |
| self.twitter_api = tweepy.API(self.twitter_auth) | |
| async def fetch_market_data(self, symbols: List[str], period: str = "2y") -> pd.DataFrame: | |
| """Récupération des données de marché""" | |
| data = pd.DataFrame() | |
| for symbol in symbols: | |
| try: | |
| # Données de base avec yfinance | |
| yf_data = yf.download(symbol, period=period) | |
| # Données supplémentaires avec Finnhub | |
| finnhub_data = self._get_finnhub_data(symbol) | |
| # Combiner les données | |
| combined_data = pd.concat([yf_data, finnhub_data], axis=1) | |
| data[symbol] = combined_data['Close'] | |
| except Exception as e: | |
| print(f"Erreur lors de la récupération des données pour {symbol}: {e}") | |
| return data | |
| async def fetch_news_data(self, symbols: List[str], days: int = 7) -> List[Dict]: | |
| """Récupération des nouvelles financières""" | |
| news_data = [] | |
| async with aiohttp.ClientSession() as session: | |
| for symbol in symbols: | |
| try: | |
| # Nouvelles de Finnhub | |
| finnhub_news = self.finnhub_client.company_news( | |
| symbol, | |
| _from=(datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d'), | |
| to=datetime.now().strftime('%Y-%m-%d') | |
| ) | |
| # Articles de presse avec newspaper3k | |
| for news in finnhub_news: | |
| try: | |
| article = Article(news['url']) | |
| article.download() | |
| article.parse() | |
| news_data.append({ | |
| 'symbol': symbol, | |
| 'title': news['headline'], | |
| 'content': article.text, | |
| 'source': news['source'], | |
| 'url': news['url'], | |
| 'datetime': datetime.fromtimestamp(news['datetime']), | |
| 'sentiment': news.get('sentiment') | |
| }) | |
| except Exception as e: | |
| print(f"Erreur lors de l'analyse de l'article: {e}") | |
| except Exception as e: | |
| print(f"Erreur lors de la récupération des nouvelles pour {symbol}: {e}") | |
| return news_data | |
| async def fetch_alternative_data(self, symbols: List[str]) -> Dict: | |
| """Récupération des données alternatives""" | |
| return { | |
| 'social_media': await self._fetch_social_media_data(symbols), | |
| 'web_traffic': await self._fetch_web_traffic_data(symbols), | |
| 'satellite': await self._fetch_satellite_data(symbols) | |
| } | |
| async def _fetch_social_media_data(self, symbols: List[str]) -> List[Dict]: | |
| """Récupération des données des réseaux sociaux""" | |
| social_data = [] | |
| for symbol in symbols: | |
| try: | |
| # Tweets | |
| tweets = self.twitter_api.search_tweets( | |
| q=f"${symbol}", | |
| lang="en", | |
| count=100, | |
| tweet_mode="extended" | |
| ) | |
| for tweet in tweets: | |
| social_data.append({ | |
| 'platform': 'twitter', | |
| 'text': tweet.full_text, | |
| 'timestamp': tweet.created_at, | |
| 'engagement': tweet.favorite_count + tweet.retweet_count, | |
| 'symbol': symbol | |
| }) | |
| # Reddit (exemple avec PRAW) | |
| # Reddit data collection here... | |
| except Exception as e: | |
| print(f"Erreur lors de la récupération des données sociales pour {symbol}: {e}") | |
| return social_data | |
| async def _fetch_web_traffic_data(self, symbols: List[str]) -> pd.DataFrame: | |
| """Récupération des données de trafic web""" | |
| traffic_data = pd.DataFrame() | |
| for symbol in symbols: | |
| try: | |
| # Similerweb API (si disponible) | |
| company_domain = self._get_company_domain(symbol) | |
| if company_domain: | |
| traffic_metrics = await self._get_similerweb_metrics(company_domain) | |
| traffic_data[symbol] = traffic_metrics | |
| except Exception as e: | |
| print(f"Erreur lors de la récupération du trafic web pour {symbol}: {e}") | |
| return traffic_data | |
| async def _fetch_satellite_data(self, symbols: List[str]) -> Dict: | |
| """Récupération des données satellite""" | |
| satellite_data = {} | |
| for symbol in symbols: | |
| try: | |
| # Données RS Data (exemple) | |
| company_locations = self._get_company_locations(symbol) | |
| if company_locations: | |
| satellite_metrics = await self._get_satellite_metrics(company_locations) | |
| satellite_data[symbol] = satellite_metrics | |
| except Exception as e: | |
| print(f"Erreur lors de la récupération des données satellite pour {symbol}: {e}") | |
| return satellite_data | |
| def _get_company_domain(self, symbol: str) -> Optional[str]: | |
| """Récupère le domaine principal de l'entreprise""" | |
| try: | |
| company_profile = self.finnhub_client.company_profile2(symbol=symbol) | |
| return company_profile.get('weburl', '').replace('www.', '').replace('https://', '') | |
| except: | |
| return None | |
| async def _get_similerweb_metrics(self, domain: str) -> pd.Series: | |
| """Récupère les métriques de trafic web via Similerweb""" | |
| # Implémentez ici la logique de récupération des données Similerweb | |
| # Nécessite un compte Similerweb API | |
| pass | |
| def _get_company_locations(self, symbol: str) -> List[Dict]: | |
| """Récupère les emplacements principaux de l'entreprise""" | |
| try: | |
| # Implémenter la logique de récupération des emplacements | |
| pass | |
| except: | |
| return [] | |
| async def _get_satellite_metrics(self, locations: List[Dict]) -> Dict: | |
| """Récupère les métriques satellite pour les emplacements donnés""" | |
| # Implémenter la logique de récupération des données satellite | |
| pass | |
| class SatelliteDataAnalyzer: | |
| """Analyse des données satellite pour indicateurs économiques""" | |
| async def get_retail_activity(self, locations: List[Dict]) -> Dict[str, float]: | |
| """Analyse l'activité de vente au détail via le trafic de parkings""" | |
| retail_metrics = {} | |
| for location in locations: | |
| try: | |
| # Récupération du comptage de voitures | |
| parking_count = await self._analyze_parking_occupancy( | |
| lat=location['latitude'], | |
| lon=location['longitude'], | |
| radius=500 # mètres | |
| ) | |
| retail_metrics[location['name']] = { | |
| 'parking_occupancy': parking_count / location['total_spaces'], | |
| 'customer_traffic': self._estimate_customer_traffic(parking_count), | |
| 'yoy_change': self._calculate_yoy_change(location['id'], parking_count) | |
| } | |
| except Exception as e: | |
| print(f"Erreur analyse retail pour {location['name']}: {e}") | |
| return retail_metrics | |
| async def get_industrial_activity(self, locations: List[Dict]) -> Dict[str, float]: | |
| """Analyse l'activité industrielle via imagerie thermique et pollution""" | |
| industrial_metrics = {} | |
| for location in locations: | |
| try: | |
| # Analyse de l'activité thermique | |
| heat_signature = await self._analyze_thermal_activity( | |
| lat=location['latitude'], | |
| lon=location['longitude'] | |
| ) | |
| # Analyse des émissions | |
| emissions = await self._analyze_emissions( | |
| lat=location['latitude'], | |
| lon=location['longitude'] | |
| ) | |
| industrial_metrics[location['name']] = { | |
| 'activity_level': heat_signature['activity_score'], | |
| 'emissions_level': emissions['level'], | |
| 'operational_status': heat_signature['operational_status'] | |
| } | |
| except Exception as e: | |
| print(f"Erreur analyse industrielle pour {location['name']}: {e}") | |
| return industrial_metrics | |
| async def get_shipping_activity(self, ports: List[Dict]) -> Dict[str, float]: | |
| """Analyse l'activité maritime et logistique""" | |
| shipping_metrics = {} | |
| for port in ports: | |
| try: | |
| # Analyse du trafic maritime | |
| vessel_count = await self._analyze_vessel_traffic( | |
| lat=port['latitude'], | |
| lon=port['longitude'], | |
| radius=5000 # mètres | |
| ) | |
| # Analyse des conteneurs | |
| container_count = await self._analyze_container_stacks( | |
| lat=port['latitude'], | |
| lon=port['longitude'] | |
| ) | |
| shipping_metrics[port['name']] = { | |
| 'vessel_occupancy': vessel_count / port['capacity'], | |
| 'container_volume': container_count, | |
| 'port_congestion': self._calculate_congestion_score(vessel_count, port['capacity']) | |
| } | |
| except Exception as e: | |
| print(f"Erreur analyse maritime pour {port['name']}: {e}") | |
| return shipping_metrics | |
| class SocialMediaAnalyzer: | |
| """Analyse avancée des médias sociaux pour le sentiment des investisseurs""" | |
| async def analyze_investment_sentiment(self, symbols: List[str]) -> Dict[str, Dict]: | |
| """Analyse complète du sentiment des investisseurs""" | |
| sentiment_data = {} | |
| for symbol in symbols: | |
| try: | |
| # Reddit (r/wallstreetbets, r/stocks, etc.) | |
| reddit_sentiment = await self._analyze_reddit_sentiment(symbol) | |
| # StockTwits | |
| stocktwits_sentiment = await self._analyze_stocktwits_sentiment(symbol) | |
| # Twitter $cashtags | |
| twitter_sentiment = await self._analyze_twitter_sentiment(symbol) | |
| # Agrégation et normalisation | |
| sentiment_data[symbol] = { | |
| 'overall_sentiment': self._aggregate_sentiment_scores([ | |
| reddit_sentiment['score'], | |
| stocktwits_sentiment['score'], | |
| twitter_sentiment['score'] | |
| ]), | |
| 'sentiment_momentum': self._calculate_sentiment_momentum(symbol), | |
| 'retail_interest': self._gauge_retail_interest( | |
| reddit_sentiment['volume'], | |
| stocktwits_sentiment['volume'], | |
| twitter_sentiment['volume'] | |
| ), | |
| 'institutional_hints': self._detect_institutional_activity(symbol) | |
| } | |
| except Exception as e: | |
| print(f"Erreur analyse sentiment pour {symbol}: {e}") | |
| return sentiment_data | |
| def _aggregate_sentiment_scores(self, scores: List[float]) -> float: | |
| """Agrège les scores de sentiment avec pondération""" | |
| weights = [0.4, 0.3, 0.3] # Pondération par importance de la source | |
| return sum(score * weight for score, weight in zip(scores, weights)) | |
| def _calculate_sentiment_momentum(self, symbol: str) -> float: | |
| """Calcule la dynamique du sentiment""" | |
| # Implémentation de la logique de momentum | |
| pass | |
| def _gauge_retail_interest(self, *volumes: int) -> float: | |
| """Évalue l'intérêt des investisseurs particuliers""" | |
| return sum(volumes) / len(volumes) if volumes else 0 | |
| def _detect_institutional_activity(self, symbol: str) -> Dict: | |
| """Détecte les signes d'activité institutionnelle""" | |
| # Analyse des ordres importants, des dark pools, etc. | |
| pass |