parallel-consumer
parallel-consumer copied to clipboard
Question: Is it possible to produce events using reactor?
Hi, I am working with ReactorProcessor and I would like to produce events. I tried the following code, but it did not write any new events into output topic.
Example code:
lateinit var pConsumer: ReactorProcessor<String, JsonObject>
private fun createReactParallelConsumer(): ReactorProcessor<String, JsonObject> {
val producer: Producer<String, JsonObject> = KafkaProducerBuilder.getProducer(kafkaConsumerConfig)
val options = ParallelConsumerOptions.builder<String, JsonObject>()
.ordering(ParallelConsumerOptions.ProcessingOrder.KEY)
.maxConcurrency(parallelConsumerConfig.maxConcurrency)
// .batchSize(parallelConsumerConfig.batchSize)
.consumer(buildConsumer(kafkaConsumerConfig))
.producer(producer)
.build()
return ReactorProcessor(options)
pConsumer.react { context ->
val event = context.singleConsumerRecord
// do something with event
val result = ProducerRecord<String, JsonObject>("output", event.key(),
JsonObject(mapOf("someTest" to event.offset())))
Mono.just(result)
}
What am i missing?
Hi and welcome to the project! The reactor module doesn't have the produce flow implemented. However, you can produce a record directly with the producer. However you should consider if you want to wait for positive ack from the broker first or not. If doing a blocking send, you should use a blocking thread pool from Reactor to do it.
To make this more obvious, I could have the reactor module check the return result of the user function, and either throw an exception or log a warning, if a ProducerRecord is returned from your React function. Thoughts?
With regards to sending, the core module does send in bulk, an wait in bulk for send acks. However it still blocks while doing so. See this upcoming PR for fully async non blocking rec ack in the core module:
- #356
Curious - what sort of load do you have, that you want to use Reactor? Or is it just to be more efficient with resources?
Hi,
First of all, thanks for this great library. We are using Reactor since we are writing to S3 and RDS and we want to do it in an async way without blocking, to achieve better usage of resources.
Currently, we are working on implementing the producer logic. Once our processor tasks are finished in an async way (we are not blocking) we produce events with callbacks that "on complete" update "computableFuture"s.
We return at the end of the function a Mono.fromFuture(of the Computable future for those producer results). We assume that the library will only commit after the callback will finish successfully.
Does that make sense?
BTW the reason I opened this issue was that I saw the producer in the reactor example
My pleasure! :)
Yup! Sounds great! And yes, that's right. You can see in the Reactor adapter, that the hooks for commit don't register until the Future completes.
You could probably do a PR to do basically that and have it inside pc.
If you could paste your code, I can take a look to see if it lines up with what I'm imagining.
BTW the reason I opened this issue was that I saw the producer in the reactor example
Link didn't work, but the reactor example - yeah it's not the greatest, but the item it returns is just a dummy string. It doesn't try to send a record back to PC. Am I missing something?
PR is too much for me now, Our code is written in kotlin and obviously I can not share it, but I converted the relevant parts to java. For what it worth here is an example code of what we did:
/**
*
* // Example usage
parallelConsumer.react(context -> {
var consumerRecord = context.getSingleRecord().getConsumerRecord();
log.info("Concurrently constructing and returning RequestInfo from record: {}", consumerRecord);
Map<String, String> params = UniMaps.of("recordKey", consumerRecord.key(), "payload", consumerRecord.value());
Mono originalResult = Mono.just(Arrays.asList(new ProducerRecord("topic", "key", "some value"));
return originalResult.map(batchProducer::produce);
});
*/
class BatchProducer<K, V> {
Producer<K, V> producer;
public BatchProducer(Producer<K, V> producer) {
this.producer = producer;
}
public Mono<List<RecordMetadata>> produce(List<ProducerRecord<K, V>> messages) {
List<CompletableFuture<RecordMetadata>> futures = messages.stream().map(message -> {
CompletableFuture<RecordMetadata> completableFuture = new CompletableFuture<RecordMetadata>();
Callback kafkaCallback = createCallback(completableFuture);
producer.send(message, kafkaCallback);
return completableFuture;
}).toList();
CompletableFuture<List<RecordMetadata>> oneResult = sequence(futures);
return Mono.fromFuture(oneResult);
}
// From here: https://stackoverflow.com/questions/30025428/convert-from-listcompletablefuture-to-completablefuturelist
static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
return CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> com.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
}
private Callback createCallback(CompletableFuture<RecordMetadata> completableFuture) {
return new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
completableFuture.completeExceptionally(exception);
} else {
completableFuture.complete(metadata);
}
}
};
}
public void close() {
producer.close();
}
}
Hi again,
Following adding a PR to contribute to the project, I tried to download and compile it locally, and it failed, the reason it fails as far as I understand is due to maven repositories that are not publicly available and also jars that are not available in maven central.
Is there a guide on how to install locally?
Is there something I should do before running mvn clean compile?
Is there a workaround?
TG isn't in maven central, but it is in a public repo, and the repo is added in the pom. Should be fine - unless your corporate proxy setup blocks other repos?
It's on my list to add TG to central, but it's not a priority.
Is there a guide on how to install locally?
shouldn't be needed
Is there something I should do before running mvn clean compile?
no
Is there a workaround?
to what?
I'm on the confluent community slack if you want to chat in real time FYI