Adding a writer¶
shardyfusion ships four writer flavors: Spark, Dask, Ray, and Python (single-process / multi-process). Each is a thin orchestration layer over _writer_core. This page describes the pattern so you can add a fifth (e.g. Beam, Flink, plain concurrent.futures).
Before starting, read:
architecture/writer-core.md— the shared shard-attempt pipeline.architecture/sharding.md— how rows are routed.architecture/manifest-and-current.md— two-phase publish.
What "adding a writer" means¶
A writer is responsible for:
- Distributing records to shards — using the chosen
ShardingStrategy. - Driving
_writer_coreper shard attempt — open adapter, write batches, checkpoint, reportShardAttemptResult. - Selecting winners — call
select_winnersonce all attempts are reported. - Building and publishing the manifest — call
assemble_build_resultthenpublish_to_store. - Cleaning up losers — call
cleanup_losers. - Run-registry lifecycle —
RunRecordLifecycle.start(...)context manager bookends the whole thing.
Steps 2–6 are framework-agnostic and live in _writer_core. Step 1 is what makes each writer different.
Anatomy¶
shardyfusion/writer/<flavor>/
├── __init__.py
├── writer.py # public entry points: write_hash_sharded() / write_cel_sharded()
├── sharding.py # framework-specific row distribution
└── (helpers)
Read shardyfusion/writer/python/writer.py first — it's the simplest and uses no framework primitives.
Pattern¶
1. Public entry point¶
The signature should be familiar to users of the framework. Provide split entry points by sharding strategy — one for HASH and one for CEL — each accepting the corresponding concrete config type. Framework-specific knobs live on the function signature.
def write_hash_sharded(
records: <framework-native collection>,
config: HashShardedWriteConfig,
*,
key_fn: Callable[[T], KeyInput],
value_fn: Callable[[T], bytes],
columns_fn: Callable[[T], dict[str, Any]] | None = None,
vector_fn: Callable[[T], tuple[int | str, Any, dict[str, Any] | None]] | None = None,
# framework-specific knobs ...
) -> BuildResult: ...
def write_cel_sharded(
records: <framework-native collection>,
config: CelShardedWriteConfig,
*,
key_fn: Callable[[T], KeyInput],
value_fn: Callable[[T], bytes],
columns_fn: Callable[[T], dict[str, Any]] | None = None,
vector_fn: Callable[[T], tuple[int | str, Any, dict[str, Any] | None]] | None = None,
# framework-specific knobs ...
) -> BuildResult: ...
Spark uses a DataFrame; Dask uses a Bag/DataFrame; Ray uses Datasets; Python uses an iterable. The internal pipeline is the same.
2. Routing¶
Use route_hash(key, num_dbs=num_dbs, hash_algorithm=sharding.hash_algorithm) (for HashShardingSpec) or route_cel(key, cel_expr=..., cel_columns=..., routing_values=..., routing_context=...) (for CelShardingSpec) from _writer_core for the per-row routing decision. Don't duplicate the HASH/CEL logic.
For frameworks that pre-partition (Spark, Dask), partition by the routing function then hand each partition to a worker that runs the per-shard write loop. For frameworks without pre-partitioning (Ray Datasets, plain Python), groupby on the computed shard id.
3. Per-shard write loop¶
For each shard attempt:
- Construct the adapter via
config.adapter_factory(db_url=..., local_dir=...). - Buffer records into batches of
config.batch_size. adapter.write_batch(pairs)per batch.- After all records:
adapter.flush(), thenadapter.seal(). Generate the shard'scheckpoint_idviashardyfusion._checkpoint_id.generate_checkpoint_id()(an opaqueuuid4().hex). Adapters do not return a checkpoint id; the writer stamps one and stores it onRequiredShardMeta.checkpoint_id. - Report a
ShardAttemptResult(db_id, attempt index, row count, byte count, min/max keys, db_url, generatedcheckpoint_id).
4. Winner selection + publish¶
Once all attempts have reported (success or failure):
winners = select_winners(attempts)
build = assemble_build_result(...)
manifest_ref = publish_to_store(build, manifest_options)
cleanup_losers(attempts, winners, ...)
Wrap everything in with RunRecordLifecycle.start(config=..., run_id=..., writer_type=...) as run_record: so the run-registry lifecycle (start → set_manifest_ref → mark_succeeded/mark_failed → close) is correctly emitted.
5. Retries¶
Per-shard retry is opt-in via config.shard_retry. Implement it as: catch exceptions inside the per-shard worker, increment the attempt counter, write under a fresh db_url, report the new attempt. Winner selection naturally picks the latest successful attempt because the sort key is (attempt, ...).
The Python parallel writer file-spool pattern (_parallel_writer.py:1405) is a reference implementation.
Sharding extras¶
If the framework needs framework-specific sharding helpers (e.g. Spark repartitionByRange), put them in writer/<flavor>/sharding.py. The Spark writer's sharding.py is the canonical example.
Optional dependency gating¶
The framework dependency goes behind an extra:
# pyproject.toml
[project.optional-dependencies]
writer-myframework = [
"shardyfusion[slatedb]",
"myframework>=2.0",
]
writer-myframework-sqlite = [
"shardyfusion[sqlite]",
"myframework>=2.0",
]
[dependency-groups]
cap-writer-myframework = ["myframework>=2.0"]
Import the framework lazily inside the writer module — never at the top of shardyfusion/__init__.py's import graph.
Tests¶
| Layer | What to test |
|---|---|
| Unit | Routing decisions on synthetic input; uses FakeSlateDbAdapter. |
| Unit | Backpressure / rate-limit knobs work as documented. |
| Integration | End-to-end against moto S3 with real adapters. |
| E2E | Garage S3 via tests/e2e/writer/. |
| Property | tests/unit/writer/core/test_routing_contract.py must still pass. |
Add py{311,312,313}-myframework-slatedb-{unit,integration,e2e} envs to tox.ini.
Documentation¶
Per adding-a-use-case.md, add at minimum:
docs/use-cases/kv-storage/build/myframework.md(covering both SlateDB and SQLite backends in one page)
Update architecture/writer-core.md if you introduce a new shared primitive.
Common mistakes¶
- Re-implementing
route_hashorroute_cel. Always reuse_writer_core.route_hashand_writer_core.route_cel. - Forgetting
RunRecordLifecycle.start(...). Run records won't be written; loser cleanup deferred from a previous run can't progress. - Top-level framework import. Breaks the base install.
- Adding scalar extraction parameters to public writer functions. Keep public signatures to
(data, config, input, options=None)and put extraction details inPythonRecordInput,ColumnWriteInput, orVectorColumnInput. - Adding new fields to
HashShardedWriteConfigorCelShardedWriteConfigfor one framework. Framework knobs go in the matching*WriteOptions; shared fields go in the nested public config groups.
See also¶
architecture/writer-core.md.architecture/retry-and-cleanup.md.adding-an-adapter.md— the related "add a backend" workflow.history/design-decisions/adr-004-consistent-writer-retry.md(after Phase 6).