LSH: Locality Sensitive Hashing
Locality Sensitive Hashing (LSH) is a probabilistic technique that uses special hash functions to map similar vectors to the same hash buckets with high probability, enabling sub-linear time similarity search.
Interactive LSH Visualization
Explore how LSH uses random projections and hash functions to partition space for efficient similarity search:
The Core Idea
Unlike traditional hash functions that avoid collisions, LSH deliberately causes collisions for similar items:
The probability of hashing to the same bucket is proportional to the similarity between items.
LSH Families
1. Random Projection (Cosine Similarity)
For cosine similarity, use random hyperplanes:
def random_projection_hash(vector, hyperplanes): """Hash using random hyperplanes for cosine similarity""" hash_code = 0 for i, hyperplane in enumerate(hyperplanes): # Check which side of hyperplane if np.dot(vector, hyperplane) >= 0: hash_code |= (1 << i) return hash_code
Collision probability:
Where θ(x,y) is the angle between vectors.
2. MinHash (Jaccard Similarity)
For set similarity, use min-wise independent permutations:
def minhash(set_items, num_hashes): """MinHash for Jaccard similarity""" signature = [] for i in range(num_hashes): # Use different hash functions min_hash = float('inf') for item in set_items: hash_val = hash_function(item, seed=i) min_hash = min(min_hash, hash_val) signature.append(min_hash) return signature
Collision probability equals Jaccard similarity:
3. p-Stable Distributions (Euclidean Distance)
For L2 distance, use Gaussian random projections:
def p_stable_hash(vector, a, b, w): """Hash using p-stable distributions for Lp distance""" # a: random vector from Gaussian distribution # b: random offset [0, w] # w: bucket width projection = np.dot(vector, a) + b return int(projection / w)
Amplification Techniques
AND Amplification (Increase Precision)
Combine r hash functions - match only if ALL agree:
- Reduces false positives
- Increases false negatives
- Makes matching stricter
OR Amplification (Increase Recall)
Use b independent hash tables - match if ANY agrees:
- Increases true positives
- Increases false positives
- Makes matching more lenient
AND-OR Amplification
Combine both: b tables of r functions each:
This creates an S-curve that sharpens the similarity threshold.
Implementation
Basic LSH Index
class LSHIndex: def __init__(self, d, num_tables=5, hash_size=10): self.num_tables = num_tables self.hash_size = hash_size self.tables = [{} for _ in range(num_tables)] # Generate random hyperplanes for each table self.hyperplanes = [] for _ in range(num_tables): planes = np.random.randn(hash_size, d) planes /= np.linalg.norm(planes, axis=1, keepdims=True) self.hyperplanes.append(planes) def _hash(self, vector, table_id): """Generate hash for vector in specific table""" planes = self.hyperplanes[table_id] projections = np.dot(planes, vector) return tuple(projections >= 0) def insert(self, vector, item_id): """Insert vector into all hash tables""" for table_id in range(self.num_tables): hash_key = self._hash(vector, table_id) if hash_key not in self.tables[table_id]: self.tables[table_id][hash_key] = [] self.tables[table_id][hash_key].append(item_id) def query(self, vector, max_results=10): """Find similar vectors""" candidates = set() for table_id in range(self.num_tables): hash_key = self._hash(vector, table_id) if hash_key in self.tables[table_id]: candidates.update(self.tables[table_id][hash_key]) return list(candidates)[:max_results]
Multi-Probe LSH
Improve recall by checking nearby buckets:
def multi_probe_query(self, vector, num_probes=3): """Query with multi-probe to increase recall""" candidates = set() for table_id in range(self.num_tables): base_hash = self._hash(vector, table_id) # Generate nearby hashes by flipping bits for probe in generate_probes(base_hash, num_probes): if probe in self.tables[table_id]: candidates.update(self.tables[table_id][probe]) return candidates def generate_probes(hash_code, num_probes): """Generate nearby hash codes by flipping bits""" probes = [hash_code] hash_list = list(hash_code) # Single bit flips for i in range(min(num_probes, len(hash_list))): probe = hash_list.copy() probe[i] = not probe[i] probes.append(tuple(probe)) return probes
Performance Analysis
Time Complexity
| Operation | Exact Search | LSH |
|---|---|---|
| Build | O(N) | O(N · L · k) |
| Query | O(N · d) | O(L · k + \text{candidates}) |
| Space | O(N · d) | O(N · L) |
Where:
- L = number of hash tables
- k = hash signature length
- d = vector dimension
Probability Guarantees
For (r, c)-sensitive family:
With p1 > p2 and c > 1.
Parameter Selection
Optimal parameters for threshold similarity s:
Where \rho = log p1log p2.
Advanced Techniques
1. Cross-Polytope LSH
Better theoretical guarantees than random projection:
class CrossPolytopeLSH: def __init__(self, d): # Create orthogonal basis self.basis = self._create_orthogonal_basis(d) def hash(self, vector): # Rotate vector rotated = self.rotate(vector) # Find closest basis vector max_coord = np.argmax(np.abs(rotated)) sign = np.sign(rotated[max_coord]) return (max_coord, sign)
2. Data-Dependent LSH
Learn hash functions from data:
class LearnedLSH: def __init__(self, training_data): # Learn optimal projections using PCA self.projections = PCA(n_components=32).fit(training_data) def hash(self, vector): # Project onto learned subspace projected = self.projections.transform(vector) # Threshold to get binary code return projected > np.median(projected)
3. Distributed LSH
Scale to billions of vectors:
class DistributedLSH: def __init__(self, num_nodes): self.nodes = num_nodes self.tables = [RemoteHashTable(i) for i in range(num_nodes)] async def insert(self, vector, item_id): # Hash to determine node node_id = hash(item_id) % self.nodes # Insert into remote table await self.tables[node_id].insert(vector, item_id) async def query(self, vector): # Query all nodes in parallel tasks = [table.query(vector) for table in self.tables] results = await asyncio.gather(*tasks) # Merge results return merge_results(results)
Comparison with Other Methods
LSH vs HNSW vs IVF-PQ
| Feature | LSH | HNSW | IVF-PQ |
|---|---|---|---|
| Build Speed | Very Fast | Slow | Fast |
| Query Speed | Fast | Very Fast | Fast |
| Memory | Low | High | Very Low |
| Recall | Moderate | Excellent | Good |
| Updates | Instant | Incremental | Batch |
| Theory | Strong | Limited | Limited |
When to Use LSH
✅ Use LSH when:
- Streaming data (instant updates)
- Need theoretical guarantees
- Very high dimensions (>1000)
- Simple implementation required
- Memory is limited
❌ Avoid LSH when:
- Need very high recall (>95%)
- Small dataset (<10K vectors)
- Can afford complex preprocessing
- Latency is absolutely critical
Real-World Applications
1. Near-Duplicate Detection
Finding similar documents at web scale:
def detect_near_duplicates(documents, threshold=0.8): lsh = MinHashLSH(threshold=threshold, num_perm=128) duplicates = [] for doc_id, content in enumerate(documents): # Create shingles shingles = create_shingles(content, k=3) # Compute MinHash signature signature = MinHash(num_perm=128) for shingle in shingles: signature.update(shingle.encode()) # Check for duplicates similar = lsh.query(signature) if similar: duplicates.append((doc_id, similar)) else: lsh.insert(doc_id, signature) return duplicates
2. Recommendation Systems
Real-time similar item recommendations:
class LSHRecommender: def __init__(self, num_recommendations=10): self.lsh = LSHIndex(d=100, num_tables=10) self.num_recs = num_recommendations def fit(self, item_embeddings): for item_id, embedding in enumerate(item_embeddings): self.lsh.insert(embedding, item_id) def recommend(self, user_embedding): # Find similar items candidates = self.lsh.query(user_embedding, max_results=100) # Rank by exact similarity scores = [] for item_id in candidates: similarity = cosine_similarity( user_embedding, self.item_embeddings[item_id] ) scores.append((item_id, similarity)) # Return top recommendations scores.sort(key=lambda x: x[1], reverse=True) return scores[:self.num_recs]
3. Genomic Sequence Matching
Finding similar DNA sequences:
def sequence_lsh(sequence, k=10): """LSH for genomic sequences""" # Extract k-mers kmers = [sequence[i:i+k] for i in range(len(sequence)-k+1)] # Hash each k-mer signature = [] for seed in range(20): # 20 hash functions min_hash = min(hash((kmer, seed)) for kmer in kmers) signature.append(min_hash) return signature
Implementation Tips
1. Hash Function Design
def create_hash_family(similarity_type): if similarity_type == 'cosine': # Random hyperplanes return RandomProjectionHash(num_projections=10) elif similarity_type == 'jaccard': # MinHash return MinHashFamily(num_permutations=128) elif similarity_type == 'euclidean': # p-stable distributions return PStableHash(bucket_width=2.0)
2. Optimal Table Configuration
def estimate_parameters(n, target_recall=0.9): """Estimate LSH parameters for target recall""" # Approximate using empirical formula L = int(np.log(1 - target_recall) / np.log(0.5)) k = int(np.log(n) / np.log(2)) return { 'num_tables': L, 'hash_size': k, 'expected_recall': 1 - (0.5 ** L) }
3. Dynamic Rehashing
class AdaptiveLSH: def __init__(self): self.tables = [] self.threshold = 100 # Items per bucket def insert(self, vector, item_id): # Insert into existing tables for table in self.tables: table.insert(vector, item_id) # Check if rehashing needed if self.max_bucket_size() > self.threshold: self.rehash() def rehash(self): """Add new table with different hash functions""" new_table = create_hash_table() # Redistribute heavy buckets for table in self.tables: for bucket in table.heavy_buckets(): for item in bucket: new_table.insert(item) self.tables.append(new_table)
Related Concepts
- HNSW Search - Graph-based approach
- IVF-PQ - Clustering with compression
- Dense Embeddings - Vector representations
- ANN Comparison - Compare all methods
References
- Indyk & Motwani. "Approximate Nearest Neighbors: Towards Removing the Curse of Dimensionality"
- Andoni & Indyk. "Near-Optimal Hashing Algorithms for Approximate Nearest Neighbor in High Dimensions"
- LSH Forest Paper
- Datasketch: MinHash LSH Library
