Skip to content

Manifest Stores

Manifest stores handle the persistence and retrieval of snapshot manifests — the JSON metadata that describes which shards exist, where they live, and how to route keys to them.

Overview

Every write pipeline ends with two publish steps: (1) persist the manifest payload, (2) update the current pointer. Every reader starts by loading the current pointer and dereferencing the manifest it points to. The ManifestStore protocol abstracts this lifecycle so you can swap between S3, a relational database, or an in-memory store for testing.

flowchart LR
    W["Writer"] -->|"publish()"| MS["ManifestStore"]
    MS -->|"load_current()"| R["Reader"]
    MS -->|"load_manifest(ref)"| R
    MS -->|"list_manifests()"| R
    MS -->|"set_current(ref)"| CLI["CLI (rollback)"]

ManifestStore Protocol

The full read/write protocol used by writers and the CLI:

class ManifestStore(Protocol):
    def publish(
        self,
        *,
        run_id: str,
        required_build: RequiredBuildMeta,
        shards: list[RequiredShardMeta],
        custom: dict[str, Any],
    ) -> str:
        """Persist manifest + update current pointer. Returns a manifest reference."""

    def load_current(self) -> ManifestRef | None:
        """Return the current manifest pointer, or None if not published."""

    def load_manifest(self, ref: str) -> ParsedManifest:
        """Load and parse a manifest by reference."""

    def list_manifests(self, *, limit: int = 10) -> list[ManifestRef]:
        """Return up to *limit* manifests in reverse chronological order."""

    def set_current(self, ref: str) -> None:
        """Update the current pointer to the given manifest reference."""

ManifestRef is the backend-agnostic handle returned by load_current() and list_manifests():

@dataclass(slots=True, frozen=True)
class ManifestRef:
    ref: str              # backend-specific reference (S3 key, DB row ID, etc.)
    run_id: str           # the run_id that produced this manifest
    published_at: datetime

AsyncManifestStore Protocol

A read-only async subset for use with AsyncShardedReader:

class AsyncManifestStore(Protocol):
    async def load_current(self) -> ManifestRef | None: ...
    async def load_manifest(self, ref: str) -> ParsedManifest: ...
    async def list_manifests(self, *, limit: int = 10) -> list[ManifestRef]: ...

No publish() or set_current() — manifest publishing is always a writer-side (synchronous) operation.

S3ManifestStore

The default implementation. Manifests are stored as SQLite database files in S3; the current pointer is a separate _CURRENT JSON object.

S3 Object Layout

s3://bucket/prefix/
├── manifests/
│   ├── 2026-03-14T10:30:00.000000Z_run_id=abc123/
│   │   └── manifest          ← manifest SQLite payload
│   ├── 2026-03-13T08:00:00.000000Z_run_id=def456/
│   │   └── manifest
│   └── ...
└── _CURRENT                   ← current pointer JSON

Manifest S3 keys are timestamp-prefixed for chronological listing via S3 CommonPrefixes.

Constructor

from shardyfusion import S3ManifestStore, RetryConfig

store = S3ManifestStore(
    s3_prefix="s3://bucket/prefix",
    manifest_name="manifest",                      # default
    current_name="_CURRENT",                       # default
    credential_provider=None,                      # optional CredentialProvider
    s3_connection_options=None,                    # optional S3ConnectionOptions
    metrics_collector=None,                        # optional MetricsCollector
    retry_config=RetryConfig(                      # optional, controls S3 retries
        max_retries=3,                             # default
        initial_backoff=timedelta(seconds=1.0),    # default
        backoff_multiplier=2.0,                    # default
    ),
)

Retry Behavior

S3 operations use exponential backoff for transient errors. The default retry sequence is 1s → 2s → 4s (3 attempts). Customize via RetryConfig:

store = S3ManifestStore(
    s3_prefix="s3://bucket/prefix",
    retry_config=RetryConfig(max_retries=5, initial_backoff=timedelta(seconds=0.5)),
)

Manifest Format

Manifests are serialized as SQLite databases (application/x-sqlite3) with two tables:

  • build_meta — single row with run metadata (run_id, num_dbs, s3_prefix, sharding as JSON, etc.)
  • shards — one row per non-empty shard (db_id, db_url, attempt, row_count, min_key/max_key as JSON, writer_info as JSON)

Use parse_manifest_payload() or parse_sqlite_manifest() to deserialize into a ParsedManifest object.

CURRENT Pointer Format

{
  "manifest_ref": "s3://bucket/prefix/manifests/2026-03-14T10:30:00.000000Z_run_id=abc123/manifest",
  "manifest_content_type": "application/x-sqlite3",
  "run_id": "abc123",
  "updated_at": "2026-03-14T10:30:01Z",
  "format_version": 1
}

AsyncS3ManifestStore

Native async S3 loading via aiobotocore. Used automatically by AsyncShardedReader when the read-async extra is installed.

from shardyfusion import AsyncS3ManifestStore

store = AsyncS3ManifestStore(
    s3_prefix="s3://bucket/prefix",
    manifest_name="manifest",           # default
    current_name="_CURRENT",            # default
    credential_provider=None,           # optional CredentialProvider
    s3_connection_options=None,         # optional S3ConnectionOptions
    metrics_collector=None,             # optional MetricsCollector
    retry_config=None,                  # optional RetryConfig
)

Creates a short-lived S3 client per operation (via aiobotocore session context manager). This is appropriate because manifest operations are infrequent — only at initialization and refresh.

from shardyfusion import AsyncShardedReader

# AsyncShardedReader uses AsyncS3ManifestStore by default
reader = await AsyncShardedReader.open(
    s3_prefix="s3://bucket/prefix",
    local_root="/tmp/reader",
)

# Or provide a custom async store
reader = await AsyncShardedReader.open(
    s3_prefix="s3://bucket/prefix",
    local_root="/tmp/reader",
    manifest_store=my_async_store,  # must implement AsyncManifestStore
)

Note

AsyncShardedReader only accepts AsyncManifestStore — not the sync ManifestStore. When manifest_store=None (default), it creates an AsyncS3ManifestStore internally.

Database Manifest Stores

For environments where S3 is not the primary metadata store, two database-backed implementations are available: PostgresManifestStore and Comdb2ManifestStore.

How They Work

Both use two tables:

erDiagram
    shardyfusion_manifests {
        text run_id PK
        jsonb payload
        text content_type
        timestamptz created_at
    }
    shardyfusion_pointer {
        timestamptz updated_at
        text manifest_ref
    }
    shardyfusion_pointer }o--|| shardyfusion_manifests : "references"
  • shardyfusion_manifests — stores manifest JSON payloads, one row per write
  • shardyfusion_pointer — append-only current-pointer tracking (one INSERT per publish() or set_current() call)

load_current() reads the newest pointer row. If the pointer table is empty, it falls back to the newest manifest row.

PostgresManifestStore

import psycopg2
from shardyfusion.db_manifest_store import PostgresManifestStore

store = PostgresManifestStore(
    connection_factory=lambda: psycopg2.connect("dbname=mydb"),
    table_name="shardyfusion_manifests",        # default
    pointer_table_name="shardyfusion_pointer",   # default
    ensure_table=True,                           # default: auto-create tables
)

When ensure_table=True (the default), tables are created on construction using this DDL:

-- Manifest table
CREATE TABLE IF NOT EXISTS shardyfusion_manifests (
    run_id TEXT PRIMARY KEY,
    payload JSONB NOT NULL,
    content_type TEXT NOT NULL,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

-- Pointer table (append-only)
CREATE TABLE IF NOT EXISTS shardyfusion_pointer (
    updated_at TIMESTAMPTZ DEFAULT NOW(),
    manifest_ref TEXT NOT NULL
);

CREATE INDEX IF NOT EXISTS idx_shardyfusion_pointer_updated_at
    ON shardyfusion_pointer (updated_at DESC);

Comdb2ManifestStore

Same interface, adapted for Comdb2's SQL dialect (uses TEXT instead of JSONB, CURRENT_TIMESTAMP instead of NOW()):

from shardyfusion.db_manifest_store import Comdb2ManifestStore

store = Comdb2ManifestStore(
    connection_factory=lambda: comdb2_connect("mydb"),
    ensure_table=True,
)

Usage with Writers and Readers

Pass a DB manifest store via ManifestOptions:

from shardyfusion import WriteConfig, ManifestOptions

config = WriteConfig(
    num_dbs=8,
    s3_prefix="s3://bucket/prefix",
    manifest=ManifestOptions(store=store),
)

# Reader
from shardyfusion import ConcurrentShardedReader

reader = ConcurrentShardedReader(
    s3_prefix="s3://bucket/prefix",
    local_root="/tmp/reader",
    manifest_store=store,
)

InMemoryManifestStore

A simple in-memory implementation for testing. No S3 or database required.

from shardyfusion import InMemoryManifestStore

store = InMemoryManifestStore()

# Use in tests
reader = ShardedReader(
    s3_prefix="s3://test/prefix",
    local_root="/tmp/test-reader",
    manifest_store=store,
)

Choosing a Backend

Feature S3ManifestStore DB (Postgres/Comdb2) InMemoryManifestStore
Production-ready Yes Yes No (testing only)
Async variant AsyncS3ManifestStore
Transactional writes No (eventual consistency) Yes (ACID) N/A
History/rollback Via S3 key listing Via pointer table queries Via in-memory list
Auto-DDL N/A ensure_table=True N/A
Retry support RetryConfig Connection-level N/A
Best for Most deployments Environments needing ACID metadata Unit tests