API Reference¶
Config¶
Configuration models for sharded snapshot writes.
ManifestOptions
dataclass
¶
Manifest build and publish settings.
OutputOptions
dataclass
¶
Output path/layout settings for shard database writes.
WriteConfig
dataclass
¶
Top-level configuration for sharded snapshot writes.
Framework-specific parameters (key_col, value_spec,
sort_within_partitions) live on the writer function signature,
not here.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
num_dbs
|
int | None
|
Number of shard databases to create. Must be > 0 for
explicit HASH sharding. Set to |
None
|
s3_prefix
|
str
|
S3 location for shard databases and manifests
(e.g. |
''
|
key_encoding
|
KeyEncoding
|
How keys are serialized to bytes. Default |
U64BE
|
batch_size
|
int
|
Number of key-value pairs per write batch. Default 50,000. |
50000
|
adapter_factory
|
DbAdapterFactory | None
|
Factory for creating shard database adapters.
Default: |
None
|
sharding
|
ShardingSpec
|
Sharding strategy configuration (hash, range, or custom). |
ShardingSpec()
|
output
|
OutputOptions
|
Output path/layout settings. |
OutputOptions()
|
manifest
|
ManifestOptions
|
Manifest build and publish settings. |
ManifestOptions()
|
metrics_collector
|
MetricsCollector | None
|
Optional observer for write lifecycle events. |
None
|
shard_retry
|
RetryConfig | None
|
Optional retry configuration. Used for per-shard retry in
Dask/Ray sharded writes, whole-database retry in Spark/Dask/Ray
|
None
|
credential_provider
|
CredentialProvider | None
|
Credential provider for S3 access. When also
set on |
None
|
s3_connection_options
|
S3ConnectionOptions | None
|
S3 transport/connection overrides. When also
set on |
None
|
Raises:
| Type | Description |
|---|---|
ConfigValidationError
|
If any parameter fails validation. |
Errors¶
Custom exception hierarchy for shardyfusion.
Failure classification¶
Each exception carries a retryable flag so callers can decide whether
retrying the operation may help:
- Retryable errors (e.g.
PublishManifestError,PublishCurrentError) may succeed on retry — typically transient infrastructure failures. - Non-retryable errors (e.g.
ConfigValidationError,ManifestParseError) indicate programmer/data errors that require a fix, not a retry.
ConfigValidationError
¶
Bases: ShardyfusionError
Configuration failed validation.
Raised during WriteConfig.__post_init__ or writer startup when
parameters are invalid (e.g. num_dbs <= 0, malformed s3_prefix,
unsupported sharding strategy for a given writer backend).
DbAdapterError
¶
Bases: ShardyfusionError
Database adapter binding was unavailable or incompatible.
Also raised when shard reader operations fail (e.g. during close).
ManifestBuildError
¶
Bases: ShardyfusionError
Manifest artifact creation failed.
Raised when the ManifestBuilder raises during build().
ManifestParseError
¶
Bases: ShardyfusionError
Manifest or CURRENT pointer payload could not be parsed or is structurally invalid.
Raised when JSON is malformed, required fields are missing, or structural invariants are violated (e.g. shard count mismatch).
ManifestStoreError
¶
Bases: ShardyfusionError
Transient manifest store failure (DB connection, query timeout, etc.).
Raised by database-backed ManifestStore implementations when the
underlying connection or query fails. Typically retryable.
PoolExhaustedError
¶
Bases: ShardyfusionError
All readers in the pool are checked out and checkout timed out.
Typically transient — a brief spike in concurrent reads or a slow shard query can exhaust the pool temporarily.
PublishCurrentError
¶
Bases: ShardyfusionError
CURRENT pointer publish operation failed.
The manifest has already been published when this error is raised.
Use manifest_ref to locate the published manifest for recovery
(e.g. manually updating the CURRENT pointer).
PublishManifestError
¶
Bases: ShardyfusionError
Manifest publish operation failed.
The manifest bytes could not be uploaded to S3. This is retryable because the failure is typically a transient S3 error.
ReaderStateError
¶
Bases: ShardyfusionError
Reader operation attempted in an invalid lifecycle state (closed, missing CURRENT).
Raised when calling get(), multi_get(), or refresh()
on a closed reader, or when the CURRENT pointer is not found.
S3TransientError
¶
Bases: ShardyfusionError
Transient S3 error that may succeed on retry (throttle, 500, 503, timeout).
Used internally by the storage layer's exponential-backoff retry logic.
ShardAssignmentError
¶
Bases: ShardyfusionError
Rows could not be assigned to valid shard IDs.
Raised when routing verification detects a mismatch between framework-assigned shard IDs and Python routing.
ShardCoverageError
¶
Bases: ShardyfusionError
Partition results did not cover all expected shard IDs.
Raised after shard writes complete if the set of shard IDs in
results doesn't match range(num_dbs).
ShardWriteError
¶
Bases: ShardyfusionError
Shard write operation failed with a potentially transient error.
Raised when adapter operations (write_batch, flush, checkpoint) fail with errors that may succeed on retry (e.g. underlying S3 I/O failures in the storage adapter).
ShardyfusionError
¶
Bases: Exception
Base exception for all package-specific errors.
Metrics¶
Configurable metrics collection for observability.
Backward-compatible: from shardyfusion.metrics import MetricEvent works
exactly as before the module-to-package promotion.
MetricEvent
¶
Bases: str, Enum
Metric events emitted during write and read operations.
Events are grouped by subsystem. Implementations should silently ignore any events they do not recognise so that new events can be added without breaking existing collectors.
MetricsCollector
¶
Bases: Protocol
Protocol for receiving metric events.
Implementations must be thread-safe. The emit() method is called
synchronously; buffer internally if blocking is a concern.
Type Definitions¶
Shared type aliases and protocols used across the package.
AsyncShardReader
¶
Bases: Protocol
Async counterpart of ShardReader for use with AsyncShardedReader.
AsyncShardReaderFactory
¶
Bases: Protocol
Factory for opening one async shard reader.
RetryConfig
dataclass
¶
Configurable retry parameters for S3 operations.
Default values preserve current hardcoded behavior.
S3ConnectionOptions
¶
Bases: TypedDict
Transport/connection overrides for boto3 S3 client construction.
Identity fields (access key, secret, session token) are handled
separately by :class:~shardyfusion.credentials.CredentialProvider.
ShardReader
¶
Credentials¶
Credential provider abstractions for S3 authentication.
Separates identity (who you are) from transport (how you connect). All dataclasses are frozen for immutability and picklability across Spark/Dask/Ray/multiprocessing boundaries.
CredentialProvider
¶
Bases: Protocol
Pull-based credential resolver.
Implementations return an S3Credentials snapshot. The caller
decides when to resolve (e.g. once at construction, or before
each request for future rotation support).
EnvCredentialProvider
dataclass
¶
Credential provider that reads S3_* / AWS_* environment variables.
Env var precedence per field (first non-empty wins):
- S3_ACCESS_KEY_ID / AWS_ACCESS_KEY_ID
- S3_SECRET_ACCESS_KEY / AWS_SECRET_ACCESS_KEY
- S3_SESSION_TOKEN / AWS_SESSION_TOKEN
S3Credentials
dataclass
¶
Resolved S3 identity — access key, secret, and optional session token.
StaticCredentialProvider
dataclass
¶
Credential provider with inline identity fields.
Flat fields (not composition) for ergonomic construction::
provider = StaticCredentialProvider(
access_key_id="AKIA...",
secret_access_key="wJal...",
)
materialize_env_file(creds)
¶
Write credentials to a temporary .env file for SlateDB's object-store.
Returns the file path. The file is created with 0o600 permissions
so only the current user can read it. Caller is responsible for
calling :func:remove_env_file after use.
remove_env_file(path)
¶
Safely delete a credentials env file created by :func:materialize_env_file.
resolve_env_file(env_file, credential_provider)
¶
Create a context manager that resolves an env file for SlateDB.
Sharding Types¶
Shared sharding types used by both writer and reader paths.
KeyEncoding
¶
Bases: str, Enum
Supported key serialization encodings.
from_value(value)
classmethod
¶
Parse an encoding value from enum or string input.
ShardingSpec
dataclass
¶
Configuration for mapping rows to shard database ids.
Two strategies are supported:
HASH (default): Uniform distribution via xxh3_64(canonical_bytes(key)) % num_dbs.
Supports int, str, and bytes keys. Requires num_dbs > 0 (explicit or computed
from max_keys_per_shard).
CEL: Flexible shard assignment via a CEL expression. The expression may return
a dense integer shard id directly, or a categorical token resolved by exact match
against routing_values. A built-in shard_hash() function (wrapping xxh3_64)
is available in CEL expressions. num_dbs is always derived from routing
metadata or discovered from data; it must not be provided explicitly.
to_manifest_dict()
¶
Return manifest-safe representation (Spark callables omitted).
ShardingStrategy
¶
Bases: str, Enum
Supported sharding strategies.
from_value(value)
classmethod
¶
Parse a strategy value from enum or string input.
validate_routing_values(routing_values, *, require_unique=True)
¶
Validate categorical routing values are unique and type-consistent.
CEL Helpers¶
CEL (Common Expression Language) integration for flexible sharding.
Provides compile/evaluate/routing functions shared by all writer backends
and the reader routing layer. Requires the cel extra:
pip install shardyfusion[cel].
CEL routing supports three resolver modes:
- direct integer mode: the expression returns the dense internal shard id
- categorical mode: the expression returns a discrete token and
routing_valuesresolves the shard id by exact match
A built-in shard_hash() function wrapping xxh3_64 is registered
automatically.
Uses cel-expr-python (Google's C++ CEL wrapper) via:
from cel_expr_python.cel import NewEnv, Type
CelColumn
dataclass
¶
Column specification for :func:cel_sharding_by_columns.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Column name as it appears in the DataFrame / record. |
required |
type
|
CelType
|
CEL type of the column (default :attr: |
STRING
|
CelType
¶
Bases: str, Enum
Supported CEL column types for :func:cel_sharding_by_columns.
CompiledCel
¶
Wrapper around a compiled CEL expression with its environment.
evaluate(context)
¶
Evaluate the CEL expression with the given variable bindings.
Uses cel_expr_python's Expression.eval(data=...) method
(a sandboxed CEL evaluator, NOT Python's built-in eval).
UnknownRoutingTokenError
¶
Bases: ValueError
Raised when categorical CEL routing produces a token not in the snapshot.
build_categorical_routing_lookup(routing_values)
¶
Build exact-match lookup for categorical CEL routing.
cel_sharding(expr, columns, *, routing_values=None)
¶
Build a :class:ShardingSpec for an arbitrary CEL expression.
This is the general-purpose companion to :func:cel_sharding_by_columns.
Use it when you want raw CEL control but do not want to construct
ShardingSpec(strategy=ShardingStrategy.CEL, ...) manually.
Direct routing example::
cel_sharding("key % 4", {"key": "int"})
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
expr
|
str
|
CEL expression to evaluate for routing. |
required |
columns
|
dict[str, str | CelType]
|
Mapping of column name to CEL type string or :class: |
required |
routing_values
|
Sequence[RoutingValue] | None
|
Optional categorical routing values for exact-match mode. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
A |
ShardingSpec
|
class: |
Raises:
| Type | Description |
|---|---|
ConfigValidationError
|
If no columns are given or an unsupported CEL type is declared. |
cel_sharding_by_columns(*columns, num_shards=None, separator=':')
¶
Build a :class:ShardingSpec for CEL-based partitioning by column values.
Each column is either a bare name (str, defaults to
:attr:CelType.STRING) or a :class:CelColumn instance. The generated
CEL expression either hashes the column value(s) and takes modulo
num_shards to produce a 0-based shard ID, or when num_shards is
omitted, returns a categorical routing token whose distinct values are
discovered from data at write time.
Single column example::
cel_sharding_by_columns("region", num_shards=10)
# → cel_expr="shard_hash(region) % 10u"
# cel_columns={"region": "string"}
Typed column example::
cel_sharding_by_columns(CelColumn("tier", CelType.INT), num_shards=4)
# → cel_expr="shard_hash(tier) % 4u"
# cel_columns={"tier": "int"}
Multiple columns example::
cel_sharding_by_columns("region", CelColumn("tier", CelType.INT), num_shards=8)
# → cel_expr='shard_hash(region + ":" + string(tier)) % 8u'
# cel_columns={"region": "string", "tier": "int"}
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*columns
|
str | CelColumn
|
One or more column specifications. |
()
|
num_shards
|
int | None
|
Target number of shards (embedded as |
None
|
separator
|
str
|
Delimiter for multi-column concatenation (default |
':'
|
Returns:
| Name | Type | Description |
|---|---|---|
A |
ShardingSpec
|
class: |
Raises:
| Type | Description |
|---|---|
ConfigValidationError
|
If no columns given or |
compile_cel(expr, columns)
¶
Compile a CEL expression with declared column types.
The shard_hash() custom function is automatically registered,
providing overloads for int, uint, string, and bytes arguments.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
expr
|
str
|
CEL expression string (e.g., |
required |
columns
|
dict[str, str]
|
Map of variable name -> type string
(e.g., |
required |
Returns:
| Type | Description |
|---|---|
CompiledCel
|
A compiled expression object that can be evaluated with variable bindings. |
Raises:
| Type | Description |
|---|---|
ImportError
|
If |
ConfigValidationError
|
If the expression fails to compile. |
evaluate_cel_arrow_batch(compiled, batch)
¶
Evaluate CEL on each row of a PyArrow RecordBatch/Table.
Used by the Spark writer via mapInArrow (Arrow batches are
the natural format). Dask and Ray writers route through
route_key() → route_cel() per row instead.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
compiled
|
CompiledCel
|
Compiled CEL expression. |
required |
batch
|
Any
|
PyArrow RecordBatch or Table. |
required |
Returns:
| Type | Description |
|---|---|
list[int | str | bytes]
|
List of routing key values, one per row. |
pandas_rows_to_contexts(pdf, cel_columns)
¶
Convert pandas DataFrame rows to CEL context dicts.
Handles numpy scalar coercion. Shared by Spark and Dask writers.
resolve_cel_routing_key(routing_key, *, routing_values=None, lookup=None)
¶
Resolve a CEL routing key to a dense internal shard id.
route_cel(compiled, context, routing_values=None, lookup=None)
¶
Evaluate CEL expression and route to a shard.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
compiled
|
CompiledCel
|
Compiled CEL expression. |
required |
context
|
dict[str, Any]
|
Variable bindings for the expression. |
required |
routing_values
|
Sequence[RoutingValue] | None
|
Optional categorical routing values. If provided, exact-match lookup gives the shard ID. |
None
|
Returns:
| Type | Description |
|---|---|
int
|
Shard db_id (0-based). |
route_cel_batch(compiled, batch, routing_values=None)
¶
Evaluate CEL and route each row to a shard.
If routing_values is provided, uses categorical exact-match lookup.
Otherwise, the CEL output is used directly as the shard ID.
validate_cel_columns(arrow_schema, cel_columns)
¶
Validate that declared CEL columns exist in an Arrow schema.
Sharding (Spark)¶
Sharding (Dask)¶
Dask-native sharding helpers.
add_db_id_column(ddf, *, key_col, num_dbs, sharding, key_encoding)
¶
Add deterministic _shard_id column via Python routing function.
Uses the same route_key() as the reader, guaranteeing the
sharding invariant without reimplementation.
For CEL sharding, builds per-row routing contexts from the CEL columns so that expressions referencing non-key columns evaluate correctly.
Sharding (Ray)¶
Ray Data-native sharding helpers.
add_db_id_column(ds, *, key_col, num_dbs, sharding, key_encoding)
¶
Add deterministic _shard_id column via Python routing function.
Uses the same route_key() as the reader, guaranteeing the
sharding invariant without reimplementation.
Uses Arrow batch format to avoid Arrow->pandas->Arrow round-trip.
For CEL sharding, delegates to route_cel_batch() which evaluates
the CEL expression with the full row context (supporting non-key columns).
Writer (Spark)¶
Public sharded snapshot writer entrypoint.
verify_routing_agreement(df_with_db_id, *, key_col, num_dbs, resolved_sharding, key_encoding, sample_size=20)
¶
Sample rows and verify Spark-computed db_id matches Python routing.
Since all strategies now use Python-based mapInArrow, this is a safety net verifying the mapInArrow output against the reader's route_key() function.
write_one_shard_partition(db_id, rows_iter, runtime)
¶
Write exactly one shard from one partition and yield one ShardAttemptResult.
write_sharded(df, config, *, key_col, value_spec, sort_within_partitions=False, spark_conf_overrides=None, cache_input=False, storage_level=None, max_writes_per_second=None, max_write_bytes_per_second=None, verify_routing=True)
¶
Write a DataFrame into N independent sharded databases and publish manifest metadata.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
PySpark DataFrame containing at least the key column and value column(s). |
required |
config
|
WriteConfig
|
Write configuration (num_dbs, s3_prefix, sharding strategy, etc.). |
required |
key_col
|
str
|
Name of the integer key column used for shard routing. |
required |
value_spec
|
ValueSpec
|
Specifies how DataFrame rows are serialized to bytes (binary_col, json_cols, or a callable encoder). |
required |
sort_within_partitions
|
bool
|
If True, sort rows by key within each partition before writing. Useful for range-scan workloads. |
False
|
spark_conf_overrides
|
dict[str, str] | None
|
Optional Spark config overrides applied for the duration of the write (restored after completion). |
None
|
cache_input
|
bool
|
If True, cache the input DataFrame before processing. |
False
|
storage_level
|
StorageLevel | None
|
Spark storage level for caching (default: MEMORY_AND_DISK). |
None
|
max_writes_per_second
|
float | None
|
Optional rate limit (token-bucket) for write ops/sec. |
None
|
max_write_bytes_per_second
|
float | None
|
Optional rate limit (token-bucket) for write bytes/sec. |
None
|
verify_routing
|
bool
|
If True (default), spot-check that Spark-assigned shard IDs match Python routing on a sample of written rows. |
True
|
Returns:
| Type | Description |
|---|---|
BuildResult
|
BuildResult with manifest reference, shard metadata, and build statistics. |
Raises:
| Type | Description |
|---|---|
ConfigValidationError
|
If configuration is invalid. |
ShardAssignmentError
|
If rows cannot be assigned to valid shard IDs. |
ShardCoverageError
|
If partition results don't cover all expected shards. |
PublishManifestError
|
If manifest upload to S3 fails. |
PublishCurrentError
|
If CURRENT pointer upload fails (manifest already published). |
Writer (Dask)¶
Dask-based sharded writer (no Spark/Java dependency).
write_sharded(ddf, config, *, key_col, value_spec, sort_within_partitions=False, max_writes_per_second=None, max_write_bytes_per_second=None, verify_routing=True)
¶
Write a Dask DataFrame into N independent sharded databases and publish manifest.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ddf
|
DataFrame
|
Dask DataFrame containing at least the key column and value column(s). |
required |
config
|
WriteConfig
|
Write configuration (num_dbs, s3_prefix, sharding strategy, etc.). HASH and CEL sharding strategies are supported. |
required |
key_col
|
str
|
Name of the key column used for shard routing. |
required |
value_spec
|
ValueSpec
|
Specifies how DataFrame rows are serialized to bytes (binary_col, json_cols, or a callable encoder). |
required |
sort_within_partitions
|
bool
|
If True, sort rows by key within each partition. |
False
|
max_writes_per_second
|
float | None
|
Optional rate limit (token-bucket) for write ops/sec. |
None
|
max_write_bytes_per_second
|
float | None
|
Optional rate limit (token-bucket) for write bytes/sec. |
None
|
verify_routing
|
bool
|
If True (default), spot-check that Dask-assigned shard IDs match Python routing on a sample of written rows. |
True
|
Returns:
| Type | Description |
|---|---|
BuildResult
|
BuildResult with manifest reference, shard metadata, and build statistics. |
Raises:
| Type | Description |
|---|---|
ConfigValidationError
|
If configuration is invalid. |
ShardAssignmentError
|
If rows cannot be assigned to valid shard IDs. |
ShardCoverageError
|
If partition results don't cover all expected shards. |
PublishManifestError
|
If manifest upload to S3 fails. |
PublishCurrentError
|
If CURRENT pointer upload fails (manifest already published). |
Writer (Ray)¶
Ray Data-based sharded writer (no Spark/Java dependency).
write_sharded(ds, config, *, key_col, value_spec, sort_within_partitions=False, max_writes_per_second=None, max_write_bytes_per_second=None, verify_routing=True)
¶
Write a Ray Dataset into N independent sharded databases and publish manifest.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ds
|
Dataset
|
Ray Dataset containing at least the key column and value column(s). |
required |
config
|
WriteConfig
|
Write configuration (num_dbs, s3_prefix, sharding strategy, etc.). HASH and CEL sharding strategies are supported. |
required |
key_col
|
str
|
Name of the key column used for shard routing. |
required |
value_spec
|
ValueSpec
|
Specifies how DataFrame rows are serialized to bytes (binary_col, json_cols, or a callable encoder). |
required |
sort_within_partitions
|
bool
|
If True, sort rows by key within each partition. |
False
|
max_writes_per_second
|
float | None
|
Optional rate limit (token-bucket) for write ops/sec. |
None
|
max_write_bytes_per_second
|
float | None
|
Optional rate limit (token-bucket) for write bytes/sec. |
None
|
verify_routing
|
bool
|
If True (default), spot-check that Ray-assigned shard IDs match Python routing on a sample of written rows. |
True
|
Returns:
| Type | Description |
|---|---|
BuildResult
|
BuildResult with manifest reference, shard metadata, and build statistics. |
Raises:
| Type | Description |
|---|---|
ConfigValidationError
|
If configuration is invalid. |
ShardAssignmentError
|
If rows cannot be assigned to valid shard IDs. |
ShardCoverageError
|
If partition results don't cover all expected shards. |
PublishManifestError
|
If manifest upload to S3 fails. |
PublishCurrentError
|
If CURRENT pointer upload fails (manifest already published). |
Writer (Python)¶
Pure-Python iterator-based sharded writer (no Spark dependency).
write_sharded(records, config, *, key_fn, value_fn, columns_fn=None, parallel=False, max_queue_size=100, max_parallel_shared_memory_bytes=_DEFAULT_MAX_PARALLEL_SHARED_MEMORY_BYTES, max_parallel_shared_memory_bytes_per_worker=_DEFAULT_MAX_PARALLEL_SHARED_MEMORY_BYTES_PER_WORKER, max_writes_per_second=None, max_write_bytes_per_second=None, max_total_batched_items=None, max_total_batched_bytes=None)
¶
Write an iterable of records into N sharded databases.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
records
|
Iterable[T]
|
Iterable of records to write. Each record is passed to
|
required |
config
|
WriteConfig
|
Write configuration (num_dbs, s3_prefix, sharding strategy, etc.). HASH and CEL sharding strategies are supported. |
required |
key_fn
|
Callable[[T], KeyInput]
|
Callable that extracts the routing key from each record. |
required |
value_fn
|
Callable[[T], bytes]
|
Callable that serializes each record to value bytes. |
required |
parallel
|
bool
|
If True, use one subprocess per shard ( |
False
|
max_queue_size
|
int
|
Maximum per-shard queue depth in parallel mode. In shared-memory parallel mode this bounds control messages, not payload bytes. |
100
|
max_parallel_shared_memory_bytes
|
int | None
|
Global cap on outstanding shared-memory
payload bytes in parallel mode. When exceeded, the parent blocks until
workers reclaim segments. |
_DEFAULT_MAX_PARALLEL_SHARED_MEMORY_BYTES
|
max_parallel_shared_memory_bytes_per_worker
|
int | None
|
Per-worker cap on outstanding
shared-memory payload bytes in parallel mode. When exceeded, the parent
blocks until that worker reclaims segments. |
_DEFAULT_MAX_PARALLEL_SHARED_MEMORY_BYTES_PER_WORKER
|
max_writes_per_second
|
float | None
|
Optional rate limit (token-bucket) for write ops/sec. |
None
|
max_write_bytes_per_second
|
float | None
|
Optional rate limit (token-bucket) for write bytes/sec. |
None
|
max_total_batched_items
|
int | None
|
Global cap on total buffered items across all shard batches (single-process mode only). When exceeded, the shard with the largest batch is flushed. Prevents OOM with many shards. |
None
|
max_total_batched_bytes
|
int | None
|
Global cap on total buffered bytes across all shard batches (single-process mode only). When exceeded, the shard with the largest byte footprint is flushed. |
None
|
Returns:
| Type | Description |
|---|---|
BuildResult
|
BuildResult with manifest reference, shard metadata, and build statistics. |
Raises:
| Type | Description |
|---|---|
ConfigValidationError
|
If configuration is invalid. |
ShardCoverageError
|
If partition results don't cover all expected shards. |
PublishManifestError
|
If manifest upload to S3 fails. |
PublishCurrentError
|
If CURRENT pointer upload fails (manifest already published). |
ShardyfusionError
|
If a worker process fails in parallel mode. |
Manifest Models and Builders¶
Manifest models and extensibility protocols.
BuildDurations
dataclass
¶
Measured phase durations in milliseconds.
BuildResult
dataclass
¶
Result from write_sharded.
BuildStats
dataclass
¶
Deterministic build statistics with fixed schema.
CurrentPointer
¶
Bases: BaseModel
JSON pointer to the latest published manifest.
ManifestArtifact
dataclass
¶
Serializable artifact to publish for manifests or pointers.
ManifestRef
dataclass
¶
Backend-agnostic pointer to a published manifest.
ManifestShardingSpec
¶
Bases: BaseModel
Manifest-safe sharding specification (no Callable fields).
This is the Pydantic counterpart of :class:ShardingSpec for
serialization and deserialization in manifests.
ParsedManifest
¶
Bases: BaseModel
Typed representation of a parsed manifest payload.
RequiredBuildMeta
¶
Bases: BaseModel
Library-owned metadata required for all manifests.
RequiredShardMeta
¶
Bases: BaseModel
Winner shard metadata emitted by writer partitions.
SqliteManifestBuilder
¶
Manifest builder emitting a SQLite database.
The database contains two tables:
build_meta— single row with :class:RequiredBuildMetafields. Nestedshardingspec andcustomfields stored as JSON TEXT.shards— one row per non-empty shard, indexed bydb_id.min_key/max_keystored as JSON to preserve type info.writer_infostored as JSON TEXT.
The serialized database is returned as the payload of a
:class:ManifestArtifact with content type application/x-sqlite3.
Uses :meth:sqlite3.Connection.serialize (Python 3.11+) for
zero-copy in-memory serialization.
WriterInfo
dataclass
¶
Per-attempt metadata from the writer framework.
Manifest Store and Storage¶
Unified manifest persistence protocol and implementations.
ManifestStore replaces the previous ManifestPublisher + ManifestReader pair with a single interface that accepts Pydantic models directly. S3ManifestStore implements two-phase publish internally; database backends can use a single atomic transaction.
InMemoryManifestStore
¶
In-memory manifest store for tests — replaces InMemoryPublisher.
ManifestStore
¶
Bases: Protocol
Unified protocol for persisting and loading manifests.
list_manifests(*, limit=10)
¶
Return up to limit manifests in reverse chronological order.
load_current()
¶
Return the current manifest pointer, or None if not published.
load_manifest(ref)
¶
Load and parse a manifest by reference.
publish(*, run_id, required_build, shards, custom)
¶
Persist manifest + update current pointer. Return a manifest reference.
set_current(ref)
¶
Update the current pointer to the given manifest reference.
S3ManifestStore
¶
Manifest store backed by S3 — merges publish and read in one class.
Publish is two-phase: manifest file first, then _CURRENT pointer.
Manifest keys are timestamp-prefixed for chronological listing.
SqliteShardLookup
¶
Lazy shard metadata provider backed by a SQLite manifest connection.
Satisfies the :class:~shardyfusion.routing.ShardLookup protocol.
Queries the shards table by db_id (B-tree index) and caches
results in an LRU dict with bounded size.
Parameters¶
con:
An open sqlite3.Connection to a SQLite manifest database.
The caller owns the connection lifetime; this class does not
close it.
num_dbs:
Total number of shards (from RequiredBuildMeta.num_dbs).
cache_size:
Maximum number of shard entries to cache. 0 disables caching.
get_shard(db_id)
¶
Return shard metadata for db_id, or a synthetic empty entry.
load_sqlite_build_meta(con)
¶
Read RequiredBuildMeta and custom fields from an open SQLite manifest.
Unlike :func:parse_sqlite_manifest (which takes raw bytes and loads
all shards), this reads only the build_meta row — a single small
query, ideal for lazy-router initialization.
Returns (required_build, custom_fields).
parse_current_pointer_to_ref(payload)
¶
Parse a raw _CURRENT JSON payload into a ManifestRef.
parse_manifest_dir_entry(dir_name, s3_prefix, manifest_name)
¶
Parse a timestamp-prefixed directory name into a ManifestRef, or None.
parse_manifest_payload(payload)
¶
Parse a manifest payload.
The payload must be a serialized SQLite database (starts with the
16-byte SQLite magic header). Raises :class:ManifestParseError
for unrecognised formats.
parse_sqlite_manifest(payload)
¶
Parse a SQLite manifest payload into typed ParsedManifest.
Opens the serialized SQLite database in memory via
:meth:sqlite3.Connection.deserialize, reads build_meta and
shards tables, and validates identically to :func:parse_manifest.
Async manifest store protocol and implementations.
AsyncManifestStore is the read-only async counterpart of
ManifestStore. AsyncS3ManifestStore uses aiobotocore for
native async S3 I/O.
AsyncManifestStore
¶
Bases: Protocol
Read-only async manifest loading (no publish — that's writer-side).
AsyncS3ManifestStore
¶
Native async S3 manifest store using aiobotocore.
Each load_current() / load_manifest() call creates a short-lived
S3 client via the session context manager. These calls are infrequent
(init + refresh only), so per-call clients are simple and safe.
Database-backed ManifestStore implementation (PostgreSQL).
Stores manifests in normalized _builds + _shards tables and tracks
the current pointer via an append-only _pointer table.
No external dependencies beyond a DB-API 2 driver (user-provided psycopg2).
PostgresManifestStore
¶
PostgreSQL manifest store with normalized builds + shards tables.
S3 small-object helpers used by the default publisher.
create_s3_client(credentials=None, connection_options=None)
¶
Create boto3 S3 client with optional explicit credentials and connection options.
delete_prefix(prefix_url, *, s3_client=None, metrics_collector=None, retry_config=None)
¶
Delete all objects under an S3 prefix. Returns the number of objects deleted.
When retry_config is None (the default), failures are logged but
not raised (best-effort). When a retry_config is provided, each
list+delete page is wrapped in _retry_s3_operation() and transient
errors are retried; non-transient errors propagate.
get_bytes(url, *, s3_client=None, metrics_collector=None, retry_config=None)
¶
Read object bytes from S3 URL with automatic retry on transient errors.
join_s3(base, *parts)
¶
Join S3 URL path segments, stripping redundant slashes.
list_prefixes(prefix_url, *, s3_client=None, retry_config=None)
¶
List immediate child prefixes under an S3 prefix (CommonPrefixes).
Returns full S3 URLs for each child prefix, sorted lexicographically. Uses manual pagination with per-page retry for resilience against transient S3 errors mid-pagination.
parse_s3_url(url)
¶
Parse s3://bucket/key into (bucket, key).
put_bytes(url, payload, content_type, headers=None, *, s3_client=None, metrics_collector=None, retry_config=None)
¶
PUT bytes to S3 URL with automatic retry on transient S3 errors.
try_get_bytes(url, *, s3_client=None, metrics_collector=None, retry_config=None)
¶
Read object bytes and return None when object is not found.
Transient S3 errors (throttling, timeouts) are retried automatically.
Routing and Reader¶
Snapshot routing helpers for sharded manifests.
ShardLookup
¶
Bases: Protocol
Protocol for shard metadata access — eager (list) or lazy (SQLite).
get_shard(db_id)
¶
Return metadata for a shard by db_id.
Must return a synthetic empty-shard entry (db_url=None,
row_count=0) for db_ids that have no data.
SnapshotRouter
¶
Route point lookups to a shard database id using manifest sharding metadata.
Operates in two modes:
- Eager (default
__init__): preloads all shards into a list. Best for small-to-medium shard counts (up to ~10 K). - Lazy (:meth:
from_build_meta): delegates shard lookups to a :class:ShardLookupprovider (e.g. backed by SQLite with range reads). Constant memory regardless of shard count.
Both modes expose :meth:get_shard for uniform shard access.
is_lazy
property
¶
True when this router uses lazy shard lookup (no preloaded list).
from_build_meta(required_build, *, shard_lookup)
classmethod
¶
Create a router with lazy shard lookup (no preloaded shard list).
Use this when the manifest has too many shards to load into memory.
The shard_lookup provider supplies :class:RequiredShardMeta on
demand (e.g. via indexed SQLite queries).
.. note::
router.shards is an empty list in lazy mode. Code that
iterates over all shards must use the lookup provider directly
or switch to :meth:get_shard.
get_shard(db_id)
¶
Look up shard metadata by db_id.
Works in both eager (list) and lazy (lookup) mode. Returns a synthetic empty-shard entry for db_ids with no data.
group_keys(keys, *, routing_context=None)
¶
Group keys by routed db id while preserving order within each shard bucket.
group_keys_allow_missing(keys, *, routing_context=None)
¶
Group keys by routed db id, returning unroutable categorical keys separately.
route(key, *, routing_context=None)
¶
Route a key, using routing_context for CEL multi-column mode if provided.
route_with_context(routing_context)
¶
Route using CEL with explicit routing context.
In multi-column mode, the CEL expression evaluates over multiple columns from the routing context, not just the key.
canonical_bytes(key)
¶
Convert a key to its canonical byte representation for hashing.
int→ 8-byte signed little-endian (range [-2^63, 2^63-1])str→ UTF-8 encoded bytesbytes/bytearray→ passed through as-is
xxh3_db_id(key, num_dbs)
¶
Route a key to a shard db_id using xxh3_64.
xxh3_digest(key) % num_dbs
xxh3_digest(key)
¶
Compute the xxh3_64 digest of a key's canonical bytes.
This is the single authoritative hash step used by both HASH routing
(xxh3_db_id) and the CEL shard_hash() function.
Service-side sharded reader — non-thread-safe variant.
ShardedReader
¶
Bases: _BaseShardedReader
Non-thread-safe sharded reader for single-threaded services.
Provides the same get / multi_get / refresh API as
ConcurrentShardedReader but without locks or reference counting.
health(*, staleness_threshold=None)
¶
Return a diagnostic snapshot of reader state.
reader_for_key(key)
¶
Return a borrowed read handle for the shard a key routes to.
readers_for_keys(keys)
¶
Return a mapping of keys to borrowed read handles.
route_key(key, *, routing_context=None)
¶
Return the shard db_id a key would route to.
shard_details()
¶
Return per-shard metadata from the current manifest.
shard_for_key(key, *, routing_context=None)
¶
Return shard metadata for the shard a key routes to.
shards_for_keys(keys)
¶
Return a mapping of keys to their shard metadata.
Async sharded reader for asyncio-based services (FastAPI, aiohttp, etc.).
AsyncShardReaderHandle
¶
Borrowed async reference to a single shard's reader.
Supports async with and explicit close(). Closing releases the
borrow (decrements refcount) but does not close the underlying shard.
close()
¶
Release the borrow. Safe to call multiple times.
AsyncShardedReader
¶
Async sharded reader for single-threaded asyncio services.
Uses asyncio.Lock for refresh serialization and native async
SlateDB calls on the read path.
health(*, staleness_threshold=None)
¶
Return a diagnostic snapshot of reader state.
open(*, s3_prefix, local_root, manifest_store=None, current_pointer_key='_CURRENT', reader_factory=None, slate_env_file=None, credential_provider=None, s3_connection_options=None, max_concurrency=None, max_fallback_attempts=3, metrics_collector=None, rate_limiter=None)
async
classmethod
¶
Create and initialize an async sharded reader.
AsyncSlateDbReaderFactory
dataclass
¶
Default async factory using SlateDBReader.open_async().
Serde¶
Key/value serialization utilities.
KeyEncoder = Callable[[object], bytes]
module-attribute
¶
Callable that encodes a key into bytes for SlateDB.
ValueSpec
dataclass
¶
Value serialization strategy for a Spark row.
make_key_encoder(encoding)
¶
Return a key encoder for the given encoding.
Resolves the encoding branch once at setup time so hot loops can call the returned function without per-key dispatch.