alpakka icon indicating copy to clipboard operation
alpakka copied to clipboard

Unable to consume more than 4MB from pubsub

Open mahendra-sawarkar-exa opened this issue 1 year ago • 2 comments

We are using below dependencies in out project:

"com.google.cloud" % "google-cloud-pubsub" % "1.105.1" exclude("io.grpc", "grpc-alts"), "io.grpc" % "grpc-alts" % "1.29.0", "com.lightbend.akka" %% "akka-stream-alpakka-google-cloud-pub-sub-grpc" % "3.0.4"

below is the akka code:

val pubsubSubscription = Helper.getSubscriptionPath(customerName)
val pubsubConfig = PullRequest()
  .withSubscription(pubsubSubscription)
  .withReturnImmediately(true)
  .withMaxMessages(1000)

  GooglePubSub.subscribePolling(pubsubConfig, 500.millis)
  .map { pubsubMessage => print(s"I got message $pubsubMessage") )
  ......

but i am getting following error while consuming data from pubsub

Error to processing messages for Code Code(testUser). io.grpc.StatusRuntimeException: RESOURCE_EXHAUSTED: gRPC message exceeds maximum size 4194304: 4910368 at io.grpc.Status.asRuntimeException(Status.java:533) at akka.grpc.internal.UnaryCallAdapter.onClose(UnaryCallAdapter.scala:40) at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:413) at io.grpc.internal.ClientCallImpl.access$500(ClientCallImpl.java:66) at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:742) at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:721) at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Jan 03, 2023 12:16:07 PM io.grpc.internal.AbstractClientStream$TransportState inboundDataReceived INFO: Received data on closed stream

please guide, thanks in advance!

mahendra-sawarkar-exa avatar Jan 03 '23 13:01 mahendra-sawarkar-exa

I had the same limitation on this one. I ended up swapping the SubscriberClient on the GrpcSubscriber with a custom one using reflection. A pr tackling this would be more suitable.

gkatzioura avatar Jan 03 '23 23:01 gkatzioura

@gkatzioura Thanks for suggestion :) is this error per pubsub message or for batch of messages?

mahendra-sawarkar-exa avatar Jan 04 '23 01:01 mahendra-sawarkar-exa