lakeFS
lakeFS copied to clipboard
[Bug]: Clients don't support rate limiting
What happened?
Current Behavior:
I've launched a lakeFS instance with rate limiting configured in order to avoid throttling from the backend engine (e.g DynamoDB) and put a high water mark for the infrastructure's throughput.
The task fails due to ApiException: Too Many Requests
produced by the client, even if the task retries, due to its nature of retrying the same thing, it'll fail again, and eventually fail the job.
Example Jupyter notebook I used to test it:
val AmazonReviews = spark.read.parquet("s3a://amazon-reviews-pds/parquet/")
val partial = AmazonReviews.limit(117647)
val reviews = spark.read.parquet("s3://MY-REPO/stress-test/abc/b")
reviews.write.parquet("lakefs://test2/main/amazon-reviews")
reviews.write.parquet("lakefs://stress-dynamo/main/amazon-reviews")
val reviews = spark.read.parquet("lakefs://stress-dynamo/test3/amazon-reviews")
reviews.repartition(4000).write.mode(SaveMode.Overwrite).parquet("lakefs://stress-dynamo/test3/amazon-reviews-repartition-4000/")
Expected Behavior
Expected Behavior:
I would expect lakeFS clients to handle HTTP Status Code 429 (and possibly the additional response headers, such as Retry-After
) and buffer/hold requests instead of failing.
lakeFS Version
0.90
Deplyoment
Kubernetes, Helm Chart, DynamoDB
Affected Clients
No response
Relevant logs output
Job aborted.
Caused by: Job aborted due to stage failure.
Caused by: IOException: statObject
Caused by: ApiException: Too Many Requests
at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:659)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:365)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:202)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:126)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:124)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:138)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$1(QueryExecution.scala:202)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:240)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:388)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:187)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:973)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:142)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:338)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:202)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:198)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:591)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:178)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:591)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:268)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:264)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:567)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:198)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:324)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:198)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:183)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:174)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:228)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:965)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:430)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:397)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:251)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:898)
at $line72eb909cf0414a729132966c8ac9cf5247.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-3805131591057868:1)
at $line72eb909cf0414a729132966c8ac9cf5247.$read$$iw$$iw$$iw$$iw$$iw.<init>(command-3805131591057868:45)
at $line72eb909cf0414a729132966c8ac9cf5247.$read$$iw$$iw$$iw$$iw.<init>(command-3805131591057868:47)
at $line72eb909cf0414a729132966c8ac9cf5247.$read$$iw$$iw$$iw.<init>(command-3805131591057868:49)
at $line72eb909cf0414a729132966c8ac9cf5247.$read$$iw$$iw.<init>(command-3805131591057868:51)
at $line72eb909cf0414a729132966c8ac9cf5247.$read$$iw.<init>(command-3805131591057868:53)
at $line72eb909cf0414a729132966c8ac9cf5247.$read.<init>(command-3805131591057868:55)
at $line72eb909cf0414a729132966c8ac9cf5247.$read$.<init>(command-3805131591057868:59)
at $line72eb909cf0414a729132966c8ac9cf5247.$read$.<clinit>(command-3805131591057868)
at $line72eb909cf0414a729132966c8ac9cf5247.$eval$.$print$lzycompute(<notebook>:7)
at $line72eb909cf0414a729132966c8ac9cf5247.$eval$.$print(<notebook>:6)
at $line72eb909cf0414a729132966c8ac9cf5247.$eval.$print(<notebook>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:747)
at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1020)
at scala.tools.nsc.interpreter.IMain.$anonfun$interpret$1(IMain.scala:568)
at scala.reflect.internal.util.ScalaClassLoader.asContext(ScalaClassLoader.scala:36)
at scala.reflect.internal.util.ScalaClassLoader.asContext$(ScalaClassLoader.scala:116)
at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:41)
at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:567)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:594)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:564)
at com.databricks.backend.daemon.driver.DriverILoop.execute(DriverILoop.scala:221)
at com.databricks.backend.daemon.driver.ScalaDriverLocal.$anonfun$repl$1(ScalaDriverLocal.scala:225)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.backend.daemon.driver.DriverLocal$TrapExitInternal$.trapExit(DriverLocal.scala:1145)
at com.databricks.backend.daemon.driver.DriverLocal$TrapExit$.apply(DriverLocal.scala:1098)
at com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:225)
at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute$22(DriverLocal.scala:765)
at com.databricks.unity.EmptyHandle$.runWith(UCSHandle.scala:41)
at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute$20(DriverLocal.scala:750)
at com.databricks.logging.Log4jUsageLoggingShim$.$anonfun$withAttributionContext$1(Log4jUsageLoggingShim.scala:32)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:94)
at com.databricks.logging.Log4jUsageLoggingShim$.withAttributionContext(Log4jUsageLoggingShim.scala:30)
at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:283)
at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:282)
at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:62)
at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:318)
at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:303)
at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:62)
at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:728)
at com.databricks.backend.daemon.driver.DriverWrapper.$anonfun$tryExecutingCommand$1(DriverWrapper.scala:622)
at scala.util.Try$.apply(Try.scala:213)
at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:614)
at com.databricks.backend.daemon.driver.DriverWrapper.executeCommandAndGetError(DriverWrapper.scala:533)
at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:568)
at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:438)
at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:381)
at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:232)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 31 in stage 30.0 failed 4 times, most recent failure: Lost task 31.3 in stage 30.0 (TID 35121) (10.154.175.51 executor 10): java.io.IOException: statObject
at io.lakefs.LakeFSFileSystem.getFileStatus(LakeFSFileSystem.java:751)
at io.lakefs.LakeFSFileSystem.create(LakeFSFileSystem.java:204)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:335)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:524)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:455)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:444)
at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:36)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:159)
at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:163)
at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:145)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:417)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$21(FileFormatWriter.scala:341)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:168)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:136)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:96)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:889)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1692)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:892)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:747)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: io.lakefs.hadoop.shade.api.ApiException: Too Many Requests
at io.lakefs.hadoop.shade.api.ApiClient.handleResponse(ApiClient.java:1031)
at io.lakefs.hadoop.shade.api.ApiClient.execute(ApiClient.java:944)
at io.lakefs.hadoop.shade.api.ObjectsApi.statObjectWithHttpInfo(ObjectsApi.java:1115)
at io.lakefs.hadoop.shade.api.ObjectsApi.statObject(ObjectsApi.java:1089)
at io.lakefs.LakeFSFileSystem.getFileStatus(LakeFSFileSystem.java:747)
... 32 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3288)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3222)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3216)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3216)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1415)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1415)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1415)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3497)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3438)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3426)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:51)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1169)
at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2715)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2698)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:330)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:202)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:126)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:124)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:138)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$1(QueryExecution.scala:202)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:240)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:388)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:187)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:973)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:142)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:338)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:202)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:198)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:591)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:178)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:591)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:268)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:264)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:567)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:198)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:324)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:198)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:183)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:174)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:228)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:965)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:430)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:397)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:251)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:898)
at $line72eb909cf0414a729132966c8ac9cf5247.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-3805131591057868:1)
at $line72eb909cf0414a729132966c8ac9cf5247.$read$$iw$$iw$$iw$$iw$$iw.<init>(command-3805131591057868:45)
at $line72eb909cf0414a729132966c8ac9cf5247.$read$$iw$$iw$$iw$$iw.<init>(command-3805131591057868:47)
at $line72eb909cf0414a729132966c8ac9cf5247.$read$$iw$$iw$$iw.<init>(command-3805131591057868:49)
at $line72eb909cf0414a729132966c8ac9cf5247.$read$$iw$$iw.<init>(command-3805131591057868:51)
at $line72eb909cf0414a729132966c8ac9cf5247.$read$$iw.<init>(command-3805131591057868:53)
at $line72eb909cf0414a729132966c8ac9cf5247.$read.<init>(command-3805131591057868:55)
at $line72eb909cf0414a729132966c8ac9cf5247.$read$.<init>(command-3805131591057868:59)
at $line72eb909cf0414a729132966c8ac9cf5247.$read$.<clinit>(command-3805131591057868)
at $line72eb909cf0414a729132966c8ac9cf5247.$eval$.$print$lzycompute(<notebook>:7)
at $line72eb909cf0414a729132966c8ac9cf5247.$eval$.$print(<notebook>:6)
at $line72eb909cf0414a729132966c8ac9cf5247.$eval.$print(<notebook>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:747)
at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1020)
at scala.tools.nsc.interpreter.IMain.$anonfun$interpret$1(IMain.scala:568)
at scala.reflect.internal.util.ScalaClassLoader.asContext(ScalaClassLoader.scala:36)
at scala.reflect.internal.util.ScalaClassLoader.asContext$(ScalaClassLoader.scala:116)
at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:41)
at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:567)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:594)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:564)
at com.databricks.backend.daemon.driver.DriverILoop.execute(DriverILoop.scala:221)
at com.databricks.backend.daemon.driver.ScalaDriverLocal.$anonfun$repl$1(ScalaDriverLocal.scala:225)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.backend.daemon.driver.DriverLocal$TrapExitInternal$.trapExit(DriverLocal.scala:1145)
at com.databricks.backend.daemon.driver.DriverLocal$TrapExit$.apply(DriverLocal.scala:1098)
at com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:225)
at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute$22(DriverLocal.scala:765)
at com.databricks.unity.EmptyHandle$.runWith(UCSHandle.scala:41)
at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute$20(DriverLocal.scala:750)
at com.databricks.logging.Log4jUsageLoggingShim$.$anonfun$withAttributionContext$1(Log4jUsageLoggingShim.scala:32)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:94)
at com.databricks.logging.Log4jUsageLoggingShim$.withAttributionContext(Log4jUsageLoggingShim.scala:30)
at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:283)
at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:282)
at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:62)
at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:318)
at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:303)
at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:62)
at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:728)
at com.databricks.backend.daemon.driver.DriverWrapper.$anonfun$tryExecutingCommand$1(DriverWrapper.scala:622)
at scala.util.Try$.apply(Try.scala:213)
at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:614)
at com.databricks.backend.daemon.driver.DriverWrapper.executeCommandAndGetError(DriverWrapper.scala:533)
at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:568)
at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:438)
at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:381)
at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:232)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.IOException: statObject
at io.lakefs.LakeFSFileSystem.getFileStatus(LakeFSFileSystem.java:751)
at io.lakefs.LakeFSFileSystem.create(LakeFSFileSystem.java:204)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:335)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:524)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:455)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:444)
at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:36)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:159)
at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:163)
at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:145)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:417)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$21(FileFormatWriter.scala:341)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:168)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:136)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:96)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:889)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1692)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:892)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:747)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: io.lakefs.hadoop.shade.api.ApiException: Too Many Requests
at io.lakefs.hadoop.shade.api.ApiClient.handleResponse(ApiClient.java:1031)
at io.lakefs.hadoop.shade.api.ApiClient.execute(ApiClient.java:944)
at io.lakefs.hadoop.shade.api.ObjectsApi.statObjectWithHttpInfo(ObjectsApi.java:1115)
at io.lakefs.hadoop.shade.api.ObjectsApi.statObject(ObjectsApi.java:1089)
at io.lakefs.LakeFSFileSystem.getFileStatus(LakeFSFileSystem.java:747)
at io.lakefs.LakeFSFileSystem.create(LakeFSFileSystem.java:204)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:335)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:524)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:455)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:444)
at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:36)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:159)
at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:163)
at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:145)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:417)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$21(FileFormatWriter.scala:341)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:168)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:136)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:96)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:889)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1692)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:892)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:747)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Contact Details
No response
@ortz I understand why this happens for for HadoopFS, but not why it would happen for the metadata client with GC. There this should have been fixed by issue #4189 / PR #4190.
Did you observe GC getting stuck because of this?
Why it happens in lakeFSFS
I observed this before we abandoned the work on LakeFSOutputCommitter. Like LakeFSFS the committer would never retry a failed call, and API calls to lakeFS can fail under load.
Proposal
The solution in LakeFSOutputCommitter was to add a retry wrapper (the same one from #4190 in fact) around calls to the lakeFS API. This will solve most of the issue: Spark has a fixed number of executors to use, and each executor writes in a single-threaded manner. So it should be enough to back off to succeed.
Here we would ideally also respect the Retry-After header on the 429 response, as a minimum retry interval. In practice such headers are optimized more towards single success than sustained throughput, so they may under-estimate the time to retry; consider adjust the value in any case.
@ortz I understand why this happens for for HadoopFS, but not why it would happen for the metadata client with GC. There this should have been fixed by issue #4189 / PR #4190.
Did you observe GC getting stuck because of this?
I haven't run GC to test it TBH, haven't got to it yet. It's something we'll need to do.
The rate limit above is based on extending the lakeFS OpenAPI first to retry-after (backoff) response first and than add the client support. Note that the issue above can origin from client timeout, for example by default the generated java SDK uses 10-30 second per request timeout by default. This can cause the large number of request while lakeFS uses the underlying storage SDK which perform backoff and supports the above header/mechanism we would like to provide to our clients.
This issue is now marked as stale after 90 days of inactivity, and will be closed soon. To keep it, mark it with the "no stale" label.
This is still an issue: server prevents rate-limiting on clients, which is poor behaviour.
See also #4957 , #5664 (which is blocked on fixing this), will add others as I find them.