[improve][admin][broker] Admin API: stream internal topic stats
Motivation
The JSON may be huge when individuallyDeletedMessages inside ManagedLedgerInternalStats$CursorStats contains many ranges
Modifications
Instead of returning the Object and letting the framework do the encoding, switch to StreamingOutput and perform the JSON encoding manually. One step further may be to start serving the results as soon as they are available, instead of build the object in memory and then perform the encoding, but this is far more involved
Verifying this change
- [ ] Make sure that the change passes the CI checks.
This change is a trivial rework / code cleanup without any test coverage.
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
- [ ] Dependencies (add or upgrade a dependency)
- [ ] The public API
- [ ] The schema
- [ ] The default values of configurations
- [ ] The threading model
- [ ] The binary protocol
- [x] The REST endpoints
- [ ] The admin CLI options
- [ ] The metrics
- [ ] Anything that affects deployment
Documentation
- [ ]
doc - [ ]
doc-required - [x]
doc-not-needed - [ ]
doc-complete
Matching PR in forked repository
PR in forked repository: https://github.com/eolivelli/pulsar/pull/25
@eolivelli have you had a chance to check if Gzip compression helps with large responses? Pulsar admin client changes: #22464, broker side changes: #21667, #22463 and #22370 .
The JSON may be huge when individuallyDeletedMessages inside ManagedLedgerInternalStats$CursorStats contains many ranges
@eolivelli do you have any figures to share? How large is the JSON response? How many individuallyDeletedMessages are there?
@eolivelli have you had a chance to check if Gzip compression helps with large responses? Pulsar admin client changes: #22464, broker side changes: #21667, #22463 and #22370 .
I have seen those PRs, great stuff, but the problem here is about the serialization happening in some upgredictable place, like this:
"pulsar-web-48-18" #174 prio=5 os_prio=0 cpu=1421.36ms elapsed=456.08s tid=0x00007f6a54002260 nid=0x342c0 waiting on condition [0x00007f6e5c951000]
java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
- parking to wait for <0x00001007fe5a5b38> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park([email protected]/LockSupport.java:341)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionNode.block([email protected]/AbstractQueuedSynchronizer.java:506)
at java.util.concurrent.ForkJoinPool.unmanagedBlock([email protected]/ForkJoinPool.java:3465)
at java.util.concurrent.ForkJoinPool.managedBlock([email protected]/ForkJoinPool.java:3436)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await([email protected]/AbstractQueuedSynchronizer.java:1623)
at org.eclipse.jetty.util.SharedBlockingCallback$Blocker.block(SharedBlockingCallback.java:241)
at org.eclipse.jetty.server.HttpOutput.channelWrite(HttpOutput.java:270)
at org.eclipse.jetty.server.HttpOutput.write(HttpOutput.java:873)
at org.glassfish.jersey.servlet.internal.ResponseWriter$NonCloseableOutputStreamWrapper.write(ResponseWriter.java:301)
at org.glassfish.jersey.message.internal.CommittingOutputStream.write(CommittingOutputStream.java:200)
at org.glassfish.jersey.message.internal.WriterInterceptorExecutor$UnCloseableOutputStream.write(WriterInterceptorExecutor.java:276)
at com.fasterxml.jackson.core.json.UTF8JsonGenerator._flushBuffer(UTF8JsonGenerator.java:2203)
at com.fasterxml.jackson.core.json.UTF8JsonGenerator._writeStringSegments(UTF8JsonGenerator.java:1343)
at com.fasterxml.jackson.core.json.UTF8JsonGenerator.writeString(UTF8JsonGenerator.java:517)
at com.fasterxml.jackson.databind.ser.std.StringSerializer.serialize(StringSerializer.java:41)
at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:733)
at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:774)
at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:178)
at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeOptionalFields(MapSerializer.java:869)
at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeWithoutTypeInfo(MapSerializer.java:760)
at com.fasterxml.jackson.databind.ser.std.MapSerializer.serialize(MapSerializer.java:720)
at com.fasterxml.jackson.databind.ser.std.MapSerializer.serialize(MapSerializer.java:35)
at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:733)
at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:774)
at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:178)
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:480)
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:319)
at com.fasterxml.jackson.databind.ObjectWriter$Prefetch.serialize(ObjectWriter.java:1572)
at com.fasterxml.jackson.databind.ObjectWriter.writeValue(ObjectWriter.java:1061)
at org.glassfish.jersey.jackson.internal.jackson.jaxrs.base.ProviderBase.writeTo(ProviderBase.java:647)
at org.glassfish.jersey.message.internal.WriterInterceptorExecutor$TerminalWriterInterceptor.invokeWriteTo(WriterInterceptorExecutor.java:242)
at org.glassfish.jersey.message.internal.WriterInterceptorExecutor$TerminalWriterInterceptor.aroundWriteTo(WriterInterceptorExecutor.java:227)
at org.glassfish.jersey.message.internal.WriterInterceptorExecutor.proceed(WriterInterceptorExecutor.java:139)
at org.glassfish.jersey.server.internal.JsonWithPaddingInterceptor.aroundWriteTo(JsonWithPaddingInterceptor.java:85)
at org.glassfish.jersey.message.internal.WriterInterceptorExecutor.proceed(WriterInterceptorExecutor.java:139)
at org.glassfish.jersey.server.internal.MappableExceptionWrapperInterceptor.aroundWriteTo(MappableExceptionWrapperInterceptor.java:61)
at org.glassfish.jersey.message.internal.WriterInterceptorExecutor.proceed(WriterInterceptorExecutor.java:139)
at org.glassfish.jersey.message.internal.MessageBodyFactory.writeTo(MessageBodyFactory.java:1116)
at org.glassfish.jersey.server.ServerRuntime$Responder.writeResponse(ServerRuntime.java:635)
at org.glassfish.jersey.server.ServerRuntime$Responder.processResponse(ServerRuntime.java:373)
at org.glassfish.jersey.server.ServerRuntime$Responder.process(ServerRuntime.java:363)
at org.glassfish.jersey.server.ServerRuntime$AsyncResponder$3.run(ServerRuntime.java:857)
How many individuallyDeletedMessages are there? more than 100.000
I have seen those PRs, great stuff, but the problem here is about the serialization happening in some upgredictable place, like this:
Trying to understand this more deeper. What is the impact of using the default serialization in this case? how does the StreamingOutput based serialization improve the situation?
I have seen those PRs, great stuff, but the problem here is about the serialization happening in some upgredictable place, like this:
Trying to understand this more deeper. What is the impact of using the default serialization in this case? how does the StreamingOutput based serialization improve the situation?
@lhotari I guess the default implementation of the jetty is
String jsonString = JSON.toJsonString(object);
httpOutput.writeString(jsonString);
It will generate the jsonString first and then send it to the client. But by this PR, it doesn't need to generate the jsonString first in the memory, but sends the result to the client directly. So we can reduce heap memory usage and avoid more memory copies. @eolivelli Am I right? I support the PR, it could help a lot if the response body is huge.
String jsonString = JSON.toJsonString(object); httpOutput.writeString(jsonString);
@dao-jun this doesn't seem to be the case based on the stack trace that Enrico shared. The default solution even without this PR is "streaming". Based on the stack trace, it's using this class to create the JSON response: https://github.com/eclipse-ee4j/jersey/blob/2.41/media/json-jackson/src/main/java/org/glassfish/jersey/jackson/internal/jackson/jaxrs/base/ProviderBase.java#L583-L659