alpakka
alpakka copied to clipboard
Unable to consume more than 4MB from pubsub
We are using below dependencies in out project:
"com.google.cloud" % "google-cloud-pubsub" % "1.105.1" exclude("io.grpc", "grpc-alts"), "io.grpc" % "grpc-alts" % "1.29.0", "com.lightbend.akka" %% "akka-stream-alpakka-google-cloud-pub-sub-grpc" % "3.0.4"
below is the akka code:
val pubsubSubscription = Helper.getSubscriptionPath(customerName)
val pubsubConfig = PullRequest()
.withSubscription(pubsubSubscription)
.withReturnImmediately(true)
.withMaxMessages(1000)
GooglePubSub.subscribePolling(pubsubConfig, 500.millis)
.map { pubsubMessage => print(s"I got message $pubsubMessage") )
......
but i am getting following error while consuming data from pubsub
Error to processing messages for Code Code(testUser). io.grpc.StatusRuntimeException: RESOURCE_EXHAUSTED: gRPC message exceeds maximum size 4194304: 4910368 at io.grpc.Status.asRuntimeException(Status.java:533) at akka.grpc.internal.UnaryCallAdapter.onClose(UnaryCallAdapter.scala:40) at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:413) at io.grpc.internal.ClientCallImpl.access$500(ClientCallImpl.java:66) at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:742) at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:721) at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Jan 03, 2023 12:16:07 PM io.grpc.internal.AbstractClientStream$TransportState inboundDataReceived INFO: Received data on closed stream
please guide, thanks in advance!
I had the same limitation on this one. I ended up swapping the SubscriberClient on the GrpcSubscriber with a custom one using reflection. A pr tackling this would be more suitable.
@gkatzioura Thanks for suggestion :) is this error per pubsub message or for batch of messages?