spring-cloud-stream-samples icon indicating copy to clipboard operation
spring-cloud-stream-samples copied to clipboard

Example for Kafka Streams branching with Flux

Open benkeil opened this issue 4 years ago • 2 comments

Can you please add an example how to use branching in a reactive way?

Predicate<String, Flux<Review>> testPredicate = (k, v) -> ???

@Bean
@SuppressWarnings("unchecked")
public Function<KStream<String, Value>, KStream<String, Review>[]> map() {
    return reviews -> reviews.branch(testPredicate);
}

benkeil avatar Feb 11 '21 13:02 benkeil

@benkeil Not sure what you mean here. Could you elaborate on the use case? For one thing, you cannot use reactive types with Kafka Streams functions. On the other hand, I don't see the need to use reactive types in the code above since you are operating on KStream and calling it's branch method. The branch method in KStream takes a predicate, but that is specific to that API. See the details here.

sobychacko avatar Mar 01 '21 19:03 sobychacko

The predicate needs to return a boolean and I don't know how to write it in a way that the flux returns a boolean.

benkeil avatar Mar 06 '21 13:03 benkeil