Proposal: Quiver - Arrow-Based Persistence for OTAP Dataflow
⚠️ Document has moved to https://github.com/open-telemetry/otel-arrow/blob/main/rust/experimental/quiver/ARCHITECTURE.md. Please refer for latest updates, etc... ⚠️
⚠️ Issue text below is no longer being updated or revised. ⚠️
=================
Problem Statement
otap-dataflow currently operates purely in-memory. We anticipate users needing
durability: the ability to survive process crashes and network outages
without data loss.
Proposed Solution: Quiver
We propose building Quiver: a standalone, embeddable Arrow-based segment store
packaged as a reusable Rust crate. Quiver does not exist yet; this document
defines its initial design, scope, and open questions. While it will be
developed first for otap-dataflow, we intend to keep it decoupled so it can
integrate into other telemetry pipelines or streaming systems that need durable
buffering around Apache Arrow.
Throughout the proposal we use RecordBundle to describe the logical unit
Quiver persists. In OTAP terms this corresponds to an OtapArrowRecords
value: a fixed set of payload slots (Logs, LogAttrs, ScopeAttrs,
ResourceAttrs, etc.) that may or may not be populated for a given RecordBundle.
Core Concepts
Segment Store: Immutable Arrow IPC files containing batches of telemetry. Each segment:
- Groups multiple
RecordBundlearrivals (8-64MB target size) and persists the per-slot Arrow streams they reference. - Supports many payload types and evolving schemas inside the same segment via a stream directory + batch manifest.
- Contains metadata: time ranges, signal type (via adapter), schema fingerprints, checksum, and per-stream statistics.
- Supports zero-copy memory-mapped reads.
Write-Ahead Log (WAL): Append-only log for crash recovery
- Records batches before segment finalization
- Configurable flush behavior (durability vs. latency trade-off)
- Truncated after segments are persisted
Open Segment Buffer (In-Memory): Bounded in-memory accumulation
- Each incoming
RecordBundleis appended to the WAL for durability and also to the current open segment's per-stream accumulators. - Buffer size capped by
segment.target_size(and optionallysegment.max_open_durationfor time slicing). - On finalize trigger (size, duration, shutdown, or retention pressure) Quiver flushes each accumulator to its Arrow IPC stream slice and writes the segment directory + manifest.
- After the segment file + metadata are durable, the corresponding WAL region becomes eligible for truncation.
- Early finalize under pressure reduces memory footprint; metrics expose open segment bytes.
Write path overview:
- Append
RecordBundleto WAL (optionally flush/fdatasync per policy). - Send Ack to upstream producer.
- Encode bundle payload slots into their stream accumulators (in-memory Arrow streaming writers).
- Finalize: seal builders, assign
segment_seq, write segment file + metadata. - Notify subscribers for each finalized
RecordBundle; begin per-bundle Ack/Nack lifecycle.
WAL Truncation Safety: “Safe” means the WAL entries you are removing are no longer needed for crash recovery of segment data. Concretely:
- All batches up to the boundary of one or more finalized segments have been serialized into a segment file AND that file plus its metadata have been made durable (written and flushed per the configured flush/fsync policy).
- Those WAL entries are not part of the current open (still accumulating) segment.
- The segment finalization was successful (no partial/corrupt write detected).
Subscriber ACKs are not required for truncation—the segment file is the durable source for replay until retention deletes it. Truncation never removes WAL entries belonging to:
- The active open segment.
- A segment whose file/metadata durability has not been confirmed.
WAL File Format & Rotation
- Single append-only file per core: each persistence instance writes to
wal/quiver.wal, rolling into numbered siblings (.1,.2, …) only when rotation triggers as described below. The steady-state design keeps writes sequential for simplicity and page-cache locality. - File header: fixed-width preamble (
b"QUIVER\0WAL"), version, and the target segment configuration hash. Concretely the header is{magic: [u8;10], version: u16, reserved: u16, segment_cfg_hash: [u8;16]}and is always little-endian.reservedmust be zero today; we bumpversionfor any incompatible encoding change so older binaries can decisively reject files they cannot interpret and newer binaries can interpret the older file format.segment_cfg_hashis a 128-bit digest (e.g., truncated BLAKE3) over the adapter-owned layout contract: slot id → payload mappings, per-slot ordering, checksum policy toggles, and any other settings that a particular adapter (OTAP today) uses to interpret RecordBundle payloads. We treat operational knobs such assegment.target_size, flush cadence, or retention caps as out of scope for the hash so mundane tuning never invalidates an otherwise healthy WAL. On replay a hash mismatch is treated the same as an unknown format version: Quiver refuses to ingest the WAL and asks operators either to finish draining with the previous adapter build or to migrate the data (e.g., replay through the old binary and regenerate segments) before rolling forward. - Framed entries: every
RecordBundleappend writes a length-prefixed record:-
4-byte little-endian length of the entire entry (header + payloads)
-
Entry header (
u8 entry_type, currently0 = RecordBundle) -
Ingestion timestamp (
i64nanos UTC) and per-core sequence number (u64, monotonically increasing per WAL writer) -
Slot bitmap (currently a single
u64) followed byslot_countmetadata blocks. Each block containspayload_type_id: u16,schema_fingerprint: [u8;32],row_count: u32, andpayload_len: u32. -
The slot bitmap tracks which logical
RecordBundleslots are populated; bitncorresponds to slotn. Metadata blocks are emitted in ascending slot order for every set bit so the reader can deterministically associate each block with its slot without storing redundant ids. -
Pseudo-layout for a
RecordBundleentry, showing the precise write order (length and CRC fields are not covered by the checksum as noted below):// Little-endian, sequential on disk u32 entry_len; // prefix (covers header..payload) EntryHeader { u8 entry_type; // 0 = RecordBundle i64 ingestion_ts_nanos; u64 per_core_sequence; u64 slot_bitmap; // current encoding fits <= 64 slots } for slot in slots_present(slot_bitmap) { SlotMeta { u16 payload_type_id; [u8;32] schema_fingerprint; u32 row_count; u32 payload_len; // bytes of following Arrow IPC stream } [u8;payload_len] arrow_payload; // streaming IPC bytes for the slot } u32 crc32c; // trailer; covers EntryHeader..payloads -
Arrow payload blobs are serialized in the streaming IPC format. For each populated slot we write exactly one contiguous IPC stream containing the chunk(s) belonging to that
Option<RecordBatch>value. The blob for a slot immediately follows its metadata block, so the serialized representation of a singleRecordBundleis[entry header][slot bitmap][metadata₀][payload₀] [metadata₁][payload₁]…. Absent slots (bitmap bit cleared) contribute neither metadata nor bytes; they are implicitlyNonewhen reconstructing the bundle. -
Every entry ends with a 4-byte little-endian CRC32C checksum that covers the entry header, bitmap, metadata blocks, and payload bytes (everything except the leading length field and the checksum itself). Replay verifies the CRC before decoding Arrow IPC bytes; a mismatch marks the WAL as corrupted and triggers truncation back to the last known-good offset.
-
Because the metadata describes the length of every blob, unknown entry types (future versions) can be skipped using the recorded length.
-
Each WAL append encodes the bundle’s payload chunks once; crash replay decodes those IPC bytes back into
RecordBatches before reinserting them into the in-memory segment accumulators.
-
- Replay model: on startup we scan sequentially, validating the WAL header
(preamble bytes + version), then iterating
length -> entryuntil EOF. Each entry is parsed, checksum-verified, and dispatched byentry_typeto rebuild open-segment state. If we encounter a partial entry (e.g., due to a crash mid-write) we truncate the file back to the last successful offset before continuing. Replay stops at the tail once the last partial segment is reconstituted. - Versioning / evolvability: because the header encodes a version, we can
introduce new entry types (e.g., periodic checkpoints) or swap serialization
without breaking older data; unknown entry types are skipped using the recorded
length. A checkpoint entry (
entry_type = 1) would embed the current open-segment manifest,truncate_offset, and high-water sequence numbers so recovery can jump directly to the latest checkpoint instead of replaying the entire log.
Truncation & rotation mechanics
-
Track truncate progress: After a segment finalizes and its metadata + file are flushed, we advance a
truncate_offsetpointer to the first byte belonging to the next entry in the open segment. Think oftruncate_offsetas "the earliest WAL byte still needed for crash recovery." We persist thatu64(plus a monotonically increasing rotation generation) into a tiny sidecar file (e.g.,wal/truncate.offset) immediately after advancing it and fsync the sidecar so crash recovery can seek straight to that logical offset without rescanning finalized entries. -
Truncate sidecar format: The sidecar is a fixed 32-byte struct written in little-endian order:
TruncateSidecar { [u8; 8] magic = b"QUIVER\0T"; // distinguishes from WAL proper u16 version = 1; // bump if layout changes u16 reserved = 0; u64 truncate_offset; // first byte still needed in wal/quiver.wal u64 rotation_generation; // increments each WAL rotation u32 crc32c; // covers magic..rotation_generation }We write updates via
truncate.offset.tmp: encode the struct, compute the CRC,pwrite+fdatasync, thenrenameatover the live file so readers see either the old or new offset. On startup we verify the magic, version, and checksum before trusting the recorded offsets; failure falls back to scanning from the beginning ofquiver.wal. -
Probe prefix reclamation: On startup we test whether the active filesystem supports punching holes out of the current WAL file. Linux builds attempt
fallocate(FALLOC_FL_PUNCH_HOLE)against a temporary WAL stub while Windows builds issueFSCTL_SET_ZERO_DATAon a sparse scratch file. If the probe fails withEOPNOTSUPP/ERROR_INVALID_FUNCTIONwe mark the capability as disabled and fall back to rewriting until the process restarts. -
Drop reclaimed prefixes: When support exists—or when the pointer crosses a configurable threshold—we invoke
fallocate(FALLOC_FL_PUNCH_HOLE)in-place to discard bytes[header_len, truncate_offset), leaving the fixed header (30 bytes: magic + version + reserved + cfg hash) intact. On filesystems where hole punching is not supported, we instead perform a rewrite: copy every byte fromtruncate_offsetthrough the current end-of-file intoquiver.wal.new(usingcopy_file_range/CopyFile2where available) while the writer stays paused, then reopen the new file and stream the fixed header back to the front so the layout matches a freshly created WAL. Once the copy completes we fsync the new file, atomically rename it over the original, and resume appends at offset(header_len) + (old_len - truncate_offset). Windows builds rely onCopyFile2plusSetFileInformationByHandlefor the same sequence. This rewrite path preserves the header bytes verbatim, so replay cannot distinguish a rewritten WAL from one that was never hole-punched. -
Rotate on size:
wal.max_sizecaps the aggregate footprint of the active WAL plus every still-referenced rotated sibling. We keep a running total of the active file and the byte spans tracked forquiver.wal.N; when the next append would push the aggregate over the configured cap we rotate immediately: shift older suffixes up, close and renamewal/quiver.waltoquiver.wal.1, and reopen a freshquiver.wal. Each rotation records the byte span covered by the retired chunk so cleanup can later deletequiver.wal.Nonly after the persistedtruncate_offsetexceeds that span’s upper bound. We never hole-punch rotated files; they are deleted wholesale once fully covered by the durability pointer, avoiding rewrites of large historical blobs while keeping the total WAL footprint bounded bywal.max_size. To keep rename churn and per-core directory fan-out predictable we retain at mostwal.max_chunksfiles (default10, counting the active WAL plus rotated siblings). Operators can override the default. Hitting the chunk cap is treated like hitting the byte cap: the next append that would require a rotation instead trips backpressure (ordrop_oldest, if selected) until either truncation reclaims an older chunk or the limit is raised. We never create an eleventh file in the background because doing so would undermine the predictive bound the knob is meant to provide. -
Durability-only dependency: Because WAL truncation depends solely on segment durability, exporter ACK lag never blocks WAL cleanup; segments themselves remain on disk until subscribers advance, but the WAL only needs to cover the currently open segment.
Acknowledgement Log (ack.log): Shared append-only log for subscriber state
- Stores every per-bundle Ack/Nack as
(segment_seq, bundle_index, subscriber_id, outcome) - Replayed on startup to rebuild each subscriber’s in-flight bundle set and advance segment high-water marks once all bundles are acknowledged
- Enables recovery without per-subscriber WALs
Dual Time & Ordering Semantics:
- Event Time: Timestamp embedded in telemetry (from source).
- Ingestion Timestamp (
ingestion_time): Wall-clock timestamp (UTC) captured when data arrives; drives retention windows, metrics, and diagnostics. - Ingestion Sequence (
segment_seq): Monotonically increasing counter assigned per finalized segment; drives delivery ordering, high-water marks, and gap detection, independent of wall-clock adjustments.
Retention and queries can use event time (semantic) or ingestion timestamp (system). The ingestion sequence remains authoritative for stable replay ordering and tie-breaking. Indexing directly on event time is lower priority and will follow query feature implementation.
Pub/Sub Notifications: Subscribers (exporters) receive notifications when new segments are ready.
Multi-Subscriber Tracking: Each subscriber maintains per-segment bundle
progress; data is deleted only when every subscriber has acknowledged every
bundle (or retention policy overrides). A subscriber’s high-water mark is the
highest contiguous segment whose bundles all have Ack (or dropped) outcomes.
Out-of-order Acks land in a tiny gap set keyed by (segment_seq, bundle_index)
so the mark never jumps past a missing bundle. Each bundle Ack decrements the
segment’s outstanding bundle counter; once it reaches zero the segment becomes
eligible for deletion.
Ack/Nack events append to a shared log (single WAL keyed by subscriber ID) so
per-bundle state is replayed after crashes without per-subscriber files. The
shared log (ack.log) lives alongside the main WAL. Periodic checkpoints
persist each subscriber’s current high-water mark into the metadata index so
recovery can skip directly to the first gap, replaying the log only for recent
events; bundle gap sets are rebuilt on replay from the same log stream.
Integration with OTAP Dataflow
Quiver can be placed at different points in the pipeline depending on use case:
Option A: Early Persistence (after receiver)
Receiver -> [Quiver Persistence] -> [Signal Processor] -> [Exporters]
v
[WAL + Segments]
Protects all raw data; useful during network outages before processing
Option B: Late Persistence (after signal processing)
Receiver -> [Signal Processor] -> [Quiver Persistence] -> [Exporters]
v
[WAL + Segments]
Protects processed data; smaller footprint, buffers during downstream outages
- Optional: Persistence is an optional component; disable for pure memory streaming.
- Per-Core: Each core has its own Quiver instance (no cross-core locking).
- Durability: When enabled, data is acknowledged after WAL flush; segment finalization happens after. (Acknowledgement refers to durability, not subscriber delivery; delivery notifications occur only after segment finalization. Memory use is bounded by the open segment's configured target size.)
Key Design Choices
- Standalone Crate: Separate Rust library with minimal dependencies; embeddable in any telemetry pipeline.
- Arrow-Native: Leverage Arrow IPC format for zero-copy, language-agnostic storage.
- Immutable Segments: Once finalized, segments never change.
- Single Writer: Each Quiver instance has one writer.
- Bounded Resources: Configurable caps on WAL size, segment count, retention window.
- Cross-Platform: Works on Linux, Windows, and macOS with minimal dependencies.
- Dual Persistence Path: WAL provides first durability; open segment builders hold data in memory until segment finalization. This avoids reading back WAL bytes to build Arrow structures, trading bounded memory for lower finalize latency.
- Default Strict Durability:
backpressureis the default size-cap policy, guaranteeing no segment loss prior to acknowledgement;drop_oldestmust be explicitly selected to allow controlled loss.
Terminology
- RecordBundle: The generic ingestion unit Quiver stores in a segment.
Conceptually, it is a fixed-width array of optional payload type slots (e.g., slot
0 = root records, slot 1 = resource attributes). The type only exposes
arrow::record_batch::RecordBatchvalues plus metadata needed to compute a schema fingerprint and row count. - Payload Type Slot: A stable identifier within a
RecordBundle. Each slot maps to exactly one logical payload kind for the embedding system. OTAP assigns slots toLogs,LogAttrs,ResourceAttrs, etc.; another integration can provide its own slot table. - Stream: The ordered sequence of Arrow IPC messages Quiver writes for a
(slot, schema_fingerprint)pairing inside a segment. - Stream Directory: The header table that records every stream’s id, slot, schema fingerprint, byte offset, byte length, and statistics.
- Batch Manifest: The ordered list of
RecordBundlearrivals. Each entry lists the(stream_id, chunk_index)to read for every payload slot that was present in the bundle. - Adapter: A thin crate-specific shim that converts between the embedding
project’s structs (for OTAP:
OtapArrowRecords) and Quiver’s genericRecordBundleinterface.
OTAP RecordBundle Primer
- Each OTAP batch (an
OtapArrowRecordsvalue) maps to aRecordBundlethrough the OTAP adapter. - Slots correspond to OTAP payload types (
Logs,LogAttrs,ScopeAttrs,ResourceAttrs, etc.). - During persistence, each slot’s
RecordBatchis encoded into a stream whenever it is present.
graph LR
subgraph Bundle0[RecordBundle 0]
B0L[Logs<br/>rows: 37<br/>schema L1]
B0A[LogAttrs<br/>rows: 37<br/>schema A1]
B0S[ScopeAttrs<br/>missing]
B0R[ResourceAttrs<br/>missing]
end
subgraph Bundle1[RecordBundle 1]
B1L[Logs<br/>rows: 54<br/>schema L1]
B1A[LogAttrs<br/>rows: 54<br/>schema A1]
B1S[ScopeAttrs<br/>rows: 2<br/>schema S1]
B1R[ResourceAttrs<br/>rows: 1<br/>schema R1]
end
subgraph Bundle2[RecordBundle 2]
B2L[Logs<br/>rows: 49<br/>schema L2<br/>+ column]
B2A[LogAttrs<br/>rows: 49<br/>schema A2]
B2S[ScopeAttrs<br/>missing]
B2R[ResourceAttrs<br/>rows: 1<br/>schema R1]
end
classDef missing fill:#eee,stroke:#bbb,stroke-dasharray:4 2
class B0S,B0R,B2S missing
classDef variant fill:#cfe3ff,stroke:#004488,stroke-width:2px
class B2L,B2A variant
The example highlights three consecutive RecordBundles.
Bundle 0 omits the ScopeAttrs and ResourceAttrs slots entirely,
so those payloads never emit a stream chunk.
Bundle 1 carries the full set of payloads;
each slot already has its own fingerprint (L1, A1, S1, R1)
even though none changed between bundle 0 and 1.
Bundle 2 shows schema drift for the Logs slot (schema L2 with an
additional column) and for LogAttrs (schema A2), while ResourceAttrs
remains on R1 and ScopeAttrs is still absent.
Quiver writes separate Arrow streams for every (slot, schema) combination
and uses the batch manifest to link each bundle back to the correct stream
chunks when reading segment files.
graph TD
subgraph Streams[Segment Streams]
SL1[Logs schema L1<br/>RB0 chunk0 rows 37<br/>RB1 chunk1 rows 54<br/>total rows 91]
SL2[Logs schema L2<br/>RB2 chunk0 rows 49<br/>total rows 49]
SA1[LogAttrs schema A1<br/>RB0 chunk0 rows 37<br/>RB1 chunk1 rows 54<br/>total rows 91]
SA2[LogAttrs schema A2<br/>RB2 chunk0 rows 49<br/>total rows 49]
SS1[ScopeAttrs schema S1<br/>RB1 chunk0 rows 2<br/>total rows 2]
SR1[ResourceAttrs schema R1<br/>RB1 chunk0 rows 1<br/>RB2 chunk1 rows 1<br/>total rows 2]
end
subgraph Manifest[Batch Manifest]
M0[RecordBundle 0<br/>SL1 chunk0<br/>SA1 chunk0]
M1[RecordBundle 1<br/>SL1 chunk1<br/>SA1 chunk1<br/>SS1 chunk0<br/>SR1 chunk0]
M2[RecordBundle 2<br/>SL2 chunk0<br/>SA2 chunk0<br/>SR1 chunk1]
end
M0 --> SL1
M0 --> SA1
M1 --> SL1
M1 --> SA1
M1 --> SS1
M1 --> SR1
M2 --> SL2
M2 --> SA2
M2 --> SR1
Each stream holds the serialized Arrow RecordBatch messages for a particular
(slot, schema) fingerprint. The manifest references those batches by stream
id and chunk index so the reader can reassemble the original RecordBundle
values.
Multi-Schema Segment Format
Quiver segments are containers around Arrow IPC streams plus a manifest that
describes how those streams reassemble back into the RecordBundle
abstraction used by the embedding pipeline.
Envelope Overview
- The segment header contains two primary sections:
stream_directory: one entry per(payload_type, schema_signature)pairing with stream id, payload kind (Logs, LogAttrs, ScopeAttrs, ResourceAttrs, etc.), schema fingerprint, byte offset, and byte length.batch_manifest: ordered entries for everyOtapArrowRecordsthat arrived while the segment was open. Each manifest row lists, per payload slot,stream_id+chunk_indexpairs pointing back into the directory.
- Writers reuse a stream id whenever a payload arrives with the same schema fingerprint; schema evolution during the segment allocates a new stream id.
- No control records are serialized; the manifest fully describes the replay order without embedding markers inside the Arrow buffers.
graph TD
subgraph SegmentWriter
A[Incoming RecordBundles] --> B["StreamAccumulator (per payload/schema)"]
B --> |Arrow IPC| C[Segment Data Chunks]
B --> |Stream metadata| D[Stream Directory]
A --> |Batch manifest entries| E[Batch Manifest]
end
C --> F[Segment File]
D --> F
E --> F
F --> |mmap| G[Segment Reader]
Arrow IPC Encoding
- While a segment is open, Quiver appends messages to each stream using the Arrow streaming format so we can keep adding batches without rewriting footers.
- On finalize, each stream flushes any buffered messages, writes an Arrow
file footer, and aligns the slice on an 8-byte boundary. The header stores
the final offsets and lengths so readers can memory map the slice and hand it
directly to
arrow_ipc::FileReader. - During replay, the reader consults the manifest to rebuild each
RecordBundle, hydrating only the payloads the consumer requested.
sequenceDiagram
participant M as Manifest Entry (RecordBundle)
participant SD as Stream Directory
participant S as Stream Chunks
participant R as Reassembled Bundle
M->>SD: lookup stream_id for payload slot
SD-->>M: schema fingerprint & chunk count
M->>S: fetch chunk N from stream_id
S-->>M: Arrow RecordBatch bytes
M->>R: insert payload batch into RecordBundle
R-->>M: RecordBundle returned to adapter
Dictionary Handling
- Each
(slot, schema)stream keeps dictionary encoding intact. While bundles accumulate we capture the union of dictionary values per column. When finalizing the segment we rebuild those columns against a deterministic vocabulary and emit the Arrow IPC file with the canonical dictionary in the header. Readers reopen the slice viaarrow_ipc::FileReader, which replays the seeded dictionaries before yielding the chunk referenced by the manifest. - Dictionaries stay deterministic for the lifetime of a stream because the
final vocabulary is chosen from the accumulated batches. If a stream would
exceed configured cardinality limits we rotate to a fresh stream (resetting
dictionary ids) rather than serializing delta messages. That mirrors the
in-memory lifecycle in
otap-dataflowand keeps chunks self-contained.
DataFusion Integration
- otap-dataflow will eventually ship a
FileFormatthat exposes one payload type at a time.infer_schema()returns the representative schema for that payload: the union of all columns observed in the segment (or across selected segments) with types promoted to the widest compatible Arrow type. - During reads the DataFusion/quiver adapter reorders columns, inserts placeholder arrays for omitted fields, and casts as needed so DataFusion receives a stable schema even when telemetry batches vary.
- Representative schemas live alongside the stream directory, allowing a table provider to merge schemas across multiple segments without touching the underlying Arrow buffers.
OTAP Payload Representation in Quiver
- The otap-dataflow crate provides an adapter that implements Quiver’s
RecordBundleinterface on top ofOtapArrowRecords, mapping slot ids to the OTAP payload enum (Logs,LogAttrs,ScopeAttrs, etc.). - Each
OtapArrowRecordsvalue is treated as a bundle of payload-specificRecordBatchvalues. Within a segment, the manifest records the ordering of payloads so replay reconstructs the familiar[Option<RecordBatch>; N]structure for logs, traces, or metrics before handing control back to the adapter. - Optional payloads simply do not emit a stream for that batch. Optional columns drop from the schema signature; when the column reappears it yields a new stream id with the expanded schema.
- Per-segment dictionaries reset at segment boundaries, matching today’s in-memory lifetime: high-cardinality attributes force an early finalize and avoid unbounded growth.
- Segment metadata keeps per-stream statistics (row count, column omit bitmap) so readers can quickly decide which payload types to materialize.
- When many segments are queried together, the table provider unions their representative schemas per payload type, ensuring DataFusion sees a single, coherent schema without re-serializing the Arrow buffers.
Retention & Size Cap Policy
Quiver keeps disk usage low with two layers of cleanup:
-
Steady-state cleanup (runs continuously): when a segment's outstanding count drops to zero, it is queued for deletion. Each core drains its queue during the next finalize or maintenance tick and deletes the oldest fully processed segments immediately. Optional knob
retention.steady_state_headroomcan reserve a tiny buffer (default near zero) if operators want a small on-disk spool even in healthy operation. -
Size-cap safety net (invoked when usage still exceeds the configured cap):
- Evict oldest fully processed segments until usage <= cap.
- If still over cap:
backpressure(default): slow or reject ingestion until step 1 becomes possible (strict durability: no pre-ack loss).drop_oldest: remove oldest unprocessed data (prefer finalized; else WAL / open buffers) to preserve throughput under emergencies.
- Optionally finalize a large open segment early so it becomes eligible for the next cleanup pass.
Only drop_oldest permits deleting finalized-but-unprocessed data; otherwise
Quiver preserves data and throttles intake until subscribers catch up.
Config key:
retention.size_cap_policy: backpressure | drop_oldest # default: backpressure
Metrics to observe:
quiver.gc.evictions_total{reason="size"}
quiver.retention.size_emergency_total # future
quiver.ingest.throttle_total # when backpressure triggers
Durability guarantee (default policy): With the default backpressure policy
Quiver guarantees zero segment loss prior to subscriber acknowledgement.
Segments are deleted only after all subscribers have acked every bundle within
them (or they age out per retention time windows). Choosing the optional
drop_oldest policy
explicitly authorizes controlled segment loss during size emergencies; each
evicted segment is recorded as a synthetic dropped outcome for affected
subscribers and surfaced via metrics.
Gap Set Interaction with drop_oldest
drop_oldest must not leave subscribers with permanent, unfillable gaps. If a
size emergency forces eviction of a finalized segment that still has unacked
bundles for any subscriber, Quiver records synthetic dropped outcomes for
each outstanding (segment_seq, bundle_index, subscriber_id) in ack.log
before deletion.
droppedis treated like anackfor advancing the high-water mark (HWM) and immediately removes the bundle from the subscriber’s gap set.- HWM only advances across a contiguous run of bundles with
ackordropped; real gaps still block. - Without this, deleting a gap bundle would freeze the subscriber's HWM forever and distort lag metrics.
Metrics:
quiver.subscriber.dropped_bundles_total # increments per subscriber per dropped bundle
quiver.segment.dropped_total # unique segments that lost bundles pre-ack
Operators that require strict delivery semantics (never convert missing data to
dropped) must select backpressure; that policy retains unprocessed segments
and throttles instead of synthesizing outcomes.
Multi-Core Coordination
otap-dataflow runs one persistence instance per core but all cores share a
single storage directory. Steady-state cleanup happens locally. Size-cap
enforcement will initially be a simple scheme of dividing the cap evenly per core.
A more sophisticated algorithm (Phase 2) will require a small amount of coordination.
Phase 1: Per-Core Static Cap (Initial Implementation)
Initial approach: divide retention.size_cap evenly across N cores
(per_core_cap = total_cap / N). Each core enforces its slice exactly like a
single-core instance; no shared atomics or coordination. On hitting its cap a
core applies drop_oldest or backpressure locally.
Pros: zero cross-core contention, simplest code, fastest path to prototype.
Cons: global usage can temporarily exceed total cap by up to roughly
(N - 1) * segment_target_size; reclaimable fully processed segments may sit
on idle cores while a hot core throttles.
Metrics: quiver.core.bytes{core_id}, quiver.core.cap_fraction to spot
imbalance.
Phase 2: Fair Eviction with Minimal Coordination (to be designed)
Phase 2 (TBD) will introduce a fair eviction approach instead of a naive per-core cap. Preliminary goals:
- Distribute eviction work (no single-core dependency).
- Accomodate fair eviction when some threads may be blocked on other work.
- Minimal coordination overhead (brief atomic claims, no long-lived tokens).
- Preserve ordering; escalate to unprocessed deletion only under
drop_oldest. - Limit transient overshoot to ~one segment per concurrently finalizing core.
Complete otap-dataflow Persistence Configuration Example
nodes:
otlp_receiver:
kind: receiver
plugin_urn: "urn:otel:otlp:receiver"
out_ports:
out_port:
destinations:
- persistence
dispatch_strategy: round_robin
config:
listening_addr: "127.0.0.1:4317"
response_stream_channel_size: 256 # Required: channel buffer capacity (number of messages)
otap_receiver:
kind: receiver
plugin_urn: "urn:otel:otap:receiver"
out_ports:
out_port:
destinations:
- persistence
dispatch_strategy: round_robin
config:
listening_addr: "127.0.0.1:4318"
response_stream_channel_size: 256 # Required: channel buffer capacity (number of messages)
persistence:
kind: processor
plugin_urn: "urn:otap:processor:persistence"
out_ports:
out_port:
destinations:
- otap_exporter
- otlp_exporter
dispatch_strategy: round_robin
config:
path: ./quiver_data # Platform-appropriate persistent storage location
segment:
target_size: 32MB
wal:
max_size: 4GB
flush_interval: 25ms
retention:
max_retain_after_ingestion_hours: 72
size_cap: 500GB
size_cap_policy: drop_oldest # or backpressure
otap_exporter:
kind: exporter
plugin_urn: "urn:otel:otap:exporter"
config:
grpc_endpoint: "http://{{backend_hostname}}:1235"
compression_method: zstd
arrow:
payload_compression: none
otlp_exporter:
kind: exporter
plugin_urn: "urn:otel:otlp:exporter"
config:
grpc_endpoint: "http://127.0.0.1:4318"
# timeout: "15s" # Optional: timeout for RPC requests
Example: Dual Exporters with Completion Tracking
Consider a single persistence node feeding both a Parquet exporter (local file
writer) and an OTLP exporter (remote endpoint). Each exporter is a Quiver
subscriber with its own cursor and participates in the OTAP Ack/Nack protocol.
Happy-path flow for segment seg-120 (4 MiB, 3 RecordBundles):
- Incoming batches append to the WAL and accumulate in the in-memory open
segment until finalize triggers; then the data is written as
seg-120.arrow. - Quiver enqueues a notification for
parquet_exporterandotlp_exporter. - Each exporter drains the segment’s three bundles in order and, after
finishing each bundle, emits
Ack(segment_seq, bundle_index)(orNack) back to Quiver. The consumer-side cursor only advances to the next bundle once the acknowledgement for the current bundle is recorded. - On every Ack/Nack Quiver appends a record to the shared acknowledgement log:
ts=2025-11-10T18:22:07Z segment=seg-120 bundle=0 subscriber=parquet_exporter ack
ts=2025-11-10T18:22:07Z segment=seg-120 bundle=0 subscriber=otlp_exporter ack
ts=2025-11-10T18:22:08Z segment=seg-120 bundle=1 subscriber=parquet_exporter ack
ts=2025-11-10T18:22:08Z segment=seg-120 bundle=1 subscriber=otlp_exporter ack
ts=2025-11-10T18:22:09Z segment=seg-120 bundle=2 subscriber=parquet_exporter ack
ts=2025-11-10T18:22:10Z segment=seg-120 bundle=2 subscriber=otlp_exporter ack
- Once every subscriber has acknowledged bundle
0..=2forseg-120, the segment’s outstanding bundle count drops to zero and it becomes eligible for eviction according to the retention policy. - During crash recovery Quiver replays the acknowledgement log alongside the WAL to restore per-subscriber high-water marks; no per-subscriber WAL files are required.
Nack handling reuses the same log: Quiver records the nack, retries delivery for
the specific (segment_seq, bundle_index) according to OTAP policy, and keeps
the segment pinned until every subscriber eventually acks each bundle or the
operator opts to drop data via drop_oldest (which synthesizes dropped
entries for the missing bundles).
Future Enhancements
- Ability to query over persisted data (e.g., per-node aggregations, DataFusion, etc.).
- Indexing.
- Configurable policy for recovery prioritization (new arrivals vs. old data, forward vs. reverse replay, etc.).
- Consider support for storing finalized immutable Arrow IPC segments in an object store.
- Kubernetes support (review for any k8s specific requirements for configuring and/or supporting a quiver store).
FAQ
How does this differ from Kafka?: Quiver is an in-process Arrow segment
buffer embedded in otap-dataflow. It keeps data local, uses existing pipeline
channels and Arrow IPC, and has no broker cluster, network protocol, or
multi-topic replication. Kafka is a distributed log service. Quiver is a
lightweight durability layer for one collector.
Do I need to run extra services?: No. Quiver ships as a Rust crate inside the collector. Storage lives on the same node; coordination is via shared memory atomics, not external daemons.
What happens on crash recovery?: On restart we replay the data WAL to rebuild
segments, replay ack.log to restore subscriber high-water marks, and immediately
resume dispatch. Segments whose every bundle was acknowledged by all subscribers
before the crash are eligible for deletion as soon as the steady-state sweeper runs.
Success Criteria
- Zero data loss across process restarts and network outages
- Automatic retry/forwarding once downstream systems recover
- Minimal performance impact on streaming path (<5% overhead)
- Aggressive cleanup: data deleted as soon as all exporters confirm receipt
Next Steps
- Community feedback on this proposal
- Proof-of-concept: Basic WAL + segment creation + replay
- Benchmark: Measure overhead on streaming pipeline
- Iterate: Refine based on real-world usage patterns
Open Questions
- Subscriber lifecycle: how do we register/deregister subscribers and clean up state when exporters are removed permanently?
- Ack log scale: what rotation/checkpoint policy keeps replay time bounded under heavy churn?
- Observability: which metrics/logs expose ack log depth, gap set size, and eviction/backpressure activity so operators can react early?
- Policy interaction: how does time-based retention interact with the size-cap safety net and steady-state sweeper - should one take precedence?
- Failure handling: what safeguards do we need if
ack.logor metadata become corrupted (checksums, repair tools, etc.)?
Feedback Welcome: Please comment on architecture choices, use cases to prioritize, or concerns about complexity / performance.
Overall, I'm a fan of the proposal. I think it addresses several well identified needs.
I do have a few questions:
- If I understand the design correctly, downstream components can only see data after a segment is finalized, not while it's still in the open segment buffer, which depending on the persistence processor's parameters could introduce significant latency. Could we allow OTAP batches to "pass through" (append to WAL but not to the in-memory open segment) in order to minimize the latency observed by downstream components? Could we receive by anticipation the corresponding Ack/NAck and apply a mechanic close to the one mentioned in this proposal?
- What does "safe" mean in "truncate WAL when safe"? I think the exact conditions should be specified.
- If
drop_oldestremoves a segment that's still in a subscriber's gap set, that subscriber will never converge. We should clarify howdrop_oldestinteracts with outstanding gap sets. - "Drop oldest unprocessed data" contradicts the stated zero-loss success criterion. Either reframe the success criterion ("no loss unless explicitly configured") or make backpressure the default.
- What are the requirements for this to work in a Kubernetes deployment?
- Could we consider storing segments in an object store in the future?
@lquerel Great, thank you for taking the time to provide feedback!
If I understand the design correctly, downstream components can only see data after a segment is finalized, not while it's still in the open segment buffer, which depending on the persistence processor's parameters could introduce significant latency. Could we allow OTAP batches to "pass through" (append to WAL but not to the in-memory open segment) in order to minimize the latency observed by downstream components? Could we receive by anticipation the corresponding Ack/NAck and apply a mechanic close to the one mentioned in this proposal?
Yes, your understanding is correct. The proposal is suggesting "segment-level" forwarding to preserve atomic durability, compact ack tracking, reduced I/O, and predictable retention at the cost of some amount of latency while the segment is assembled and finalized. Per-batch pass-through introduces non-trivial complexity (ack explosion, replay ambiguity, eviction inconsistency) for a latency improvement which could mostly be achieved through config (smaller segments, shorter max open duration, idle finalize). I would suggest we implement, instrument and measure the latency with this proposal as conceived first before adding latency-optimized/per-batch/pre-finalized notifications.
What does "safe" mean in "truncate WAL when safe"? I think the exact conditions should be specified.
Adding a section to clarify.
If drop_oldest removes a segment that's still in a subscriber's gap set, that subscriber will never converge. We should clarify how drop_oldest interacts with outstanding gap sets.
Yes, good call out. I will add a section on this to clarify that dropping a segment necessitates a synthetic 'dropped' ack to allow removal of the segment from the gap set.
"Drop oldest unprocessed data" contradicts the stated zero-loss success criterion. Either reframe the success criterion ("no loss unless explicitly configured") or make backpressure the default.
Thank you for catching the inconsistency. Given the primary value proposition is durability to avoid data loss, I think it makes more sense to keep the success criterion and switch the default to backpressure.
What are the requirements for this to work in a Kubernetes deployment?
In brief, the user would need to configure a per-instance directory on a durable volume. Do you think we need to add a section about Kubernetes to this issue? I do think it will be important to document how to configure durability appropriately when running in a Kubernetes environment, but I currently don't know of specific design requirements from Kubernetes that would need to be added here.
Could we consider storing segments in an object store in the future?
Yes, I will add an item about considering this under the Future Enhancements section.
For the Kubernetes requirements, I don't think it's necessary to have a dedicated section at this stage. Maybe just list Kubernetes support among future developments (like object store support).
As for the rest, I completely share your approach.
Hey @AaronRM -- GREAT writeup. I think this will be a really great addition to the Arrow ecosystem and I'm thrilled to have it associated with this project.
I wanted to send some followup on the discussion we had on the SIG meeting today.
I'll start with some general background on OTAP.
OTAP batches, as they exist in memory, are a collection of arrow RecordBatchs each representing a "Payload Type". For example if we look at logs, which is the simplest of the signals we support, there are four payload types: Logs, LogAttrs, ScopeAttr, ResourceAttrs. Other signal types (traces & metrics) have additional Payload Types.
So we could think of representing a logs OTAP Batch as an [Option<RecordBatch>; 4] in Rust (in fact, it actually is represented like this in our codebase).
As our dataflow pipelines are processing batches of OTAP data, they're more/less processing a sequence of batches in this representation (as opposed to say, a sequence of individual RecordBatchs).
Additionally, there's a few ways that the structure of the batches can change from one batch to the next:
- A given Payload Type may be optional -- if there are no LogAttrs for a given batch, the
RecordBatchfor this Payload Type would be absent in the OTAP Batch - Columns within a given RecordBatch may be optional -- consider for example the
strcolumn in the LogAttrs batch, which represents the value for attributes which are type string. If there are no values attributes of this type in the batch (e.g. if the column would be all null) then the column is omitted from theRecordBatch. Sidenote: we also omit some columns if they're entirely default value (e.g. we may omit int columns that are all zero). - The type of some columns may change -- for a given string column we may switch between arrow types
Dictionary<u8, utf8>,Dictionary<u16, utf8>and the native type (a regularStringArray) depending on the cardinality of the column
All this is done to minimize the amount of memory is used for each OTAP Batch.
Obviously, all this presents some interesting technical challenges for this Quiver solution (which we can work through :slightly_smiling_face:).
The technical challenge in particular that I was trying to call out was this: the Arrow IPC format expects to each RecordBatch to have the same schema. For example if we look at the spec https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format we see the an Arrow IPC stream contains a series of messages laid out like this
<SCHEMA>
<DICTIONARY 0>
...
<DICTIONARY k - 1>
<RECORD BATCH 0>
...
<DICTIONARY x DELTA>
...
<DICTIONARY y DELTA>
...
<RECORD BATCH n - 1>
<EOS [optional]: 0xFFFFFFFF 0x00000000>
And the IPC File format contains messages like this:
<magic number "ARROW1">
<empty padding bytes [to 8 byte boundary]>
<STREAMING FORMAT with EOS>
<FOOTER>
<FOOTER SIZE: int32>
<magic number "ARROW1">
The consequences for us are:
- We wouldn't be able to store all the all the
RecordBatchs for a given OTAP Batch in the same Arrow IPC File/Stream because each Payload Types will have a different schema - We might not be able to store subsequent
RecordBatchs for a given PayloadType in the same Arrow IPC File/Stream because because the schema might change from one OTAP batch to the next.
One decision we'll need to make is how flexible we want to be about the schema in a Quiver segment file. Deciding this may also inform whether we write multiple OTAP Payload Types into the Quiver segment file.
On one end of the spectrum, we are completely inflexible with whatever schema gets written into the Quiver segment file. Every OTAP Payload type gets its own segment file, and even for a given Paylaod Type there are multiple segment files for the different schemas. This is simple from the Quiver segment file format perspective, as the quiver segment file can just be a regular Arrow IPC File. However, it might add a lot of complexity in managing the many Quiver segment files.
On the other end of the flexibility spectrum, we accept any RecordBatchs, and write an Arrow IPC Stream/File containing a single RecordBatch for each. The messages in the file might look like if we wrote a new IPC stream for each record batch:
// begin record batch 0
<SCHEMA>
<DICTIONARY 0>
...
<DICTIONARY k - 1>
<RECORD BATCH 0>
<EOS [optional]: 0xFFFFFFFF 0x00000000>
<END RECORD BATCH 0>
// begin record batch 1
<SCHEMA>
<DICTIONARY 0>
...
<DICTIONARY k - 1>
<RECORD BATCH 0>
<EOS [optional]: 0xFFFFFFFF 0x00000000>
// ... and so on
This is simple and flexible, but the downside is that a lot of overhead: schemas are repeated for every batch, there are no delta dictionaries, and when we read back the data we'd need to create a new ArrowIpcReader for each batch.
There might be a middleground that could work well. I'd need to think a bit more deeply about what is the right approach here. Now that I've (hopefully) explained the problem more clearly, interested to hear if others have thoughts as well (cc @lquerel & @jmacd).
One approach we could take would be to interleave multiple streams. Rather than creating a new stream for each record batch, we could have each record batch in the Quiver file associated with an identified IPC stream. For example, we could have a file containing messages as follows (I'll prefix the messages with either QUIVER or ARROW_IPC to distinguish what is part of the Quiver protocol from Arrow IPC):
// first record batch begins a new stream
<QUIVER: BEGIN RECORD BATCH 0>
<QUIVER: STREAM 0>
<ARROW_IPC: SCHEMA>
<ARROW_IPC: DICTIONARY 0>
...
<ARROW_IPC: DICTIONARY k - 1>
<ARROW_IPC: RECORD BATCH 0>
<QUIVER: END RECORD BATCH 0>
// a 2nd record batch with a different schema, we begin another new stream
<QUIVER: BEGIN RECORD BATCH 2>
<QUIVER: STREAM 1>
<ARROW_IPC: DICTIONARY 0>
...
<ARROW_IPC: DICTIONARY k - 1>
<ARROW_IPC: RECORD BATCH 0>
<QUIVER: END RECORD BATCH 0>
// a 3rd record batch with the same schema as record batch 1, we continue stream 0
<QUIVER: BEGIN RECORD BATCH 2>
<QUIVER: STREAM 0>
<ARROW_IPC: DICTIONARY x DELTA>
<ARROW_IPC: RECORD BATCH 1>
<QUIVER: END RECORD BATCH 2>
This is very similar to how the OTAP protocol works.
Another consideration related to the discussion above is how we can support reading the data back using DataFusion
A very typical way to support this is to implement the FileFormat trait. There are implementations of this for Parquet, Arrow IPC, Avro, CSV, etc.
However, one of the methods we'll need to implement is infer_schema which is used to fetch the Arrow Schema. The point here is that DataFusion would expect the data source to have a known schema and this is why we'll need to be a bit considered in our approach.
In my opinion, it only makes sense for the implementation to be able to return data to DataFusion for a single OTAP Payload Type given that each payload type has a completely different set of columns.
Also, within a given Payload Type, there can be slight variations to which columns are present and what are the types of those columns (the same column could be one of Dict<u8>, Dict<u16> or non-dict native array)
There are probably a combination of things we can do to address this.
Schema on read Say we had a Quiver segment file containing serialized record batches with multiple schemas. As we read the batches from each serialized IPC stream, the reader could make slight modifications to each RecordBatch to give them a common representative schema.
The common representative schema should have:
- The superset of all columns for the given payload type, sorted in a common order
- The types for each column should use the "biggest" type of any that were written (e.g. non-dict array is bigger than Dict
is bigger than Dict )
We could imagine the Quiver reader making slight adjustments to each RecordBatch to conform it to the representative schema by:
- sorting the columns
- adding placeholder columns for any that are missing
- casting each column to the correct arrow type
Note - this is kind of similar to how Parquet's Arrow Reader/Writer work..
The Parquet arrow writer stores a serialized arrow schema in the file metadata, and for a given column it accepts subsequent arrays that are logically the same (e.g. it will accept Dict<u8, utf8> or StringArray for the same column). The reader marshals the parquet data into arrow arrays of the correct type (it can also accept a custom schema at read time, which is something we could also consider).
However, Parquet is kind of different to how I'm imaging this would work with Quiver. The parquet writer does not re-order columns nor insert any placeholders. That's why we have to do some marshalling of the schema when we write (in the parquet exporter). However, we might not need to enforce the schema on write for Quiver if we design our reader correctly.
We might want to do something similar to the Parquet arrow writer, and store the representative schema in some metadata in the Quiver segment file.
Also note, this whole thing gets a bit more complicated when reading from multiple Quiver files as the common representative schema would need to be common across all files.
Multi Payload Type Files If we want to have multiple OTAP Payload Types written to the same file, then we'd need to have give some metadata to IPC stream to identify which Payload Type it is associated with. Then our reader could skip over any these streams.
@albertlockett Thank you for the excellent comments above capturing the issues you raised during the SIG meeting last week!
I've made a few significant revisions to the proposal based on your feedback. Primary changes:
- Propose a new file format for the segment files which embeds multiple Arrow IPC File sections (one per schema) and allows for schema variation across OtapArrowRecords within a given OTAP Payload Type. This is similar to the middle ground 'interleaved' example above. The format I'm proposing maps well to OTAP, allows memory-mapping of the segment files, and still allows use of the standard Arrow IPC file reader over slices within the memory mapped 'Quiver segment' file.
- Add a section on Dictionary Handling for segments. Open to comments/suggestions for optimizations on this...just wanted to get something relatively simple for a first implementation.
- Switch ack/nacks to be per-RecordBundle (i.e. per OtapArrowRecord) instead of per-segment
Looks great @AaronRM ! I think what you've laid our solves the problems I was discussing in my previous comments.
Two small things I'd mention after re-reading:
Representative schemas live alongside the stream directory, allowing a table provider to merge schemas across multiple segments without touching the underlying Arrow buffers.
I'm not sure this is entirely necessary, so we could avoid doing this if it adds unneeded complexity. In the file format you've described, I think the representative schema could technically be computed by reading the footer of each of the serialized IPC files for the slots containing a given payload type.
Although that said, it could be cached in the stream directory, as you suggested, which could give a slight performance when opening the file for reading by DataFusion. e.g. we could consider this a future optimization.
Consider support for storing finalized immutable Arrow IPC segments in an object store.
If we use the obect_store crate when we implement this, it would be very straight forward to support moving to cloud object storage in the future. This is what we did in parquet-exporter, for example.
Also, do we have a plan yet for what is the actual data layout of the WAL?
If we use the obect_store crate when we implement this, it would be very straight forward to support moving to cloud object storage in the future. This is what we did in parquet-exporter, for example.
Just mentioning a hazard on the Microsoft side with using the object_store crate - The crypto used is considered non-compliant internally. I got pretty far along to merging the ability to have pluggable crypto providers, but it stalled out: https://github.com/apache/arrow-rs-object-store/pull/462.
I'm not sure if/when I'll be able to get back around to it.
@albertlockett Thanks! Yes the intent was an optimization for integration with DataFusion so the full segment files don't need to get paged in during infer_schema(). We don't need to implement that up front.
Re: WAL, I added a detailed "WAL File Format & Rotation" section with a proposed format. FWIW, it may be easier to read in context of the full document which has now been merged here: https://github.com/open-telemetry/otel-arrow/blob/main/rust/experimental/quiver/ARCHITECTURE.md#wal-file-format--rotation