""" Enhanced Contact Finder Service Finds real decision-makers using LinkedIn search, team page scraping, and AI extraction """ from typing import List, Optional, Dict, Set, TYPE_CHECKING import re import logging from email_validator import validate_email, EmailNotValidError from services.web_search import get_search_service from services.web_scraper import WebScraperService from app.schema import Contact import uuid import asyncio if TYPE_CHECKING: from mcp.registry import MCPRegistry logger = logging.getLogger(__name__) class EnhancedContactFinder: """ Enhanced contact discovery using multiple strategies: 1. LinkedIn profile search 2. Company team/about page scraping 3. AI-powered contact extraction 4. Email pattern detection Now supports MCP (Model Context Protocol) for unified search interface """ def __init__(self, mcp_registry: Optional['MCPRegistry'] = None): """ Initialize enhanced contact finder Args: mcp_registry: Optional MCP registry for unified search (recommended) If None, falls back to direct web search service """ if mcp_registry: # Use MCP search client self.search = mcp_registry.get_search_client() logger.info("EnhancedContactFinder initialized with MCP search client") else: # Fallback to direct search service (legacy) self.search = get_search_service() logger.warning("EnhancedContactFinder initialized without MCP (consider using MCP)") self.scraper = WebScraperService() # Common team page URL patterns self.team_page_patterns = [ '/team', '/about-us', '/about', '/leadership', '/our-team', '/management', '/executives', '/people' ] # Enhanced regex patterns for name extraction self.name_patterns = [ # LinkedIn format: "Name - Title at Company | LinkedIn" r'([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)\s*[-–—]\s*([^|]+?)\s*(?:at|@)\s*([^|]+)', # Standard format: "Name, Title at Company" r'([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+),?\s+([^,\n]+?)\s+(?:at|@)\s+([^\n]+)', # Bio format: "Name is the Title" r'([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)\s+is\s+(?:the\s+)?([^.]+)', # Direct format: "Name\nTitle" r'([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)\s*\n\s*([A-Z][^,\n]+)', ] # We do NOT estimate emails - only use verified emails found on web # This list is kept for reference but not used for generation self._common_email_patterns = [ '{first}.{last}', # john.smith@company.com '{first}{last}', # johnsmith@company.com ] async def find_real_contacts( self, company_name: str, domain: str, target_titles: List[str], max_contacts: int = 3 ) -> List[Contact]: """ Find real decision-makers with VERIFIED contact information. Searches multiple sources: 1. Company website (team/about/contact pages) 2. LinkedIn profiles 3. Press releases and news articles 4. Crunchbase and business directories 5. Social media (Twitter, Instagram business profiles) Returns: List of Contact objects with verified information """ logger.info(f"EnhancedFinder: Finding VERIFIED contacts at '{company_name}'") print(f"\n[CONTACT FINDER] ========================================") print(f"[CONTACT FINDER] Starting comprehensive search for {company_name}") print(f"[CONTACT FINDER] Domain: {domain}") print(f"[CONTACT FINDER] Target titles: {target_titles}") print(f"[CONTACT FINDER] ========================================") contacts = [] seen_emails: Set[str] = set() seen_names: Set[str] = set() # Strategy 1: Scrape company website directly print(f"\n[CONTACT FINDER] 📄 Strategy 1: Scraping company website...") website_contacts = await self._scrape_company_website( company_name, domain, target_titles, seen_emails, seen_names, max_contacts ) contacts.extend(website_contacts) print(f"[CONTACT FINDER] ✓ Found {len(website_contacts)} contacts from company website") # Strategy 2: LinkedIn search for executives if len(contacts) < max_contacts: print(f"\n[CONTACT FINDER] 💼 Strategy 2: Searching LinkedIn...") linkedin_contacts = await self._search_linkedin( company_name, domain, target_titles, seen_emails, seen_names, max_contacts - len(contacts) ) contacts.extend(linkedin_contacts) print(f"[CONTACT FINDER] ✓ Found {len(linkedin_contacts)} contacts from LinkedIn") # Strategy 3: Search Crunchbase/business directories if len(contacts) < max_contacts: print(f"\n[CONTACT FINDER] 📊 Strategy 3: Searching business directories...") directory_contacts = await self._search_business_directories( company_name, domain, target_titles, seen_emails, seen_names, max_contacts - len(contacts) ) contacts.extend(directory_contacts) print(f"[CONTACT FINDER] ✓ Found {len(directory_contacts)} contacts from directories") # Strategy 4: Press releases and news if len(contacts) < max_contacts: print(f"\n[CONTACT FINDER] 📰 Strategy 4: Searching press releases & news...") news_contacts = await self._search_press_releases( company_name, domain, target_titles, seen_emails, seen_names, max_contacts - len(contacts) ) contacts.extend(news_contacts) print(f"[CONTACT FINDER] ✓ Found {len(news_contacts)} contacts from news/PR") # Strategy 5: Social media profiles if len(contacts) < max_contacts: print(f"\n[CONTACT FINDER] 📱 Strategy 5: Searching social media...") social_contacts = await self._search_social_media( company_name, domain, target_titles, seen_emails, seen_names, max_contacts - len(contacts) ) contacts.extend(social_contacts) print(f"[CONTACT FINDER] ✓ Found {len(social_contacts)} contacts from social media") # Strategy 6: Direct email search as fallback if len(contacts) < max_contacts: print(f"\n[CONTACT FINDER] 🔍 Strategy 6: Direct email search...") email_contacts = await self._search_for_emails( company_name, domain, target_titles, seen_emails, max_contacts - len(contacts) ) contacts.extend(email_contacts) print(f"[CONTACT FINDER] ✓ Found {len(email_contacts)} contacts from direct email search") logger.info(f"EnhancedFinder: Total {len(contacts)} VERIFIED contacts found for '{company_name}'") print(f"\n[CONTACT FINDER] ========================================") print(f"[CONTACT FINDER] FINAL RESULTS: {len(contacts)} verified contacts") print(f"[CONTACT FINDER] ========================================") for i, contact in enumerate(contacts[:max_contacts], 1): print(f"[CONTACT FINDER] {i}. {contact.name} ({contact.title})") print(f"[CONTACT FINDER] 📧 {contact.email}") if len(contacts) == 0: print(f"[CONTACT FINDER] No verified contacts found.") print(f"[CONTACT FINDER] Try manual search on LinkedIn or company website.") print(f"[CONTACT FINDER] ========================================\n") return contacts[:max_contacts] async def _scrape_company_website( self, company_name: str, domain: str, target_titles: List[str], seen_emails: Set[str], seen_names: Set[str], max_needed: int ) -> List[Contact]: """Scrape company website for contact information""" contacts = [] # Pages to check on company website pages_to_check = [ f"https://{domain}/team", f"https://{domain}/about", f"https://{domain}/about-us", f"https://{domain}/leadership", f"https://{domain}/our-team", f"https://{domain}/management", f"https://{domain}/contact", f"https://{domain}/contact-us", f"https://www.{domain}/team", f"https://www.{domain}/about", f"https://www.{domain}/about-us", f"https://www.{domain}/leadership", f"https://www.{domain}/contact", ] for url in pages_to_check: if len(contacts) >= max_needed: break try: print(f"[CONTACT FINDER] Checking: {url}") page_content = await self.scraper.scrape_page(url) if not page_content: continue text = page_content.get('text', '') html = page_content.get('html', '') # Find emails on page found_emails = self._extract_emails_from_text(text, domain) for email in found_emails: if email.lower() not in seen_emails and not self._is_generic_email(email.split('@')[0]): name, title = self._extract_name_near_email(text, email, target_titles) if name and name.lower() not in seen_names: contacts.append(Contact( id=str(uuid.uuid4()), name=name, email=email, title=title or "Executive", prospect_id="" )) seen_emails.add(email.lower()) seen_names.add(name.lower()) print(f"[CONTACT FINDER] ✓ FOUND: {name} ({title}) - {email}") if len(contacts) >= max_needed: return contacts except Exception as e: logger.debug(f"Error scraping {url}: {str(e)}") continue return contacts async def _search_linkedin( self, company_name: str, domain: str, target_titles: List[str], seen_emails: Set[str], seen_names: Set[str], max_needed: int ) -> List[Contact]: """Search LinkedIn for company executives with contact info""" contacts = [] for title in target_titles[:5]: # Check top 5 titles if len(contacts) >= max_needed: break # LinkedIn-specific search queries queries = [ f'site:linkedin.com/in "{company_name}" "{title}" email', f'site:linkedin.com "{company_name}" {title} contact', f'linkedin.com/in {title} {company_name} "@{domain}"', ] for query in queries: if len(contacts) >= max_needed: break try: print(f"[CONTACT FINDER] Query: {query[:60]}...") results = await self.search.search(query, max_results=5) for result in results: text = result.get('title', '') + ' ' + result.get('body', '') url = result.get('url', '') # Look for emails in the result found_emails = self._extract_emails_from_text(text, domain) if found_emails: for email in found_emails: if email.lower() not in seen_emails: name = self._extract_linkedin_name(text, result.get('title', '')) if name and name.lower() not in seen_names: contacts.append(Contact( id=str(uuid.uuid4()), name=name, email=email, title=title, prospect_id="" )) seen_emails.add(email.lower()) seen_names.add(name.lower()) print(f"[CONTACT FINDER] ✓ FOUND: {name} ({title}) - {email}") except Exception as e: logger.debug(f"LinkedIn search error: {str(e)}") continue return contacts async def _search_business_directories( self, company_name: str, domain: str, target_titles: List[str], seen_emails: Set[str], seen_names: Set[str], max_needed: int ) -> List[Contact]: """Search Crunchbase, ZoomInfo, and other business directories""" contacts = [] # Directory search queries queries = [ f'site:crunchbase.com "{company_name}" founder CEO email', f'site:crunchbase.com/person "{company_name}" email', f'"{company_name}" founder email "@{domain}"', f'"{company_name}" CEO email contact', f'site:zoominfo.com "{company_name}" contact', f'site:apollo.io "{company_name}" email', ] for query in queries: if len(contacts) >= max_needed: break try: print(f"[CONTACT FINDER] Query: {query[:60]}...") results = await self.search.search(query, max_results=5) for result in results: text = result.get('title', '') + ' ' + result.get('body', '') found_emails = self._extract_emails_from_text(text, domain) for email in found_emails: if email.lower() not in seen_emails and not self._is_generic_email(email.split('@')[0]): name, title = self._extract_name_near_email(text, email, target_titles) if name and name.lower() not in seen_names: contacts.append(Contact( id=str(uuid.uuid4()), name=name, email=email, title=title or "Founder/Executive", prospect_id="" )) seen_emails.add(email.lower()) seen_names.add(name.lower()) print(f"[CONTACT FINDER] ✓ FOUND: {name} - {email}") except Exception as e: logger.debug(f"Directory search error: {str(e)}") continue return contacts async def _search_press_releases( self, company_name: str, domain: str, target_titles: List[str], seen_emails: Set[str], seen_names: Set[str], max_needed: int ) -> List[Contact]: """Search press releases and news for executive contact info""" contacts = [] queries = [ f'"{company_name}" press release contact email', f'"{company_name}" announcement CEO founder email', f'site:prnewswire.com "{company_name}" contact', f'site:businesswire.com "{company_name}" contact', f'"{company_name}" media contact "@{domain}"', f'"{company_name}" PR contact email', ] for query in queries: if len(contacts) >= max_needed: break try: print(f"[CONTACT FINDER] Query: {query[:60]}...") results = await self.search.search(query, max_results=5) for result in results: text = result.get('title', '') + ' ' + result.get('body', '') found_emails = self._extract_emails_from_text(text, domain) for email in found_emails: if email.lower() not in seen_emails and not self._is_generic_email(email.split('@')[0]): name, title = self._extract_name_near_email(text, email, target_titles) if name and name.lower() not in seen_names: contacts.append(Contact( id=str(uuid.uuid4()), name=name, email=email, title=title or "Media Contact", prospect_id="" )) seen_emails.add(email.lower()) seen_names.add(name.lower()) print(f"[CONTACT FINDER] ✓ FOUND: {name} - {email}") except Exception as e: logger.debug(f"Press release search error: {str(e)}") continue return contacts async def _search_social_media( self, company_name: str, domain: str, target_titles: List[str], seen_emails: Set[str], seen_names: Set[str], max_needed: int ) -> List[Contact]: """Search social media profiles for contact information""" contacts = [] queries = [ f'site:twitter.com "{company_name}" email "@{domain}"', f'site:instagram.com "{company_name}" email contact', f'"{company_name}" twitter CEO founder email', f'"{company_name}" instagram business email', f'site:facebook.com "{company_name}" about email', ] for query in queries: if len(contacts) >= max_needed: break try: print(f"[CONTACT FINDER] Query: {query[:60]}...") results = await self.search.search(query, max_results=5) for result in results: text = result.get('title', '') + ' ' + result.get('body', '') found_emails = self._extract_emails_from_text(text, domain) for email in found_emails: if email.lower() not in seen_emails and not self._is_generic_email(email.split('@')[0]): name, title = self._extract_name_near_email(text, email, target_titles) if name and name.lower() not in seen_names: contacts.append(Contact( id=str(uuid.uuid4()), name=name, email=email, title=title or "Executive", prospect_id="" )) seen_emails.add(email.lower()) seen_names.add(name.lower()) print(f"[CONTACT FINDER] ✓ FOUND: {name} - {email}") except Exception as e: logger.debug(f"Social media search error: {str(e)}") continue return contacts def _extract_linkedin_name(self, text: str, title: str) -> Optional[str]: """Extract name from LinkedIn search result""" # LinkedIn title format: "Name - Title at Company | LinkedIn" linkedin_pattern = r'^([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)\s*[-–—]' match = re.search(linkedin_pattern, title) if match: name = match.group(1).strip() if self._is_valid_name(name): return name # Try to find name in text for pattern in self.name_patterns: match = re.search(pattern, text) if match: name = match.group(1).strip() if self._is_valid_name(name): return name return None async def _search_for_emails( self, company_name: str, domain: str, target_titles: List[str], seen_emails: Set[str], max_needed: int ) -> List[Contact]: """Search specifically for email addresses associated with company executives""" contacts = [] # Direct email search queries email_queries = [ f'"{domain}" email CEO OR founder OR director', f'"{company_name}" contact email executive', f'site:{domain} email contact', f'"{company_name}" "@{domain}" CEO OR VP OR director', ] for query in email_queries: try: print(f"[CONTACT FINDER] Query: '{query}'") results = await self.search.search(query, max_results=10) for result in results: text = result.get('title', '') + ' ' + result.get('body', '') # Extract emails from text found_emails = self._extract_emails_from_text(text, domain) for email in found_emails: if email.lower() not in seen_emails and not self._is_generic_email(email.split('@')[0]): # Try to find associated name and title name, title = self._extract_name_near_email(text, email, target_titles) if name: contacts.append(Contact( id=str(uuid.uuid4()), name=name, email=email, title=title or "Executive", prospect_id="" )) seen_emails.add(email.lower()) print(f"[CONTACT FINDER] ✓ FOUND: {name} - {email}") if len(contacts) >= max_needed: return contacts except Exception as e: logger.debug(f"Email search error: {str(e)}") continue return contacts async def _scrape_for_verified_emails( self, company_name: str, domain: str, target_titles: List[str], seen_emails: Set[str], max_needed: int ) -> List[Contact]: """Scrape company pages to find actual email addresses""" contacts = [] # Pages likely to have contact info pages_to_check = [ f"https://{domain}/contact", f"https://{domain}/contact-us", f"https://{domain}/about", f"https://{domain}/about-us", f"https://{domain}/team", f"https://{domain}/leadership", f"https://{domain}/our-team", f"https://www.{domain}/contact", f"https://www.{domain}/about", f"https://www.{domain}/team", ] for url in pages_to_check: try: page_content = await self.scraper.scrape_page(url) if not page_content: continue text = page_content.get('text', '') # Find all emails on page found_emails = self._extract_emails_from_text(text, domain) for email in found_emails: if email.lower() not in seen_emails and not self._is_generic_email(email.split('@')[0]): # Try to find associated name name, title = self._extract_name_near_email(text, email, target_titles) if name: contacts.append(Contact( id=str(uuid.uuid4()), name=name, email=email, title=title or "Contact", prospect_id="" )) seen_emails.add(email.lower()) print(f"[CONTACT FINDER] ✓ SCRAPED: {name} - {email} from {url}") if len(contacts) >= max_needed: return contacts except Exception as e: logger.debug(f"Scrape error for {url}: {str(e)}") continue return contacts async def _find_contacts_with_emails( self, company_name: str, domain: str, target_titles: List[str], seen_emails: Set[str], max_needed: int ) -> List[Contact]: """Search for executives and only return those with verified emails""" contacts = [] for title in target_titles: # Search for person WITH email mention queries = [ f'"{company_name}" {title} email "@{domain}"', f'"{company_name}" {title} contact email', f'site:linkedin.com "{company_name}" {title} email', ] for query in queries: try: results = await self.search.search(query, max_results=5) for result in results: text = result.get('title', '') + ' ' + result.get('body', '') # Only proceed if we find an actual email found_emails = self._extract_emails_from_text(text, domain) for email in found_emails: if email.lower() not in seen_emails and not self._is_generic_email(email.split('@')[0]): # Extract name from text name = self._extract_name_from_text(text, company_name) if name: contacts.append(Contact( id=str(uuid.uuid4()), name=name, email=email, title=title, prospect_id="" )) seen_emails.add(email.lower()) print(f"[CONTACT FINDER] ✓ FOUND: {name} ({title}) - {email}") if len(contacts) >= max_needed: return contacts except Exception as e: logger.debug(f"Search error: {str(e)}") continue return contacts def _extract_emails_from_text(self, text: str, domain: str) -> List[str]: """Extract email addresses from text, prioritizing company domain""" if not text: return [] # Find all emails email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' all_emails = re.findall(email_pattern, text, re.IGNORECASE) # Prioritize company domain emails company_emails = [e for e in all_emails if domain.lower() in e.lower()] # Filter out junk filtered = [] ignore_patterns = ['example.com', 'domain.com', 'email.com', 'test.com', 'sample.com', 'noreply', 'no-reply', 'donotreply', 'unsubscribe', 'privacy', 'support@', 'info@', 'contact@', 'hello@', 'sales@', 'help@'] for email in company_emails: if not any(pattern in email.lower() for pattern in ignore_patterns): filtered.append(email.lower()) return list(set(filtered)) def _extract_name_near_email(self, text: str, email: str, target_titles: List[str]) -> tuple: """Extract name that appears near an email address""" if not text or not email: return None, None # Find context around email (200 chars before and after) email_pos = text.lower().find(email.lower()) if email_pos == -1: return None, None start = max(0, email_pos - 200) end = min(len(text), email_pos + len(email) + 200) context = text[start:end] # Look for name patterns in context name = None title = None # Try to find name-title patterns for pattern in self.name_patterns: match = re.search(pattern, context) if match: potential_name = match.group(1).strip() if self._is_valid_name(potential_name): name = potential_name if len(match.groups()) > 1: title = match.group(2).strip() break # If no name found, try simpler extraction if not name: # Look for capitalized name-like words near email words = context.split() for i, word in enumerate(words): if word and word[0].isupper() and len(word) > 2: if i + 1 < len(words) and words[i+1] and words[i+1][0].isupper(): potential_name = f"{word} {words[i+1]}" if self._is_valid_name(potential_name): name = potential_name break return name, title def _extract_name_from_text(self, text: str, company_name: str) -> Optional[str]: """Extract a person's name from text""" for pattern in self.name_patterns: match = re.search(pattern, text) if match: name = match.group(1).strip() if self._is_valid_name(name) and company_name.lower() not in name.lower(): return name return None def _is_valid_name(self, name: str) -> bool: """Validate that a string looks like a real person's name""" if not name: return False # Remove extra whitespace name = ' '.join(name.split()) # Check for minimum length if len(name) < 4 or len(name) > 50: return False # Should have at least 2 words (first and last name) parts = name.split() if len(parts) < 2: return False # Each part should be reasonable length if not all(2 <= len(part) <= 20 for part in parts): return False # Should start with capital letters if not all(part[0].isupper() for part in parts): return False # Shouldn't contain common non-name words non_name_words = {'inc', 'ltd', 'llc', 'corporation', 'company', 'the', 'and', 'of'} if any(word.lower() in non_name_words for word in parts): return False return True def _is_generic_email(self, prefix: str) -> bool: """Check if email prefix is generic (info, contact, etc.)""" generic_prefixes = { 'info', 'contact', 'support', 'hello', 'sales', 'admin', 'help', 'service', 'team', 'general', 'office', 'mail' } return prefix.lower() in generic_prefixes # Legacy singleton (deprecated - use MCP instead) _enhanced_finder: Optional[EnhancedContactFinder] = None def get_enhanced_contact_finder(mcp_registry=None) -> EnhancedContactFinder: """ Get enhanced contact finder instance Args: mcp_registry: Optional MCP registry (recommended). If provided, creates new instance. If None, returns legacy singleton (deprecated) Returns: EnhancedContactFinder instance """ if mcp_registry: # Create new instance with MCP (recommended) return EnhancedContactFinder(mcp_registry=mcp_registry) # Legacy singleton fallback (deprecated) global _enhanced_finder if _enhanced_finder is None: _enhanced_finder = EnhancedContactFinder() return _enhanced_finder