Observability¶
shardyfusion provides two observability channels: metrics (structured events emitted at lifecycle milestones) and structured logging (JSON-formatted log lines with contextual fields).
flowchart LR
subgraph Pipeline
W["Writer"]
R["Reader"]
S3["S3 / Storage"]
end
W -->|"MetricEvent"| MC["MetricsCollector"]
R -->|"MetricEvent"| MC
S3 -->|"S3_RETRY"| MC
MC --> Prom["PrometheusCollector"]
MC --> OTel["OtelCollector"]
MC --> Custom["Custom Collector"]
W -->|"log_event / log_failure"| Log["shardyfusion.* loggers"]
R -->|"log_event / log_failure"| Log
Log --> JF["JsonFormatter"]
Log --> SF["Standard Formatter"]
MetricsCollector Protocol¶
Implement the MetricsCollector protocol to receive structured metric events:
from shardyfusion.metrics import MetricEvent, MetricsCollector
class MyCollector:
def emit(self, event: MetricEvent, payload: dict) -> None:
... # handle the event
Pass the collector to writers or readers:
# Writer — via config or parameter
config = WriteConfig(num_dbs=8, s3_prefix="...", metrics_collector=collector)
# Reader
reader = ShardedReader(
s3_prefix="...", local_root="/tmp/reader",
metrics_collector=collector,
)
Implementation requirements:
- Must be thread-safe (called from multiple threads in concurrent reader)
- Called synchronously — buffer internally if blocking is a concern
- Should silently ignore unknown events for forward compatibility
MetricEvent Catalog¶
Writer Lifecycle¶
| Event | Payload | Description |
|---|---|---|
WRITE_STARTED |
elapsed_ms |
Write pipeline initiated |
SHARDING_COMPLETED |
elapsed_ms, duration_ms |
Shard assignment phase complete |
SHARD_WRITE_STARTED |
elapsed_ms, db_id |
Individual shard write begins |
SHARD_WRITE_COMPLETED |
elapsed_ms, db_id, row_count, duration_ms |
Individual shard write finishes |
SHARD_WRITES_COMPLETED |
elapsed_ms, duration_ms, rows_written |
All shards written |
BATCH_WRITTEN |
elapsed_ms, db_id, batch_size |
Single batch flushed to adapter |
MANIFEST_PUBLISHED |
elapsed_ms |
Manifest uploaded |
CURRENT_PUBLISHED |
elapsed_ms |
CURRENT pointer updated |
WRITE_COMPLETED |
elapsed_ms, rows_written |
Entire write pipeline done |
Reader Lifecycle¶
| Event | Payload | Description |
|---|---|---|
READER_INITIALIZED |
(empty) | Reader constructed and state loaded |
READER_GET |
duration_ms, found |
Single key lookup completed |
READER_MULTI_GET |
duration_ms, num_keys |
Multi-key lookup completed |
READER_REFRESHED |
changed |
Refresh attempt (changed=True if new snapshot loaded) |
READER_CLOSED |
num_handles |
Reader closed |
Writer Retry¶
| Event | Payload | Description |
|---|---|---|
SHARD_WRITE_RETRIED |
elapsed_ms, db_id, attempt, backoff_s |
Shard write failed with a retryable error; retrying after backoff |
SHARD_WRITE_RETRY_EXHAUSTED |
elapsed_ms, db_id, attempts |
All shard write retry attempts failed |
Note
Writer retry events are only emitted when WriteConfig.shard_retry is configured. That currently covers Dask/Ray sharded writes, Spark/Dask/Ray write_single_db(), and Python parallel writes. Spark sharded writes still rely on Spark task retry/speculation instead. The built-in Prometheus and OTel collectors do not yet instrument these events — custom collectors can handle them via the standard emit() protocol.
Infrastructure¶
| Event | Payload | Description |
|---|---|---|
S3_RETRY |
attempt, max_retries, delay_s |
S3 operation being retried |
S3_RETRY_EXHAUSTED |
attempts |
All S3 retry attempts failed |
RATE_LIMITER_THROTTLED |
wait_seconds |
Rate limiter imposed a wait before acquiring tokens |
RATE_LIMITER_DENIED |
tokens_requested |
Non-blocking try_acquire() could not satisfy the request |
Built-in Collectors¶
PrometheusCollector¶
Maps MetricEvents to Prometheus counters and histograms.
Install:
pip install shardyfusion[metrics-prometheus]
# or: uv sync --extra metrics-prometheus
Usage:
from shardyfusion.metrics.prometheus import PrometheusCollector
# Uses the default global registry
collector = PrometheusCollector()
# Or use a custom registry (recommended for tests)
from prometheus_client import CollectorRegistry
registry = CollectorRegistry()
collector = PrometheusCollector(registry=registry, prefix="shardyfusion_")
Constructor parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
registry |
CollectorRegistry \| None |
None (global REGISTRY) |
Prometheus registry to register instruments with |
prefix |
str |
"shardyfusion_" |
Prefix for all metric names |
Instruments:
| Prometheus Name | Type | Source Event | Description |
|---|---|---|---|
shardyfusion_writes_started_total |
Counter | WRITE_STARTED |
Write pipelines initiated |
shardyfusion_writes_completed_total |
Counter | WRITE_COMPLETED |
Write pipelines completed |
shardyfusion_write_duration_seconds |
Histogram | WRITE_COMPLETED |
Total write pipeline duration |
shardyfusion_shard_writes_completed_total |
Counter | SHARD_WRITE_COMPLETED |
Individual shard writes completed |
shardyfusion_shard_write_duration_seconds |
Histogram | SHARD_WRITE_COMPLETED |
Per-shard write duration |
shardyfusion_batches_written_total |
Counter | BATCH_WRITTEN |
Batches flushed to adapters |
shardyfusion_reader_gets_total |
Counter | READER_GET |
Single key lookups |
shardyfusion_reader_get_duration_seconds |
Histogram | READER_GET |
Single key lookup duration |
shardyfusion_reader_multi_gets_total |
Counter | READER_MULTI_GET |
Multi-key lookups |
shardyfusion_reader_multi_get_duration_seconds |
Histogram | READER_MULTI_GET |
Multi-key lookup duration |
shardyfusion_s3_retries_total |
Counter | S3_RETRY |
S3 retries |
shardyfusion_s3_retries_exhausted_total |
Counter | S3_RETRY_EXHAUSTED |
S3 retry exhaustions |
shardyfusion_rate_limiter_throttled_total |
Counter | RATE_LIMITER_THROTTLED |
Rate limiter throttle events |
shardyfusion_rate_limiter_wait_seconds |
Histogram | RATE_LIMITER_THROTTLED |
Rate limiter wait duration |
shardyfusion_rate_limiter_denied_total |
Counter | RATE_LIMITER_DENIED |
Non-blocking acquire denials |
Duration payloads are converted from milliseconds to seconds (ms / 1000.0).
Test Isolation
Always pass a dedicated CollectorRegistry in tests to avoid polluting the global registry and causing Duplicated timeseries errors across test runs.
OtelCollector¶
Maps MetricEvents to OpenTelemetry counters and histograms.
Install:
pip install shardyfusion[metrics-otel]
# or: uv sync --extra metrics-otel
Usage:
from shardyfusion.metrics.otel import OtelCollector
# Uses the global MeterProvider
collector = OtelCollector()
# Or use a custom MeterProvider (recommended for tests)
from opentelemetry.sdk.metrics import MeterProvider
provider = MeterProvider()
collector = OtelCollector(meter_provider=provider, meter_name="shardyfusion")
Constructor parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
meter_provider |
MeterProvider \| None |
None (global provider) |
OTel MeterProvider for instrument creation |
meter_name |
str |
"shardyfusion" |
Meter name for instrument grouping |
Instruments:
| OTel Name | Type | Unit | Source Event |
|---|---|---|---|
shardyfusion.writes_started |
Counter | — | WRITE_STARTED |
shardyfusion.writes_completed |
Counter | — | WRITE_COMPLETED |
shardyfusion.write_duration |
Histogram | s |
WRITE_COMPLETED |
shardyfusion.shard_writes_completed |
Counter | — | SHARD_WRITE_COMPLETED |
shardyfusion.shard_write_duration |
Histogram | s |
SHARD_WRITE_COMPLETED |
shardyfusion.batches_written |
Counter | — | BATCH_WRITTEN |
shardyfusion.reader_gets |
Counter | — | READER_GET |
shardyfusion.reader_get_duration |
Histogram | s |
READER_GET |
shardyfusion.reader_multi_gets |
Counter | — | READER_MULTI_GET |
shardyfusion.reader_multi_get_duration |
Histogram | s |
READER_MULTI_GET |
shardyfusion.s3_retries |
Counter | — | S3_RETRY |
shardyfusion.s3_retries_exhausted |
Counter | — | S3_RETRY_EXHAUSTED |
shardyfusion.rate_limiter_throttled |
Counter | — | RATE_LIMITER_THROTTLED |
shardyfusion.rate_limiter_wait |
Histogram | s |
RATE_LIMITER_THROTTLED |
shardyfusion.rate_limiter_denied |
Counter | — | RATE_LIMITER_DENIED |
OTel uses .add() for counters and .record() for histograms. Duration payloads are converted from milliseconds to seconds.
Test Isolation
Pass a custom MeterProvider in tests to avoid polluting the global meter and to enable metric assertion via InMemoryMetricReader.
Structured Logging¶
shardyfusion uses Python's logging module with the shardyfusion logger namespace. A set of helpers provide structured, machine-parseable log output.
Logger Hierarchy¶
from shardyfusion.logging import get_logger
logger = get_logger("mymodule") # → logging.getLogger("shardyfusion.mymodule")
All shardyfusion loggers live under the shardyfusion.* hierarchy, so you can control verbosity at the root:
import logging
logging.getLogger("shardyfusion").setLevel(logging.DEBUG)
Structured Event Emission¶
Two functions emit structured log lines with contextual fields attached via extra={"shardyfusion": fields}:
from shardyfusion.logging import log_event, log_failure, FailureSeverity
# Informational event
log_event("manifest_published", run_id="abc123", num_shards=8)
# Failure with severity-based log level
log_failure(
"s3_put_failed",
severity=FailureSeverity.TRANSIENT, # → WARNING level
error=exc,
attempt=2,
max_retries=3,
)
FailureSeverity¶
Controls the log level used by log_failure():
| Severity | Log Level | Use Case |
|---|---|---|
TRANSIENT |
WARNING |
Retryable errors (S3 timeouts, rate limits) |
ERROR |
ERROR |
Non-retryable errors (bad config, parse failures) |
CRITICAL |
CRITICAL |
Unrecoverable errors (data corruption) |
LogContext¶
A context manager that binds fields to all log_event() / log_failure() calls within its scope. Uses contextvars — works correctly with both threading and asyncio.create_task().
from shardyfusion.logging import LogContext, log_event
with LogContext(run_id="abc123", pipeline="daily"):
log_event("write_started") # includes run_id and pipeline
with LogContext(db_id=3):
log_event("shard_write_started") # includes run_id, pipeline, and db_id
log_event("write_completed") # includes run_id and pipeline (db_id gone)
Nesting is supported — inner contexts merge with outer ones. Fields from inner contexts take precedence on key collision.
JsonFormatter¶
Formats log records as single-line JSON, suitable for log aggregation systems (ELK, Datadog, CloudWatch):
from shardyfusion.logging import JsonFormatter, configure_logging
configure_logging(json_format=True)
Output format:
{"timestamp": "2026-03-14 10:30:00,000", "level": "INFO", "logger": "shardyfusion.writer", "event": "manifest_published", "run_id": "abc123", "num_shards": 8}
The formatter flattens extra["shardyfusion"] fields into the top-level JSON object alongside standard fields (timestamp, level, logger, event).
configure_logging¶
Convenience function to set up the shardyfusion logger hierarchy:
from shardyfusion.logging import configure_logging
# Standard text format
configure_logging(level=logging.DEBUG)
# JSON format for production
configure_logging(level=logging.INFO, json_format=True)
# Custom stream
configure_logging(level=logging.INFO, json_format=True, stream=sys.stderr)
| Parameter | Type | Default | Description |
|---|---|---|---|
level |
int |
logging.INFO |
Log level for the shardyfusion hierarchy |
json_format |
bool |
False |
Use JsonFormatter instead of standard format |
stream |
stream | None (stderr) |
Output stream for the handler |
Full Example: Production Service¶
import logging
from shardyfusion import ConcurrentShardedReader
from shardyfusion.logging import configure_logging, LogContext
from shardyfusion.metrics.prometheus import PrometheusCollector
# Set up structured JSON logging
configure_logging(level=logging.INFO, json_format=True)
# Set up Prometheus metrics
collector = PrometheusCollector()
# Create reader with metrics
reader = ConcurrentShardedReader(
s3_prefix="s3://bucket/prefix",
local_root="/tmp/reader",
metrics_collector=collector,
)
# Use LogContext to tag all log lines with request context
def handle_request(request_id: str, key: int):
with LogContext(request_id=request_id):
value = reader.get(key)
return value