spring-cloud-sleuth icon indicating copy to clipboard operation
spring-cloud-sleuth copied to clipboard

Spring Cloud 2021.0.1 breaks the order kafka message consumption when spring sleuth function is enabled.

Open davidmelia opened this issue 2 years ago • 5 comments

Describe the bug When upgrading from Spring Cloud 2020.0.4 to Spring Cloud 2021.0.1 this breaks the ordering of Kafka messages when using flatMapSequential. The reason I think this is Spring Cloud Sleuth Issue is because if I set spring.sleuth.function.enabled=false then the problem goes away.

I have a sample detailed below which outputs offsets.

Spring Cloud 2021.0.1 :out of order kafka messages:

kafka_offset=0
kafka_offset=4
kafka_offset=65
kafka_offset=1
kafka_offset=2
kafka_offset=3 

Spring Cloud 2020.0.4 OR spring.sleuth.function.enabled=false :in of order kafka messages:

kafka_offset=0
kafka_offset=1
kafka_offset=2
kafka_offset=3
kafka_offset=4
kafka_offset=5

This is an issue when using flatMapSequential:

 @Bean
  public Function<Flux<Message<Map<String, BigDecimal>>>, Mono<Void>> fxRates() {
    return events -> events.flatMap(event -> {
      System.out.println("kafka_offset="+event.getHeaders().get("kafka_offset"));
      return Mono
          .defer(() -> Mono.just("A thing"))
          .delayElement(Duration.ofMillis(10)) // The delay hands off to another thread, easy WebClient simulation
          .then();
    },1).then();

Sample https://github.com/davidmelia/spring-boot-webflux-sink-only

  1. Create the Kafka Topic: kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
  2. Run KafkaTopicPopulator which will populate this topic with 100 events
  3. Start the app
  4. You will see the offsets ordering is out of sync in the logs
  5. Either downgrade spring cloud OR set spring.sleuth.function.enabled=false and ordering is correct

davidmelia avatar Mar 21 '22 11:03 davidmelia

Most likely we're messing up with the flux in the Sleuth Stream Flux support. I will try to take a look at that when I have time but feel free to debug this section of Sleuth in the meantime https://github.com/spring-cloud/spring-cloud-sleuth/blob/3.1.x/spring-cloud-sleuth-instrumentation/src/main/java/org/springframework/cloud/sleuth/instrument/messaging/TraceFunctionAroundWrapper.java#L134-L137

marcingrzejszczak avatar Mar 21 '22 11:03 marcingrzejszczak

@marcingrzejszczak so this might be a terrible solution adding flatMapSequential to the existing reactorFluxStream but I can't see another way as Kafka is sequential:

private Object reactorFluxStream(SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction,
      Publisher<Message> messagePublisher) {
    if (log.isDebugEnabled()) {
      log.debug("Will instrument a stream Flux function");
    }

    Flux<Message> flux = Flux.from(messagePublisher)
        // ensure there are no previous spans
        .doOnNext(m -> tracer.withSpan(null))
        .map(msg -> this.traceMessageHandler.wrapInputMessage(msg,
            inputDestination(targetFunction.getFunctionDefinition())))
        .flatMapSequential(msg -> Flux.deferContextual(ctx -> { // TODO: [AS] Bug fix
          MessageAndSpansAndScope messageAndSpansAndScope = ctx.get(MessageAndSpansAndScope.class);
          messageAndSpansAndScope.messageAndSpans = msg;
          messageAndSpansAndScope.span = msg.childSpan;
          setNameAndTag(targetFunction, msg.childSpan);
          messageAndSpansAndScope.scope = tracer.withSpan(msg.childSpan);
          return Mono.just(msg.msg);
        }), 1);
    if (targetFunction.isConsumer()) {
      return targetFunction.apply(reactorStreamConsumer(flux));
    }
    final Publisher<Message> function = ((Publisher<Message>) targetFunction.apply(flux));
    if (function instanceof Mono) {
      return messageMono(targetFunction, (Mono<Message>) function);
    }
    return messageFlux(targetFunction, (Flux<Message>) function);
  }

I appreciate Spring has to cover other message systems too. Very tricky problem!!

davidmelia avatar Mar 23 '22 12:03 davidmelia

I wonder how we could approach this more globally. I understand that if we did the flatMapSequential we can break the non-sequential brokers?

marcingrzejszczak avatar Mar 24 '22 10:03 marcingrzejszczak

@marcingrzejszczak we couldn't figure it out and had to copy your TraceFunctionPatchAutoConfiguration and augment with the above flatMapSequential hack as are only using Kafka. Appreciated this is a poor solution but it unblocked us moving to boot 2.6.x.

Unless there is something in the context to determine the messaging system where you can change the flow based on this then I can't see how the current change will ever work for Kafka - tough problem.

davidmelia avatar Apr 27 '22 13:04 davidmelia

I think I need to talk to @olegz about this...

marcingrzejszczak avatar Apr 27 '22 14:04 marcingrzejszczak

Please upgrade to Micrometer Tracing. Spring Cloud Sleuth is feature complete and out of OSS support.

marcingrzejszczak avatar Feb 09 '24 13:02 marcingrzejszczak