silverback
silverback copied to clipboard
Using ApplyTo on SkipMessageErrorPolicy
Is it possible to apply rules on Skip ?
public void Configure(IEndpointsConfigurationBuilder builder)
{
builder
.AddKafkaEndpoints(
endpoints => endpoints
// Configure the properties needed by all consumers/producers
.Configure(
config =>
{
// The bootstrap server address is needed to connect
config.BootstrapServers = _configuration.GetValue<string>("Kafka:BootstrapServers");
})
.AddInbound<SampleMessage>(
endpoint => endpoint
.ConsumeFrom("samples-basic")
.Configure(config =>
{
config.GroupId = "WebApi1";
})
.OnError(policy => policy.Skip(opts => opts.ApplyTo<ArgumentException>()))
)
.AddOutbound<SampleMessage>(
endpoint => endpoint
.ProduceTo("samples-basic")));
}
When de exception occurs the consumer stops:
[19:02:04 INF] Processing inbound message. | endpointName: samples-basic, messageType: WebApi.SampleMessage, WebApi, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null, messageId: 916776ce-85db-4729-b882-0ab48ac8479e, offset: [0]@1, kafkaKey: 916776ce-85db-4729-b882-0ab48ac8479e
[19:02:04 INF] Received 69
[19:02:05 ERR] Error occurred processing the inbound message. | endpointName: samples-basic, messageType: WebApi.SampleMessage, WebApi, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null, messageId: 916776ce-85db-4729-b882-0ab48ac8479e, offset: [0]@1, kafkaKey: 916776ce-85db-4729-b882-0ab48ac8479e
System.Reflection.TargetInvocationException: Exception has been thrown by the target of an invocation.
---> System.ArgumentException: Value does not fall within the expected range.
at WebApi.SampleMessageSubscriber.OnMessageReceived(SampleMessage message) in C:\repos\dev.azure.com\superdigital\arq-opentelemetry\samples\WebApi\SampleMessageSubscriber.cs:line 18
--- End of inner exception stack trace ---
at System.RuntimeMethodHandle.InvokeMethod(Object target, Span`1& arguments, Signature sig, Boolean constructor, Boolean wrapExceptions)
at System.Reflection.RuntimeMethodInfo.Invoke(Object obj, BindingFlags invokeAttr, Binder binder, Object[] parameters, CultureInfo culture)
...
[19:02:05 FTL] Fatal error occurred processing the consumed message. The consumer will be stopped. | endpointName: samples-basic, messageType: WebApi.SampleMessage, WebApi, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null, messageId: 916776ce-85db-4729-b882-0ab48ac8479e, offset: [0]@1, kafkaKey: 916776ce-85db-4729-b882-0ab48ac8479e
As per the log, the ArgumentException is wrapped in a TargetInvokationException so you'd need to check the inner exception instead (using ApplyWhen).
I know it's not ideal but I noticed just recently that the exceptions thrown by sync subscribers (not returning Task) aren't unwrapped correctly. It's possible that I will fix this in the next major release.
Thanks! @BEagle1984. The subscriber was just a sample and it was void, I turned that into async Task and it solved ;)
Should I keep this issue open ?
Yes, let's keep this open. I'll fix it for the sync methods too eventually. 👍