pulsar-client-go icon indicating copy to clipboard operation
pulsar-client-go copied to clipboard

pulsar-client-go appears to report message too large before compressing it.

Open flowchartsman opened this issue 3 years ago • 10 comments

I'm getting "message too large" messages on some large JSON documents that, when compressed, are nowhere near the limit.

in producer_partiton.go:

	// if msg is too large
	if len(payload) > int(p.cnx.GetMaxMessageSize()) {
		p.publishSemaphore.Release()
		request.callback(nil, request.msg, errMessageTooLarge)
		p.log.WithError(errMessageTooLarge).
			WithField("size", len(payload)).
			WithField("properties", msg.Properties).
			Error()
		p.metrics.PublishErrorsMsgTooLarge.Inc()
		return
	}

Correct me if I'm wrong, but this appears to check the payload size before any compression.

Whereas in ProducerImpl.java:

int compressedSize = compressedPayload.readableBytes();
            if (compressedSize > ClientCnx.getMaxMessageSize() && !this.conf.isChunkingEnabled()) {
                compressedPayload.release();

flowchartsman avatar Jan 22 '21 19:01 flowchartsman

That's right, the Java and C++ clients are more permissive in that the message go through if it's below the limit after compression. We should also do the same thing in Go.

Keep in mind, though, that it won't change the "guaranteed" max size, since there is guarantee that the compression will actually reduce the size.

merlimat avatar Jan 22 '21 19:01 merlimat

Keep in mind, though, that it won't change the "guaranteed" max size, since there is guarantee that the compression will actually reduce the size.

I am not sure what this means. Of course there is never a guarantee with compression...

flowchartsman avatar Jan 22 '21 19:01 flowchartsman

Yes, if the max limit is 5MB, it will only guaranteed to accept up to 5MB. Above that, it's just "best effort" to get that to fit in with compression.

merlimat avatar Jan 22 '21 20:01 merlimat

Okay, so wait, I’m confused. I guess it might help to discuss the intended purpose of maxmessagesize. If it’s just a knob to adjust cluster performance and storage allocation and such, that’s one thing, but if it is intended as a guarantee that, for example, a consumer can keep a fixed buffer for handing the payload, then this would seem to be correct behavior, am I right? Are the Java and C++ clients actually incorrect in allowing message compression before payload size checking?

flowchartsman avatar Jan 22 '21 20:01 flowchartsman

@flowchartsman as my understanding, Java and C++ are doing the right way. MaxMessageSize can be considered as the buffer size between broker and client. But it also the buffer size between bookkeeper and pulsar. Pulsar and bookkeeper will not de-compress the message, and the decompression is a client side job. This is why we need to ensure that client side features are consistent across different languages.

freeznet avatar Jan 28 '21 14:01 freeznet

any news about this?

yuvalgut avatar Jun 17 '21 10:06 yuvalgut

any updates on this issue? We meet the same problem recently.

cocktail828 avatar Sep 23 '21 06:09 cocktail828

@cocktail828 I see you have checked in a fix for this issue in your fork; is there any way you'd be able to issue a PR for this?

flowchartsman avatar Oct 27 '21 18:10 flowchartsman

This issue is now becoming a problem for us in Go pulsar functions again, due to occasional large JSON messages which would likely compress very well, only they appear to be failing with AddToBatchFailed which is about as descriptive as saying "there was a failure". Does anyone have time to work on this one? Is there any interest in @cocktail828 's fix? I'm happy to clean it up and issue a PR, if necessary, so I guess I'll just do that if I don't hear anything else here first.

flowchartsman avatar May 10 '22 14:05 flowchartsman

This issue is now becoming a problem for us in Go pulsar functions again, due to occasional large JSON messages which would likely compress very well, only they appear to be failing with AddToBatchFailed which is about as descriptive as saying "there was a failure". Does anyone have time to work on this one? Is there any interest in @cocktail828 's fix? I'm happy to clean it up and issue a PR, if necessary, so I guess I'll just do that if I don't hear anything else here first.

@flowchartsman Sorry for late reply, I have just issue a PR for this issue, please check that. Currently golang SDK does not support 'chunk message' so the fix is much easier. Refer to c++ implement, it will check compressed-message size again when the raw message is too big.

cocktail828 avatar Jul 12 '22 07:07 cocktail828