beam icon indicating copy to clipboard operation
beam copied to clipboard

Reduce number of Gax related threads, likely by providing common executor to GAX clients

Open damccorm opened this issue 3 years ago • 30 comments

When looking at thread stacks for a pipeline reading pubsub and writing to BQ using the WriteApi there were 3600 Gax- prefixed threads.

The # of threads an possibly performance could be improved by finding where these came from and using a shared executor.

Imported from Jira BEAM-13826. Original Jira may contain additional context. Reported by: scwhittle.

damccorm avatar Jun 04 '22 22:06 damccorm

John, assigning to you to prioritize since this could be hit by users when using various I/O connectors (for example, BigQueryIO sink with the STORAGE_WRITE_API mode).

chamikaramj avatar Aug 23 '22 23:08 chamikaramj

IIUC the appropriate fix is to adjust client libraries used in various IOs to use a shared channel (@lukecwik can advise).

TheNeuralBit avatar Aug 23 '22 23:08 TheNeuralBit

I thought that the main bug had already been fixed.

reuvenlax avatar Aug 23 '22 23:08 reuvenlax

Is there any documentation on where we use GAX, or a pointer to a design pattern for this?

johnjcasey avatar Aug 24 '22 13:08 johnjcasey

@reuvenlax do you have a link to the fix ?

also ccing @scwhittle who filed the original issue.

chamikaramj avatar Aug 24 '22 20:08 chamikaramj

@reuvenlax maybe you're thinking of https://github.com/apache/beam/pull/16619 which fixed a different thread explosion in RetryManager?

TheNeuralBit avatar Aug 24 '22 20:08 TheNeuralBit

Is there any documentation on where we use GAX, or a pointer to a design pattern for this?

I'm not sure about documentation. My understanding from offline discussion with @lukecwik, is that gax-java is used by grpc-java which is used by Google client libraries used by various GCP IOs. The client libraries generally have ways to specify an executor that would allow us to specify a shared one. See here for an example in the Spanner client.

TheNeuralBit avatar Aug 24 '22 20:08 TheNeuralBit

Got it. I suppose I will need to do some research on executor options to know which one is the best for this use case

johnjcasey avatar Aug 25 '22 14:08 johnjcasey

@lukecwik do you have a recommendation on executors to try?

johnjcasey avatar Aug 25 '22 15:08 johnjcasey

Yes the scheduled executor service should be shared but it is the wrong scope of object to be shared. Based upon GCP client library best practices the recommendation is to share the client instance itself across multiple usages.

lukecwik avatar Aug 25 '22 15:08 lukecwik

A somewhat simple change would be to update the various get methods in BigQueryServicesImpl to be cached. If you pass in the same options, you get the existing client instance.

https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java#L183

johnjcasey avatar Aug 25 '22 15:08 johnjcasey

That would make sense. Would need to check that the objects being managed there aren't stateful in a meaningful way that would impact being used across multiple threads (e.g. have state transitions like open->closed, buffer data for a bundle, ...) allowing the object to be marked @ThreadSafe making it bundle agnostic.

lukecwik avatar Aug 25 '22 16:08 lukecwik

Also, is there a way to close the client in a way where it cleans up those resources?

This would be useful to not leak instances of the client if the options change over time (e.g. dynamic destinations).

lukecwik avatar Aug 25 '22 17:08 lukecwik

We do cache the client instances. The problem is that today, each destination table requires a separate client.

On Thu, Aug 25, 2022 at 8:45 AM Lukasz Cwik @.***> wrote:

Yes the scheduled executor service should be shared but it is the wrong scope of object to be shared. Based upon GCP client library best practices the recommendation is to share the client instance itself across multiple usages https://cloud.google.com/apis/docs/client-libraries-best-practices#reuse_client_objects_and_sessions .

— Reply to this email directly, view it on GitHub https://github.com/apache/beam/issues/21368#issuecomment-1227451738, or unsubscribe https://github.com/notifications/unsubscribe-auth/AFAYJVO3W6RHULH7JCN4ZGLV26IKRANCNFSM5X4DNNQQ . You are receiving this because you were mentioned.Message ID: @.***>

reuvenlax avatar Aug 25 '22 19:08 reuvenlax

got it so we shouldn't add additional caching, but instead cache the transport s.t. all the various clients use the same one?

johnjcasey avatar Aug 25 '22 20:08 johnjcasey

Speaking of, where does that caching occur?

johnjcasey avatar Aug 25 '22 20:08 johnjcasey

Yes, but at least today there is no way of doing that AFAICT. We might have to wait for the BigQuery support to land.

On Thu, Aug 25, 2022 at 1:20 PM johnjcasey @.***> wrote:

got it so we shouldn't add additional caching, but instead cache the transport s.t. all the various clients use the same one?

— Reply to this email directly, view it on GitHub https://github.com/apache/beam/issues/21368#issuecomment-1227719837, or unsubscribe https://github.com/notifications/unsubscribe-auth/AFAYJVINEGMK7RTQ32VXRRTV27IPJANCNFSM5X4DNNQQ . You are receiving this because you were mentioned.Message ID: @.***>

reuvenlax avatar Aug 26 '22 16:08 reuvenlax

The client libraries allow injecting our own transport/threadpool via BigQueryWriteClient#setTransportChannelProvider and BigQueryWriteClient#setBackgroundExecutorProvider.

I still think we should be re-using the client across bundles by having a global object pool of BQ Options -> Client but using a single shared scheduled thread pool is likely the easiest change.

lukecwik avatar Aug 26 '22 17:08 lukecwik

We are reusing clients across bundles for streaming write API. The DoFns that use this keep a global static cache of table name -> client

On Fri, Aug 26, 2022 at 10:19 AM Lukasz Cwik @.***> wrote:

The client libraries allow injecting our own transport/threadpool via BigQueryWriteClient#setTransportChannelProvider and BigQueryWriteClient#setBackgroundExecutorProvider.

I still think we should be re-using the client across bundles by having a global object pool of BQ Options -> Client

— Reply to this email directly, view it on GitHub https://github.com/apache/beam/issues/21368#issuecomment-1228740344, or unsubscribe https://github.com/notifications/unsubscribe-auth/AFAYJVPAHANGEPR2DAVA7LTV3D4DLANCNFSM5X4DNNQQ . You are receiving this because you were mentioned.Message ID: @.***>

reuvenlax avatar Aug 27 '22 06:08 reuvenlax

I believe that to be sufficient caching from a client level.

Is there no way to have distinct clients share a transport, or a transport provider?

johnjcasey avatar Aug 29 '22 15:08 johnjcasey

Did you see Luke's suggestion about that?

The client libraries allow injecting our own transport/threadpool via BigQueryWriteClient#setTransportChannelProvider and BigQueryWriteClient#setBackgroundExecutorProvider.

So you could have a static shared transport and threadpool across all the cached clients

scwhittle avatar Sep 01 '22 18:09 scwhittle

@lukecwik or @scwhittle do either of you have a suggestion on what transport to use? I'm fairly unfamiliar with the details of these

johnjcasey avatar Sep 13 '22 17:09 johnjcasey

There is only one gRPC channel provider, the InstantiatingGrpcChannelProvider which is also the default. You could experiment with creating a fixed one but I would suggest using setBackgroundExecutorProvider on all the clients using a FixedExecutorService and updating https://github.com/apache/beam/blob/9be9a43c1b85be9b2f78e2943f6092ccb88e13b4/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java#L136 to return a ScheduledExecutorService which is supplied to the FixedExecutorService.

lukecwik avatar Sep 13 '22 22:09 lukecwik

That makes sense, though we will have to do some testing to verify that changing from a 'normal' thread pool executor to a scheduled thread pool executor doesn't impact the rest of our GCP IOs

johnjcasey avatar Sep 14 '22 15:09 johnjcasey

It looks like a variety of our tests assume the ThreadPoolExecutor instead of a scheduled one, and time out when a scheduled executor is default. I believe the solution will be to instead provide a second thread pool for places that need scheduled execution

johnjcasey avatar Sep 15 '22 17:09 johnjcasey

How do they assume this?

If its easy to clean-up I would prefer that we use only one.

lukecwik avatar Sep 15 '22 23:09 lukecwik

It isn't explicit, but we have 14 tests that fail with varieties of timeouts when we use the scheduled executor.

johnjcasey avatar Sep 16 '22 13:09 johnjcasey

What tests are they? I'm curious if there is some trivial pattern to them that is causing the test to fail.

lukecwik avatar Sep 16 '22 17:09 lukecwik

https://ci-beam.apache.org/job/beam_PreCommit_Java_Commit/23996/

org.apache.beam.runners.fnexecution.control.RemoteExecutionTest mostly.

when I separated to just the scheduled executor, that was causing issues in BQ itself as well

johnjcasey avatar Sep 19 '22 14:09 johnjcasey

The ThreadPoolExecutor implementation uses a synchronous queue meaning that scheduling a task will block till a thread takes it. Could this be the difference?

lukecwik avatar Sep 21 '22 03:09 lukecwik