airbyte icon indicating copy to clipboard operation
airbyte copied to clipboard

[kubernetes] want to increase number of workers in k8s

Open sivankumar86 opened this issue 1 year ago • 1 comments
trafficstars

Platform Version

0.50.34

What step the error happened?

None

Revelant information

Job is failing due to "Caused by: java.lang.NullPointerException: Cannot invoke "java.lang.Integer.intValue()" because the return value of "io.airbyte.workers.process.KubePortManagerSingleton.take()" is null"

Is there a way to increase worker based on number of jobs pending in k8s. Please provide me pointer to do the same.

I have tried CPU/Memory metrics but, no luck.

Relevant log output

io.airbyte.workers.exception.WorkerException: Failed to create pod for check step
        at io.airbyte.workers.process.KubeProcessFactory.create(KubeProcessFactory.java:188) ~[io.airbyte-airbyte-commons-worker-0.50.34.jar:?]
        at io.airbyte.workers.process.AirbyteIntegrationLauncher.check(AirbyteIntegrationLauncher.java:143) ~[io.airbyte-airbyte-commons-worker-0.50.34.jar:?]
        at io.airbyte.workers.general.DefaultCheckConnectionWorker.run(DefaultCheckConnectionWorker.java:71) ~[io.airbyte-airbyte-commons-worker-0.50.34.jar:?]
        at io.airbyte.workers.general.DefaultCheckConnectionWorker.run(DefaultCheckConnectionWorker.java:44) ~[io.airbyte-airbyte-commons-worker-0.50.34.jar:?]
        at io.airbyte.workers.temporal.TemporalAttemptExecution.get(TemporalAttemptExecution.java:135) ~[io.airbyte-airbyte-workers-0.50.34.jar:?]
        at io.airbyte.workers.temporal.check.connection.CheckConnectionActivityImpl.lambda$runWithJobOutput$1(CheckConnectionActivityImpl.java:133) ~[io.airbyte-airbyte-workers-0.50.34.jar:?]
        at io.airbyte.commons.temporal.HeartbeatUtils.withBackgroundHeartbeat(HeartbeatUtils.java:57) ~[io.airbyte-airbyte-commons-temporal-core-0.50.34.jar:?]
        at io.airbyte.workers.temporal.check.connection.CheckConnectionActivityImpl.runWithJobOutput(CheckConnectionActivityImpl.java:118) ~[io.airbyte-airbyte-workers-0.50.34.jar:?]
        at jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104) ~[?:?]
        at java.lang.reflect.Method.invoke(Method.java:578) ~[?:?]
        at io.temporal.internal.activity.RootActivityInboundCallsInterceptor$POJOActivityInboundCallsInterceptor.executeActivity(RootActivityInboundCallsInterceptor.java:64) ~[temporal-sdk-1.17.0.jar:?]
        at io.temporal.internal.activity.RootActivityInboundCallsInterceptor.execute(RootActivityInboundCallsInterceptor.java:43) ~[temporal-sdk-1.17.0.jar:?]
        at io.temporal.internal.activity.ActivityTaskExecutors$BaseActivityTaskExecutor.execute(ActivityTaskExecutors.java:95) ~[temporal-sdk-1.17.0.jar:?]
        at io.temporal.internal.activity.ActivityTaskHandlerImpl.handle(ActivityTaskHandlerImpl.java:92) ~[temporal-sdk-1.17.0.jar:?]
        at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handleActivity(ActivityWorker.java:241) ~[temporal-sdk-1.17.0.jar:?]
        at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:206) ~[temporal-sdk-1.17.0.jar:?]
        at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:179) ~[temporal-sdk-1.17.0.jar:?]
        at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:93) ~[temporal-sdk-1.17.0.jar:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
        at java.lang.Thread.run(Thread.java:1589) ~[?:?]
Caused by: java.lang.NullPointerException: Cannot invoke "java.lang.Integer.intValue()" because the return value of "io.airbyte.workers.process.KubePortManagerSingleton.take()" is null
        at io.airbyte.workers.process.KubeProcessFactory.create(KubeProcessFactory.java:131) ~[io.airbyte-airbyte-commons-worker-0.50.34.jar:?]

sivankumar86 avatar Dec 17 '23 21:12 sivankumar86

Today you only can increase using the env variable. Some other users had created some external applications to auto scale based on the number of pods. The best path is to use cron and calculate the number of workers you'll need.

marcosmarxm avatar Dec 26 '23 17:12 marcosmarxm

@marcosmarxm Thanks for reply. At the moment, we are running jobs 4 hours once and nightly jobs hence, it would be great if we can increase /decrease the workers based on total number of jobs (running + pending). is there a blog or document which you can refer to me ?

sivankumar86 avatar Jan 09 '24 01:01 sivankumar86