File size: 4,836 Bytes
8bab08d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# file: vector/store.py
import json
import pickle
from pathlib import Path
import numpy as np
import faiss
from app.config import VECTOR_INDEX_PATH, EMBEDDING_DIM, DATA_DIR
class VectorStore:
"""FAISS vector store with persistence"""
def __init__(self):
self.index_path = Path(VECTOR_INDEX_PATH)
self.metadata_path = self.index_path.with_suffix(".meta")
self.index = None
self.metadata = []
self._initialize()
def _initialize(self):
"""Initialize or load the index"""
if self.index_path.exists():
self._load()
else:
self._create_new()
def _create_new(self):
"""Create a new FAISS index"""
# Using IndexFlatIP for inner product (cosine with normalized vectors)
self.index = faiss.IndexFlatIP(EMBEDDING_DIM)
self.metadata = []
def _load(self):
"""Load existing index and metadata"""
try:
self.index = faiss.read_index(str(self.index_path))
if self.metadata_path.exists():
with open(self.metadata_path, "rb") as f:
self.metadata = pickle.load(f)
except Exception as e:
print(f"Could not load index: {e}")
self._create_new()
def save(self):
"""Persist index and metadata"""
if self.index:
self.index_path.parent.mkdir(parents=True, exist_ok=True)
faiss.write_index(self.index, str(self.index_path))
with open(self.metadata_path, "wb") as f:
pickle.dump(self.metadata, f)
def add(self, embeddings: np.ndarray, metadata: list):
"""Add embeddings with metadata"""
if self.index is None:
self._create_new()
# Normalize embeddings for cosine similarity
norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
normalized = embeddings / (norms + 1e-10)
self.index.add(normalized.astype(np.float32))
self.metadata.extend(metadata)
self.save()
def search(self, query_embedding: np.ndarray, k: int = 5):
"""Search for similar vectors"""
if self.index is None or self.index.ntotal == 0:
return []
# Normalize query
norm = np.linalg.norm(query_embedding)
normalized = query_embedding / (norm + 1e-10)
# Search
scores, indices = self.index.search(
normalized.reshape(1, -1).astype(np.float32),
min(k, self.index.ntotal)
)
results = []
for score, idx in zip(scores[0], indices[0]):
if idx < len(self.metadata):
result = self.metadata[idx].copy()
result["score"] = float(score)
results.append(result)
return results
def rebuild_index(self):
"""Rebuild the index from scratch"""
self._create_new()
# Load seed data and re-embed
companies_file = DATA_DIR / "companies.json"
if companies_file.exists():
with open(companies_file) as f:
companies = json.load(f)
from vector.embeddings import get_embedding_model
model = get_embedding_model()
texts = []
metadata = []
for company in companies:
# Add company description
desc = f"{company['name']} is a {company['industry']} company with {company['size']} employees"
texts.append(desc)
metadata.append({
"company_id": company["id"],
"type": "description",
"text": desc
})
# Add pain points
for pain in company.get("pains", []):
text = f"{company['name']} pain point: {pain}"
texts.append(text)
metadata.append({
"company_id": company["id"],
"type": "pain",
"text": text
})
# Add notes
for note in company.get("notes", []):
texts.append(note)
metadata.append({
"company_id": company["id"],
"type": "note",
"text": note
})
if texts:
embeddings = model.encode(texts)
self.add(embeddings, metadata)
def is_initialized(self):
"""Check if the store is initialized"""
return self.index is not None and self.index.ntotal > 0 |