pekko
pekko copied to clipboard
Feature request: Add Source#create method
Motivation:
Even currently, we have Source#unfold and Source#queue, which is still a bit different from the Reactor's Flux#create method.
When we want to control progress, we still make it lazy. btw, the behavior seems to be expressed with Source#lazySource and Source#queue.
I think the logic should not be that hard to implement but will make users' lives much easier.
refs: https://projectreactor.io/docs/core/release/reference/coreFeatures/programmatically-creating-sequence.html#producing.create
Flux<String> bridge = Flux.create(sink -> {
myEventProcessor.register(
new MyEventListener<String>() {
public void onDataChunk(List<String> chunk) {
for(String s : chunk) {
sink.next(s);
}
}
public void processComplete() {
sink.complete();
}
});
});
Source.create[String](1024)(sink => {
sink.offer("a")
sink.offer("b")
sink.complete()
})
.toMat(TestSink.probe[String])(Keep.right)
.run()
.ensureSubscription()
.request(2)
.expectNext("a", "b")
.expectComplete()
I implemented it with Source.queue , @pjfanning wdyt, which will make Java dsl much nicer, same as Flux#create