Discarding Corrupt Message due to apparent decompression failure
Expected behavior
Expect more logs to help debug what is going on with decompression
Actual behavior
We get the message
{"log":{"consumerID":95,"level":"ERROR",
"msg":"Discarding corrupted message","msgID":{"entryId":1792,"ledgerId":758404,"partition":-1},"name":"","subscription":"reader-czjug","time":"2025-02-12T10:46:13.616639169Z",
"topic":"persistent://XXXX/wav/084b74e6-f4c6-4ff8-9bff-d35370e6a77b",
"validationError":1},"stream":"stdout","timestamp":1739357173616}
which looks like it is coming from here https://github.com/apache/pulsar-client-go/blob/4e71a47a4f6174f883905e740ba35f2498494ed1/pulsar/consumer_partition.go#L2172-L2182
validationError:1 appears to be a decompression error
https://github.com/apache/pulsar-client-go/blob/4e71a47a4f6174f883905e740ba35f2498494ed1/pulsar/internal/pulsar_proto/PulsarApi.pb.go#L996
which means it is coming from here I guess? https://github.com/apache/pulsar-client-go/blob/4e71a47a4f6174f883905e740ba35f2498494ed1/pulsar/consumer_partition.go#L1228-L1232
and given no other log lines then it suggests a cause is this https://github.com/apache/pulsar-client-go/blob/4e71a47a4f6174f883905e740ba35f2498494ed1/pulsar/consumer_partition.go#L2148-L2151
Steps to reproduce
Sorry I don't have reproduction steps but maybe we could add some logging here? https://github.com/apache/pulsar-client-go/blob/4e71a47a4f6174f883905e740ba35f2498494ed1/pulsar/consumer_partition.go#L2148-L2151
System configuration
Pulsar version: 2.11.3 and 3.3.2 (we upgraded brokers and still seeing same issue)
Pulsar Golang client: 0.14.0
The consumer would print the error log here: https://github.com/apache/pulsar-client-go/blob/024e230d32fc6a0d5586b8c8bc6b34a208b9e9d0/pulsar/internal/connection.go#L765-L770
Could you check this log? Here is an example:
time="2025-02-20T18:29:28+08:00" level=error msg="handle message Id: ledgerId:97 entryId:2 partition:-1" consumerID=1 error="fake decompress error" local_addr="127.0.0.1:59035" remote_addr="pulsar://localhost:6650"
Thanks @RobertIndie yes I see a number of them all with the same message
{"log":{"consumerID":76,
"error":"unexpected EOF","level":"ERROR","local_addr":{"IP":"10.108.35.29","Port":51180,"Zone":""},
"msg":"handle message Id: ledgerId:60350 entryId:361 partition:-1",
"remote_addr":{"ForceQuery":false,"Fragment":"","Host":"platform-pulsar-broker-0.platform-pulsar-broker.t-bt.svc.cluster.local:6650","OmitHost":false,"Opaque":"","Path":"","RawFragment":"","RawPath":"","RawQuery":"","Scheme":"pulsar","User":null},"time":"2025-02-20T13:46:26.095528909Z"},"stream":"stdout","timestamp":1740059186095}
Sorry for the late reply. Seems I misunderstood and thought it was just a logging issue.
{"log":{"consumerID":76, "error":"unexpected EOF","level":"ERROR","local_addr":{"IP":"10.108.35.29","Port":51180,"Zone":""}, "msg":"handle message Id: ledgerId:60350 entryId:361 partition:-1", "remote_addr":{"ForceQuery":false,"Fragment":"","Host":"platform-pulsar-broker-0.platform-pulsar-broker.t-bt.svc.cluster.local:6650","OmitHost":false,"Opaque":"","Path":"","RawFragment":"","RawPath":"","RawQuery":"","Scheme":"pulsar","User":null},"time":"2025-02-20T13:46:26.095528909Z"},"stream":"stdout","timestamp":1740059186095}
Which compression algorithm were you using? Could you try using another client, like a Java client, to consume that topic if the issue happens again? This will help us determine if the message is actually corrupted or if there's a problem with the Go client's decompression. How often does this issue occur? Does it only affect a few messages within a topic?
@RobertIndie we are using CompressionType.LZ4
We have other services consuming the same data that are in Java and they do not report any errors. We gauge it occurs roughly 2% of our audio workflows. The downside is we are streaming binary data (audio) so the corruption of any one packet disrupts the entire stream.
we are using CompressionType.LZ4
That's strange. The LZ4 decompression doesn't seem to use the reader, so it shouldn't cause the EOF error. Maybe I missed something. However, I noticed that version v2.0.5 of LZ4 is outdated, so I submitted a PR to upgrade it to v4: https://github.com/apache/pulsar-client-go/pull/1341. The new version of LZ4 has resolved many EOF errors, but I'm not sure if it addresses this issue.
I also submitted another PR to improve the decompression error message to include more useful information: https://github.com/apache/pulsar-client-go/pull/1342.
After mering this PR, we can try it again and hope we can get more helpful info.
Awesome - thank you @RobertIndie !!!
@nodece would it be possible to include these two PRs (or at least the logging one) in the 0.15.0 release or will it be in a subsequent release?
https://github.com/apache/pulsar-client-go/releases/tag/v0.15.0-candidate-1
@frankjkelly It has been a long time since the last release candidate was submitted for voting, but no PMC members have participated in the vote. As a result, I will no longer proceed with the release process.
If you want to use the latest version, please use the following command:
go get github.com/apache/pulsar-client-go@master
@nodece Sorry to hear about the release. My company has a contract with StreamNative so perhaps I can create an ask of the organization and see if that will help move the release along (with or without the changes).
@frankjkelly The StreamNative has multiple PMC members, it will be helpful.
@nodece I can help move the release forward. Or would you like to hand it over to me for 0.15.0 release?