spring-cloud-stream-samples
spring-cloud-stream-samples copied to clipboard
Example for Kafka Streams branching with Flux
Can you please add an example how to use branching in a reactive way?
Predicate<String, Flux<Review>> testPredicate = (k, v) -> ???
@Bean
@SuppressWarnings("unchecked")
public Function<KStream<String, Value>, KStream<String, Review>[]> map() {
return reviews -> reviews.branch(testPredicate);
}
@benkeil Not sure what you mean here. Could you elaborate on the use case? For one thing, you cannot use reactive types with Kafka Streams functions. On the other hand, I don't see the need to use reactive types in the code above since you are operating on KStream and calling it's branch method. The branch method in KStream takes a predicate, but that is specific to that API. See the details here.
The predicate needs to return a boolean and I don't know how to write it in a way that the flux returns a boolean.