azure-functions-kafka-extension
azure-functions-kafka-extension copied to clipboard
Distributed Tracing and kafka trigger
Hello, I am trying to use the distributed tracing improvement in kafka trigger created in this PR : https://github.com/Azure/azure-functions-kafka-extension/pull/396 My testing setup is the following :
Broker : kafka through confluent cloud
Authent SASl_SSL PLAIN
This is a sample code I use :
[FunctionName("ConsumerCCUnit")] public async Task ConsumeCCUnit( [KafkaTrigger( "%kafka_broker_endpoint_cc%", "%kafka_broker_topic_name_cc%", ConsumerGroup = "%kafka_consumer_consumer_group_cc%", Protocol = BrokerProtocol.SaslSsl, AuthenticationMode = BrokerAuthenticationMode.Plain, Username ="%kafka_username_cc%", Password ="%kafka_password_cc%" )] KafkaEventData<string> events, ILogger log, CancellationToken cancellationToken) { log.LogInformation($"C# Kafka trigger function processed a message: {events.Value}"); var foundTraceParent = events.Headers.TryGetFirst("traceparent", out var traceparentInBytes); if (foundTraceParent) { var traceparent = Encoding.UTF8.GetString(traceparentInBytes); log.LogInformation($"Traceparent Header for the event: {traceparent}"); } var activity = Activity.Current; Console.WriteLine("Trace Id of current Activity: " + activity.TraceId); Console.WriteLine("Parent Id of current Activity: " + activity.ParentId); }
I send a message with the traceparent header using kafka cat in a docker container :
sh-4.4$ kafkacat -P -b azure.confluent.cloud:9092 -X security.protocol=SASL_SSL -X sasl.mechanism=PLAIN -X sasl.username=user -X sasl.password=password -t topic "traceparent=00-b39c2de26892485f15dffc3e4a259ef9-ae54938c1ce7d212-00" hello world with traceparent
I can see the message in confluent cloud with the correct header
the result of the following code is :
[2023-10-18T12:27:42.266Z] Executing 'ConsumerCCUnit' (Reason='(null)', Id=b1cc4847-8a48-4cd1-9c49-482d5c173d1b) [2023-10-18T12:27:42.345Z] C# Kafka trigger function processed a message: hello world with traceparent [2023-10-18T12:27:42.363Z] Traceparent Header for the event: 00-b39c2de26892485f15dffc3e4a259ef9-ae54938c1ce7d212-00 Trace Id of current Activity: 38b938aa9140775ad3c83ffc9ebb373d
as you can see, the Trace ID of current activity is 38b938aa9140775ad3c83ffc9ebb373d but the trace Id in the traceparent header is b39c2de26892485f15dffc3e4a259ef9
here are my dependancy (to show i am using version 3.9.0 of kafka extension) :
<PackageReference Include="LplCloud.Logging" Version="1.0.8214" /> <PackageReference Include="Microsoft.Azure.Functions.Extensions" Version="1.1.0" /> <PackageReference Include="Microsoft.Azure.KeyVault" Version="3.0.5" /> <PackageReference Include="Microsoft.Azure.Services.AppAuthentication" Version="1.6.2" /> <PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Kafka" Version="3.9.0" /> <PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Storage" Version="5.1.3" /> <PackageReference Include="Microsoft.NET.Sdk.Functions" Version="4.2.0" /> <PackageReference Include="Refit.HttpClientFactory" Version="6.3.2" />