spring-cloud-sleuth
spring-cloud-sleuth copied to clipboard
Spring Cloud 2021.0.1 breaks the order kafka message consumption when spring sleuth function is enabled.
Describe the bug When upgrading from Spring Cloud 2020.0.4 to Spring Cloud 2021.0.1 this breaks the ordering of Kafka messages when using flatMapSequential. The reason I think this is Spring Cloud Sleuth Issue is because if I set spring.sleuth.function.enabled=false then the problem goes away.
I have a sample detailed below which outputs offsets.
Spring Cloud 2021.0.1 :out of order kafka messages:
kafka_offset=0
kafka_offset=4
kafka_offset=65
kafka_offset=1
kafka_offset=2
kafka_offset=3
Spring Cloud 2020.0.4 OR spring.sleuth.function.enabled=false :in of order kafka messages:
kafka_offset=0
kafka_offset=1
kafka_offset=2
kafka_offset=3
kafka_offset=4
kafka_offset=5
This is an issue when using flatMapSequential:
@Bean
public Function<Flux<Message<Map<String, BigDecimal>>>, Mono<Void>> fxRates() {
return events -> events.flatMap(event -> {
System.out.println("kafka_offset="+event.getHeaders().get("kafka_offset"));
return Mono
.defer(() -> Mono.just("A thing"))
.delayElement(Duration.ofMillis(10)) // The delay hands off to another thread, easy WebClient simulation
.then();
},1).then();
Sample https://github.com/davidmelia/spring-boot-webflux-sink-only
- Create the Kafka Topic: kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
- Run KafkaTopicPopulator which will populate this topic with 100 events
- Start the app
- You will see the offsets ordering is out of sync in the logs
- Either downgrade spring cloud OR set spring.sleuth.function.enabled=false and ordering is correct
Most likely we're messing up with the flux in the Sleuth Stream Flux support. I will try to take a look at that when I have time but feel free to debug this section of Sleuth in the meantime https://github.com/spring-cloud/spring-cloud-sleuth/blob/3.1.x/spring-cloud-sleuth-instrumentation/src/main/java/org/springframework/cloud/sleuth/instrument/messaging/TraceFunctionAroundWrapper.java#L134-L137
@marcingrzejszczak so this might be a terrible solution adding flatMapSequential to the existing reactorFluxStream but I can't see another way as Kafka is sequential:
private Object reactorFluxStream(SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction,
Publisher<Message> messagePublisher) {
if (log.isDebugEnabled()) {
log.debug("Will instrument a stream Flux function");
}
Flux<Message> flux = Flux.from(messagePublisher)
// ensure there are no previous spans
.doOnNext(m -> tracer.withSpan(null))
.map(msg -> this.traceMessageHandler.wrapInputMessage(msg,
inputDestination(targetFunction.getFunctionDefinition())))
.flatMapSequential(msg -> Flux.deferContextual(ctx -> { // TODO: [AS] Bug fix
MessageAndSpansAndScope messageAndSpansAndScope = ctx.get(MessageAndSpansAndScope.class);
messageAndSpansAndScope.messageAndSpans = msg;
messageAndSpansAndScope.span = msg.childSpan;
setNameAndTag(targetFunction, msg.childSpan);
messageAndSpansAndScope.scope = tracer.withSpan(msg.childSpan);
return Mono.just(msg.msg);
}), 1);
if (targetFunction.isConsumer()) {
return targetFunction.apply(reactorStreamConsumer(flux));
}
final Publisher<Message> function = ((Publisher<Message>) targetFunction.apply(flux));
if (function instanceof Mono) {
return messageMono(targetFunction, (Mono<Message>) function);
}
return messageFlux(targetFunction, (Flux<Message>) function);
}
I appreciate Spring has to cover other message systems too. Very tricky problem!!
I wonder how we could approach this more globally. I understand that if we did the flatMapSequential
we can break the non-sequential brokers?
@marcingrzejszczak we couldn't figure it out and had to copy your TraceFunctionPatchAutoConfiguration and augment with the above flatMapSequential hack as are only using Kafka. Appreciated this is a poor solution but it unblocked us moving to boot 2.6.x.
Unless there is something in the context to determine the messaging system where you can change the flow based on this then I can't see how the current change will ever work for Kafka - tough problem.
I think I need to talk to @olegz about this...
Please upgrade to Micrometer Tracing. Spring Cloud Sleuth is feature complete and out of OSS support.