2026-04-04 Sharded Vector Search¶
- Status:
implemented - Date:
2026-04-04 - Baseline repo commit before this change series:
c90ba94 - Implementation commits:
9ffdfe9feat: add sharded vector search (write HNSW indices, search across shards)fcd5288fix: add VectorReaderHealth export, lazy adapter imports, remove faiss placeholder3b2efccfeat: add CEL sharding support to vector writer (types, config, assignment)
Summary¶
This change adds a shardyfusion.vector subpackage for sharded approximate nearest neighbor (ANN) vector search. It provides a Python iterator-based writer that builds per-shard HNSW indices and uploads them to S3, and a reader that lazily downloads shard indices and fans out searches with top-k result merging.
The design reuses the existing shardyfusion manifest and S3 infrastructure but introduces a separate sharding model (cluster, LSH, explicit) suited to vector similarity workloads rather than the existing key-hash model.
1. What problem is being solved or functionality being added by the changes?¶
shardyfusion supports sharded KV lookups but has no support for vector similarity search. Users with embedding workloads need to:
- Build per-shard HNSW indices from vector collections
- Upload indices to S3 alongside a manifest
- Search across shards with multi-probe routing and top-k merge
- Support multiple distance metrics (cosine, L2, dot product)
The existing KV sharding model (xxh3_64 hash or CEL expression) is not suitable for vector workloads because it distributes vectors without regard for their geometric proximity. Vector search benefits from locality-aware sharding (cluster or LSH) that places similar vectors on the same shard, reducing the number of shards that need to be probed at query time.
2. What design decisions were considered with their pros and cons and trade offs?¶
Decision 1: How to shard vectors across indices¶
Option A: Reuse the existing key-hash sharding¶
Pros: - no new sharding code - consistent with KV writer behavior - uniform shard sizes
Cons: - vectors with similar embeddings end up on different shards - every search must query every shard (no routing optimization) - num_probes optimization is impossible because assignment has no geometric meaning - defeats the purpose of sharded ANN search
Option B: K-means cluster-based sharding¶
Pros: - similar vectors land on the same shard by construction - at query time, only the nearest centroid shards need to be probed - well-understood IVF-style partitioning from the ANN literature - centroid training can be done from a sample of the data
Cons: - requires a training step or user-provided centroids - uneven shard sizes if cluster sizes vary - centroids must be persisted for the reader
Option C: Locality-sensitive hashing (LSH)¶
Pros: - no training step required - deterministic assignment from random hyperplanes - supports multi-probe search by flipping hash bits - hyperplanes are cheap to generate and persist
Cons: - less precise locality than k-means - hash collisions can group dissimilar vectors - multi-probe quality degrades as num_dbs increases relative to hash bit count
Option D: Explicit user-assigned shards¶
Pros: - maximum flexibility for users with domain-specific partitioning - trivial implementation - no training or random state
Cons: - puts all routing decisions on the user - no built-in multi-probe at query time
Option E: CEL expression-based sharding¶
Pros:
- reuses the existing shardyfusion.cel module — compile, evaluate, route
- enables attribute-based partitioning (e.g., route by region column) without geometric locality
- at query time, routing_context resolves to a single shard — no multi-probe needed
- categorical mode (routing_values) maps discrete tokens to dense shard IDs automatically
- composable with point lookups — same CEL expression routes both KV get() and vector search() to the same shard (unified mode, future work)
Cons:
- no geometric locality — similar vectors may land on different shards if their routing attributes differ
- requires routing_context on every VectorRecord at write time and on every search() call at read time
- requires the cel extra (cel-expr-python)
- num_dbs must be inferred from routing_values length (categorical) or provided explicitly (direct integer)
Chosen approach: All four (B, C, D, E) are implemented as VectorShardingStrategy.CLUSTER, LSH, EXPLICIT, and CEL. Option A was rejected because it provides no geometric locality. The four strategies cover complementary use cases: CLUSTER for best search quality, LSH for training-free deployment, EXPLICIT for user-managed partitioning, and CEL for attribute-based routing (e.g., partition by region, tenant, or any discrete dimension). CEL is particularly useful when vector search will later be unified with KV point lookups on the same shards.
Decision 2: How to store vector metadata in the manifest¶
Option A: Extend RequiredBuildMeta with vector-specific fields¶
Pros: - vector metadata is first-class in the manifest schema - type-checked by Pydantic validation
Cons: - requires modifying the core manifest model, which all readers parse - introduces vector-specific fields (dim, metric, centroids_ref) into a model shared with KV writers - risks manifest format compatibility issues with non-vector readers - increases coupling between vector and KV code paths
Option B: Use the existing custom_manifest_fields dict¶
Pros:
- no changes to core manifest models or manifest format version
- vector metadata is isolated in a "vector" key within the custom dict
- non-vector readers ignore the custom fields entirely
- avoids manifest schema migration for all existing deployments
Cons: - vector metadata is untyped at the manifest level (dict, not Pydantic model) - reader must manually parse and validate the custom dict - no schema enforcement at publish time
Option C: Create a separate vector manifest alongside the standard manifest¶
Pros: - complete isolation from KV manifest - can evolve independently
Cons: - duplicates manifest infrastructure (publish, load, list, set_current) - two manifest stores to configure and manage - complex coordination if both KV and vector share a prefix
Chosen approach: Option B. The custom fields dict already exists for user-defined metadata. Storing vector config there avoids touching the core manifest schema while keeping the publish/load/refresh infrastructure entirely shared. The reader parses the "vector" key from manifest.custom and reconstructs the index config and sharding metadata. Centroids and hyperplanes are stored as separate .npy files in S3 and referenced by URL in the custom dict.
Decision 3: How to structure the per-shard index adapter¶
Option A: Build HNSW graph in Python, serialize custom format¶
Pros: - full control over the graph format - no external library dependency
Cons: - implementing HNSW correctly in pure Python is error-prone and slow - custom format requires custom deserialization - no established tooling for inspection or debugging
Option B: Use lancedb as the index engine with a pluggable adapter protocol¶
Pros: - lancedb is a mature, fast HNSW implementation - supports multiple distance metrics and quantization (fp16, i8) - save/load to file is built in - deferred import means no hard dependency at package level - adapter protocol allows swapping to FAISS or other engines later
Cons: - adds an optional C++ dependency (lancedb) - lancedb API may change between versions - payload storage is not built into lancedb (requires a sidecar)
Option C: Use FAISS as the index engine¶
Pros: - widely used, well-tested - supports IVF, PQ, and other advanced index types
Cons: - heavier dependency (faiss-cpu or faiss-gpu) - API is more complex than lancedb for simple HNSW - no clear advantage for the HNSW-only use case
Chosen approach: Option B with VectorIndexWriter and VectorShardReader protocols. The lancedb adapter is the default implementation, imported lazily. Payloads are stored in a SQLite sidecar database alongside each shard's index file. The protocol design allows adding a FAISS adapter later without changing the writer or reader. The pre-existing HnswGraphBuilder / HnswGraph / HnswNode types in types.py were kept for potential future graph inspection use but are not used by the current adapter — lancedb manages the HNSW graph internally.
Decision 4: How to handle reader shard lifecycle¶
Option A: Preload all shard readers on initialization¶
Pros: - first search has no cold-start latency - simple implementation
Cons: - downloads all N shard indices from S3 at startup, even if only a few are queried - high memory usage for large shard counts - slow startup time proportional to num_dbs
Option B: Lazy loading with LRU eviction¶
Pros: - only downloads shards that are actually queried - bounded memory via max_cached_shards - amortized cold start across queries - double-checked locking prevents duplicate downloads under concurrency
Cons: - first query to a shard has download latency - LRU eviction may cause re-downloads for access patterns that cycle through many shards
Option C: Lazy loading with TTL-based eviction¶
Pros: - time-bounded cache freshness
Cons: - adds timer complexity - TTL does not bound memory usage - less useful than LRU for shard access patterns
Chosen approach: Option B with optional preload. The reader uses an OrderedDict as an LRU cache with per-shard locks for concurrent download safety. preload_shards=True is supported for users who want eager loading. max_cached_shards controls the eviction threshold.
Decision 5: How to merge results across shards¶
Option A: Sort all results, take top-k¶
Pros: - simple implementation
Cons: - O(N log N) where N is total results across all shards - wasteful when N >> top_k
Option B: Heap-based top-k selection¶
Pros: - O(N log k) time complexity - uses Python's built-in heapq.nsmallest / nlargest - naturally handles metric-dependent ordering (lower-is-better for L2/cosine, higher-is-better for dot product)
Cons: - marginal complexity over sort for small result sets
Chosen approach: Option B. heapq.nsmallest for L2 and cosine (lower score = better), heapq.nlargest for dot product (higher score = better). The implementation is in _merge.py.
3. What implementation was chosen and why?¶
The implementation creates a shardyfusion/vector/ subpackage with the following structure:
Module Layout¶
types.py— Enums (DistanceMetric,VectorShardingStrategy), data classes (VectorRecord,SearchResult,VectorSearchResponse,VectorShardDetail,VectorSnapshotInfo), and protocols (VectorIndexWriter,VectorIndexWriterFactory,VectorShardReader,VectorShardReaderFactory).config.py—VectorIndexConfig(dim, metric, HNSW params, quantization),VectorShardingSpec(strategy, num_probes, centroids/hyperplanes),VectorShardedWriteConfig(top-level config mirroringWriteConfigstructure).sharding.py— Pure-numpy sharding implementations: k-means++ training (train_centroids_kmeans), cluster assignment and probing, LSH hyperplane generation, LSH hashing and probing, CEL expression-based routing (delegates toshardyfusion.cel), and a unifiedroute_vector_to_shardsdispatcher._merge.py— Heap-based top-k merge across per-shard result lists, metric-aware.adapters/lancedb_adapter.py—LanceDBWriter(builds lancedb.Index + SQLite payloads locally, uploads to S3 on close),LanceDBShardReader(downloads from S3, loads index and payloads), and corresponding factories. lancedb is imported inside_import_lancedb()to keep it optional.writer.py—write_sharded()function with single-process implementation. Buffers records by shard, flushes in batches to the adapter, handles centroid training and hyperplane generation, uploads sharding metadata to S3, and publishes a manifest with vector config in custom fields.reader.py—ShardedVectorReaderclass with lazy shard loading, LRU eviction, thread-pool fan-out for multi-shard search, manifest refresh, health monitoring, and shard inspection methods.__init__.py— Public API exports (18 names).adapters/__init__.py— Lazy__getattr__exports for LanceDB adapter types.
Write Pipeline¶
- Validate config (dim > 0, s3_prefix set, sharding spec consistent).
- Resolve sharding: train centroids if requested, generate hyperplanes for LSH, validate explicit shard IDs.
- Create S3 client from credential provider.
- Iterate records: assign each to a shard via
_assign_shard(), buffer in_ShardState. - When batch reaches
config.batch_size, flush to adapter viaadd_batch(). - After iteration: flush remaining batches, checkpoint each adapter.
- Upload centroids/hyperplanes as
.npyfiles to{s3_prefix}/vector_meta/. - Build
RequiredBuildMeta(usingKeyEncoding.RAWandShardingStrategy.HASHas placeholders) andRequiredShardMetalist from shard states. - Publish manifest via
ManifestStore.publish()with vector metadata incustom["vector"]. - Return
BuildResult.
Read Pipeline¶
- Load manifest via
ManifestStore.load_current()+load_manifest(). - Parse
manifest.custom["vector"]to reconstructVectorIndexConfig, sharding strategy, and num_probes. - Download centroids/hyperplanes from S3 refs in the custom dict.
- On
search(query, top_k): route query to shard IDs viaroute_vector_to_shards(), fan out to shards (thread pool or sequential), merge results via_merge.merge_results(). - Shard readers are loaded lazily on first access, cached in an
OrderedDictLRU, and evicted whenmax_cached_shardsis exceeded. refresh()reloads the manifest, closes old shard readers, and rebuilds state.
Manifest Integration¶
Vector metadata stored in manifest.custom["vector"]:
{
"dim": 128,
"metric": "cosine",
"index_type": "hnsw",
"quantization": null,
"total_vectors": 100000,
"sharding_strategy": "cluster",
"num_probes": 2,
"centroids_ref": "s3://bucket/prefix/vector_meta/centroids.npy",
"num_hash_bits": 8,
"hyperplanes_ref": "s3://bucket/prefix/vector_meta/hyperplanes.npy"
}
For CEL sharding, the custom dict additionally contains:
{
"sharding_strategy": "cel",
"cel_expr": "region",
"cel_columns": {"region": "string"},
"routing_values": ["us", "eu", "asia"]
}
This avoids any changes to the core manifest schema.
Adapter Protocol¶
The VectorIndexWriter protocol mirrors the existing DbAdapter pattern:
add_batch(ids, vectors, payloads)— add vectors to the indexflush()— persist current state to local filescheckpoint() -> str | None— return a content hash for the manifestclose()— upload local files to S3- Context manager support via
__enter__/__exit__
The VectorShardReader protocol:
search(query, top_k) -> list[SearchResult]— search the shardclose()— release resources
Payload Storage¶
The LanceDB adapter stores payloads in a SQLite sidecar (payloads.db) alongside the index file (index.lancedb). Each shard has two files uploaded to S3:
{db_url}/index.lancedb— the lancedb HNSW index{db_url}/payloads.db— SQLite with(id TEXT PRIMARY KEY, payload TEXT)table
The reader downloads both files, loads the index, and joins search results with payloads via SQL lookups.
Known Limitations¶
Not Yet Implemented¶
- Async reader: No
AsyncShardedVectorReaderyet. The sync reader with thread-pool fan-out covers the initial use case. - FAISS adapter: The adapter protocol supports it, but no implementation exists.
- Dask/Ray/Spark writers: Only the Python iterator-based writer is implemented. Framework writers would follow the same pattern as existing KV framework writers.
Design Trade-offs Accepted¶
- Placeholder sharding in manifest:
RequiredBuildMeta.shardingis set toShardingStrategy.HASHas a placeholder since the coreManifestShardingSpecdoes not support vector sharding strategies. The actual vector sharding config lives incustom["vector"]. This is fine because non-vector readers never interpret the sharding field for vector manifests. - Payload sidecar: Storing payloads in a separate SQLite file adds a second S3 GET per shard load. An alternative would be embedding payloads in the index file format, but lancedb does not support arbitrary payload storage, and the SQLite approach keeps payloads queryable.
- No graph inspection: The
HnswGraph/HnswNodetypes exist intypes.pybut are not populated by the lancedb adapter. lancedb manages the graph internally and does not expose node-level structure. These types remain available for potential future adapters that do expose graph internals.