Reader Side¶
Overview¶
The reader side provides three classes for looking up keys across sharded snapshots, whether the shard database is SlateDB or SQLite:
ShardedReader— non-thread-safe, for single-threaded servicesConcurrentShardedReader— thread-safe with lock or pool concurrency modesAsyncShardedReader— for asyncio services (FastAPI, aiohttp, etc.)
All reader variants follow the same lifecycle:
- Load the
_CURRENTpointer from S3 - Dereference the manifest JSON it points to
- Open one shard reader per
db_id - Build a
SnapshotRouterthat maps keys to shard IDs (mirroring write-time sharding)
ShardedReader vs ConcurrentShardedReader¶
| Feature | ShardedReader | ConcurrentShardedReader |
|---|---|---|
| Thread-safe | No | Yes |
| Locking overhead | None | Per-shard lock or pool checkout |
| Reference counting | No | Yes (safe refresh under load) |
| Best for | Single-threaded services, scripts | Multi-threaded web servers, gRPC services, CLI |
Use ShardedReader when your application is single-threaded or you manage concurrency externally. Use ConcurrentShardedReader when multiple threads may call get() / multi_get() concurrently.
Basic Usage¶
from shardyfusion import ShardedReader
reader = ShardedReader(
s3_prefix="s3://bucket/prefix",
local_root="/tmp/shardyfusion-reader",
)
value = reader.get(123)
values = reader.multi_get([1, 2, 3])
changed = reader.refresh()
reader.close()
Both readers support the context manager protocol:
with ShardedReader(
s3_prefix="s3://bucket/prefix",
local_root="/tmp/shardyfusion-reader",
) as reader:
value = reader.get(123)
Thread-Safety Modes¶
ConcurrentShardedReader supports two concurrency modes via the thread_safety parameter:
Lock Mode (default)¶
One reader instance per shard, protected by a threading.Lock. Concurrent threads serialize on the lock for each shard.
from shardyfusion import ConcurrentShardedReader
reader = ConcurrentShardedReader(
s3_prefix="s3://bucket/prefix",
local_root="/tmp/shardyfusion-reader",
thread_safety="lock",
max_workers=4, # for multi_get parallelism
)
Best for: moderate concurrency where read latency is low relative to lock contention.
Pool Mode¶
Multiple reader instances per shard (max_workers copies), managed via a checkout pool. Concurrent threads get their own reader — no lock contention.
from datetime import timedelta
reader = ConcurrentShardedReader(
s3_prefix="s3://bucket/prefix",
local_root="/tmp/shardyfusion-reader",
thread_safety="pool",
max_workers=8, # 8 reader copies per shard
pool_checkout_timeout=timedelta(seconds=30.0), # duration to wait for an available reader
)
Best for: high concurrency or when individual reads have non-trivial latency. Trades memory for throughput.
When all max_workers reader copies for a shard are checked out, the next thread blocks for up to pool_checkout_timeout (default 30s). If the timeout expires, PoolExhaustedError is raised. Tune this value based on your expected read latency and concurrency level.
Guidance on max_workers: In lock mode, max_workers controls the ThreadPoolExecutor used by multi_get to read from multiple shards in parallel. In pool mode, it also controls how many reader copies are created per shard. Start with 4 and tune based on observed contention.
Parameter constraints¶
Both reader classes validate their parameters at construction time:
max_workersmust be a positive integer (>= 1), orNone(default)pool_checkout_timeoutmust be> 0(only used in pool mode)
Invalid values raise ValueError immediately.
Refresh Semantics¶
Both readers support refresh() to pick up new snapshots without restarting:
changed = reader.refresh() # True if a newer snapshot was loaded
How refresh works¶
- Load the
_CURRENTpointer from S3 - Compare the
manifest_refto the current state - If unchanged, return
False(no-op) - Load the new manifest and open new shard readers
- Swap in the new state and close old readers
Concurrent refresh safety¶
ConcurrentShardedReader uses reference counting to ensure safe refresh under concurrent reads:
sequenceDiagram
participant T1 as Thread 1 (get)
participant R as Reader
participant T2 as Thread 2 (refresh)
T1->>R: acquire_state (refcount=1)
T1->>R: read from shard
T2->>R: refresh() — swap state
Note over R: old state marked retired, refcount=1
T1->>R: release_state (refcount=0)
Note over R: refcount=0 + retired → close old readers
T2->>R: new state active
In-flight reads always complete against the state they acquired. Old readers are only closed when the last reference is released (refcount drops to 0).
Custom Reader Factory¶
By default, synchronous readers use SlateDbReaderFactory, which creates SlateDBReader instances. You can also pass a SQLite reader factory from shardyfusion.sqlite_adapter, or provide a custom factory for testing or alternative storage:
from pathlib import Path
from shardyfusion import ShardedReader
def my_factory(*, db_url: str, local_dir: Path, checkpoint_id: str | None):
return MyCustomReader(db_url, local_dir, checkpoint_id)
reader = ShardedReader(
s3_prefix="s3://bucket/prefix",
local_root="/tmp/reader",
reader_factory=my_factory,
)
The factory must conform to the ShardReaderFactory protocol — a callable accepting db_url, local_dir, and checkpoint_id keyword arguments, returning an object with get(key: bytes) -> bytes | None and close() methods.
Custom Manifest Store¶
For manifests not stored in the default S3 JSON format, provide a ManifestStore:
reader = ShardedReader(
s3_prefix="s3://bucket/prefix",
local_root="/tmp/reader",
manifest_store=my_custom_store, # implements ManifestStore protocol
)
Snapshot Info¶
Both readers expose metadata about the current snapshot:
info = reader.snapshot_info()
print(info.run_id, info.num_dbs, info.sharding, info.manifest_ref)
Shard Metadata and Routing¶
Both readers provide methods to inspect shard metadata and routing without performing database reads:
# Per-shard summary (returns list[ShardDetail])
for shard in reader.shard_details():
print(shard.db_id, shard.row_count, shard.min_key, shard.max_key, shard.db_url)
# Which shard does a key route to?
db_id = reader.route_key(42)
# Full shard metadata for a key (returns RequiredShardMeta)
meta = reader.shard_for_key(42)
print(meta.db_id, meta.db_url, meta.row_count)
# Batch: mapping of keys to their shard metadata
mapping = reader.shards_for_keys([1, 42, 100])
Direct Shard Access¶
For advanced use cases, you can borrow a raw read handle for a specific shard.
The returned ShardReaderHandle delegates to the underlying shard database
but does not close the database when you call close() — it only releases
the borrow (and decrements the refcount / borrow count).
ShardReaderHandle supports the context manager protocol for automatic cleanup:
with reader.reader_for_key(42) as handle:
raw = handle.get(key_bytes) # single raw bytes lookup
batch = handle.multi_get([k1, k2]) # batch lookup on one shard
# Batch variant: one handle per unique key
handles = reader.readers_for_keys([1, 42, 100])
try:
for key, h in handles.items():
print(key, h.get(encode(key)))
finally:
for h in handles.values():
h.close()
Warning
Always close borrowed handles (or use with). For both reader variants,
each handle holds a borrow/refcount increment — unclosed handles prevent
cleanup of old state after refresh().
Reader Health¶
All readers expose a health() method that returns a diagnostic snapshot:
health = reader.health()
print(health.status) # "healthy", "degraded", or "unhealthy"
print(health.manifest_ref) # current manifest reference
print(health.manifest_age) # timedelta since manifest was published
print(health.num_shards) # number of active shards
print(health.is_closed) # whether the reader has been closed
ReaderHealth fields:
| Field | Type | Description |
|---|---|---|
status |
Literal["healthy", "degraded", "unhealthy"] |
Overall reader health |
manifest_ref |
str |
Reference to the currently loaded manifest |
manifest_age |
timedelta |
Age of the manifest |
num_shards |
int |
Number of shards in the current snapshot |
is_closed |
bool |
Whether the reader has been closed |
Use staleness_threshold to control when a manifest is considered stale (which affects the status field):
from datetime import timedelta
health = reader.health(staleness_threshold=timedelta(minutes=5))
Health Check Integration¶
from datetime import timedelta
from fastapi import FastAPI
from shardyfusion import ConcurrentShardedReader
app = FastAPI()
reader = ConcurrentShardedReader(...)
@app.get("/health")
def health_check():
h = reader.health(staleness_threshold=timedelta(minutes=10))
status_code = 200 if h.status == "healthy" else 503
return {"status": h.status, "manifest_age_s": h.manifest_age.total_seconds()}, status_code
Warning
Calling health() on a closed reader raises ReaderStateError.
Cold-Start Fallback¶
If the latest manifest is malformed (e.g., partial write, schema migration in progress), the reader can fall back to a previous valid manifest instead of failing outright.
flowchart TD
A["load_current()"] --> B["load_manifest(ref)"]
B -->|Success| C["Open shard readers"]
B -->|ManifestParseError| D{"max_fallback_attempts > 0?"}
D -->|No| E["Raise ManifestParseError"]
D -->|Yes| F["list_manifests(limit=N+1)"]
F --> G["Try each manifest\n(newest first, skip current)"]
G -->|Found valid| C
G -->|All invalid| E
On Cold Start¶
When a reader is first constructed and load_manifest() raises ManifestParseError, the reader walks backward through list_manifests() to find a valid manifest:
reader = ConcurrentShardedReader(
s3_prefix="s3://bucket/prefix",
local_root="/tmp/reader",
max_fallback_attempts=3, # default: try up to 3 previous manifests
)
Set max_fallback_attempts=0 to disable fallback and fail immediately on a malformed manifest.
On Refresh¶
During refresh(), if the new manifest is malformed, the reader silently skips it and keeps its current good state. refresh() returns False in this case — no error is raised.
changed = reader.refresh() # False if new manifest was malformed → kept old state
This applies to all reader variants: ShardedReader, ConcurrentShardedReader, and AsyncShardedReader.
Reader-Side Rate Limiting¶
All reader constructors accept a rate_limiter parameter to throttle read operations:
from shardyfusion import ConcurrentShardedReader, TokenBucket
limiter = TokenBucket(rate=1000.0) # 1000 reads/sec
reader = ConcurrentShardedReader(
s3_prefix="s3://bucket/prefix",
local_root="/tmp/reader",
rate_limiter=limiter,
)
# Each get() and multi_get() call acquires tokens before reading
value = reader.get(42) # blocks if rate limit exceeded
Async Reader Rate Limiting¶
AsyncShardedReader uses acquire_async() (based on asyncio.sleep) — it never blocks the event loop:
from shardyfusion import AsyncShardedReader, TokenBucket
limiter = TokenBucket(rate=1000.0)
reader = await AsyncShardedReader.open(
s3_prefix="s3://bucket/prefix",
local_root="/tmp/reader",
rate_limiter=limiter,
)
value = await reader.get(42) # uses asyncio.sleep if throttled
Note
The rate limiter uses try_acquire() (pure arithmetic, guaranteed non-blocking) paired with asyncio.sleep for the wait — no asyncio.to_thread() delegation required.
Metrics¶
Pass a MetricsCollector to observe reader lifecycle events:
reader = ShardedReader(
s3_prefix="s3://bucket/prefix",
local_root="/tmp/reader",
metrics_collector=my_collector,
)
Events emitted: READER_INITIALIZED, READER_GET, READER_MULTI_GET, READER_REFRESHED, READER_CLOSED.
AsyncShardedReader¶
For asyncio-based services (FastAPI, aiohttp, etc.), AsyncShardedReader provides a fully async interface that avoids blocking the event loop.
Installation¶
# Default async reader (SlateDB backend + native async S3)
uv sync --extra read-async
# Async SQLite reader wrappers (use with AsyncSqliteReaderFactory)
uv sync --extra sqlite-async
# Async SQLite range reads (adds APSW for AsyncSqliteRangeReaderFactory)
uv sync --extra sqlite-async --extra read-sqlite-range
AsyncShardedReader defaults to AsyncS3ManifestStore, so aiobotocore is required unless you pass a custom AsyncManifestStore. The read-async extra installs the default SlateDB async stack. The sqlite-async extra installs AsyncSqliteReaderFactory; add read-sqlite-range as well if you need the APSW-backed AsyncSqliteRangeReaderFactory.
Basic Usage¶
from shardyfusion import AsyncShardedReader
# Use the async factory (since __init__ can't do async I/O)
reader = await AsyncShardedReader.open(
s3_prefix="s3://bucket/prefix",
local_root="/tmp/shardyfusion-reader",
)
value = await reader.get(123)
values = await reader.multi_get([1, 2, 3])
changed = await reader.refresh()
await reader.close()
Async Context Manager¶
async with await AsyncShardedReader.open(
s3_prefix="s3://bucket/prefix",
local_root="/tmp/shardyfusion-reader",
) as reader:
value = await reader.get(123)
Borrow Safety During Reads¶
get() and multi_get() hold borrow count increments on the underlying reader state for the duration of each read. This prevents refresh() or close() from closing shard readers while a read is in progress. The borrow is automatically released when the read completes (or raises).
Concurrency Control¶
multi_get fans out reads across shards using asyncio.TaskGroup. To limit the number of concurrent shard reads, pass max_concurrency:
reader = await AsyncShardedReader.open(
s3_prefix="s3://bucket/prefix",
local_root="/tmp/shardyfusion-reader",
max_concurrency=4, # at most 4 concurrent shard reads
)
Sync Metadata Methods¶
Routing and metadata methods are synchronous (no I/O involved):
info = reader.snapshot_info()
db_id = reader.route_key(42)
meta = reader.shard_for_key(42)
details = reader.shard_details()
Direct Shard Access¶
Borrow async handles for direct shard access:
async with reader.reader_for_key(42) as handle:
raw = await handle.get(key_bytes)
batch = await handle.multi_get([k1, k2])
Custom Async Reader Factory¶
reader = await AsyncShardedReader.open(
s3_prefix="s3://bucket/prefix",
local_root="/tmp/reader",
reader_factory=my_async_factory, # implements AsyncShardReaderFactory
)
The factory must be an async callable returning an object with async def get(key: bytes) -> bytes | None and async def close() -> None.