from annoy import AnnoyIndex
class ANNOYWrapper:
def __init__(self, dimension: int, n_trees: int = 10, metric: str = 'angular'):
self.dimension = dimension
self.n_trees = n_trees
self.metric = metric
self.index = AnnoyIndex(dimension, metric)
self.built = False
def add_vectors(self, vectors: np.ndarray):
"""Add vectors and build index"""
for i, vector in enumerate(vectors):
self.index.add_item(i, vector)
self.index.build(self.n_trees)
self.built = True
def search(self, query_vector: np.ndarray, k: int = 10) -> Tuple[List[int], List[float]]:
"""Search for nearest neighbors"""
if not self.built:
raise RuntimeError("Index must be built before searching")
indices, distances = self.index.get_nns_by_vector(
query_vector, k, include_distances=True
)
return indices, distances
def memory_usage(self) -> dict:
"""Estimate memory usage"""
# ANNOY uses approximately 4 * dimension * n_items * n_trees bytes
estimated_size = 4 * self.dimension * self.index.get_n_items() * self.n_trees
return {
'estimated_mb': estimated_size / (1024 * 1024),
'n_trees': self.n_trees,
'n_items': self.index.get_n_items()
}
SCANN (Scalable Nearest Neighbors)
Google’s SCANN algorithm combines multiple optimization techniques:
import scann
class SCANNIndex:
def __init__(self, vectors: np.ndarray, num_leaves: int = 1000,
num_leaves_to_search: int = 100, training_sample_size: int = 100000):
self.vectors = vectors.astype(np.float32)
self.num_leaves = num_leaves
self.num_leaves_to_search = num_leaves_to_search
# Build SCANN index with optimized parameters
builder = scann.scann_ops_pybind.builder(self.vectors, 10, "dot_product")
self.searcher = (builder
.tree(num_leaves=num_leaves,
num_leaves_to_search=num_leaves_to_search,
training_sample_size=min(training_sample_size, len(vectors)))
.score_ah(2, anisotropic_quantization_threshold=0.2)
.reorder(100)
.build())
def search(self, query: np.ndarray, k: int = 10) -> Tuple[np.ndarray, np.ndarray]:
"""Search with SCANN"""
neighbors, distances = self.searcher.search(query.astype(np.float32), final_num_neighbors=k)
return distances, neighbors
def batch_search(self, queries: np.ndarray, k: int = 10) -> Tuple[np.ndarray, np.ndarray]:
"""Batch search for better throughput"""
neighbors, distances = self.searcher.search_batched(queries.astype(np.float32), final_num_neighbors=k)
return distances, neighbors
Index Selection Strategy Framework
Choosing the optimal index requires analyzing multiple factors:
class IndexSelector:
def __init__(self):
self.index_profiles = {
'IVF_FLAT': {
'memory_efficiency': 1.0,
'search_speed': 0.7,
'accuracy': 0.9,
'build_time': 0.8,
'update_friendly': 0.9
},
'IVF_PQ': {
'memory_efficiency': 0.3,
'search_speed': 0.6,
'accuracy': 0.7,
'build_time': 0.7,
'update_friendly': 0.8
},
'HNSW': {
'memory_efficiency': 0.8,
'search_speed': 0.9,
'accuracy': 0.95,
'build_time': 0.6,
'update_friendly': 0.4
},
'ANNOY': {
'memory_efficiency': 0.9,
'search_speed': 0.8,
'accuracy': 0.8,
'build_time': 0.9,
'update_friendly': 0.2
}
}
def recommend_index(self, requirements: dict) -> str:
"""Recommend index based on requirements"""
weights = {
'memory_efficiency': requirements.get('memory_weight', 0.2),
'search_speed': requirements.get('speed_weight', 0.3),
'accuracy': requirements.get('accuracy_weight', 0.3),
'build_time': requirements.get('build_weight', 0.1),
'update_friendly': requirements.get('update_weight', 0.1)
}
scores = {}
for index_type, profile in self.index_profiles.items():
score = sum(profile[metric] * weight for metric, weight in weights.items())
scores[index_type] = score
return max(scores, key=scores.get)
def get_recommendations(self, dataset_size: int, dimension: int,
memory_limit_gb: float, qps_target: int) -> dict:
"""Get specific recommendations based on constraints"""
recommendations = {}
# Memory-constrained scenarios
if dataset_size * dimension * 4 / (1024**3) > memory_limit_gb:
recommendations['memory_optimized'] = 'IVF_PQ'
# High QPS requirements
if qps_target > 1000:
recommendations['high_throughput'] = 'HNSW'
# High accuracy requirements
recommendations['high_accuracy'] = 'IVF_FLAT'
# Balanced approach
requirements = {
'memory_weight': 0.25,
'speed_weight': 0.25,
'accuracy_weight': 0.25,
'build_weight': 0.125,
'update_weight': 0.125
}
recommendations['balanced'] = self.recommend_index(requirements)
return recommendations
# Usage example
selector = IndexSelector()
recommendations = selector.get_recommendations(
dataset_size=1000000,
dimension=768,
memory_limit_gb=16,
qps_target=500
)
print(recommendations)
Memory vs Accuracy Trade-offs Analysis
Understanding the trade-offs between memory usage and search accuracy is crucial for production deployments:
class TradeoffAnalyzer:
def __init__(self):
self.results = {}
def analyze_ivf_variants(self, vectors: np.ndarray, test_queries: np.ndarray,
ground_truth: np.ndarray) -> dict:
"""Compare IVF variants across different parameters"""
configurations = [
{'type': 'IVF_FLAT', 'nlist': 1024},
{'type': 'IVF_PQ', 'nlist': 1024, 'm': 8, 'nbits': 8},
{'type': 'IVF_PQ', 'nlist': 1024, 'm': 16, 'nbits': 8},
{'type': 'IVF_PQ', 'nlist': 2048, 'm': 8, 'nbits': 8},
]
results = {}
for config in configurations:
config_name = f"{config['type']}_nlist{config['nlist']}"
if 'm' in config:
config_name += f"_m{config['m']}_nbits{config['nbits']}"
# Build index
if config['type'] == 'IVF_FLAT':
index = self._build_ivf_flat(vectors, config['nlist'])
else:
index = self._build_ivf_pq(vectors, config['nlist'],
config['m'], config['nbits'])
# Measure performance
memory_usage = self._measure_memory(index)
recall_at_k = self._measure_recall(index, test_queries, ground_truth)
search_time = self._measure_search_time(index, test_queries)
results[config_name] = {
'memory_mb': memory_usage,
'recall@10': recall_at_k,
'avg_search_time_ms': search_time,
'memory_per_vector_bytes': memory_usage * 1024 * 1024 / len(vectors)
}
return results
def plot_pareto_frontier(self, results: dict):
"""Plot memory vs accuracy Pareto frontier"""
import matplotlib.pyplot as plt
memory_values = [r['memory_mb'] for r in results.values()]
recall_values = [r['recall@10'] for r in results.values()]
labels = list(results.keys())
plt.figure(figsize=(10, 6))
plt.scatter(memory_values, recall_values, s=100)
for i, label in enumerate(labels):
plt.annotate(label, (memory_values[i], recall_values[i]),
xytext=(5, 5), textcoords='offset points')
plt.xlabel('Memory Usage (MB)')
plt.ylabel('Recall@10')
plt.title('Memory vs Accuracy Trade-off')
plt.grid(True, alpha=0.3)
plt.show()
def _build_ivf_flat(self, vectors: np.ndarray, nlist: int):
"""Build IVF_FLAT index"""
dimension = vectors.shape[1]
quantizer = faiss.IndexFlatL2(dimension)
index = faiss.IndexIVFFlat(quantizer, dimension, nlist)
index.train(vectors[:min(len(vectors), nlist * 39)])
index.add(vectors)
return index
def _build_ivf_pq(self, vectors: np.ndarray, nlist: int, m: int, nbits: int):
"""Build IVF_PQ index"""
dimension = vectors.shape[1]
quantizer = faiss.IndexFlatL2(dimension)
index = faiss.IndexIVFPQ(quantizer, dimension, nlist, m, nbits)
index.train(vectors[:min(len(vectors), nlist * 39)])
index.add(vectors)
return index
Custom Index Implementation Guide
Building custom indexes for specialized use cases:
class CustomLSHIndex:
"""Locality Sensitive Hashing implementation for binary vectors"""
def __init__(self, dimension: int, num_tables: int = 10, hash_size: int = 10):
self.dimension = dimension
self.num_tables = num_tables
self.hash_size = hash_size
self.hash_tables = []
self.vectors = {}
# Initialize random projection matrices
for _ in range(num_tables):
random_matrix = np.random.randn(hash_size, dimension)
self.hash_tables.append({
'matrix': random_matrix,
'buckets': {}
})
def _hash_vector(self, vector: np.ndarray, table_idx: int) -> str:
"""Hash vector using random projection"""
projection = np.dot(self.hash_tables[table_idx]['matrix'], vector)
binary_hash = (projection > 0).astype(int)
return ''.join(map(str, binary_hash))
def add_vector(self, vector_id: int, vector: np.ndarray):
"""Add vector to all hash tables"""
self.vectors[vector_id] = vector
for table_idx in range(self.num_tables):
hash_key = self._hash_vector(vector, table_idx)
if hash_key not in self.hash_tables[table_idx]['buckets']:
self.hash_tables[table_idx]['buckets'][hash_key] = []
self.hash_tables[table_idx]['buckets'][hash_key].append(vector_id)
def search(self, query_vector: np.ndarray, k: int = 10) -> List[Tuple[int, float]]:
"""Search for similar vectors"""
candidates = set()
# Collect candidates from all hash tables
for table_idx in range(self.num_tables):
hash_key = self._hash_vector(query_vector, table_idx)
bucket = self.hash_tables[table_idx]['buckets'].get(hash_key, [])
candidates.update(bucket)
# Calculate exact distances for candidates
distances = []
for candidate_id in candidates:
candidate_vector = self.vectors[candidate_id]
distance = np.linalg.norm(query_vector - candidate_vector)
distances.append((candidate_id, distance))
# Return top-k results
distances.sort(key=lambda x: x[1])
return distances[:k]
class HierarchicalIndex:
"""Multi-level hierarchical index for very large datasets"""
def __init__(self, dimension: int, levels: int = 3):
self.dimension = dimension
self.levels = levels
self.level_indices = []
self.level_centroids = []
# Initialize indices for each level
for level in range(levels):
nlist = 100 * (2 ** level) # Exponentially increasing clusters
quantizer = faiss.IndexFlatL2(dimension)
index = faiss.IndexIVFFlat(quantizer, dimension, nlist)
self.level_indices.append(index)
def train_and_add(self, vectors: np.ndarray):
"""Train hierarchical structure"""
current_vectors = vectors
for level in range(self.levels):
# Train current level
self.level_indices[level].train(current_vectors)
self.level_indices[level].add(current_vectors)
# Extract centroids for next level
if level < self.levels - 1:
centroids = self.level_indices[level].quantizer.reconstruct_n(
0, self.level_indices[level].nlist
)
self.level_centroids.append(centroids)
current_vectors = centroids
def search(self, query_vector: np.ndarray, k: int = 10) -> Tuple[np.ndarray, np.ndarray]:
"""Hierarchical search from coarse to fine"""
current_query = query_vector.reshape(1, -1)
# Start from the coarsest level
for level in range(self.levels - 1, -1, -1):
# Search at current level
nprobe = min(10, self.level_indices[level].nlist)
self.level_indices[level].nprobe = nprobe
if level == 0:
# Final level - return actual results
distances, indices = self.level_indices[level].search(current_query, k)
return distances, indices
else:
# Intermediate level - refine search space
_, cluster_indices = self.level_indices[level].search(current_query, 1)
# Use cluster centroid as refined query for next level
current_query = self.level_centroids[level-1][cluster_indices[0]].reshape(1, -1)
Comprehensive Benchmark Framework
A robust benchmarking system for comparing index performance:
class IndexBenchmark:
def __init__(self):
self.results = {}
self.datasets = {}
def load_dataset(self, name: str, vectors: np.ndarray, queries: np.ndarray,
ground_truth: np.ndarray):
"""Load benchmark dataset"""
self.datasets[name] = {
'vectors': vectors,
'queries': queries,
'ground_truth': ground_truth
}
def benchmark_index(self, index_name: str, index_config: dict,
dataset_name: str, search_params: dict = None) -> dict:
"""Comprehensive index benchmarking"""
if dataset_name not in self.datasets:
raise ValueError(f"Dataset {dataset_name} not loaded")
dataset = self.datasets[dataset_name]
vectors = dataset['vectors']
queries = dataset['queries']
ground_truth = dataset['ground_truth']
# Build index
build_start = time.time()
index = self._build_index(index_config, vectors)
build_time = time.time() - build_start
# Memory usage
memory_usage = self._measure_memory_usage(index)
# Search performance
search_results = self._benchmark_search(index, queries, ground_truth, search_params)
results = {
'index_name': index_name,
'dataset': dataset_name,
'build_time_seconds': build_time,
'memory_usage_mb': memory_usage,
'index_size_mb': self._get_index_size(index),
**search_results
}
self.results[f"{index_name}_{dataset_name}"] = results
return results
def _benchmark_search(self, index, queries: np.ndarray, ground_truth: np.ndarray,
search_params: dict) -> dict:
"""Benchmark search performance with different parameters"""
results = {}
param_sets = self._generate_param_sets(search_params)
for param_name, params in param_sets.items():
# Configure search parameters
self._configure_search_params(index, params)
# Measure search time
start_time = time.time()
distances, indices = self._search_index(index, queries, k=10)
search_time = time.time() - start_time
# Calculate metrics
recall_at_1 = self._calculate_recall(indices[:, :1], ground_truth[:, :1])
recall_at_10 = self._calculate_recall(indices, ground_truth)
qps = len(queries) / search_time
avg_latency = (search_time / len(queries)) * 1000 # ms
results[param_name] = {
'recall@1': recall_at_1,
'recall@10': recall_at_10,
'qps': qps,
'avg_latency_ms': avg_latency,
'params': params
}
return results
def generate_report(self, output_file: str = None) -> str:
"""Generate comprehensive benchmark report"""
report = "# Vector Index Benchmark Report\n\n"
# Summary table
report += "## Performance Summary\n\n"
report += "| Index | Dataset | Build Time (s) | Memory (MB) | Recall@10 | QPS | Latency (ms) |\n"
report += "|-------|---------|----------------|-------------|-----------|-----|-------------|\n"
for key, result in self.results.items():
best_config = max(result.items(), key=lambda x: x[1].get('recall@10', 0) if isinstance(x[1], dict) else 0)
if isinstance(best_config[1], dict):
report += f"| {result['index_name']} | {result['dataset']} | "
report += f"{result['build_time_seconds']:.2f} | {result['memory_usage_mb']:.1f} | "
report += f"{best_config[1]['recall@10']:.3f} | {best_config[1]['qps']:.0f} | "
report += f"{best_config[1]['avg_latency_ms']:.2f} |\n"
# Detailed analysis
report += "\n## Detailed Analysis\n\n"
for key, result in self.results.items():
report += f"### {result['index_name']} on {result['dataset']}\n\n"
report += f"- **Build Time**: {result['build_time_seconds']:.2f} seconds\n"
report += f"- **Memory Usage**: {result['memory_usage_mb']:.1f} MB\n"
report += f"- **Index Size**: {result['index_size_mb']:.1f} MB\n\n"
# Parameter sweep results
for param_name, metrics in result.items():
if isinstance(metrics, dict) and 'recall@10' in metrics:
report += f"**{param_name}**: Recall@10={metrics['recall@10']:.3f}, "
report += f"QPS={metrics['qps']:.0f}, Latency={metrics['avg_latency_ms']:.2f}ms\n"
if output_file:
with open(output_file, 'w') as f:
f.write(report)
return report
def plot_performance_comparison(self):
"""Generate performance comparison plots"""
import matplotlib.pyplot as plt
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 10))
# Extract data for plotting
index_names = []
recall_values = []
qps_values = []
memory_values = []
latency_values = []
for key, result in self.results.items():
index_names.append(result['index_name'])
memory_values.append(result['memory_usage_mb'])
# Get best performing configuration
best_config = max(result.items(),
key=lambda x: x[1].get('recall@10', 0) if isinstance(x[1], dict) else 0)
if isinstance(best_config[1], dict):
recall_values.append(best_config[1]['recall@10'])
qps_values.append(best_config[1]['qps'])
latency_values.append(best_config[1]['avg_latency_ms'])
# Recall comparison
ax1.bar(index_names, recall_values)
ax1.set_title('Recall@10 Comparison')
ax1.set_ylabel('Recall@10')
ax1.tick_params(axis='x', rotation=45)
# QPS comparison
ax2.bar(index_names, qps_values)
ax2.set_title('Queries Per Second')
ax2.set_ylabel('QPS')
ax2.tick_params(axis='x', rotation=45)
# Memory usage
ax3.bar(index_names, memory_values)
ax3.set_title('Memory Usage')
ax3.set_ylabel('Memory (MB)')
ax3.tick_params(axis='x', rotation=45)
# Latency comparison
ax4.bar(index_names, latency_values)
ax4.set_title('Average Latency')
ax4.set_ylabel('Latency (ms)')
ax4.tick_params(axis='x', rotation=45)
plt.tight_layout()
plt.show()
# Usage example
benchmark = IndexBenchmark()
# Load test dataset
vectors = np.random.random((100000, 128)).astype(np.float32)
queries = np.random.random((1000, 128)).astype(np.float32)
ground_truth = np.random.randint(0, 100000, (1000, 10))
benchmark.load_dataset('synthetic_128d', vectors, queries, ground_truth)
# Benchmark different indices
ivf_config = {"index_type": "IVF_FLAT", "metric_type": "L2", "params": {"nlist": 1024}}
hnsw_config = {"index_type": "HNSW", "metric_type": "L2", "params": {"M": 16, "efConstruction": 200}}
benchmark.benchmark_index('IVF_FLAT', ivf_config, 'synthetic_128d')
benchmark.benchmark_index('HNSW', hnsw_config, 'synthetic_128d')
# Generate report
report = benchmark.generate_report('benchmark_report.md')
benchmark.plot_performance_comparison()
GPU Acceleration for Index Building
Modern GPUs dramatically accelerate index construction and search operations:
import cupy as cp
import faiss
class GPUAcceleratedIndexing:
def __init__(self, gpu_id: int = 0):
self.gpu_id = gpu_id
self.gpu_resource = faiss.StandardGpuResources()
# Configure GPU memory
self.gpu_resource.setTempMemory(1024 * 1024 * 1024) # 1GB temp memory
def build_gpu_ivf_flat(self, vectors: np.ndarray, nlist: int = 1024) -> faiss.Index:
"""Build IVF_FLAT index on GPU"""
dimension = vectors.shape[1]
# Create CPU index first
cpu_quantizer = faiss.IndexFlatL2(dimension)
cpu_index = faiss.IndexIVFFlat(cpu_quantizer, dimension, nlist)
# Move to GPU
gpu_index = faiss.index_cpu_to_gpu(self.gpu_resource, self.gpu_id, cpu_index)
# Train and add vectors
gpu_index.train(vectors)
gpu_index.add(vectors)
return gpu_index
def build_gpu_ivf_pq(self, vectors: np.ndarray, nlist: int = 1024,
m: int = 8, nbits: int = 8) -> faiss.Index:
"""Build IVF_PQ index on GPU"""
dimension = vectors.shape[1]
cpu_quantizer = faiss.IndexFlatL2(dimension)
cpu_index = faiss.IndexIVFPQ(cpu_quantizer, dimension, nlist, m, nbits)
# GPU configuration for PQ
config = faiss.GpuIndexIVFPQConfig()
config.useFloat16 = True # Use half precision for memory efficiency
config.usePrecomputed = True # Precompute codes for faster search
gpu_index = faiss.GpuIndexIVFPQ(self.gpu_resource, self.gpu_id, cpu_index, config)
gpu_index.train(vectors)
gpu_index.add(vectors)
return gpu_index
def benchmark_gpu_vs_cpu(self, vectors: np.ndarray, queries: np.ndarray) -> dict:
"""Compare GPU vs CPU performance"""
results = {}
# CPU benchmark
cpu_start = time.time()
cpu_index = self._build_cpu_index(vectors)
cpu_build_time = time.time() - cpu_start
cpu_search_start = time.time()
cpu_distances, cpu_indices = cpu_index.search(queries, 10)
cpu_search_time = time.time() - cpu_search_start
# GPU benchmark
gpu_start = time.time()
gpu_index = self.build_gpu_ivf_flat(vectors)
gpu_build_time = time.time() - gpu_start
gpu_search_start = time.time()
gpu_distances, gpu_indices = gpu_index.search(queries, 10)
gpu_search_time = time.time() - gpu_search_start
results = {
'cpu': {
'build_time': cpu_build_time,
'search_time': cpu_search_time,
'qps': len(queries) / cpu_search_time
},
'gpu': {
'build_time': gpu_build_time,
'search_time': gpu_search_time,
'qps': len(queries) / gpu_search_time
},
'speedup': {
'build': cpu_build_time / gpu_build_time,
'search': cpu_search_time / gpu_search_time
}
}
return results
def optimize_gpu_memory(self, vectors: np.ndarray, target_memory_gb: float) -> dict:
"""Optimize index parameters for GPU memory constraints"""
vector_memory_gb = vectors.nbytes / (1024**3)
available_memory_gb = target_memory_gb - vector_memory_gb
recommendations = {}
if available_memory_gb < 1:
# Very limited memory - use aggressive compression
recommendations['index_type'] = 'IVF_PQ'
recommendations['params'] = {
'nlist': min(1024, len(vectors) // 100),
'm': 16, # More subquantizers for better compression
'nbits': 4 # Fewer bits per subquantizer
}
elif available_memory_gb < 4:
# Moderate memory - balanced approach
recommendations['index_type'] = 'IVF_PQ'
recommendations['params'] = {
'nlist': min(2048, len(vectors) // 50),
'm': 8,
'nbits': 8
}
else:
# Ample memory - prioritize accuracy
recommendations['index_type'] = 'IVF_FLAT'
recommendations['params'] = {
'nlist': min(4096, len(vectors) // 39)
}
return recommendations
# Multi-GPU scaling
class MultiGPUIndexing:
def __init__(self, gpu_ids: List[int]):
self.gpu_ids = gpu_ids
self.gpu_resources = []
for gpu_id in gpu_ids:
resource = faiss.StandardGpuResources()
resource.setTempMemory(1024 * 1024 * 1024)
self.gpu_resources.append(resource)
def build_distributed_index(self, vectors: np.ndarray, nlist: int = 1024) -> faiss.Index:
"""Build index distributed across multiple GPUs"""
dimension = vectors.shape[1]
# Create sharded index
cpu_quantizer = faiss.IndexFlatL2(dimension)
cpu_index = faiss.IndexIVFFlat(cpu_quantizer, dimension, nlist)
# Create multi-GPU index
gpu_index = faiss.index_cpu_to_gpu_multiple_py(
self.gpu_resources, self.gpu_ids, cpu_index
)
# Train and add vectors
gpu_index.train(vectors)
gpu_index.add(vectors)
return gpu_index
def parallel_search(self, index: faiss.Index, queries: np.ndarray, k: int = 10) -> Tuple[np.ndarray, np.ndarray]:
"""Perform parallel search across GPUs"""
# Split queries across GPUs
queries_per_gpu = len(queries) // len(self.gpu_ids)
results = []
for i, gpu_id in enumerate(self.gpu_ids):
start_idx = i * queries_per_gpu
end_idx = start_idx + queries_per_gpu if i < len(self.gpu_ids) - 1 else len(queries)
gpu_queries = queries[start_idx:end_idx]
distances, indices = index.search(gpu_queries, k)
results.append((distances, indices))
# Combine results
all_distances = np.vstack([r[0] for r in results])
all_indices = np.vstack([r[1] for r in results])
return all_distances, all_indices
Production Deployment Considerations
Index Configuration for Different Scales
class ProductionIndexConfig:
@staticmethod
def get_config_for_scale(num_vectors: int, dimension: int,
memory_budget_gb: float, qps_target: int) -> dict:
"""Get production-ready index configuration"""
if num_vectors < 100000:
# Small scale - prioritize simplicity
return {
"index_type": "IVF_FLAT",
"metric_type": "L2",
"params": {"nlist": min(1024, num_vectors // 39)}
}
elif num_vectors < 10000000:
# Medium scale - balance performance and memory
estimated_memory = num_vectors * dimension * 4 / (1024**3)
if estimated_memory > memory_budget_gb:
return {
"index_type": "IVF_PQ",
"metric_type": "L2",
"params": {
"nlist": min(4096, num_vectors // 39),
"m": 8,
"nbits": 8
}
}
else:
return {
"index_type": "HNSW",
"metric_type": "L2",
"params": {
"M": 16,
"efConstruction": 200
}
}
else:
# Large scale - prioritize memory efficiency
return {
"index_type": "IVF_PQ",
"metric_type": "L2",
"params": {
"nlist": min(8192, num_vectors // 39),
"m": 16,
"nbits": 4
}
}
@staticmethod
def get_search_params(index_type: str, recall_target: float = 0.9) -> dict:
"""Get search parameters for target recall"""
if index_type == "IVF_FLAT" or index_type == "IVF_PQ":
if recall_target >= 0.95:
return {"nprobe": 128}
elif recall_target >= 0.9:
return {"nprobe": 64}
else:
return {"nprobe": 32}
elif index_type == "HNSW":
if recall_target >= 0.95:
return {"ef": 200}
elif recall_target >= 0.9:
return {"ef": 100}
else:
return {"ef": 50}
return {}
Conclusion
Vector indexing in Milvus offers a rich ecosystem of algorithms, each optimized for specific use cases and constraints. The choice between IVF variants, HNSW, and other algorithms depends on your specific requirements for memory usage, search accuracy, build time, and query performance.
Key takeaways for production deployments:
- IVF_FLAT excels in scenarios requiring high accuracy with moderate memory constraints
- IVF_PQ provides excellent memory efficiency for large-scale deployments
- HNSW delivers superior query performance with good memory characteristics
- GPU acceleration can provide 5-10x speedups for both index building and search operations
- Custom implementations enable specialized optimizations for domain-specific requirements
The benchmarking framework and analysis tools provided in this guide enable data-driven decisions for index selection and parameter tuning. As vector databases continue to evolve, understanding these fundamental algorithms and their trade-offs remains crucial for building high-performance AI applications.
Regular benchmarking with your specific data and query patterns is essential, as theoretical performance characteristics may vary significantly with real-world workloads. The investment in proper index selection and tuning pays substantial dividends in application performance and operational efficiency.
Be the first to comment