lakeFS icon indicating copy to clipboard operation
lakeFS copied to clipboard

[Bug]: Clients don't support rate limiting

Open ortz opened this issue 2 years ago • 6 comments

What happened?

Current Behavior: I've launched a lakeFS instance with rate limiting configured in order to avoid throttling from the backend engine (e.g DynamoDB) and put a high water mark for the infrastructure's throughput. The task fails due to ApiException: Too Many Requests produced by the client, even if the task retries, due to its nature of retrying the same thing, it'll fail again, and eventually fail the job.

Example Jupyter notebook I used to test it:

val AmazonReviews = spark.read.parquet("s3a://amazon-reviews-pds/parquet/")
val partial = AmazonReviews.limit(117647)
val reviews = spark.read.parquet("s3://MY-REPO/stress-test/abc/b")
reviews.write.parquet("lakefs://test2/main/amazon-reviews")
reviews.write.parquet("lakefs://stress-dynamo/main/amazon-reviews")
val reviews = spark.read.parquet("lakefs://stress-dynamo/test3/amazon-reviews")

reviews.repartition(4000).write.mode(SaveMode.Overwrite).parquet("lakefs://stress-dynamo/test3/amazon-reviews-repartition-4000/")

Expected Behavior

Expected Behavior: I would expect lakeFS clients to handle HTTP Status Code 429 (and possibly the additional response headers, such as Retry-After) and buffer/hold requests instead of failing.

lakeFS Version

0.90

Deplyoment

Kubernetes, Helm Chart, DynamoDB

Affected Clients

No response

Relevant logs output

Job aborted.
Caused by: Job aborted due to stage failure.
Caused by: IOException: statObject
Caused by: ApiException: Too Many Requests
	at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:659)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:365)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:202)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:126)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:124)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:138)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$1(QueryExecution.scala:202)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:240)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:388)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:187)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:973)
	at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:142)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:338)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:202)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:198)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:591)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:178)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:591)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:268)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:264)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:567)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:198)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:324)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:198)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:183)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:174)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:228)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:965)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:430)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:397)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:251)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:898)
	at $line72eb909cf0414a729132966c8ac9cf5247.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-3805131591057868:1)
	at $line72eb909cf0414a729132966c8ac9cf5247.$read$$iw$$iw$$iw$$iw$$iw.<init>(command-3805131591057868:45)
	at $line72eb909cf0414a729132966c8ac9cf5247.$read$$iw$$iw$$iw$$iw.<init>(command-3805131591057868:47)
	at $line72eb909cf0414a729132966c8ac9cf5247.$read$$iw$$iw$$iw.<init>(command-3805131591057868:49)
	at $line72eb909cf0414a729132966c8ac9cf5247.$read$$iw$$iw.<init>(command-3805131591057868:51)
	at $line72eb909cf0414a729132966c8ac9cf5247.$read$$iw.<init>(command-3805131591057868:53)
	at $line72eb909cf0414a729132966c8ac9cf5247.$read.<init>(command-3805131591057868:55)
	at $line72eb909cf0414a729132966c8ac9cf5247.$read$.<init>(command-3805131591057868:59)
	at $line72eb909cf0414a729132966c8ac9cf5247.$read$.<clinit>(command-3805131591057868)
	at $line72eb909cf0414a729132966c8ac9cf5247.$eval$.$print$lzycompute(<notebook>:7)
	at $line72eb909cf0414a729132966c8ac9cf5247.$eval$.$print(<notebook>:6)
	at $line72eb909cf0414a729132966c8ac9cf5247.$eval.$print(<notebook>)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:747)
	at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1020)
	at scala.tools.nsc.interpreter.IMain.$anonfun$interpret$1(IMain.scala:568)
	at scala.reflect.internal.util.ScalaClassLoader.asContext(ScalaClassLoader.scala:36)
	at scala.reflect.internal.util.ScalaClassLoader.asContext$(ScalaClassLoader.scala:116)
	at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:41)
	at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:567)
	at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:594)
	at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:564)
	at com.databricks.backend.daemon.driver.DriverILoop.execute(DriverILoop.scala:221)
	at com.databricks.backend.daemon.driver.ScalaDriverLocal.$anonfun$repl$1(ScalaDriverLocal.scala:225)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.backend.daemon.driver.DriverLocal$TrapExitInternal$.trapExit(DriverLocal.scala:1145)
	at com.databricks.backend.daemon.driver.DriverLocal$TrapExit$.apply(DriverLocal.scala:1098)
	at com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:225)
	at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute$22(DriverLocal.scala:765)
	at com.databricks.unity.EmptyHandle$.runWith(UCSHandle.scala:41)
	at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute$20(DriverLocal.scala:750)
	at com.databricks.logging.Log4jUsageLoggingShim$.$anonfun$withAttributionContext$1(Log4jUsageLoggingShim.scala:32)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:94)
	at com.databricks.logging.Log4jUsageLoggingShim$.withAttributionContext(Log4jUsageLoggingShim.scala:30)
	at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:283)
	at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:282)
	at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:62)
	at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:318)
	at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:303)
	at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:62)
	at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:728)
	at com.databricks.backend.daemon.driver.DriverWrapper.$anonfun$tryExecutingCommand$1(DriverWrapper.scala:622)
	at scala.util.Try$.apply(Try.scala:213)
	at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:614)
	at com.databricks.backend.daemon.driver.DriverWrapper.executeCommandAndGetError(DriverWrapper.scala:533)
	at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:568)
	at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:438)
	at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:381)
	at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:232)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 31 in stage 30.0 failed 4 times, most recent failure: Lost task 31.3 in stage 30.0 (TID 35121) (10.154.175.51 executor 10): java.io.IOException: statObject
	at io.lakefs.LakeFSFileSystem.getFileStatus(LakeFSFileSystem.java:751)
	at io.lakefs.LakeFSFileSystem.create(LakeFSFileSystem.java:204)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
	at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
	at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:335)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:524)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:455)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:444)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:36)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:159)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:163)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:145)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:417)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$21(FileFormatWriter.scala:341)
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
	at org.apache.spark.scheduler.Task.doRunTask(Task.scala:168)
	at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:136)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.Task.run(Task.scala:96)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:889)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1692)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:892)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:747)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: io.lakefs.hadoop.shade.api.ApiException: Too Many Requests
	at io.lakefs.hadoop.shade.api.ApiClient.handleResponse(ApiClient.java:1031)
	at io.lakefs.hadoop.shade.api.ApiClient.execute(ApiClient.java:944)
	at io.lakefs.hadoop.shade.api.ObjectsApi.statObjectWithHttpInfo(ObjectsApi.java:1115)
	at io.lakefs.hadoop.shade.api.ObjectsApi.statObject(ObjectsApi.java:1089)
	at io.lakefs.LakeFSFileSystem.getFileStatus(LakeFSFileSystem.java:747)
	... 32 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3288)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3222)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3216)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3216)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1415)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1415)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1415)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3497)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3438)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3426)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:51)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1169)
	at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2715)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2698)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:330)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:202)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:126)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:124)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:138)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$1(QueryExecution.scala:202)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:240)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:388)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:187)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:973)
	at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:142)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:338)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:202)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:198)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:591)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:178)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:591)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:268)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:264)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:567)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:198)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:324)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:198)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:183)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:174)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:228)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:965)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:430)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:397)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:251)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:898)
	at $line72eb909cf0414a729132966c8ac9cf5247.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-3805131591057868:1)
	at $line72eb909cf0414a729132966c8ac9cf5247.$read$$iw$$iw$$iw$$iw$$iw.<init>(command-3805131591057868:45)
	at $line72eb909cf0414a729132966c8ac9cf5247.$read$$iw$$iw$$iw$$iw.<init>(command-3805131591057868:47)
	at $line72eb909cf0414a729132966c8ac9cf5247.$read$$iw$$iw$$iw.<init>(command-3805131591057868:49)
	at $line72eb909cf0414a729132966c8ac9cf5247.$read$$iw$$iw.<init>(command-3805131591057868:51)
	at $line72eb909cf0414a729132966c8ac9cf5247.$read$$iw.<init>(command-3805131591057868:53)
	at $line72eb909cf0414a729132966c8ac9cf5247.$read.<init>(command-3805131591057868:55)
	at $line72eb909cf0414a729132966c8ac9cf5247.$read$.<init>(command-3805131591057868:59)
	at $line72eb909cf0414a729132966c8ac9cf5247.$read$.<clinit>(command-3805131591057868)
	at $line72eb909cf0414a729132966c8ac9cf5247.$eval$.$print$lzycompute(<notebook>:7)
	at $line72eb909cf0414a729132966c8ac9cf5247.$eval$.$print(<notebook>:6)
	at $line72eb909cf0414a729132966c8ac9cf5247.$eval.$print(<notebook>)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:747)
	at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1020)
	at scala.tools.nsc.interpreter.IMain.$anonfun$interpret$1(IMain.scala:568)
	at scala.reflect.internal.util.ScalaClassLoader.asContext(ScalaClassLoader.scala:36)
	at scala.reflect.internal.util.ScalaClassLoader.asContext$(ScalaClassLoader.scala:116)
	at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:41)
	at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:567)
	at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:594)
	at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:564)
	at com.databricks.backend.daemon.driver.DriverILoop.execute(DriverILoop.scala:221)
	at com.databricks.backend.daemon.driver.ScalaDriverLocal.$anonfun$repl$1(ScalaDriverLocal.scala:225)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.backend.daemon.driver.DriverLocal$TrapExitInternal$.trapExit(DriverLocal.scala:1145)
	at com.databricks.backend.daemon.driver.DriverLocal$TrapExit$.apply(DriverLocal.scala:1098)
	at com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:225)
	at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute$22(DriverLocal.scala:765)
	at com.databricks.unity.EmptyHandle$.runWith(UCSHandle.scala:41)
	at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute$20(DriverLocal.scala:750)
	at com.databricks.logging.Log4jUsageLoggingShim$.$anonfun$withAttributionContext$1(Log4jUsageLoggingShim.scala:32)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:94)
	at com.databricks.logging.Log4jUsageLoggingShim$.withAttributionContext(Log4jUsageLoggingShim.scala:30)
	at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:283)
	at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:282)
	at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:62)
	at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:318)
	at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:303)
	at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:62)
	at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:728)
	at com.databricks.backend.daemon.driver.DriverWrapper.$anonfun$tryExecutingCommand$1(DriverWrapper.scala:622)
	at scala.util.Try$.apply(Try.scala:213)
	at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:614)
	at com.databricks.backend.daemon.driver.DriverWrapper.executeCommandAndGetError(DriverWrapper.scala:533)
	at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:568)
	at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:438)
	at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:381)
	at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:232)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.IOException: statObject
	at io.lakefs.LakeFSFileSystem.getFileStatus(LakeFSFileSystem.java:751)
	at io.lakefs.LakeFSFileSystem.create(LakeFSFileSystem.java:204)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
	at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
	at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:335)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:524)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:455)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:444)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:36)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:159)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:163)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:145)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:417)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$21(FileFormatWriter.scala:341)
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
	at org.apache.spark.scheduler.Task.doRunTask(Task.scala:168)
	at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:136)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.Task.run(Task.scala:96)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:889)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1692)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:892)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:747)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: io.lakefs.hadoop.shade.api.ApiException: Too Many Requests
	at io.lakefs.hadoop.shade.api.ApiClient.handleResponse(ApiClient.java:1031)
	at io.lakefs.hadoop.shade.api.ApiClient.execute(ApiClient.java:944)
	at io.lakefs.hadoop.shade.api.ObjectsApi.statObjectWithHttpInfo(ObjectsApi.java:1115)
	at io.lakefs.hadoop.shade.api.ObjectsApi.statObject(ObjectsApi.java:1089)
	at io.lakefs.LakeFSFileSystem.getFileStatus(LakeFSFileSystem.java:747)
	at io.lakefs.LakeFSFileSystem.create(LakeFSFileSystem.java:204)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
	at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
	at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:335)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:524)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:455)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:444)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:36)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:159)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:163)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:145)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:417)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$21(FileFormatWriter.scala:341)
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
	at org.apache.spark.scheduler.Task.doRunTask(Task.scala:168)
	at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:136)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.Task.run(Task.scala:96)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:889)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1692)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:892)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:747)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Contact Details

No response

ortz avatar Jan 26 '23 11:01 ortz

@ortz I understand why this happens for for HadoopFS, but not why it would happen for the metadata client with GC. There this should have been fixed by issue #4189 / PR #4190.

Did you observe GC getting stuck because of this?

arielshaqed avatar Jan 26 '23 13:01 arielshaqed

Why it happens in lakeFSFS

I observed this before we abandoned the work on LakeFSOutputCommitter. Like LakeFSFS the committer would never retry a failed call, and API calls to lakeFS can fail under load.

Proposal

The solution in LakeFSOutputCommitter was to add a retry wrapper (the same one from #4190 in fact) around calls to the lakeFS API. This will solve most of the issue: Spark has a fixed number of executors to use, and each executor writes in a single-threaded manner. So it should be enough to back off to succeed.

Here we would ideally also respect the Retry-After header on the 429 response, as a minimum retry interval. In practice such headers are optimized more towards single success than sustained throughput, so they may under-estimate the time to retry; consider adjust the value in any case.

arielshaqed avatar Jan 26 '23 13:01 arielshaqed

@ortz I understand why this happens for for HadoopFS, but not why it would happen for the metadata client with GC. There this should have been fixed by issue #4189 / PR #4190.

Did you observe GC getting stuck because of this?

I haven't run GC to test it TBH, haven't got to it yet. It's something we'll need to do.

ortz avatar Jan 26 '23 13:01 ortz

The rate limit above is based on extending the lakeFS OpenAPI first to retry-after (backoff) response first and than add the client support. Note that the issue above can origin from client timeout, for example by default the generated java SDK uses 10-30 second per request timeout by default. This can cause the large number of request while lakeFS uses the underlying storage SDK which perform backoff and supports the above header/mechanism we would like to provide to our clients.

nopcoder avatar Jan 26 '23 13:01 nopcoder

This issue is now marked as stale after 90 days of inactivity, and will be closed soon. To keep it, mark it with the "no stale" label.

github-actions[bot] avatar Nov 01 '23 14:11 github-actions[bot]

This is still an issue: server prevents rate-limiting on clients, which is poor behaviour.

See also #4957 , #5664 (which is blocked on fixing this), will add others as I find them.

arielshaqed avatar Nov 05 '23 13:11 arielshaqed