parallel-consumer icon indicating copy to clipboard operation
parallel-consumer copied to clipboard

Question: Is it possible to produce events using reactor?

Open Ehud-Lev-Forter opened this issue 3 years ago • 7 comments
trafficstars

Hi, I am working with ReactorProcessor and I would like to produce events. I tried the following code, but it did not write any new events into output topic. Example code:

lateinit var pConsumer: ReactorProcessor<String, JsonObject>

    private fun createReactParallelConsumer(): ReactorProcessor<String, JsonObject> {
        val producer: Producer<String, JsonObject> = KafkaProducerBuilder.getProducer(kafkaConsumerConfig)
        val options = ParallelConsumerOptions.builder<String, JsonObject>()
            .ordering(ParallelConsumerOptions.ProcessingOrder.KEY)
            .maxConcurrency(parallelConsumerConfig.maxConcurrency)
//            .batchSize(parallelConsumerConfig.batchSize)
            .consumer(buildConsumer(kafkaConsumerConfig))
            .producer(producer)
            .build()
        return ReactorProcessor(options)
      pConsumer.react { context ->
            val event = context.singleConsumerRecord
            // do something with event
            val result = ProducerRecord<String, JsonObject>("output", event.key(),
                JsonObject(mapOf("someTest" to event.offset())))
            Mono.just(result)
        }

What am i missing?

Ehud-Lev-Forter avatar Nov 14 '22 09:11 Ehud-Lev-Forter

Hi and welcome to the project! The reactor module doesn't have the produce flow implemented. However, you can produce a record directly with the producer. However you should consider if you want to wait for positive ack from the broker first or not. If doing a blocking send, you should use a blocking thread pool from Reactor to do it.

To make this more obvious, I could have the reactor module check the return result of the user function, and either throw an exception or log a warning, if a ProducerRecord is returned from your React function. Thoughts?

With regards to sending, the core module does send in bulk, an wait in bulk for send acks. However it still blocks while doing so. See this upcoming PR for fully async non blocking rec ack in the core module:

  • #356

Curious - what sort of load do you have, that you want to use Reactor? Or is it just to be more efficient with resources?

astubbs avatar Nov 17 '22 11:11 astubbs

Hi,  First of all, thanks for this great library. We are using Reactor since we are writing to S3 and RDS and we want to do it in an async way without blocking, to achieve better usage of resources. Currently, we are working on implementing the producer logic. Once our processor tasks are finished in an async way (we are not blocking) we produce events with callbacks that "on complete" update "computableFuture"s.  We return at the end of the function a Mono.fromFuture(of the Computable future for those producer results). We assume that the library will only commit after the callback will finish successfully.  Does that make sense? BTW the reason I opened this issue was that I saw the producer in the reactor example

Ehud-Lev-Forter avatar Nov 17 '22 14:11 Ehud-Lev-Forter

My pleasure! :)

Yup! Sounds great! And yes, that's right. You can see in the Reactor adapter, that the hooks for commit don't register until the Future completes.

You could probably do a PR to do basically that and have it inside pc.

If you could paste your code, I can take a look to see if it lines up with what I'm imagining.

BTW the reason I opened this issue was that I saw the producer in the reactor example

Link didn't work, but the reactor example - yeah it's not the greatest, but the item it returns is just a dummy string. It doesn't try to send a record back to PC. Am I missing something?

astubbs avatar Nov 17 '22 15:11 astubbs

PR is too much for me now, Our code is written in kotlin and obviously I can not share it, but I converted the relevant parts to java. For what it worth here is an example code of what we did:

    /**
     *
     * // Example usage
     parallelConsumer.react(context -> {
         var consumerRecord = context.getSingleRecord().getConsumerRecord();
         log.info("Concurrently constructing and returning RequestInfo from record: {}", consumerRecord);
         Map<String, String> params = UniMaps.of("recordKey", consumerRecord.key(), "payload", consumerRecord.value());
         Mono originalResult =  Mono.just(Arrays.asList(new ProducerRecord("topic", "key", "some value"));
         return originalResult.map(batchProducer::produce);
     });
     */
class BatchProducer<K, V> {
        Producer<K, V> producer;

        public BatchProducer(Producer<K, V> producer) {
            this.producer = producer;
        }

        public Mono<List<RecordMetadata>> produce(List<ProducerRecord<K, V>> messages) {
            List<CompletableFuture<RecordMetadata>> futures = messages.stream().map(message -> {
                CompletableFuture<RecordMetadata> completableFuture = new CompletableFuture<RecordMetadata>();
                Callback kafkaCallback = createCallback(completableFuture);
                producer.send(message, kafkaCallback);
                return completableFuture;
            }).toList();
            CompletableFuture<List<RecordMetadata>> oneResult = sequence(futures);
            return Mono.fromFuture(oneResult);
        }

        // From here: https://stackoverflow.com/questions/30025428/convert-from-listcompletablefuture-to-completablefuturelist
        static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
            return CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
                    .thenApply(v -> com.stream()
                            .map(CompletableFuture::join)
                            .collect(Collectors.toList())
                    );
        }

        private Callback createCallback(CompletableFuture<RecordMetadata> completableFuture) {
            return new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        completableFuture.completeExceptionally(exception);
                    } else {
                        completableFuture.complete(metadata);
                    }
                }

            };
        }

        public void close() {
            producer.close();
        }

    }

Ehud-Lev-Forter avatar Nov 17 '22 20:11 Ehud-Lev-Forter

Hi again, Following adding a PR to contribute to the project, I tried to download and compile it locally, and it failed, the reason it fails as far as I understand is due to maven repositories that are not publicly available and also jars that are not available in maven central. Is there a guide on how to install locally? Is there something I should do before running mvn clean compile? Is there a workaround?

Ehud-Lev-Forter avatar Dec 04 '22 05:12 Ehud-Lev-Forter

TG isn't in maven central, but it is in a public repo, and the repo is added in the pom. Should be fine - unless your corporate proxy setup blocks other repos?

It's on my list to add TG to central, but it's not a priority.

Is there a guide on how to install locally?

shouldn't be needed

Is there something I should do before running mvn clean compile?

no

Is there a workaround?

to what?

astubbs avatar Dec 05 '22 10:12 astubbs

I'm on the confluent community slack if you want to chat in real time FYI

astubbs avatar Dec 05 '22 10:12 astubbs