orleans icon indicating copy to clipboard operation
orleans copied to clipboard

Streaming v2

Open benjaminpetit opened this issue 2 months ago • 2 comments

Motivation

The current Orleans streaming infrastructure has several pain points that make it difficult to use and maintain:

Configuration Complexity: The existing streaming system requires extensive configuration across multiple providers, with non-obvious interactions between queue adapters, cache settings, and delivery guarantees. This complexity makes it difficult to get started and even harder to evolve as requirements change.

Inconsistent Provider Behavior: Different stream providers exhibit fundamentally different behaviors, making it difficult for developers to reason about their applications:

  • Azure Queue streams don't guarantee FIFO order in failure cases due to the underlying queue semantics
  • Event Hub provider has different caching
  • Each provider has different configuration options and defaults (FireAndForgetDelivery, QueueBalancerType, CacheSize)

This means developers must learn provider-specific semantics and can't easily switch providers without changing application logic. Testing with one provider for development doesn't guarantee the same behavior with another provider for production.

Cache Miss Exceptions - Including False Positives: Users frequently encounter QueueCacheMissException errors, which occur in two problematic scenarios:

  1. Legitimate cache misses: When consumers genuinely fall behind and events are evicted from cache based on DataMaxAgeInCache or DataMinTimeInCache settings
  2. False positive cache misses: Users encounter cache miss exceptions even when they didn't actually miss any events. This happens due to how the current caching implementation tracks sequence tokens. This is particularly frustrating because the consumer did nothing wrong, yet receives an exception.

These failures are difficult to reason about, require careful tuning of cache sizes and eviction policies, and can result in lost events or unexpected failures during normal operations like deployments.

Inefficient for Co-located Scenarios: In most Orleans applications, stream producers and consumers are grains running within the same cluster. The current architecture routes all events through external queues (Azure Queue, Event Hubs, Kafka), adding unnecessary latency and operational complexity for what should be simple in-cluster communication.

Goals: We need a new streaming infrastructure that:

  • Guarantees event delivery with no cache misses (legitimate or false positive)
  • Has consistent, predictable behavior regardless of storage backend choice
  • Is simple to configure and understand
  • Is optimized for same-cluster scenarios while still providing durability
  • Makes it easy to evolve and extend

High-Level Architecture

Core Principles

1. In-Memory First, Storage-Backed

  • Events flow through memory for low latency
  • All events are persisted to storage before delivery to guarantee durability
  • Storage is used only for recovery and catch-up, not regular delivery
  • This ensures fast delivery for active consumers while maintaining durability guarantees

2. Guaranteed Delivery, No Cache Misses

  • Every event is either in memory or recoverable from storage
  • No cache eviction exceptions - slow consumers automatically read from storage
  • No false positive cache misses - clear separation between in-memory buffer and persistent storage
  • Consumers can always catch up from any point within the retention window
  • Seamless transition between memory and storage reads

3. Consistent Behavior Across Storage Backends

  • Application semantics remain the same regardless of storage backend
  • Storage backends are truly pluggable - switching from blob storage to SQL doesn't change delivery guarantees or ordering
  • Developers write code once, deploy anywhere
  • Test with one backend, deploy with another with confidence

4. Simple Configuration

  • Minimal configuration: partition count, buffer size, storage backend
  • Sensible defaults that work for most scenarios
  • Clear, predictable behavior without provider-specific quirks

5. Pluggable Storage

  • Support for multiple backends: Azure Blob...
  • Simple interface: append, read, truncate operations
  • Storage choice is an operational decision, not an architectural one

Architecture Overview

Why Partitions?

In a streaming system, we potentially have millions of individual streams (one per payment, order, user session, etc.). We cannot:

  • Create a separate grain for each stream (would overwhelm the cluster)
  • Create separate storage containers for each stream (storage explosion)
  • Track each stream's metadata individually (management overhead)

The solution: Coalesce many logical streams into a smaller, fixed number of partitions. Each partition is a manageable unit that handles hundreds or thousands of streams together.

What is a Partition?

A partition is a physical unit of organization that groups multiple logical streams together. Think of it as a "bucket" that manages many streams:

  • Logical streams: payment-12345, payment-67890, order-abc, session-xyz (millions of these)
  • Physical partitions: Partition 0, Partition 1, ... Partition 31 (fixed count, e.g., 32)

Partition Assignment: Each logical stream is permanently assigned to one partition using a deterministic algorithm. Once assigned, a stream always maps to the same partition, ensuring consistent routing and ordering guarantees.

Default Algorithm (v1): Hash-based assignment

int partitionId = Math.Abs(streamId.GetHashCode()) % partitionCount;
// "payment-12345" → Partition 7
// "payment-67890" → Partition 7
// "order-abc" → Partition 3

Extensibility: The assignment logic is abstracted behind an IPartitionAssignmentStrategy interface, allowing for future customization:

  • Custom hashing algorithms (e.g., consistent hashing for better distribution)
  • Range-based partitioning (e.g., assign streams by customer ID ranges)
  • Affinity-based strategies (e.g., route related streams to same partition for locality)
  • Load-aware assignment (distribute based on stream throughput)

The key requirement: whatever algorithm is used must be deterministic - the same streamId must always map to the same partition for the system to maintain ordering guarantees and routing consistency.

Partition Implementation in Orleans

Each partition is represented by a Partition Grain:

public interface IPartitionGrain : IGrainWithIntegerKey
{
    Task<PublishResult> AppendEventAsync(string streamId, byte[] data);
    Task<IStreamSubscription> SubscribeAsync(string streamId, StreamToken? token);
}

The partition ID serves as the grain key:

var partitionGrain = grainFactory.GetGrain<IPartitionGrain>(partitionId: 7);

Partition Grain Responsibilities:

  • Manages a monotonically increasing partition sequence counter (single-threaded grain execution, no locks needed)
  • Maintains an in-memory ring buffer with recent events from ALL streams in this partition
  • Coordinates writes to persistent storage for its partition
  • Tracks active consumer subscriptions across all streams in the partition
  • Delivers events to subscribed consumers

Why Grains?

  • Natural single-threaded execution model - perfect for sequence generation
  • Automatic distribution across silos via Orleans placement
  • Built-in lifecycle management (activation/deactivation)
  • Orleans messaging for communication

Partition Storage Organization

Each partition stores its events in segments:

/streams/
  partition-00/              ← PartitionGrain(0)
    segment-0000.dat         [partition events 0-999]
    segment-0001.dat         [partition events 1000-1999]
  partition-01/              ← PartitionGrain(1)
    segment-0000.dat
  ...
  partition-31/              ← PartitionGrain(31)
    segment-0000.dat

Inside a segment (partition 7, segment 0000):

[partSeq:0, streamId:"payment-12345", streamSeq:0, data:...]
[partSeq:1, streamId:"order-abc", streamSeq:0, data:...]
[partSeq:2, streamId:"payment-12345", streamSeq:1, data:...]
[partSeq:3, streamId:"payment-67890", streamSeq:0, data:...]

Note: Events from multiple streams are interleaved within a partition, ordered by partition sequence number. Each event also carries its stream-specific sequence number.

Key Concepts

Partition Sequence: Monotonic counter within a partition (0, 1, 2, 3...) - provides total ordering within the partition

Stream Sequence: Monotonic counter within a specific stream (payment-12345: 0, 1, 2...) - provides ordering within that logical stream

Segmentation: Partitions split into segments (e.g., every 1000 events) for:

  • Faster recovery (only read relevant segments)
  • Efficient retention management (delete old segments)
  • Parallel reads during catch-up

benjaminpetit avatar Oct 29 '25 15:10 benjaminpetit

Great! I want to share my current usage scenarios of streams and pain-points to ensure that these are taken into considerations.

A general note though; this sounds like streams.v2 can support a form of event sourcing - and it would be fantastic if Orleans came with a build in way to do event sourcing simply and efficiently. Main requirements are:

  • That is, the ability to retain the entire stream of events.
  • Being able to replay events from any position in the event stream.
  • Deduplication, i.e., given an event have a unique message id, if an event is added to the stream twice, then it is ignored (or etag support, e.g., only append if the last etag for the stream is X).

Current stream scenarios

Our current streaming scenarios are all based on implicit streams with Azure EventHub as the foundation/stream provider.

IoT scenario

Description: External (to solution) Azure EventHub that receives messages from IoT devices 100,000s of IoT devices.

In my case, the IoT devices are EV charge points, and we create a stream for each charging sessions coming from the charge points.

The messages per stream are fairly constant (one message every 1-30 minute), but the number active charging sessions can vary from 10,000 to 200,000 over a few hours. And there can be spikes in the load, e.g., some sessions are started around the hour mark.

Challenges:

  • A spike in sessions means we have to overdimension our silo cluster to handle the load, otherwise we are seeing thread pool starvation, out of memory exceptions, due to the sheer number of events that are received. I have not figured out effectively configure the stream provider to deal with big spikes.

  • Cache misses can be mitigated for the most part as long as we remember the golden rule MetadataMinTimeInCache > StreamInactivityPeriod > GrainCollectionAge. However, if somebody changes either and forgets to change the other, then problems show up.

Infrequent large loads

Description: External (to solution) Azure EventHub that receives messages from a nightly or hourly batch job in another system. In our case, we use it to ingest updates to master data from other systems, that sends it to their EventHub via batch jobs. Typically, we see 100,000s of messages, where every single message is the target of one or more logical stream (ids).

Challenges: The spikes when receiving the messages are significant, both because a lot of grains are loaded into memory, causes us to run into throttling on storage providers. Solution has been to overdimension cluster and switching to a premium azure storage account.

External events with multiple target streams

Description: External (to solution) Azure EventHub that receives messages that should be send to multiple logical streams.

Challenges: Current stream EventHubDataAdapter only supports returning a single StreamId from the GetStreamIdentity method, so an intermediate grain is needed to route messages to two or more StreamIds.

One stream namespace - multiple providers

Description: We have grains that with a single [ImplicitStreamSubscription("stream-ns-1")] that multiple (Azure EventHub) stream providers publish messages to.

Challenges: Since different Azure EventHubs are the source of messages going to the same implicit stream, there is no single StreamSequenceToken that can be passed to StreamSubscriptionHandle<T>.ResumeAsync(observer, token); that makes sense, for precise resuming.

egil avatar Nov 11 '25 09:11 egil

Great thoughts!

Here's mine: 😎 I use Orleans for Event Sourcing (currently just experiments), but I want to use streams to publish all the events so (persisted) projections can be implemented via implicit subscriptions updating some database record (I also have grains with explicit subscriptions reading all events from storage on activiation and being auto-updating while activated, serving as a cached in-memory projection - unsubscribes when deactivated).

What I really miss is some kind of outbox from the Orleans.EventSourcing/ConfirmEvents to some Orleans stream.

Currently I use a custom storage provider (PostgreSQL, because I don't have really high throughput requirements) and every projection stores the current "event version" so it can read missing events, if needed. I plan to use PostgreSQL's logical replication feature for pushing back the events into Orleans streams.

flensrocker avatar Nov 13 '25 12:11 flensrocker