""" Binary Hyperdimensional Vector (Binary HDV) Core ================================================= Phase 3.0 implementation of binary VSA operations. Based on Kanerva's Hyperdimensional Computing theory (2009). Uses standard mathematical operations (XOR, Hamming distance, majority bundling) that are fundamental VSA primitives — not derived from any proprietary implementation. Key design choices: - D = 16,384 bits (2^14) — configurable via config.yaml - Storage: packed as np.uint8 arrays (D/8 bytes = 2,048 bytes per vector) - Similarity: Hamming distance (popcount of XOR result) - Binding: element-wise XOR (self-inverse, commutative) - Bundling: element-wise majority vote (thresholded sum) - Sequence: circular bit-shift (permutation) All batch operations are NumPy-vectorized (no Python loops for distance computation). """ import hashlib from typing import List, Optional, Tuple import numpy as np import re # Cached lookup table for popcount (bits set per byte value 0-255) _POPCOUNT_TABLE: Optional[np.ndarray] = None def _build_popcount_table() -> np.ndarray: """Build or return cached popcount lookup table for bytes (0-255).""" global _POPCOUNT_TABLE if _POPCOUNT_TABLE is None: _POPCOUNT_TABLE = np.array( [bin(i).count("1") for i in range(256)], dtype=np.int32 ) return _POPCOUNT_TABLE class BinaryHDV: """ A binary hyperdimensional vector stored as a packed uint8 array. The vector has `dimension` logical bits, stored in `dimension // 8` bytes. Each byte holds 8 bits in big-endian bit order (MSB first within each byte). Attributes: data: np.ndarray of dtype uint8, shape (dimension // 8,) dimension: int, number of logical bits """ __slots__ = ("data", "dimension") def __init__(self, data: np.ndarray, dimension: int): """ Args: data: Packed uint8 array of shape (dimension // 8,). dimension: Number of logical bits. """ assert data.dtype == np.uint8, f"Expected uint8, got {data.dtype}" assert data.shape == (dimension // 8,), ( f"Shape mismatch: expected ({dimension // 8},), got {data.shape}" ) self.data = data self.dimension = dimension # ------------------------------------------------------------------ # Factory methods # ------------------------------------------------------------------ @classmethod def random(cls, dimension: int = 16384) -> "BinaryHDV": """Generate a random binary vector (uniform i.i.d. bits).""" assert dimension % 8 == 0, "Dimension must be multiple of 8" n_bytes = dimension // 8 data = np.random.randint(0, 256, size=n_bytes, dtype=np.uint8) return cls(data=data, dimension=dimension) @classmethod def zeros(cls, dimension: int = 16384) -> "BinaryHDV": """All-zero vector.""" n_bytes = dimension // 8 return cls(data=np.zeros(n_bytes, dtype=np.uint8), dimension=dimension) @classmethod def ones(cls, dimension: int = 16384) -> "BinaryHDV": """All-one vector (every bit set).""" n_bytes = dimension // 8 return cls( data=np.full(n_bytes, 0xFF, dtype=np.uint8), dimension=dimension ) @classmethod def from_seed(cls, seed: str, dimension: int = 16384) -> "BinaryHDV": """ Deterministic vector from a string seed. Uses SHA-3 (SHAKE-256) for high-performance deterministic expansion. """ n_bytes = dimension // 8 # SHAKE-256 can generate arbitrary length digests in one pass digest = hashlib.shake_256(seed.encode()).digest(n_bytes) data = np.frombuffer(digest, dtype=np.uint8).copy() return cls(data=data, dimension=dimension) # ------------------------------------------------------------------ # Core VSA operations # ------------------------------------------------------------------ def xor_bind(self, other: "BinaryHDV") -> "BinaryHDV": """ Binding via element-wise XOR. Properties: - Self-inverse: a ⊕ a = 0 - Commutative: a ⊕ b = b ⊕ a - Associative: (a ⊕ b) ⊕ c = a ⊕ (b ⊕ c) - Preserves distance: hamming(a⊕c, b⊕c) = hamming(a, b) """ assert self.dimension == other.dimension return BinaryHDV( data=np.bitwise_xor(self.data, other.data), dimension=self.dimension, ) def permute(self, shift: int = 1) -> "BinaryHDV": """ Circular bit-shift for sequence/role encoding. Shifts all bits by `shift` positions to the right (with wrap-around). Works at the byte level with bit carry for efficiency. """ if shift == 0: return BinaryHDV(data=self.data.copy(), dimension=self.dimension) # Normalize shift to positive value within dimension shift = shift % self.dimension bits = np.unpackbits(self.data) bits = np.roll(bits, shift) return BinaryHDV( data=np.packbits(bits), dimension=self.dimension ) def invert(self) -> "BinaryHDV": """Bitwise NOT — produces the maximally distant vector.""" return BinaryHDV( data=np.bitwise_not(self.data), dimension=self.dimension ) def hamming_distance(self, other: "BinaryHDV") -> int: """ Hamming distance: count of differing bits. Uses lookup table for speed (replacing unpackbits). Range: [0, dimension]. """ assert self.dimension == other.dimension xor_result = np.bitwise_xor(self.data, other.data) # Optimized: use precomputed popcount table instead of unpacking bits return int(_build_popcount_table()[xor_result].sum()) def normalized_distance(self, other: "BinaryHDV") -> float: """Hamming distance normalized to [0.0, 1.0].""" return self.hamming_distance(other) / self.dimension def similarity(self, other: "BinaryHDV") -> float: """ Similarity score in [0.0, 1.0]. 1.0 = identical, 0.0 = maximally different. 0.5 = random/orthogonal (expected for unrelated vectors). """ return 1.0 - self.normalized_distance(other) # ------------------------------------------------------------------ # Compatibility shims for legacy HDV API # ------------------------------------------------------------------ def bind(self, other: "BinaryHDV") -> "BinaryHDV": """ Alias for xor_bind(). Compatibility shim for legacy HDV API. Deprecated: Use xor_bind() directly for new code. """ return self.xor_bind(other) def unbind(self, other: "BinaryHDV") -> "BinaryHDV": """ Alias for xor_bind(). Since XOR is self-inverse, unbind = bind. Compatibility shim for legacy HDV API. """ return self.xor_bind(other) def cosine_similarity(self, other: "BinaryHDV") -> float: """ Alias for similarity(). Compatibility shim for legacy HDV API. Note: For binary vectors, this returns Hamming-based similarity, not true cosine similarity. The values are comparable for most use cases. """ return self.similarity(other) def normalize(self) -> "BinaryHDV": """ No-op for binary vectors. Compatibility shim for legacy HDV API. Binary vectors are already "normalized" in the sense that they consist only of 0s and 1s. Returns a copy of the vector. """ return BinaryHDV(data=self.data.copy(), dimension=self.dimension) def __xor__(self, other: "BinaryHDV") -> "BinaryHDV": """Alias for xor_bind(). Enables v1 ^ v2 syntax.""" return self.xor_bind(other) def to_bytes(self) -> bytes: """Serialize to raw bytes (for storage).""" return self.data.tobytes() @classmethod def from_bytes(cls, raw: bytes, dimension: int = 16384) -> "BinaryHDV": """Deserialize from raw bytes.""" data = np.frombuffer(raw, dtype=np.uint8).copy() return cls(data=data, dimension=dimension) def __repr__(self) -> str: # Optimized: use precomputed popcount table popcount = int(_build_popcount_table()[self.data].sum()) return f"BinaryHDV(dim={self.dimension}, popcount={popcount}/{self.dimension})" def __eq__(self, other: object) -> bool: if not isinstance(other, BinaryHDV): return NotImplemented return self.dimension == other.dimension and np.array_equal( self.data, other.data ) # ====================================================================== # Batch operations (NumPy-vectorized, no Python loops) # ====================================================================== def batch_hamming_distance( query: BinaryHDV, database: np.ndarray ) -> np.ndarray: """ Compute Hamming distance between a query vector and all vectors in a database. Args: query: Single BinaryHDV query vector. database: 2D array of shape (N, D//8) with dtype uint8, where each row is a packed binary vector. Returns: 1D array of shape (N,) with Hamming distances (int). """ # XOR query with all database vectors: (N, D//8) xor_result = np.bitwise_xor(database, query.data) # Popcount via lookup table — count bits set in each byte # This is the fastest pure-NumPy approach for packed binary vectors popcount_table = _build_popcount_table() bit_counts = popcount_table[xor_result] # (N, D//8) # Sum across bytes to get total Hamming distance per vector return bit_counts.sum(axis=1) def batch_hamming_distance_matrix( database: np.ndarray, ) -> np.ndarray: """ Compute the full pairwise Hamming distance matrix for a database. Args: database: 2D array of shape (N, D//8) with dtype uint8. Returns: 2D array of shape (N, N) with Hamming distances. """ N = database.shape[0] popcount_table = _build_popcount_table() distances = np.zeros((N, N), dtype=np.int32) for i in range(N): xor_result = np.bitwise_xor(database[i], database[i + 1 :]) bit_counts = popcount_table[xor_result].sum(axis=1) distances[i, i + 1 :] = bit_counts distances[i + 1 :, i] = bit_counts return distances def majority_bundle( vectors: List[BinaryHDV], randomize_ties: bool = False ) -> BinaryHDV: """ Bundle multiple vectors via element-wise majority vote. For each bit position, the result bit is 1 if more than half of the input vectors have a 1 at that position. Args: vectors: List of BinaryHDV vectors to bundle. randomize_ties: If True, break ties randomly. If False (default), ties default to 0 for deterministic results. This is the standard VSA bundling operation (superposition). """ assert len(vectors) > 0, "Cannot bundle empty list" dimension = vectors[0].dimension # Unpack all vectors to bits # Optimization: Stack packed data first, then unpack all at once # This avoids K calls to unpackbits and list comprehension overhead packed_data = np.stack([v.data for v in vectors], axis=0) # (K, D//8) all_bits = np.unpackbits(packed_data, axis=1) # (K, D) # Sum along vectors axis: count of 1-bits per position sums = all_bits.sum(axis=0) # (D,) # Majority vote: > half means 1 threshold = len(vectors) / 2.0 result_bits = np.zeros(dimension, dtype=np.uint8) result_bits[sums > threshold] = 1 # Handle ties if randomize_ties: ties = sums == threshold if ties.any(): result_bits[ties] = np.random.randint( 0, 2, size=ties.sum(), dtype=np.uint8 ) return BinaryHDV(data=np.packbits(result_bits), dimension=dimension) def top_k_nearest( query: BinaryHDV, database: np.ndarray, k: int = 10 ) -> List[Tuple[int, int]]: """ Find k nearest neighbors by Hamming distance. Args: query: Query vector. database: 2D array of shape (N, D//8) packed binary vectors. k: Number of nearest neighbors. Returns: List of (index, distance) tuples, sorted by distance ascending. """ distances = batch_hamming_distance(query, database) k = min(k, len(distances)) # argpartition is O(N) vs O(N log N) for full sort — much faster for large N indices = np.argpartition(distances, k)[:k] selected_distances = distances[indices] # Sort the k results by distance sort_order = np.argsort(selected_distances) sorted_indices = indices[sort_order] sorted_distances = selected_distances[sort_order] return [(int(idx), int(dist)) for idx, dist in zip(sorted_indices, sorted_distances)] # ====================================================================== # Text encoding pipeline # ====================================================================== class TextEncoder: """ Encode text to binary HDV using token-level random vectors with position-permutation binding. Method: For text "hello world", we compute: HDV = bundle(token("hello") ⊕ permute(pos, 0), token("world") ⊕ permute(pos, 1)) Token vectors are deterministic (seeded from the token string), ensuring the same word always maps to the same base vector. """ def __init__(self, dimension: int = 16384): self.dimension = dimension self._token_cache: dict[str, BinaryHDV] = {} def get_token_vector(self, token: str) -> BinaryHDV: """Get or create a deterministic vector for a token.""" if token not in self._token_cache: self._token_cache[token] = BinaryHDV.from_seed(token, self.dimension) return self._token_cache[token] def encode(self, text: str) -> BinaryHDV: """ Encode a text string to a binary HDV. Tokenization: simple whitespace split after normalization. Each token is bound with its position via XOR(token, permute(position_marker, i)). All position-bound tokens are bundled via majority vote. """ # Improved Tokenization: consistent alphanumeric extraction tokens = re.findall(r'\b\w+\b', text.lower()) if not tokens: return BinaryHDV.random(self.dimension) if len(tokens) == 1: return self.get_token_vector(tokens[0]) # Build position-bound token vectors (#27) # Optimized: Batch process data instead of multiple object instantiations token_hdvs = [self.get_token_vector(t) for t in tokens] packed_data = np.stack([v.data for v in token_hdvs], axis=0) all_bits = np.unpackbits(packed_data, axis=1) # Apply position-based permutations (roll) for i in range(len(tokens)): if i > 0: all_bits[i] = np.roll(all_bits[i], i) # Vectorized majority vote (equivalent to majority_bundle) sums = all_bits.sum(axis=0) threshold = len(tokens) / 2.0 result_bits = np.zeros(self.dimension, dtype=np.uint8) result_bits[sums > threshold] = 1 return BinaryHDV(data=np.packbits(result_bits), dimension=self.dimension) def encode_with_context( self, text: str, context_hdv: BinaryHDV ) -> BinaryHDV: """ Encode text and bind it with a context vector. Result = encode(text) ⊕ context This creates an association between the content and its context. """ content_hdv = self.encode(text) return content_hdv.xor_bind(context_hdv)