pekko icon indicating copy to clipboard operation
pekko copied to clipboard

Feature request: Add Source#create method

Open He-Pin opened this issue 1 year ago • 1 comments

Motivation: Even currently, we have Source#unfold and Source#queue, which is still a bit different from the Reactor's Flux#create method.

When we want to control progress, we still make it lazy. btw, the behavior seems to be expressed with Source#lazySource and Source#queue.

I think the logic should not be that hard to implement but will make users' lives much easier.

refs: https://projectreactor.io/docs/core/release/reference/coreFeatures/programmatically-creating-sequence.html#producing.create

Flux<String> bridge = Flux.create(sink -> {
    myEventProcessor.register(
      new MyEventListener<String>() {

        public void onDataChunk(List<String> chunk) {
          for(String s : chunk) {
            sink.next(s);
          }
        }

        public void processComplete() {
            sink.complete();
        }
    });
});

He-Pin avatar Dec 23 '24 03:12 He-Pin

      Source.create[String](1024)(sink => {
        sink.offer("a")
        sink.offer("b")
        sink.complete()
      })
        .toMat(TestSink.probe[String])(Keep.right)
        .run()
        .ensureSubscription()
        .request(2)
        .expectNext("a", "b")
        .expectComplete()

I implemented it with Source.queue , @pjfanning wdyt, which will make Java dsl much nicer, same as Flux#create

He-Pin avatar Apr 19 '25 16:04 He-Pin