spring-boot-starter-actor icon indicating copy to clipboard operation
spring-boot-starter-actor copied to clipboard

Add Pekko Streams integration examples

Open Copilot opened this issue 5 months ago • 2 comments

Description

Adds production-ready examples showing Pekko Streams integration with Spring actors. Uses Pekko Streams API directly without reimplementation, following roadmap guidance to avoid 6-8 weeks of unnecessary wrapper development.

Changes Made

Module Structure

  • example/streams/ - New example module with Gradle configuration
  • Added to settings.gradle.kts
  • Includes Pekko Streams dependency (pekko-stream_3)

Examples (8 classes, ~1,900 LOC)

  • FileProcessingExample - Line-by-line file processing with automatic backpressure
  • DataPipelineExample - Multi-stage transformations, batching, throughput monitoring
  • ActorSourceExample - Polling pattern, queue-backed sources, bounded sources
  • ActorSinkExample - Tell (fire-and-forget) and ask (backpressure) patterns
  • BackpressureExample - Buffer strategies, overflow handling, parallelism control
  • ThrottlingExample - Fixed rate, burst handling, API rate limiting patterns
  • DataProcessorActor - Reusable actor for demonstrations

Integration Patterns

// Actor processing in stream with backpressure
actorSystem.getOrSpawn(DataProcessorActor.class, "processor")
    .thenCompose(processor -> 
        FileIO.fromPath(Paths.get(inputPath))
            .via(Framing.delimiter(ByteString.fromString("\n"), 1024))
            .map(ByteString::utf8String)
            .mapAsync(10, line -> processor
                .ask(new ProcessData(line))
                .withTimeout(Duration.ofSeconds(5))
                .execute())
            .runWith(FileIO.toPath(Paths.get(outputPath)), actorSystem.getRaw()));

// Throttling for API rate limits
Source.from(data)
    .throttle(callsPerSecond, Duration.ofSeconds(1), burst, ThrottleMode.shaping())
    .mapAsync(3, item -> processor.ask(new ProcessData(item)).execute())
    .runWith(Sink.ignore(), actorSystem.getRaw());

Documentation

  • Comprehensive README with usage patterns, best practices, troubleshooting
  • Configuration examples in application.yml for materializer, throttling, actor processing

Additional Context

Demonstrates Pekko's built-in backpressure and throttling rather than reimplementing. All examples compile, follow project code style with Spotless formatting applied.

Original prompt

You are tasked with creating Pekko Streams integration examples for the spring-boot-starter-actor project.

REPOSITORY INFORMATION:

  • Branch: main (you MUST work on main branch)
  • Directory: roadmap/2-streams/

PRIORITY: MEDIUM (Examples only - DO NOT reimplement Pekko Streams) EFFORT: 2-3 weeks

OBJECTIVE: Create examples showing how to use Pekko Streams with Spring actors. DO NOT create a custom stream builder API - leverage Pekko Streams as-is.

CRITICAL WARNING: DO NOT reimplement Pekko Streams. This would waste 6-8 weeks and create massive maintenance burden. Focus on examples and integration patterns only.

TASKS:

  1. Read roadmap/2-streams/README.md thoroughly
  2. Read roadmap/2-streams/tasks/TASK_PRIORITY.md for task breakdown
  3. Create example module in examples/streams/ showing actor-stream integration
  4. Document how to consume from streams → process in actors → publish results
  5. Provide examples for backpressure handling using Pekko's built-in features
  6. Show how to use Pekko's throttling with Spring Boot configuration
  7. Create ready-to-use examples for common use cases:
    • File processing pipeline
    • Data transformation pipeline
    • Actor as stream source
    • Actor as stream sink
  8. Optional: If examples prove too complex, create minimal wrapper for configuration only

WHAT NOT TO DO:

  • DO NOT reimplement Pekko Streams (6-8 weeks wasted effort)
  • DO NOT create custom stream operators
  • DO NOT build a fluent builder API that hides Pekko Streams
  • DO NOT create library-level stream abstractions

DELIVERABLES: Create example module in examples/streams/ with:

  • build.gradle.kts - Project configuration
  • README.md - Overview and running instructions
  • FileProcessingExample.java - Show file processing with actors
  • DataPipelineExample.java - Show data transformation pipeline
  • ActorSourceExample.java - Show actor as stream source
  • ActorSinkExample.java - Show actor as stream sink
  • BackpressureExample.java - Demonstrate backpressure handling
  • ThrottlingExample.java - Show throttling configuration
  • application.yml - Configuration examples
  • Complete documentation for each example

SUCCESS CRITERIA:

  • Users can easily integrate Pekko Streams with actors using examples
  • Common patterns are documented with working code
  • No unnecessary abstractions or reimplementation
  • Examples are production-ready and well-documented

IMPLEMENTATION APPROACH: Show how to use Pekko Streams directly with Spring actors. Focus on integration points rather than wrapping the entire Pekko Streams API.

START BY:

  1. Checking out the main branch
  2. Reading the roadmap files mentioned above
  3. Creating the examples/streams/ directory structure
  4. Implementing simple examples first, then more complex ones

✨ Let Copilot coding agent set things up for you — coding agent works faster and does higher quality work when set up for your repo.

Copilot avatar Nov 08 '25 21:11 Copilot

@copilot I love your way of integrating streams. Do you think it's a better design choice to integrate pekko streams with our library, or let users integrate their own just as you implemented ?

seonWKim avatar Nov 08 '25 22:11 seonWKim

@copilot I love your way of integrating streams. Do you think it's a better design choice to integrate pekko streams with our library, or let users integrate their own just as you implemented ?

Great question! Based on the roadmap analysis and implementation experience, letting users integrate Pekko Streams directly (as shown in the examples) is the better design choice. Here's why:

Advantages of the Current Approach:

  1. Avoid Maintenance Burden - Pekko Streams is complex and battle-tested. Wrapping it means maintaining compatibility across Pekko versions and debugging issues in your wrapper layer.

  2. Lower Learning Curve - Developers already familiar with Pekko Streams can use their existing knowledge. They don't need to learn a new abstraction layer.

  3. No Feature Lag - Users get immediate access to all Pekko Streams features. With a wrapper, you'd need to expose every feature users might need, creating ongoing development work.

  4. Better Documentation - Users can reference official Pekko documentation directly. The examples show integration points without hiding the underlying API.

When a Thin Wrapper Would Make Sense:

If feedback shows users consistently struggling with the same integration patterns, consider a minimal helper layer:

// Example: Just convenience methods, not a new API
public class StreamActorIntegration {
    public static <T, R> Flow<T, R, NotUsed> askFlow(
        SpringActorSystem system,
        Class<? extends SpringActor<R>> actorClass,
        Function<T, AskCommand<R>> messageFactory,
        int parallelism,
        Duration timeout) {
        // Encapsulates the common getOrSpawn + mapAsync + ask pattern
    }
}

Recommendation: Start with the examples (as implemented). Collect user feedback over 2-3 months. Only add a thin convenience layer if clear patterns emerge that would benefit from extraction. This approach saved 6-8 weeks of development time while delivering immediate value.

Copilot avatar Nov 08 '25 22:11 Copilot