Skip to content

API Reference

Config

Configuration models for sharded snapshot writes.

ManifestOptions dataclass

Manifest build and publish settings.

OutputOptions dataclass

Output path/layout settings for shard database writes.

WriteConfig dataclass

Top-level configuration for sharded snapshot writes.

Framework-specific parameters (key_col, value_spec, sort_within_partitions) live on the writer function signature, not here.

Parameters:

Name Type Description Default
num_dbs int | None

Number of shard databases to create. Must be > 0 for explicit HASH sharding. Set to None (default) when using CEL sharding or max_keys_per_shard (auto-discovered at write time).

None
s3_prefix str

S3 location for shard databases and manifests (e.g. s3://bucket/prefix).

''
key_encoding KeyEncoding

How keys are serialized to bytes. Default u64be (8-byte big-endian). Use u32be for 4-byte keys.

U64BE
batch_size int

Number of key-value pairs per write batch. Default 50,000.

50000
adapter_factory DbAdapterFactory | None

Factory for creating shard database adapters. Default: SlateDbFactory().

None
sharding ShardingSpec

Sharding strategy configuration (hash, range, or custom).

ShardingSpec()
output OutputOptions

Output path/layout settings.

OutputOptions()
manifest ManifestOptions

Manifest build and publish settings.

ManifestOptions()
metrics_collector MetricsCollector | None

Optional observer for write lifecycle events.

None
shard_retry RetryConfig | None

Optional retry configuration. Used for per-shard retry in Dask/Ray sharded writes, whole-database retry in Spark/Dask/Ray write_single_db(), and retry-enabled Python parallel writes.

None
credential_provider CredentialProvider | None

Credential provider for S3 access. When also set on manifest, the manifest-level provider takes precedence.

None
s3_connection_options S3ConnectionOptions | None

S3 transport/connection overrides. When also set on manifest, the manifest-level options take precedence.

None

Raises:

Type Description
ConfigValidationError

If any parameter fails validation.

Errors

Custom exception hierarchy for shardyfusion.

Failure classification

Each exception carries a retryable flag so callers can decide whether retrying the operation may help:

  • Retryable errors (e.g. PublishManifestError, PublishCurrentError) may succeed on retry — typically transient infrastructure failures.
  • Non-retryable errors (e.g. ConfigValidationError, ManifestParseError) indicate programmer/data errors that require a fix, not a retry.

ConfigValidationError

Bases: ShardyfusionError

Configuration failed validation.

Raised during WriteConfig.__post_init__ or writer startup when parameters are invalid (e.g. num_dbs <= 0, malformed s3_prefix, unsupported sharding strategy for a given writer backend).

DbAdapterError

Bases: ShardyfusionError

Database adapter binding was unavailable or incompatible.

Also raised when shard reader operations fail (e.g. during close).

ManifestBuildError

Bases: ShardyfusionError

Manifest artifact creation failed.

Raised when the ManifestBuilder raises during build().

ManifestParseError

Bases: ShardyfusionError

Manifest or CURRENT pointer payload could not be parsed or is structurally invalid.

Raised when JSON is malformed, required fields are missing, or structural invariants are violated (e.g. shard count mismatch).

ManifestStoreError

Bases: ShardyfusionError

Transient manifest store failure (DB connection, query timeout, etc.).

Raised by database-backed ManifestStore implementations when the underlying connection or query fails. Typically retryable.

PoolExhaustedError

Bases: ShardyfusionError

All readers in the pool are checked out and checkout timed out.

Typically transient — a brief spike in concurrent reads or a slow shard query can exhaust the pool temporarily.

PublishCurrentError

Bases: ShardyfusionError

CURRENT pointer publish operation failed.

The manifest has already been published when this error is raised. Use manifest_ref to locate the published manifest for recovery (e.g. manually updating the CURRENT pointer).

PublishManifestError

Bases: ShardyfusionError

Manifest publish operation failed.

The manifest bytes could not be uploaded to S3. This is retryable because the failure is typically a transient S3 error.

ReaderStateError

Bases: ShardyfusionError

Reader operation attempted in an invalid lifecycle state (closed, missing CURRENT).

Raised when calling get(), multi_get(), or refresh() on a closed reader, or when the CURRENT pointer is not found.

S3TransientError

Bases: ShardyfusionError

Transient S3 error that may succeed on retry (throttle, 500, 503, timeout).

Used internally by the storage layer's exponential-backoff retry logic.

ShardAssignmentError

Bases: ShardyfusionError

Rows could not be assigned to valid shard IDs.

Raised when routing verification detects a mismatch between framework-assigned shard IDs and Python routing.

ShardCoverageError

Bases: ShardyfusionError

Partition results did not cover all expected shard IDs.

Raised after shard writes complete if the set of shard IDs in results doesn't match range(num_dbs).

ShardWriteError

Bases: ShardyfusionError

Shard write operation failed with a potentially transient error.

Raised when adapter operations (write_batch, flush, checkpoint) fail with errors that may succeed on retry (e.g. underlying S3 I/O failures in the storage adapter).

ShardyfusionError

Bases: Exception

Base exception for all package-specific errors.

Metrics

Configurable metrics collection for observability.

Backward-compatible: from shardyfusion.metrics import MetricEvent works exactly as before the module-to-package promotion.

MetricEvent

Bases: str, Enum

Metric events emitted during write and read operations.

Events are grouped by subsystem. Implementations should silently ignore any events they do not recognise so that new events can be added without breaking existing collectors.

MetricsCollector

Bases: Protocol

Protocol for receiving metric events.

Implementations must be thread-safe. The emit() method is called synchronously; buffer internally if blocking is a concern.

Type Definitions

Shared type aliases and protocols used across the package.

AsyncShardReader

Bases: Protocol

Async counterpart of ShardReader for use with AsyncShardedReader.

AsyncShardReaderFactory

Bases: Protocol

Factory for opening one async shard reader.

RetryConfig dataclass

Configurable retry parameters for S3 operations.

Default values preserve current hardcoded behavior.

S3ConnectionOptions

Bases: TypedDict

Transport/connection overrides for boto3 S3 client construction.

Identity fields (access key, secret, session token) are handled separately by :class:~shardyfusion.credentials.CredentialProvider.

ShardReader

Bases: Protocol

Minimal shard reader shape used by the sharded reader service.

close()

Release reader resources.

get(key)

Return value bytes for key, or None when absent.

ShardReaderFactory

Bases: Protocol

Factory for opening one shard reader.

__call__(*, db_url, local_dir, checkpoint_id)

Construct an opened reader instance.

Credentials

Credential provider abstractions for S3 authentication.

Separates identity (who you are) from transport (how you connect). All dataclasses are frozen for immutability and picklability across Spark/Dask/Ray/multiprocessing boundaries.

CredentialProvider

Bases: Protocol

Pull-based credential resolver.

Implementations return an S3Credentials snapshot. The caller decides when to resolve (e.g. once at construction, or before each request for future rotation support).

EnvCredentialProvider dataclass

Credential provider that reads S3_* / AWS_* environment variables.

Env var precedence per field (first non-empty wins): - S3_ACCESS_KEY_ID / AWS_ACCESS_KEY_ID - S3_SECRET_ACCESS_KEY / AWS_SECRET_ACCESS_KEY - S3_SESSION_TOKEN / AWS_SESSION_TOKEN

S3Credentials dataclass

Resolved S3 identity — access key, secret, and optional session token.

StaticCredentialProvider dataclass

Credential provider with inline identity fields.

Flat fields (not composition) for ergonomic construction::

provider = StaticCredentialProvider(
    access_key_id="AKIA...",
    secret_access_key="wJal...",
)

materialize_env_file(creds)

Write credentials to a temporary .env file for SlateDB's object-store.

Returns the file path. The file is created with 0o600 permissions so only the current user can read it. Caller is responsible for calling :func:remove_env_file after use.

remove_env_file(path)

Safely delete a credentials env file created by :func:materialize_env_file.

resolve_env_file(env_file, credential_provider)

Create a context manager that resolves an env file for SlateDB.

Sharding Types

Shared sharding types used by both writer and reader paths.

KeyEncoding

Bases: str, Enum

Supported key serialization encodings.

from_value(value) classmethod

Parse an encoding value from enum or string input.

ShardingSpec dataclass

Configuration for mapping rows to shard database ids.

Two strategies are supported:

HASH (default): Uniform distribution via xxh3_64(canonical_bytes(key)) % num_dbs. Supports int, str, and bytes keys. Requires num_dbs > 0 (explicit or computed from max_keys_per_shard).

CEL: Flexible shard assignment via a CEL expression. The expression may return a dense integer shard id directly, or a categorical token resolved by exact match against routing_values. A built-in shard_hash() function (wrapping xxh3_64) is available in CEL expressions. num_dbs is always derived from routing metadata or discovered from data; it must not be provided explicitly.

to_manifest_dict()

Return manifest-safe representation (Spark callables omitted).

ShardingStrategy

Bases: str, Enum

Supported sharding strategies.

from_value(value) classmethod

Parse a strategy value from enum or string input.

validate_routing_values(routing_values, *, require_unique=True)

Validate categorical routing values are unique and type-consistent.

CEL Helpers

CEL (Common Expression Language) integration for flexible sharding.

Provides compile/evaluate/routing functions shared by all writer backends and the reader routing layer. Requires the cel extra: pip install shardyfusion[cel].

CEL routing supports three resolver modes:

  • direct integer mode: the expression returns the dense internal shard id
  • categorical mode: the expression returns a discrete token and routing_values resolves the shard id by exact match

A built-in shard_hash() function wrapping xxh3_64 is registered automatically.

Uses cel-expr-python (Google's C++ CEL wrapper) via: from cel_expr_python.cel import NewEnv, Type

CelColumn dataclass

Column specification for :func:cel_sharding_by_columns.

Parameters:

Name Type Description Default
name str

Column name as it appears in the DataFrame / record.

required
type CelType

CEL type of the column (default :attr:CelType.STRING).

STRING

CelType

Bases: str, Enum

Supported CEL column types for :func:cel_sharding_by_columns.

CompiledCel

Wrapper around a compiled CEL expression with its environment.

evaluate(context)

Evaluate the CEL expression with the given variable bindings.

Uses cel_expr_python's Expression.eval(data=...) method (a sandboxed CEL evaluator, NOT Python's built-in eval).

UnknownRoutingTokenError

Bases: ValueError

Raised when categorical CEL routing produces a token not in the snapshot.

build_categorical_routing_lookup(routing_values)

Build exact-match lookup for categorical CEL routing.

cel_sharding(expr, columns, *, routing_values=None)

Build a :class:ShardingSpec for an arbitrary CEL expression.

This is the general-purpose companion to :func:cel_sharding_by_columns. Use it when you want raw CEL control but do not want to construct ShardingSpec(strategy=ShardingStrategy.CEL, ...) manually.

Direct routing example::

cel_sharding("key % 4", {"key": "int"})

Parameters:

Name Type Description Default
expr str

CEL expression to evaluate for routing.

required
columns dict[str, str | CelType]

Mapping of column name to CEL type string or :class:CelType.

required
routing_values Sequence[RoutingValue] | None

Optional categorical routing values for exact-match mode.

None

Returns:

Name Type Description
A ShardingSpec

class:ShardingSpec with strategy CEL.

Raises:

Type Description
ConfigValidationError

If no columns are given or an unsupported CEL type is declared.

cel_sharding_by_columns(*columns, num_shards=None, separator=':')

Build a :class:ShardingSpec for CEL-based partitioning by column values.

Each column is either a bare name (str, defaults to :attr:CelType.STRING) or a :class:CelColumn instance. The generated CEL expression either hashes the column value(s) and takes modulo num_shards to produce a 0-based shard ID, or when num_shards is omitted, returns a categorical routing token whose distinct values are discovered from data at write time.

Single column example::

cel_sharding_by_columns("region", num_shards=10)
#  cel_expr="shard_hash(region) % 10u"
#   cel_columns={"region": "string"}

Typed column example::

cel_sharding_by_columns(CelColumn("tier", CelType.INT), num_shards=4)
# → cel_expr="shard_hash(tier) % 4u"
#   cel_columns={"tier": "int"}

Multiple columns example::

cel_sharding_by_columns("region", CelColumn("tier", CelType.INT), num_shards=8)
#  cel_expr='shard_hash(region + ":" + string(tier)) % 8u'
#   cel_columns={"region": "string", "tier": "int"}

Parameters:

Name Type Description Default
*columns str | CelColumn

One or more column specifications.

()
num_shards int | None

Target number of shards (embedded as % <N>u). If omitted, categorical routing values are inferred from data at write time.

None
separator str

Delimiter for multi-column concatenation (default ":").

':'

Returns:

Name Type Description
A ShardingSpec

class:ShardingSpec with strategy CEL.

Raises:

Type Description
ConfigValidationError

If no columns given or num_shards < 1.

compile_cel(expr, columns)

Compile a CEL expression with declared column types.

The shard_hash() custom function is automatically registered, providing overloads for int, uint, string, and bytes arguments.

Parameters:

Name Type Description Default
expr str

CEL expression string (e.g., "shard_hash(key) % 100u").

required
columns dict[str, str]

Map of variable name -> type string (e.g., {"key": "int", "region": "string"}).

required

Returns:

Type Description
CompiledCel

A compiled expression object that can be evaluated with variable bindings.

Raises:

Type Description
ImportError

If cel-expr-python is not installed.

ConfigValidationError

If the expression fails to compile.

evaluate_cel_arrow_batch(compiled, batch)

Evaluate CEL on each row of a PyArrow RecordBatch/Table.

Used by the Spark writer via mapInArrow (Arrow batches are the natural format). Dask and Ray writers route through route_key()route_cel() per row instead.

Parameters:

Name Type Description Default
compiled CompiledCel

Compiled CEL expression.

required
batch Any

PyArrow RecordBatch or Table.

required

Returns:

Type Description
list[int | str | bytes]

List of routing key values, one per row.

pandas_rows_to_contexts(pdf, cel_columns)

Convert pandas DataFrame rows to CEL context dicts.

Handles numpy scalar coercion. Shared by Spark and Dask writers.

resolve_cel_routing_key(routing_key, *, routing_values=None, lookup=None)

Resolve a CEL routing key to a dense internal shard id.

route_cel(compiled, context, routing_values=None, lookup=None)

Evaluate CEL expression and route to a shard.

Parameters:

Name Type Description Default
compiled CompiledCel

Compiled CEL expression.

required
context dict[str, Any]

Variable bindings for the expression.

required
routing_values Sequence[RoutingValue] | None

Optional categorical routing values. If provided, exact-match lookup gives the shard ID.

None

Returns:

Type Description
int

Shard db_id (0-based).

route_cel_batch(compiled, batch, routing_values=None)

Evaluate CEL and route each row to a shard.

If routing_values is provided, uses categorical exact-match lookup. Otherwise, the CEL output is used directly as the shard ID.

validate_cel_columns(arrow_schema, cel_columns)

Validate that declared CEL columns exist in an Arrow schema.

Sharding (Spark)

Sharding specs and Spark sharding helpers.

add_db_id_column(df, *, key_col, num_dbs, sharding)

Add deterministic db id column and return resolved sharding spec.

prepare_partitioned_rdd(df_with_db_id, *, num_dbs, key_col, sort_within_partitions)

Return pair RDD partitioned so partition index matches db id.

Sharding (Dask)

Dask-native sharding helpers.

add_db_id_column(ddf, *, key_col, num_dbs, sharding, key_encoding)

Add deterministic _shard_id column via Python routing function.

Uses the same route_key() as the reader, guaranteeing the sharding invariant without reimplementation.

For CEL sharding, builds per-row routing contexts from the CEL columns so that expressions referencing non-key columns evaluate correctly.

Sharding (Ray)

Ray Data-native sharding helpers.

add_db_id_column(ds, *, key_col, num_dbs, sharding, key_encoding)

Add deterministic _shard_id column via Python routing function.

Uses the same route_key() as the reader, guaranteeing the sharding invariant without reimplementation.

Uses Arrow batch format to avoid Arrow->pandas->Arrow round-trip. For CEL sharding, delegates to route_cel_batch() which evaluates the CEL expression with the full row context (supporting non-key columns).

Writer (Spark)

Public sharded snapshot writer entrypoint.

verify_routing_agreement(df_with_db_id, *, key_col, num_dbs, resolved_sharding, key_encoding, sample_size=20)

Sample rows and verify Spark-computed db_id matches Python routing.

Since all strategies now use Python-based mapInArrow, this is a safety net verifying the mapInArrow output against the reader's route_key() function.

write_one_shard_partition(db_id, rows_iter, runtime)

Write exactly one shard from one partition and yield one ShardAttemptResult.

write_sharded(df, config, *, key_col, value_spec, sort_within_partitions=False, spark_conf_overrides=None, cache_input=False, storage_level=None, max_writes_per_second=None, max_write_bytes_per_second=None, verify_routing=True)

Write a DataFrame into N independent sharded databases and publish manifest metadata.

Parameters:

Name Type Description Default
df DataFrame

PySpark DataFrame containing at least the key column and value column(s).

required
config WriteConfig

Write configuration (num_dbs, s3_prefix, sharding strategy, etc.).

required
key_col str

Name of the integer key column used for shard routing.

required
value_spec ValueSpec

Specifies how DataFrame rows are serialized to bytes (binary_col, json_cols, or a callable encoder).

required
sort_within_partitions bool

If True, sort rows by key within each partition before writing. Useful for range-scan workloads.

False
spark_conf_overrides dict[str, str] | None

Optional Spark config overrides applied for the duration of the write (restored after completion).

None
cache_input bool

If True, cache the input DataFrame before processing.

False
storage_level StorageLevel | None

Spark storage level for caching (default: MEMORY_AND_DISK).

None
max_writes_per_second float | None

Optional rate limit (token-bucket) for write ops/sec.

None
max_write_bytes_per_second float | None

Optional rate limit (token-bucket) for write bytes/sec.

None
verify_routing bool

If True (default), spot-check that Spark-assigned shard IDs match Python routing on a sample of written rows.

True

Returns:

Type Description
BuildResult

BuildResult with manifest reference, shard metadata, and build statistics.

Raises:

Type Description
ConfigValidationError

If configuration is invalid.

ShardAssignmentError

If rows cannot be assigned to valid shard IDs.

ShardCoverageError

If partition results don't cover all expected shards.

PublishManifestError

If manifest upload to S3 fails.

PublishCurrentError

If CURRENT pointer upload fails (manifest already published).

Writer (Dask)

Dask-based sharded writer (no Spark/Java dependency).

write_sharded(ddf, config, *, key_col, value_spec, sort_within_partitions=False, max_writes_per_second=None, max_write_bytes_per_second=None, verify_routing=True)

Write a Dask DataFrame into N independent sharded databases and publish manifest.

Parameters:

Name Type Description Default
ddf DataFrame

Dask DataFrame containing at least the key column and value column(s).

required
config WriteConfig

Write configuration (num_dbs, s3_prefix, sharding strategy, etc.). HASH and CEL sharding strategies are supported.

required
key_col str

Name of the key column used for shard routing.

required
value_spec ValueSpec

Specifies how DataFrame rows are serialized to bytes (binary_col, json_cols, or a callable encoder).

required
sort_within_partitions bool

If True, sort rows by key within each partition.

False
max_writes_per_second float | None

Optional rate limit (token-bucket) for write ops/sec.

None
max_write_bytes_per_second float | None

Optional rate limit (token-bucket) for write bytes/sec.

None
verify_routing bool

If True (default), spot-check that Dask-assigned shard IDs match Python routing on a sample of written rows.

True

Returns:

Type Description
BuildResult

BuildResult with manifest reference, shard metadata, and build statistics.

Raises:

Type Description
ConfigValidationError

If configuration is invalid.

ShardAssignmentError

If rows cannot be assigned to valid shard IDs.

ShardCoverageError

If partition results don't cover all expected shards.

PublishManifestError

If manifest upload to S3 fails.

PublishCurrentError

If CURRENT pointer upload fails (manifest already published).

Writer (Ray)

Ray Data-based sharded writer (no Spark/Java dependency).

write_sharded(ds, config, *, key_col, value_spec, sort_within_partitions=False, max_writes_per_second=None, max_write_bytes_per_second=None, verify_routing=True)

Write a Ray Dataset into N independent sharded databases and publish manifest.

Parameters:

Name Type Description Default
ds Dataset

Ray Dataset containing at least the key column and value column(s).

required
config WriteConfig

Write configuration (num_dbs, s3_prefix, sharding strategy, etc.). HASH and CEL sharding strategies are supported.

required
key_col str

Name of the key column used for shard routing.

required
value_spec ValueSpec

Specifies how DataFrame rows are serialized to bytes (binary_col, json_cols, or a callable encoder).

required
sort_within_partitions bool

If True, sort rows by key within each partition.

False
max_writes_per_second float | None

Optional rate limit (token-bucket) for write ops/sec.

None
max_write_bytes_per_second float | None

Optional rate limit (token-bucket) for write bytes/sec.

None
verify_routing bool

If True (default), spot-check that Ray-assigned shard IDs match Python routing on a sample of written rows.

True

Returns:

Type Description
BuildResult

BuildResult with manifest reference, shard metadata, and build statistics.

Raises:

Type Description
ConfigValidationError

If configuration is invalid.

ShardAssignmentError

If rows cannot be assigned to valid shard IDs.

ShardCoverageError

If partition results don't cover all expected shards.

PublishManifestError

If manifest upload to S3 fails.

PublishCurrentError

If CURRENT pointer upload fails (manifest already published).

Writer (Python)

Pure-Python iterator-based sharded writer (no Spark dependency).

write_sharded(records, config, *, key_fn, value_fn, columns_fn=None, parallel=False, max_queue_size=100, max_parallel_shared_memory_bytes=_DEFAULT_MAX_PARALLEL_SHARED_MEMORY_BYTES, max_parallel_shared_memory_bytes_per_worker=_DEFAULT_MAX_PARALLEL_SHARED_MEMORY_BYTES_PER_WORKER, max_writes_per_second=None, max_write_bytes_per_second=None, max_total_batched_items=None, max_total_batched_bytes=None)

Write an iterable of records into N sharded databases.

Parameters:

Name Type Description Default
records Iterable[T]

Iterable of records to write. Each record is passed to key_fn and value_fn to extract the key and value bytes.

required
config WriteConfig

Write configuration (num_dbs, s3_prefix, sharding strategy, etc.). HASH and CEL sharding strategies are supported.

required
key_fn Callable[[T], KeyInput]

Callable that extracts the routing key from each record.

required
value_fn Callable[[T], bytes]

Callable that serializes each record to value bytes.

required
parallel bool

If True, use one subprocess per shard (multiprocessing.spawn). If False (default), all shard adapters are open simultaneously in a single process.

False
max_queue_size int

Maximum per-shard queue depth in parallel mode. In shared-memory parallel mode this bounds control messages, not payload bytes.

100
max_parallel_shared_memory_bytes int | None

Global cap on outstanding shared-memory payload bytes in parallel mode. When exceeded, the parent blocks until workers reclaim segments. None disables the global cap.

_DEFAULT_MAX_PARALLEL_SHARED_MEMORY_BYTES
max_parallel_shared_memory_bytes_per_worker int | None

Per-worker cap on outstanding shared-memory payload bytes in parallel mode. When exceeded, the parent blocks until that worker reclaims segments. None disables the per-worker cap.

_DEFAULT_MAX_PARALLEL_SHARED_MEMORY_BYTES_PER_WORKER
max_writes_per_second float | None

Optional rate limit (token-bucket) for write ops/sec.

None
max_write_bytes_per_second float | None

Optional rate limit (token-bucket) for write bytes/sec.

None
max_total_batched_items int | None

Global cap on total buffered items across all shard batches (single-process mode only). When exceeded, the shard with the largest batch is flushed. Prevents OOM with many shards.

None
max_total_batched_bytes int | None

Global cap on total buffered bytes across all shard batches (single-process mode only). When exceeded, the shard with the largest byte footprint is flushed.

None

Returns:

Type Description
BuildResult

BuildResult with manifest reference, shard metadata, and build statistics.

Raises:

Type Description
ConfigValidationError

If configuration is invalid.

ShardCoverageError

If partition results don't cover all expected shards.

PublishManifestError

If manifest upload to S3 fails.

PublishCurrentError

If CURRENT pointer upload fails (manifest already published).

ShardyfusionError

If a worker process fails in parallel mode.

Manifest Models and Builders

Manifest models and extensibility protocols.

BuildDurations dataclass

Measured phase durations in milliseconds.

BuildResult dataclass

Result from write_sharded.

BuildStats dataclass

Deterministic build statistics with fixed schema.

CurrentPointer

Bases: BaseModel

JSON pointer to the latest published manifest.

ManifestArtifact dataclass

Serializable artifact to publish for manifests or pointers.

ManifestRef dataclass

Backend-agnostic pointer to a published manifest.

ManifestShardingSpec

Bases: BaseModel

Manifest-safe sharding specification (no Callable fields).

This is the Pydantic counterpart of :class:ShardingSpec for serialization and deserialization in manifests.

ParsedManifest

Bases: BaseModel

Typed representation of a parsed manifest payload.

RequiredBuildMeta

Bases: BaseModel

Library-owned metadata required for all manifests.

RequiredShardMeta

Bases: BaseModel

Winner shard metadata emitted by writer partitions.

SqliteManifestBuilder

Manifest builder emitting a SQLite database.

The database contains two tables:

  • build_meta — single row with :class:RequiredBuildMeta fields. Nested sharding spec and custom fields stored as JSON TEXT.
  • shards — one row per non-empty shard, indexed by db_id. min_key/max_key stored as JSON to preserve type info. writer_info stored as JSON TEXT.

The serialized database is returned as the payload of a :class:ManifestArtifact with content type application/x-sqlite3. Uses :meth:sqlite3.Connection.serialize (Python 3.11+) for zero-copy in-memory serialization.

WriterInfo dataclass

Per-attempt metadata from the writer framework.

Manifest Store and Storage

Unified manifest persistence protocol and implementations.

ManifestStore replaces the previous ManifestPublisher + ManifestReader pair with a single interface that accepts Pydantic models directly. S3ManifestStore implements two-phase publish internally; database backends can use a single atomic transaction.

InMemoryManifestStore

In-memory manifest store for tests — replaces InMemoryPublisher.

ManifestStore

Bases: Protocol

Unified protocol for persisting and loading manifests.

list_manifests(*, limit=10)

Return up to limit manifests in reverse chronological order.

load_current()

Return the current manifest pointer, or None if not published.

load_manifest(ref)

Load and parse a manifest by reference.

publish(*, run_id, required_build, shards, custom)

Persist manifest + update current pointer. Return a manifest reference.

set_current(ref)

Update the current pointer to the given manifest reference.

S3ManifestStore

Manifest store backed by S3 — merges publish and read in one class.

Publish is two-phase: manifest file first, then _CURRENT pointer. Manifest keys are timestamp-prefixed for chronological listing.

SqliteShardLookup

Lazy shard metadata provider backed by a SQLite manifest connection.

Satisfies the :class:~shardyfusion.routing.ShardLookup protocol. Queries the shards table by db_id (B-tree index) and caches results in an LRU dict with bounded size.

Parameters

con: An open sqlite3.Connection to a SQLite manifest database. The caller owns the connection lifetime; this class does not close it. num_dbs: Total number of shards (from RequiredBuildMeta.num_dbs). cache_size: Maximum number of shard entries to cache. 0 disables caching.

get_shard(db_id)

Return shard metadata for db_id, or a synthetic empty entry.

load_sqlite_build_meta(con)

Read RequiredBuildMeta and custom fields from an open SQLite manifest.

Unlike :func:parse_sqlite_manifest (which takes raw bytes and loads all shards), this reads only the build_meta row — a single small query, ideal for lazy-router initialization.

Returns (required_build, custom_fields).

parse_current_pointer_to_ref(payload)

Parse a raw _CURRENT JSON payload into a ManifestRef.

parse_manifest_dir_entry(dir_name, s3_prefix, manifest_name)

Parse a timestamp-prefixed directory name into a ManifestRef, or None.

parse_manifest_payload(payload)

Parse a manifest payload.

The payload must be a serialized SQLite database (starts with the 16-byte SQLite magic header). Raises :class:ManifestParseError for unrecognised formats.

parse_sqlite_manifest(payload)

Parse a SQLite manifest payload into typed ParsedManifest.

Opens the serialized SQLite database in memory via :meth:sqlite3.Connection.deserialize, reads build_meta and shards tables, and validates identically to :func:parse_manifest.

Async manifest store protocol and implementations.

AsyncManifestStore is the read-only async counterpart of ManifestStore. AsyncS3ManifestStore uses aiobotocore for native async S3 I/O.

AsyncManifestStore

Bases: Protocol

Read-only async manifest loading (no publish — that's writer-side).

AsyncS3ManifestStore

Native async S3 manifest store using aiobotocore.

Each load_current() / load_manifest() call creates a short-lived S3 client via the session context manager. These calls are infrequent (init + refresh only), so per-call clients are simple and safe.

Database-backed ManifestStore implementation (PostgreSQL).

Stores manifests in normalized _builds + _shards tables and tracks the current pointer via an append-only _pointer table.

No external dependencies beyond a DB-API 2 driver (user-provided psycopg2).

PostgresManifestStore

PostgreSQL manifest store with normalized builds + shards tables.

S3 small-object helpers used by the default publisher.

create_s3_client(credentials=None, connection_options=None)

Create boto3 S3 client with optional explicit credentials and connection options.

delete_prefix(prefix_url, *, s3_client=None, metrics_collector=None, retry_config=None)

Delete all objects under an S3 prefix. Returns the number of objects deleted.

When retry_config is None (the default), failures are logged but not raised (best-effort). When a retry_config is provided, each list+delete page is wrapped in _retry_s3_operation() and transient errors are retried; non-transient errors propagate.

get_bytes(url, *, s3_client=None, metrics_collector=None, retry_config=None)

Read object bytes from S3 URL with automatic retry on transient errors.

join_s3(base, *parts)

Join S3 URL path segments, stripping redundant slashes.

list_prefixes(prefix_url, *, s3_client=None, retry_config=None)

List immediate child prefixes under an S3 prefix (CommonPrefixes).

Returns full S3 URLs for each child prefix, sorted lexicographically. Uses manual pagination with per-page retry for resilience against transient S3 errors mid-pagination.

parse_s3_url(url)

Parse s3://bucket/key into (bucket, key).

put_bytes(url, payload, content_type, headers=None, *, s3_client=None, metrics_collector=None, retry_config=None)

PUT bytes to S3 URL with automatic retry on transient S3 errors.

try_get_bytes(url, *, s3_client=None, metrics_collector=None, retry_config=None)

Read object bytes and return None when object is not found.

Transient S3 errors (throttling, timeouts) are retried automatically.

Routing and Reader

Snapshot routing helpers for sharded manifests.

ShardLookup

Bases: Protocol

Protocol for shard metadata access — eager (list) or lazy (SQLite).

get_shard(db_id)

Return metadata for a shard by db_id.

Must return a synthetic empty-shard entry (db_url=None, row_count=0) for db_ids that have no data.

SnapshotRouter

Route point lookups to a shard database id using manifest sharding metadata.

Operates in two modes:

  • Eager (default __init__): preloads all shards into a list. Best for small-to-medium shard counts (up to ~10 K).
  • Lazy (:meth:from_build_meta): delegates shard lookups to a :class:ShardLookup provider (e.g. backed by SQLite with range reads). Constant memory regardless of shard count.

Both modes expose :meth:get_shard for uniform shard access.

is_lazy property

True when this router uses lazy shard lookup (no preloaded list).

from_build_meta(required_build, *, shard_lookup) classmethod

Create a router with lazy shard lookup (no preloaded shard list).

Use this when the manifest has too many shards to load into memory. The shard_lookup provider supplies :class:RequiredShardMeta on demand (e.g. via indexed SQLite queries).

.. note::

router.shards is an empty list in lazy mode. Code that iterates over all shards must use the lookup provider directly or switch to :meth:get_shard.

get_shard(db_id)

Look up shard metadata by db_id.

Works in both eager (list) and lazy (lookup) mode. Returns a synthetic empty-shard entry for db_ids with no data.

group_keys(keys, *, routing_context=None)

Group keys by routed db id while preserving order within each shard bucket.

group_keys_allow_missing(keys, *, routing_context=None)

Group keys by routed db id, returning unroutable categorical keys separately.

route(key, *, routing_context=None)

Route a key, using routing_context for CEL multi-column mode if provided.

route_with_context(routing_context)

Route using CEL with explicit routing context.

In multi-column mode, the CEL expression evaluates over multiple columns from the routing context, not just the key.

canonical_bytes(key)

Convert a key to its canonical byte representation for hashing.

  • int → 8-byte signed little-endian (range [-2^63, 2^63-1])
  • str → UTF-8 encoded bytes
  • bytes / bytearray → passed through as-is

xxh3_db_id(key, num_dbs)

Route a key to a shard db_id using xxh3_64.

xxh3_digest(key) % num_dbs

xxh3_digest(key)

Compute the xxh3_64 digest of a key's canonical bytes.

This is the single authoritative hash step used by both HASH routing (xxh3_db_id) and the CEL shard_hash() function.

Service-side sharded reader — non-thread-safe variant.

ShardedReader

Bases: _BaseShardedReader

Non-thread-safe sharded reader for single-threaded services.

Provides the same get / multi_get / refresh API as ConcurrentShardedReader but without locks or reference counting.

health(*, staleness_threshold=None)

Return a diagnostic snapshot of reader state.

reader_for_key(key)

Return a borrowed read handle for the shard a key routes to.

readers_for_keys(keys)

Return a mapping of keys to borrowed read handles.

route_key(key, *, routing_context=None)

Return the shard db_id a key would route to.

shard_details()

Return per-shard metadata from the current manifest.

shard_for_key(key, *, routing_context=None)

Return shard metadata for the shard a key routes to.

shards_for_keys(keys)

Return a mapping of keys to their shard metadata.

Async sharded reader for asyncio-based services (FastAPI, aiohttp, etc.).

AsyncShardReaderHandle

Borrowed async reference to a single shard's reader.

Supports async with and explicit close(). Closing releases the borrow (decrements refcount) but does not close the underlying shard.

close()

Release the borrow. Safe to call multiple times.

AsyncShardedReader

Async sharded reader for single-threaded asyncio services.

Uses asyncio.Lock for refresh serialization and native async SlateDB calls on the read path.

health(*, staleness_threshold=None)

Return a diagnostic snapshot of reader state.

open(*, s3_prefix, local_root, manifest_store=None, current_pointer_key='_CURRENT', reader_factory=None, slate_env_file=None, credential_provider=None, s3_connection_options=None, max_concurrency=None, max_fallback_attempts=3, metrics_collector=None, rate_limiter=None) async classmethod

Create and initialize an async sharded reader.

AsyncSlateDbReaderFactory dataclass

Default async factory using SlateDBReader.open_async().

Serde

Key/value serialization utilities.

KeyEncoder = Callable[[object], bytes] module-attribute

Callable that encodes a key into bytes for SlateDB.

ValueSpec dataclass

Value serialization strategy for a Spark row.

binary_col(col_name) classmethod

Use one column directly as bytes payload.

callable_encoder(fn) classmethod

Use a custom callable encoder.

encode(row)

Encode one row into value bytes.

json_cols(cols=None) classmethod

Encode selected columns (or full row) as UTF-8 JSON.

make_key_encoder(encoding)

Return a key encoder for the given encoding.

Resolves the encoding branch once at setup time so hot loops can call the returned function without per-key dispatch.