|
|
""" |
|
|
Company Discovery Service |
|
|
Uses web search to dynamically discover company information |
|
|
""" |
|
|
from typing import Optional, Dict, List, Tuple, TYPE_CHECKING |
|
|
import re |
|
|
import logging |
|
|
from urllib.parse import urlparse |
|
|
from services.web_search import get_search_service |
|
|
from app.schema import Company |
|
|
import uuid |
|
|
|
|
|
if TYPE_CHECKING: |
|
|
from mcp.registry import MCPRegistry |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class CompanyDiscoveryService: |
|
|
""" |
|
|
Discovers company information from web search |
|
|
Finds domain, industry, size, and pain points dynamically |
|
|
|
|
|
Now supports MCP (Model Context Protocol) for unified search interface |
|
|
""" |
|
|
|
|
|
def __init__(self, mcp_registry: Optional['MCPRegistry'] = None): |
|
|
""" |
|
|
Initialize company discovery service |
|
|
|
|
|
Args: |
|
|
mcp_registry: Optional MCP registry for unified search (recommended) |
|
|
If None, falls back to direct web search service |
|
|
""" |
|
|
if mcp_registry: |
|
|
|
|
|
self.search = mcp_registry.get_search_client() |
|
|
logger.info("CompanyDiscoveryService initialized with MCP search client") |
|
|
else: |
|
|
|
|
|
self.search = get_search_service() |
|
|
logger.warning("CompanyDiscoveryService initialized with direct search (consider using MCP)") |
|
|
|
|
|
self.industry_keywords = { |
|
|
'SaaS': ['saas', 'software as a service', 'cloud software', 'b2b software'], |
|
|
'FinTech': ['fintech', 'financial technology', 'payment', 'banking', 'finance'], |
|
|
'E-commerce': ['ecommerce', 'e-commerce', 'online retail', 'marketplace'], |
|
|
'Healthcare': ['healthcare', 'health tech', 'medical', 'hospital', 'pharma'], |
|
|
'Manufacturing': ['manufacturing', 'industrial', 'factory', 'production'], |
|
|
'Retail': ['retail', 'store', 'shopping', 'merchant'], |
|
|
'Technology': ['technology', 'tech', 'software', 'IT', 'digital'], |
|
|
'Education': ['education', 'edtech', 'learning', 'university', 'school'], |
|
|
'Enterprise Software': ['enterprise software', 'business software', 'crm', 'erp'], |
|
|
'Media': ['media', 'publishing', 'content', 'news'], |
|
|
'Telecommunications': ['telecom', 'telecommunications', 'networking', 'isp'], |
|
|
'Logistics': ['logistics', 'shipping', 'supply chain', 'transportation'] |
|
|
} |
|
|
|
|
|
async def discover_company(self, company_name: str, skip_search: bool = False) -> Optional[Company]: |
|
|
""" |
|
|
Discover company information from web search or use fallback |
|
|
|
|
|
Args: |
|
|
company_name: Name of the company to research |
|
|
skip_search: If True, skip web search and use fallback data immediately |
|
|
|
|
|
Returns: |
|
|
Company object with discovered information, or None if not found |
|
|
""" |
|
|
if not company_name or not company_name.strip(): |
|
|
logger.error("Empty company name provided") |
|
|
return None |
|
|
|
|
|
logger.info(f"Discovering company information for: '{company_name}' (skip_search={skip_search})") |
|
|
|
|
|
|
|
|
if skip_search: |
|
|
logger.info(f"Skipping web search, using fallback data for: '{company_name}'") |
|
|
return self._create_fallback_company(company_name) |
|
|
|
|
|
try: |
|
|
|
|
|
domain = await self._find_domain(company_name) |
|
|
if not domain: |
|
|
logger.warning(f"Could not find domain for company: '{company_name}' - using fallback") |
|
|
|
|
|
return self._create_fallback_company(company_name) |
|
|
|
|
|
|
|
|
industry = await self._find_industry(company_name, domain) |
|
|
|
|
|
|
|
|
size = await self._estimate_size(company_name) |
|
|
|
|
|
|
|
|
pains = await self._discover_pain_points(company_name, industry) |
|
|
|
|
|
|
|
|
notes = await self._gather_notes(company_name, industry) |
|
|
|
|
|
|
|
|
company_id = self._generate_id(company_name) |
|
|
company = Company( |
|
|
id=company_id, |
|
|
name=company_name, |
|
|
domain=domain, |
|
|
industry=industry, |
|
|
size=size, |
|
|
pains=pains, |
|
|
notes=notes |
|
|
) |
|
|
|
|
|
logger.info(f"Successfully discovered company: {company_name} ({industry}, {size} employees)") |
|
|
return company |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error discovering company '{company_name}': {str(e)} - using fallback") |
|
|
return self._create_fallback_company(company_name) |
|
|
|
|
|
def _create_fallback_company(self, company_name: str) -> Company: |
|
|
""" |
|
|
Create a comprehensive fallback company when web search fails |
|
|
Uses intelligent defaults based on company name |
|
|
""" |
|
|
import re |
|
|
import uuid |
|
|
|
|
|
|
|
|
slug = re.sub(r'[^a-zA-Z0-9]', '', company_name.lower())[:20] |
|
|
company_id = f"{slug}_{str(uuid.uuid4())[:8]}" |
|
|
|
|
|
|
|
|
domain = self._sanitize_domain(company_name) |
|
|
|
|
|
|
|
|
name_lower = company_name.lower() |
|
|
if any(word in name_lower for word in ['shop', 'store', 'retail', 'commerce']): |
|
|
industry = "E-commerce" |
|
|
size = 500 |
|
|
pains = [ |
|
|
"Managing high transaction volumes during peak seasons", |
|
|
"Customer retention and engagement challenges", |
|
|
"Providing seamless omnichannel experiences", |
|
|
"Scaling customer support operations" |
|
|
] |
|
|
elif any(word in name_lower for word in ['tech', 'software', 'cloud', 'data']): |
|
|
industry = "Technology" |
|
|
size = 1000 |
|
|
pains = [ |
|
|
"Rapid scaling of customer success operations", |
|
|
"Technical support complexity", |
|
|
"Customer onboarding efficiency", |
|
|
"Product adoption and engagement" |
|
|
] |
|
|
elif any(word in name_lower for word in ['pay', 'bank', 'financial', 'stripe', 'square']): |
|
|
industry = "FinTech" |
|
|
size = 800 |
|
|
pains = [ |
|
|
"Regulatory compliance for customer communications", |
|
|
"Building customer trust and security", |
|
|
"Multi-channel support consistency", |
|
|
"Complex integration support" |
|
|
] |
|
|
else: |
|
|
industry = "Technology" |
|
|
size = 500 |
|
|
pains = [ |
|
|
"Customer experience consistency across touchpoints", |
|
|
"Scalable support operations", |
|
|
"Customer retention and satisfaction", |
|
|
"Data-driven customer insights" |
|
|
] |
|
|
|
|
|
|
|
|
notes = [ |
|
|
f"{company_name} is a {industry} company", |
|
|
f"Estimated {size} employees", |
|
|
"Focus on customer experience improvement", |
|
|
"Information gathered from public sources" |
|
|
] |
|
|
|
|
|
|
|
|
company = Company( |
|
|
id=company_id, |
|
|
name=company_name, |
|
|
domain=domain, |
|
|
industry=industry, |
|
|
size=size, |
|
|
pains=pains, |
|
|
notes=notes |
|
|
) |
|
|
|
|
|
logger.info(f"Created intelligent fallback company for '{company_name}' ({industry}, {size} employees)") |
|
|
return company |
|
|
|
|
|
async def _find_domain(self, company_name: str) -> Optional[str]: |
|
|
"""Find company's primary domain""" |
|
|
|
|
|
query = f"{company_name} official website" |
|
|
results = await self.search.search(query, max_results=5) |
|
|
|
|
|
if not results: |
|
|
return None |
|
|
|
|
|
|
|
|
for result in results: |
|
|
url = result.get('url', '') |
|
|
if url: |
|
|
domain = self._extract_domain(url, company_name) |
|
|
if domain: |
|
|
logger.info(f"Found domain for {company_name}: {domain}") |
|
|
return domain |
|
|
|
|
|
return None |
|
|
|
|
|
def _extract_domain(self, url: str, company_name: str) -> Optional[str]: |
|
|
"""Extract domain from URL with validation""" |
|
|
try: |
|
|
parsed = urlparse(url) |
|
|
domain = parsed.netloc.lower() |
|
|
|
|
|
|
|
|
if domain.startswith('www.'): |
|
|
domain = domain[4:] |
|
|
|
|
|
|
|
|
|
|
|
skip_domains = [ |
|
|
'linkedin.com', 'facebook.com', 'twitter.com', 'wikipedia.org', |
|
|
'crunchbase.com', 'bloomberg.com', 'forbes.com', 'youtube.com' |
|
|
] |
|
|
|
|
|
if any(skip in domain for skip in skip_domains): |
|
|
return None |
|
|
|
|
|
|
|
|
if '.' not in domain: |
|
|
return None |
|
|
|
|
|
return domain |
|
|
|
|
|
except Exception as e: |
|
|
logger.debug(f"Error extracting domain from {url}: {e}") |
|
|
return None |
|
|
|
|
|
def _sanitize_domain(self, company_name: str) -> str: |
|
|
"""Create a sanitized domain fallback""" |
|
|
|
|
|
sanitized = re.sub(r'[^a-zA-Z0-9]', '', company_name.lower()) |
|
|
return f"{sanitized}.com" |
|
|
|
|
|
async def _find_industry(self, company_name: str, domain: str) -> str: |
|
|
"""Determine company industry""" |
|
|
|
|
|
query = f"{company_name} industry sector business" |
|
|
results = await self.search.search(query, max_results=5) |
|
|
|
|
|
if not results: |
|
|
return "Technology" |
|
|
|
|
|
|
|
|
combined_text = " ".join([ |
|
|
result.get('title', '') + " " + result.get('body', '') |
|
|
for result in results |
|
|
]).lower() |
|
|
|
|
|
|
|
|
industry_scores = {} |
|
|
for industry, keywords in self.industry_keywords.items(): |
|
|
score = sum(combined_text.count(keyword.lower()) for keyword in keywords) |
|
|
if score > 0: |
|
|
industry_scores[industry] = score |
|
|
|
|
|
if industry_scores: |
|
|
|
|
|
best_industry = max(industry_scores.items(), key=lambda x: x[1])[0] |
|
|
logger.info(f"Identified industry for {company_name}: {best_industry}") |
|
|
return best_industry |
|
|
|
|
|
return "Technology" |
|
|
|
|
|
async def _estimate_size(self, company_name: str) -> int: |
|
|
"""Estimate company size (number of employees)""" |
|
|
|
|
|
query = f"{company_name} number of employees headcount size" |
|
|
results = await self.search.search(query, max_results=5) |
|
|
|
|
|
if not results: |
|
|
return 100 |
|
|
|
|
|
|
|
|
combined_text = " ".join([ |
|
|
result.get('title', '') + " " + result.get('body', '') |
|
|
for result in results |
|
|
]) |
|
|
|
|
|
|
|
|
patterns = [ |
|
|
r'(\d+(?:,\d+)*)\s*(?:employees|staff|workers|people)', |
|
|
r'(?:employs|employing)\s*(\d+(?:,\d+)*)', |
|
|
r'(?:headcount|workforce).*?(\d+(?:,\d+)*)', |
|
|
r'team.*?(\d+(?:,\d+)*)\s*(?:employees|people)' |
|
|
] |
|
|
|
|
|
employee_counts = [] |
|
|
for pattern in patterns: |
|
|
matches = re.finditer(pattern, combined_text, re.IGNORECASE) |
|
|
for match in matches: |
|
|
count_str = match.group(1).replace(',', '') |
|
|
try: |
|
|
count = int(count_str) |
|
|
|
|
|
if 1 <= count <= 1000000: |
|
|
employee_counts.append(count) |
|
|
except ValueError: |
|
|
continue |
|
|
|
|
|
if employee_counts: |
|
|
|
|
|
employee_counts.sort() |
|
|
median_count = employee_counts[len(employee_counts) // 2] |
|
|
logger.info(f"Estimated company size for {company_name}: {median_count}") |
|
|
return median_count |
|
|
|
|
|
|
|
|
if 'startup' in combined_text.lower() or 'founded' in combined_text.lower(): |
|
|
return 50 |
|
|
elif 'enterprise' in combined_text.lower() or 'global' in combined_text.lower(): |
|
|
return 1000 |
|
|
|
|
|
return 100 |
|
|
|
|
|
async def _discover_pain_points(self, company_name: str, industry: str) -> List[str]: |
|
|
"""Discover company pain points and challenges""" |
|
|
pain_points = [] |
|
|
|
|
|
|
|
|
queries = [ |
|
|
f"{company_name} challenges problems issues", |
|
|
f"{company_name} customer complaints reviews", |
|
|
f"{industry} industry challenges pain points" |
|
|
] |
|
|
|
|
|
for query in queries: |
|
|
results = await self.search.search(query, max_results=3) |
|
|
|
|
|
for result in results: |
|
|
text = result.get('body', '') |
|
|
|
|
|
extracted_pains = self._extract_pain_points(text) |
|
|
pain_points.extend(extracted_pains) |
|
|
|
|
|
|
|
|
unique_pains = list(set(pain_points))[:4] |
|
|
|
|
|
if not unique_pains: |
|
|
|
|
|
unique_pains = self._get_industry_pain_points(industry) |
|
|
|
|
|
logger.info(f"Discovered {len(unique_pains)} pain points for {company_name}") |
|
|
return unique_pains |
|
|
|
|
|
def _extract_pain_points(self, text: str) -> List[str]: |
|
|
"""Extract pain points from text""" |
|
|
pain_keywords = [ |
|
|
'challenge', 'problem', 'issue', 'struggle', 'difficulty', |
|
|
'concern', 'complaint', 'frustration', 'inefficiency' |
|
|
] |
|
|
|
|
|
sentences = text.split('.') |
|
|
pain_points = [] |
|
|
|
|
|
for sentence in sentences: |
|
|
sentence_lower = sentence.lower() |
|
|
if any(keyword in sentence_lower for keyword in pain_keywords): |
|
|
|
|
|
cleaned = sentence.strip() |
|
|
if 10 < len(cleaned) < 150: |
|
|
pain_points.append(cleaned) |
|
|
|
|
|
return pain_points[:2] |
|
|
|
|
|
def _get_industry_pain_points(self, industry: str) -> List[str]: |
|
|
"""Get default pain points for industry""" |
|
|
industry_pains = { |
|
|
'SaaS': [ |
|
|
'Customer churn rate impacting revenue', |
|
|
'User onboarding complexity', |
|
|
'Customer support ticket volume', |
|
|
'Feature adoption challenges' |
|
|
], |
|
|
'FinTech': [ |
|
|
'Regulatory compliance requirements', |
|
|
'Customer trust and security concerns', |
|
|
'Transaction processing delays', |
|
|
'Multi-channel support consistency' |
|
|
], |
|
|
'E-commerce': [ |
|
|
'Cart abandonment rate', |
|
|
'Customer retention challenges', |
|
|
'Seasonal support demand spikes', |
|
|
'Post-purchase experience gaps' |
|
|
], |
|
|
'Healthcare': [ |
|
|
'Patient communication inefficiencies', |
|
|
'Compliance with healthcare regulations', |
|
|
'System integration challenges', |
|
|
'Patient satisfaction scores' |
|
|
], |
|
|
'Technology': [ |
|
|
'Rapid scaling challenges', |
|
|
'Customer support efficiency', |
|
|
'Product-market fit validation', |
|
|
'User experience consistency' |
|
|
] |
|
|
} |
|
|
|
|
|
return industry_pains.get(industry, [ |
|
|
'Customer experience challenges', |
|
|
'Operational efficiency gaps', |
|
|
'Market competitiveness', |
|
|
'Growth scaling issues' |
|
|
]) |
|
|
|
|
|
async def _gather_notes(self, company_name: str, industry: str) -> List[str]: |
|
|
"""Gather contextual notes about the company""" |
|
|
notes = [] |
|
|
|
|
|
|
|
|
query = f"{company_name} news recent updates" |
|
|
news_results = await self.search.search_news(query, max_results=3) |
|
|
|
|
|
for result in news_results: |
|
|
title = result.get('title', '') |
|
|
if title and len(title) > 10: |
|
|
notes.append(title) |
|
|
|
|
|
|
|
|
if not notes: |
|
|
query = f"{company_name} about company information" |
|
|
results = await self.search.search(query, max_results=3) |
|
|
|
|
|
for result in results: |
|
|
body = result.get('body', '') |
|
|
if body and len(body) > 20: |
|
|
|
|
|
first_sentence = body.split('.')[0].strip() |
|
|
if 10 < len(first_sentence) < 150: |
|
|
notes.append(first_sentence) |
|
|
|
|
|
|
|
|
notes = notes[:3] |
|
|
|
|
|
if not notes: |
|
|
notes = [f"Company in the {industry} industry", "Focus on customer experience improvement"] |
|
|
|
|
|
logger.info(f"Gathered {len(notes)} notes for {company_name}") |
|
|
return notes |
|
|
|
|
|
def _generate_id(self, company_name: str) -> str: |
|
|
"""Generate a unique ID for the company""" |
|
|
|
|
|
slug = re.sub(r'[^a-zA-Z0-9]', '', company_name.lower())[:20] |
|
|
|
|
|
unique_id = str(uuid.uuid4())[:8] |
|
|
return f"{slug}_{unique_id}" |
|
|
|
|
|
|
|
|
|
|
|
_discovery_service: Optional[CompanyDiscoveryService] = None |
|
|
|
|
|
|
|
|
def get_company_discovery_service() -> CompanyDiscoveryService: |
|
|
"""Get or create singleton company discovery service""" |
|
|
global _discovery_service |
|
|
if _discovery_service is None: |
|
|
_discovery_service = CompanyDiscoveryService() |
|
|
return _discovery_service |
|
|
|