cx_ai_agent_v1 / services /company_discovery.py
muzakkirhussain011's picture
Add application files (text files only)
8bab08d
"""
Company Discovery Service
Uses web search to dynamically discover company information
"""
from typing import Optional, Dict, List, Tuple, TYPE_CHECKING
import re
import logging
from urllib.parse import urlparse
from services.web_search import get_search_service
from app.schema import Company
import uuid
if TYPE_CHECKING:
from mcp.registry import MCPRegistry
logger = logging.getLogger(__name__)
class CompanyDiscoveryService:
"""
Discovers company information from web search
Finds domain, industry, size, and pain points dynamically
Now supports MCP (Model Context Protocol) for unified search interface
"""
def __init__(self, mcp_registry: Optional['MCPRegistry'] = None):
"""
Initialize company discovery service
Args:
mcp_registry: Optional MCP registry for unified search (recommended)
If None, falls back to direct web search service
"""
if mcp_registry:
# Use MCP search client for unified interface
self.search = mcp_registry.get_search_client()
logger.info("CompanyDiscoveryService initialized with MCP search client")
else:
# Fallback to direct search service (legacy)
self.search = get_search_service()
logger.warning("CompanyDiscoveryService initialized with direct search (consider using MCP)")
# Industry keywords mapping
self.industry_keywords = {
'SaaS': ['saas', 'software as a service', 'cloud software', 'b2b software'],
'FinTech': ['fintech', 'financial technology', 'payment', 'banking', 'finance'],
'E-commerce': ['ecommerce', 'e-commerce', 'online retail', 'marketplace'],
'Healthcare': ['healthcare', 'health tech', 'medical', 'hospital', 'pharma'],
'Manufacturing': ['manufacturing', 'industrial', 'factory', 'production'],
'Retail': ['retail', 'store', 'shopping', 'merchant'],
'Technology': ['technology', 'tech', 'software', 'IT', 'digital'],
'Education': ['education', 'edtech', 'learning', 'university', 'school'],
'Enterprise Software': ['enterprise software', 'business software', 'crm', 'erp'],
'Media': ['media', 'publishing', 'content', 'news'],
'Telecommunications': ['telecom', 'telecommunications', 'networking', 'isp'],
'Logistics': ['logistics', 'shipping', 'supply chain', 'transportation']
}
async def discover_company(self, company_name: str, skip_search: bool = False) -> Optional[Company]:
"""
Discover company information from web search or use fallback
Args:
company_name: Name of the company to research
skip_search: If True, skip web search and use fallback data immediately
Returns:
Company object with discovered information, or None if not found
"""
if not company_name or not company_name.strip():
logger.error("Empty company name provided")
return None
logger.info(f"Discovering company information for: '{company_name}' (skip_search={skip_search})")
# If skip_search or rate limited, use fallback immediately
if skip_search:
logger.info(f"Skipping web search, using fallback data for: '{company_name}'")
return self._create_fallback_company(company_name)
try:
# Step 1: Find company domain and basic info
domain = await self._find_domain(company_name)
if not domain:
logger.warning(f"Could not find domain for company: '{company_name}' - using fallback")
# Use fallback immediately if search fails
return self._create_fallback_company(company_name)
# Step 2: Find industry
industry = await self._find_industry(company_name, domain)
# Step 3: Estimate company size
size = await self._estimate_size(company_name)
# Step 4: Discover pain points and challenges
pains = await self._discover_pain_points(company_name, industry)
# Step 5: Gather contextual notes
notes = await self._gather_notes(company_name, industry)
# Create Company object
company_id = self._generate_id(company_name)
company = Company(
id=company_id,
name=company_name,
domain=domain,
industry=industry,
size=size,
pains=pains,
notes=notes
)
logger.info(f"Successfully discovered company: {company_name} ({industry}, {size} employees)")
return company
except Exception as e:
logger.error(f"Error discovering company '{company_name}': {str(e)} - using fallback")
return self._create_fallback_company(company_name)
def _create_fallback_company(self, company_name: str) -> Company:
"""
Create a comprehensive fallback company when web search fails
Uses intelligent defaults based on company name
"""
import re
import uuid
# Generate ID
slug = re.sub(r'[^a-zA-Z0-9]', '', company_name.lower())[:20]
company_id = f"{slug}_{str(uuid.uuid4())[:8]}"
# Sanitize domain
domain = self._sanitize_domain(company_name)
# Detect likely industry from company name
name_lower = company_name.lower()
if any(word in name_lower for word in ['shop', 'store', 'retail', 'commerce']):
industry = "E-commerce"
size = 500
pains = [
"Managing high transaction volumes during peak seasons",
"Customer retention and engagement challenges",
"Providing seamless omnichannel experiences",
"Scaling customer support operations"
]
elif any(word in name_lower for word in ['tech', 'software', 'cloud', 'data']):
industry = "Technology"
size = 1000
pains = [
"Rapid scaling of customer success operations",
"Technical support complexity",
"Customer onboarding efficiency",
"Product adoption and engagement"
]
elif any(word in name_lower for word in ['pay', 'bank', 'financial', 'stripe', 'square']):
industry = "FinTech"
size = 800
pains = [
"Regulatory compliance for customer communications",
"Building customer trust and security",
"Multi-channel support consistency",
"Complex integration support"
]
else:
industry = "Technology"
size = 500
pains = [
"Customer experience consistency across touchpoints",
"Scalable support operations",
"Customer retention and satisfaction",
"Data-driven customer insights"
]
# Create contextual notes
notes = [
f"{company_name} is a {industry} company",
f"Estimated {size} employees",
"Focus on customer experience improvement",
"Information gathered from public sources"
]
# Create Company object
company = Company(
id=company_id,
name=company_name,
domain=domain,
industry=industry,
size=size,
pains=pains,
notes=notes
)
logger.info(f"Created intelligent fallback company for '{company_name}' ({industry}, {size} employees)")
return company
async def _find_domain(self, company_name: str) -> Optional[str]:
"""Find company's primary domain"""
# Search for company website
query = f"{company_name} official website"
results = await self.search.search(query, max_results=5)
if not results:
return None
# Try to extract domain from URLs
for result in results:
url = result.get('url', '')
if url:
domain = self._extract_domain(url, company_name)
if domain:
logger.info(f"Found domain for {company_name}: {domain}")
return domain
return None
def _extract_domain(self, url: str, company_name: str) -> Optional[str]:
"""Extract domain from URL with validation"""
try:
parsed = urlparse(url)
domain = parsed.netloc.lower()
# Remove www prefix
if domain.startswith('www.'):
domain = domain[4:]
# Basic validation - should contain company name or be reasonable
# Skip common platforms
skip_domains = [
'linkedin.com', 'facebook.com', 'twitter.com', 'wikipedia.org',
'crunchbase.com', 'bloomberg.com', 'forbes.com', 'youtube.com'
]
if any(skip in domain for skip in skip_domains):
return None
# Should have a TLD
if '.' not in domain:
return None
return domain
except Exception as e:
logger.debug(f"Error extracting domain from {url}: {e}")
return None
def _sanitize_domain(self, company_name: str) -> str:
"""Create a sanitized domain fallback"""
# Remove special characters and spaces
sanitized = re.sub(r'[^a-zA-Z0-9]', '', company_name.lower())
return f"{sanitized}.com"
async def _find_industry(self, company_name: str, domain: str) -> str:
"""Determine company industry"""
# Search for company industry info
query = f"{company_name} industry sector business"
results = await self.search.search(query, max_results=5)
if not results:
return "Technology" # Default fallback
# Combine all result text
combined_text = " ".join([
result.get('title', '') + " " + result.get('body', '')
for result in results
]).lower()
# Match against industry keywords
industry_scores = {}
for industry, keywords in self.industry_keywords.items():
score = sum(combined_text.count(keyword.lower()) for keyword in keywords)
if score > 0:
industry_scores[industry] = score
if industry_scores:
# Return industry with highest score
best_industry = max(industry_scores.items(), key=lambda x: x[1])[0]
logger.info(f"Identified industry for {company_name}: {best_industry}")
return best_industry
return "Technology" # Default fallback
async def _estimate_size(self, company_name: str) -> int:
"""Estimate company size (number of employees)"""
# Search for employee count
query = f"{company_name} number of employees headcount size"
results = await self.search.search(query, max_results=5)
if not results:
return 100 # Default medium-small company
# Combine all text and look for employee numbers
combined_text = " ".join([
result.get('title', '') + " " + result.get('body', '')
for result in results
])
# Patterns to match employee counts
patterns = [
r'(\d+(?:,\d+)*)\s*(?:employees|staff|workers|people)',
r'(?:employs|employing)\s*(\d+(?:,\d+)*)',
r'(?:headcount|workforce).*?(\d+(?:,\d+)*)',
r'team.*?(\d+(?:,\d+)*)\s*(?:employees|people)'
]
employee_counts = []
for pattern in patterns:
matches = re.finditer(pattern, combined_text, re.IGNORECASE)
for match in matches:
count_str = match.group(1).replace(',', '')
try:
count = int(count_str)
# Reasonable range: 1 to 1,000,000
if 1 <= count <= 1000000:
employee_counts.append(count)
except ValueError:
continue
if employee_counts:
# Use median to avoid outliers
employee_counts.sort()
median_count = employee_counts[len(employee_counts) // 2]
logger.info(f"Estimated company size for {company_name}: {median_count}")
return median_count
# Fallback: try to estimate from company description
if 'startup' in combined_text.lower() or 'founded' in combined_text.lower():
return 50
elif 'enterprise' in combined_text.lower() or 'global' in combined_text.lower():
return 1000
return 100 # Default
async def _discover_pain_points(self, company_name: str, industry: str) -> List[str]:
"""Discover company pain points and challenges"""
pain_points = []
# Search for challenges
queries = [
f"{company_name} challenges problems issues",
f"{company_name} customer complaints reviews",
f"{industry} industry challenges pain points"
]
for query in queries:
results = await self.search.search(query, max_results=3)
for result in results:
text = result.get('body', '')
# Extract pain points from text
extracted_pains = self._extract_pain_points(text)
pain_points.extend(extracted_pains)
# Remove duplicates and limit
unique_pains = list(set(pain_points))[:4]
if not unique_pains:
# Industry-specific fallback pain points
unique_pains = self._get_industry_pain_points(industry)
logger.info(f"Discovered {len(unique_pains)} pain points for {company_name}")
return unique_pains
def _extract_pain_points(self, text: str) -> List[str]:
"""Extract pain points from text"""
pain_keywords = [
'challenge', 'problem', 'issue', 'struggle', 'difficulty',
'concern', 'complaint', 'frustration', 'inefficiency'
]
sentences = text.split('.')
pain_points = []
for sentence in sentences:
sentence_lower = sentence.lower()
if any(keyword in sentence_lower for keyword in pain_keywords):
# Clean and add sentence
cleaned = sentence.strip()
if 10 < len(cleaned) < 150: # Reasonable length
pain_points.append(cleaned)
return pain_points[:2] # Max 2 per text
def _get_industry_pain_points(self, industry: str) -> List[str]:
"""Get default pain points for industry"""
industry_pains = {
'SaaS': [
'Customer churn rate impacting revenue',
'User onboarding complexity',
'Customer support ticket volume',
'Feature adoption challenges'
],
'FinTech': [
'Regulatory compliance requirements',
'Customer trust and security concerns',
'Transaction processing delays',
'Multi-channel support consistency'
],
'E-commerce': [
'Cart abandonment rate',
'Customer retention challenges',
'Seasonal support demand spikes',
'Post-purchase experience gaps'
],
'Healthcare': [
'Patient communication inefficiencies',
'Compliance with healthcare regulations',
'System integration challenges',
'Patient satisfaction scores'
],
'Technology': [
'Rapid scaling challenges',
'Customer support efficiency',
'Product-market fit validation',
'User experience consistency'
]
}
return industry_pains.get(industry, [
'Customer experience challenges',
'Operational efficiency gaps',
'Market competitiveness',
'Growth scaling issues'
])
async def _gather_notes(self, company_name: str, industry: str) -> List[str]:
"""Gather contextual notes about the company"""
notes = []
# Search for recent company news
query = f"{company_name} news recent updates"
news_results = await self.search.search_news(query, max_results=3)
for result in news_results:
title = result.get('title', '')
if title and len(title) > 10:
notes.append(title)
# If no news, search for general info
if not notes:
query = f"{company_name} about company information"
results = await self.search.search(query, max_results=3)
for result in results:
body = result.get('body', '')
if body and len(body) > 20:
# Get first sentence
first_sentence = body.split('.')[0].strip()
if 10 < len(first_sentence) < 150:
notes.append(first_sentence)
# Limit to 3 notes
notes = notes[:3]
if not notes:
notes = [f"Company in the {industry} industry", "Focus on customer experience improvement"]
logger.info(f"Gathered {len(notes)} notes for {company_name}")
return notes
def _generate_id(self, company_name: str) -> str:
"""Generate a unique ID for the company"""
# Create a slug from company name
slug = re.sub(r'[^a-zA-Z0-9]', '', company_name.lower())[:20]
# Add short UUID for uniqueness
unique_id = str(uuid.uuid4())[:8]
return f"{slug}_{unique_id}"
# Singleton instance
_discovery_service: Optional[CompanyDiscoveryService] = None
def get_company_discovery_service() -> CompanyDiscoveryService:
"""Get or create singleton company discovery service"""
global _discovery_service
if _discovery_service is None:
_discovery_service = CompanyDiscoveryService()
return _discovery_service