Add Pekko Streams integration examples
Description
Adds production-ready examples showing Pekko Streams integration with Spring actors. Uses Pekko Streams API directly without reimplementation, following roadmap guidance to avoid 6-8 weeks of unnecessary wrapper development.
Changes Made
Module Structure
-
example/streams/- New example module with Gradle configuration - Added to
settings.gradle.kts - Includes Pekko Streams dependency (
pekko-stream_3)
Examples (8 classes, ~1,900 LOC)
-
FileProcessingExample- Line-by-line file processing with automatic backpressure -
DataPipelineExample- Multi-stage transformations, batching, throughput monitoring -
ActorSourceExample- Polling pattern, queue-backed sources, bounded sources -
ActorSinkExample- Tell (fire-and-forget) and ask (backpressure) patterns -
BackpressureExample- Buffer strategies, overflow handling, parallelism control -
ThrottlingExample- Fixed rate, burst handling, API rate limiting patterns -
DataProcessorActor- Reusable actor for demonstrations
Integration Patterns
// Actor processing in stream with backpressure
actorSystem.getOrSpawn(DataProcessorActor.class, "processor")
.thenCompose(processor ->
FileIO.fromPath(Paths.get(inputPath))
.via(Framing.delimiter(ByteString.fromString("\n"), 1024))
.map(ByteString::utf8String)
.mapAsync(10, line -> processor
.ask(new ProcessData(line))
.withTimeout(Duration.ofSeconds(5))
.execute())
.runWith(FileIO.toPath(Paths.get(outputPath)), actorSystem.getRaw()));
// Throttling for API rate limits
Source.from(data)
.throttle(callsPerSecond, Duration.ofSeconds(1), burst, ThrottleMode.shaping())
.mapAsync(3, item -> processor.ask(new ProcessData(item)).execute())
.runWith(Sink.ignore(), actorSystem.getRaw());
Documentation
- Comprehensive README with usage patterns, best practices, troubleshooting
- Configuration examples in
application.ymlfor materializer, throttling, actor processing
Additional Context
Demonstrates Pekko's built-in backpressure and throttling rather than reimplementing. All examples compile, follow project code style with Spotless formatting applied.
Original prompt
You are tasked with creating Pekko Streams integration examples for the spring-boot-starter-actor project.
REPOSITORY INFORMATION:
- Branch: main (you MUST work on main branch)
- Directory: roadmap/2-streams/
PRIORITY: MEDIUM (Examples only - DO NOT reimplement Pekko Streams) EFFORT: 2-3 weeks
OBJECTIVE: Create examples showing how to use Pekko Streams with Spring actors. DO NOT create a custom stream builder API - leverage Pekko Streams as-is.
CRITICAL WARNING: DO NOT reimplement Pekko Streams. This would waste 6-8 weeks and create massive maintenance burden. Focus on examples and integration patterns only.
TASKS:
- Read roadmap/2-streams/README.md thoroughly
- Read roadmap/2-streams/tasks/TASK_PRIORITY.md for task breakdown
- Create example module in examples/streams/ showing actor-stream integration
- Document how to consume from streams → process in actors → publish results
- Provide examples for backpressure handling using Pekko's built-in features
- Show how to use Pekko's throttling with Spring Boot configuration
- Create ready-to-use examples for common use cases:
- File processing pipeline
- Data transformation pipeline
- Actor as stream source
- Actor as stream sink
- Optional: If examples prove too complex, create minimal wrapper for configuration only
WHAT NOT TO DO:
- DO NOT reimplement Pekko Streams (6-8 weeks wasted effort)
- DO NOT create custom stream operators
- DO NOT build a fluent builder API that hides Pekko Streams
- DO NOT create library-level stream abstractions
DELIVERABLES: Create example module in examples/streams/ with:
- build.gradle.kts - Project configuration
- README.md - Overview and running instructions
- FileProcessingExample.java - Show file processing with actors
- DataPipelineExample.java - Show data transformation pipeline
- ActorSourceExample.java - Show actor as stream source
- ActorSinkExample.java - Show actor as stream sink
- BackpressureExample.java - Demonstrate backpressure handling
- ThrottlingExample.java - Show throttling configuration
- application.yml - Configuration examples
- Complete documentation for each example
SUCCESS CRITERIA:
- Users can easily integrate Pekko Streams with actors using examples
- Common patterns are documented with working code
- No unnecessary abstractions or reimplementation
- Examples are production-ready and well-documented
IMPLEMENTATION APPROACH: Show how to use Pekko Streams directly with Spring actors. Focus on integration points rather than wrapping the entire Pekko Streams API.
START BY:
- Checking out the main branch
- Reading the roadmap files mentioned above
- Creating the examples/streams/ directory structure
- Implementing simple examples first, then more complex ones
✨ Let Copilot coding agent set things up for you — coding agent works faster and does higher quality work when set up for your repo.
@copilot I love your way of integrating streams. Do you think it's a better design choice to integrate pekko streams with our library, or let users integrate their own just as you implemented ?
@copilot I love your way of integrating streams. Do you think it's a better design choice to integrate pekko streams with our library, or let users integrate their own just as you implemented ?
Great question! Based on the roadmap analysis and implementation experience, letting users integrate Pekko Streams directly (as shown in the examples) is the better design choice. Here's why:
Advantages of the Current Approach:
-
Avoid Maintenance Burden - Pekko Streams is complex and battle-tested. Wrapping it means maintaining compatibility across Pekko versions and debugging issues in your wrapper layer.
-
Lower Learning Curve - Developers already familiar with Pekko Streams can use their existing knowledge. They don't need to learn a new abstraction layer.
-
No Feature Lag - Users get immediate access to all Pekko Streams features. With a wrapper, you'd need to expose every feature users might need, creating ongoing development work.
-
Better Documentation - Users can reference official Pekko documentation directly. The examples show integration points without hiding the underlying API.
When a Thin Wrapper Would Make Sense:
If feedback shows users consistently struggling with the same integration patterns, consider a minimal helper layer:
// Example: Just convenience methods, not a new API
public class StreamActorIntegration {
public static <T, R> Flow<T, R, NotUsed> askFlow(
SpringActorSystem system,
Class<? extends SpringActor<R>> actorClass,
Function<T, AskCommand<R>> messageFactory,
int parallelism,
Duration timeout) {
// Encapsulates the common getOrSpawn + mapAsync + ask pattern
}
}
Recommendation: Start with the examples (as implemented). Collect user feedback over 2-3 months. Only add a thin convenience layer if clear patterns emerge that would benefit from extraction. This approach saved 6-8 weeks of development time while delivering immediate value.