hudi
hudi copied to clipboard
[SUPPORT] Intermittent timeouts with timeline server based write markers
Describe the problem you faced
Hello, We are seeing intermittent timeouts with timeline server based write markers. The stack trace is below. All the marker configurations are default. However timeout value we have set is 30s while the default is 5m. Our workloads run on EKS cluster with oss spark and hudi. This issue does not show up consistently, and cannot provide a reproducible steps. However it tends to show up, sometimes, under higher load. Due to this behavior we have reset the marker behavior to DIRECT at the expense of S3 API costs. We need help identifying the problem and fixing this.
To Reproduce
Steps to reproduce the behavior:
We cannot guarantee the behavior can be reproduced consistently. But here is our setup
- Run Delta streamer from kafka in continuous mode with timeline server based markers
- In each commit write ~1200 partitions (implicitly parquet files and marker requests)
- Run 150 Executors with 2 cores each and sufficient memory
- Wait for this to fail :)
Expected behavior
Timeline server based markers consistently succeed.
Environment Description
-
Hudi version : 0.14.1
-
Spark version : 3.4.x
-
Hadoop version : 3.3
-
Storage (HDFS/S3/GCS..) : S3
-
Running on Docker? (yes/no) : yes
-
Driver resources:
Limits:
cpu: 8
memory: 6758Mi
Requests:
cpu: 8
memory: 6758Mi
Additional context
We suspect this to be a bug in the locking behavior in the MarkerDirState, which shows up only on a higher load. As stated above the driver itself is sufficiently resourced. We tried to profile the driver using JProfiler during the occurrence of the problem but we could not find any obvious problems. Kindly, let me know what additional information you need here.
Stacktrace
at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:342)
at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleInsertPartition(BaseSparkCommitActionExecutor.java:348)
at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:259)
at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:905)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:905)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:377)
at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1552)
at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1462)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1526)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1349)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:375)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:326)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.hudi.exception.HoodieRemoteException: Failed to create marker file tenant=xxxxxx/date=20250524/3427d9c9-ea16-4af1-89f0-9d6fc1045cdf-0_648-22-13747_20250524161054306.parquet.marker.MERGE
Read timed out
at org.apache.hudi.table.marker.TimelineServerBasedWriteMarkers.executeCreateMarkerRequest(TimelineServerBasedWriteMarkers.java:187)
at org.apache.hudi.table.marker.TimelineServerBasedWriteMarkers.create(TimelineServerBasedWriteMarkers.java:143)
at org.apache.hudi.table.marker.WriteMarkers.create(WriteMarkers.java:95)
at org.apache.hudi.io.HoodieWriteHandle.createMarkerFile(HoodieWriteHandle.java:144)
at org.apache.hudi.io.HoodieMergeHandle.init(HoodieMergeHandle.java:198)
at org.apache.hudi.io.HoodieMergeHandle.<init>(HoodieMergeHandle.java:134)
at org.apache.hudi.io.HoodieMergeHandle.<init>(HoodieMergeHandle.java:125)
at org.apache.hudi.io.HoodieMergeHandleFactory.create(HoodieMergeHandleFactory.java:68)
at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.getUpdateHandle(BaseSparkCommitActionExecutor.java:400)
at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:368)
at org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.handleUpdate(BaseSparkDeltaCommitActionExecutor.java:79)
at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:335)
... 30 more
Caused by: java.net.SocketTimeoutException: Read timed out
at java.base/java.net.SocketInputStream.socketRead0(Native Method)
at java.base/java.net.SocketInputStream.socketRead(Unknown Source)
at java.base/java.net.SocketInputStream.read(Unknown Source)
at java.base/java.net.SocketInputStream.read(Unknown Source)
at org.apache.hudi.org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:139)
at org.apache.hudi.org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:155)
at org.apache.hudi.org.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:284)
at org.apache.hudi.org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:140)
at org.apache.hudi.org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57)
at org.apache.hudi.org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:261)
at org.apache.hudi.org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:165)
at org.apache.hudi.org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:167)
at org.apache.hudi.org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:272)
at org.apache.hudi.org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:124)
at org.apache.hudi.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:271)
at org.apache.hudi.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:184)
at org.apache.hudi.org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:88)
at org.apache.hudi.org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
at org.apache.hudi.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:184)
at org.apache.hudi.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
at org.apache.hudi.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:107)
at org.apache.hudi.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55)
at org.apache.hudi.org.apache.http.client.fluent.Request.execute(Request.java:151)
at org.apache.hudi.table.marker.TimelineServerBasedWriteMarkers.executeRequestToTimelineServer(TimelineServerBasedWriteMarkers.java:233)
at org.apache.hudi.table.marker.TimelineServerBasedWriteMarkers.executeCreateMarkerRequest(TimelineServerBasedWriteMarkers.java:184)
... 41 more```
This is a call from the executor to the driver. and socket timeout indicates either
- the driver is GC-ing/unresponsive.
- over the timeline server is overloaded
@yihua @nsivabalan any pattern matching with other issues we have seen with same symptoms.
I think this is blocking @hgudladona from enabling timeline markers..
@hgudladona could you share number of executors cores you have, to size up the scale of requests hitting the timeline server. (we've seen it handle 90K qps fine)
The default value for the timeout (for view manager and marker api) is 5m, we have lowered it to 30s to fail fast to secondary view for timeline view clients. We also have task retries set to 5, which also consistently fails on retry.
In this case the number of executor tasks are ~1200 in the write stage, which is very reasonable and this works most of the time.
We did test the marker API ourself with K6 and able to produce 30K RPS without fail, but this timeout happens periodically.
I found this in Hudi JIRA https://issues.apache.org/jira/browse/HUDI-5670 which has the same stacktrace.
One other possibility that the driver is responsive is a contention on a lock at higher load, which is also hard to reproduce.
After profiling the driver this is what we found. There are 100s of Jetty qtp(Queued Thread Pool) threads that are blocked and remained blocked when we encountered the socket timeouts. It looks like the Timeline server (Javalin/Jetty) is overwhelmed and exhausted its threads to serve new requests due to some blocking calls for timeline view refresh. The marker request itself does not need a view refresh but they seem to be blocked due to the other timeline requests
Any ideas on how to address this?
Please find this overlay with socket reads. For some reason the view sync seems to happening lot more times than I am expecting.
We updated the cleaner service that was running async to run inline to reduce the chance of timeline view going outof sync and there by needing a refresh. The result is as follows. The job has been very stable and the blocking operation of timeline sync is very small. I am not sure if there are any changes since 0.14.1 that fix this behavior.
would something like this in the ViewHandler work better?
private final ReadWriteLock viewLock = new ReentrantReadWriteLock();
...
/**
* Syncs data-set view if local view is behind.
*/
private boolean syncIfLocalViewBehind(Context ctx) {
// First check with read lock - allows concurrent reads
viewLock.readLock().lock();
try {
if (!isLocalViewBehind(ctx)) {
return false; // View is in sync, no need for write lock
}
} finally {
viewLock.readLock().unlock();
}
// Only if first check indicates sync is needed, get write lock
viewLock.writeLock().lock();
try {
// Double check with write lock
if (isLocalViewBehind(ctx)) {
String basePath = ctx.queryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM);
SyncableFileSystemView view = viewManager.getFileSystemView(basePath);
view.sync();
return true;
}
return false;
} finally {
viewLock.writeLock().unlock();
}
}
hey @gudladona : you might have got the root cause to a fair extent. Our fsv sync on the timeline server might have frequent refresh calls if we are operating w/ all async services. what you have proposed should work and avoids contention to some extent. But still may not completely avoid it, since if the client is having updated timeline, but timeline server is lagging, we have to trigger the refresh and w/ lazy refresh, just when the refresh kicks in, all executor requests has to wait for the refresh to complete.
If you get a chance to try out above fix, do let us know how much it could avoid the contention vs baseline.
We are starting to test this and will get back with what we find. My suspicion is that we are hitting this condition more often. If the remote client view is always behind, and the view in the timeline server is ahead, this condition will cause unnecessary trashing without any meaningful outcome. If this is the case need to figure if there is a peek operation we can do that can ensure the in-memory timeline matches the file system for subsequent calls without a full reload
// refresh if timeline hash mismatches
if (!localTimelineHash.equals(timelineHashFromClient)) {
return true;
}
Here is what we have found, turns out the reason for all the refreshes is the mismatch of the timeline hash. There is no scenario where the timeline has will match but the local timeline is ahead of the instant reported by client here. AFAICT, what this means is the code below is dead. You should consider renaming the function - as even if the timeline server view is ahead, the hash mismatch will cause it to be treated as behind.
// As a safety check, even if hash is same, ensure instant is present
return !localTimeline.containsOrBeforeTimelineStarts(lastKnownInstantFromClient);
Realistically, at least in our case, without fixing the logic in isLocalViewBehind regardless of the locking mechanism we will end up doing many refreshes in async mode.
To fix this behavior, We may need to introduce a tracker to skip subsequent refreshes upon a first refresh for the same timeline hash from the client
Please advise how to proceed.
Here is another data point, if we flip the conditions to check for the local timeline containing the instant from the remote request first, with the above proposed ReadWriteLock we are seeing only 1 refresh per async job, subsequent requests always pass the isLocalViewBehind check, both timeline instant check and hash matching check, basically working as expected.
However, I am unsure of the repercussions of doing this.
BEFORE
/**
* Determines if local view of table's timeline is behind that of client's view.
*/
private boolean isLocalViewBehind(Context ctx) {
String basePath = ctx.queryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM);
String lastKnownInstantFromClient = getLastInstantTsParam(ctx);
String timelineHashFromClient = getTimelineHashParam(ctx);
HoodieTimeline localTimeline =
viewManager.getFileSystemView(basePath).getTimeline().filterCompletedOrMajorOrMinorCompactionInstants();
if (LOG.isDebugEnabled()) {
LOG.debug("Client [ LastTs={}, TimelineHash={}], localTimeline={}",lastKnownInstantFromClient, timelineHashFromClient, localTimeline.getInstants());
}
if ((!localTimeline.getInstantsAsStream().findAny().isPresent())
&& HoodieTimeline.INVALID_INSTANT_TS.equals(lastKnownInstantFromClient)) {
return false;
}
String localTimelineHash = localTimeline.getTimelineHash();
// refresh if timeline hash mismatches
if (!localTimelineHash.equals(timelineHashFromClient)) {
return true;
}
// As a safety check, even if hash is same, ensure instant is present
return !localTimeline.containsOrBeforeTimelineStarts(lastKnownInstantFromClient);
}
AFTER
/**
* Determines if local view of table's timeline is behind that of client's view.
*/
private boolean isLocalViewBehind(Context ctx) {
String basePath = ctx.queryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM);
String lastKnownInstantFromClient = getLastInstantTsParam(ctx);
String timelineHashFromClient = getTimelineHashParam(ctx);
HoodieTimeline localTimeline =
viewManager.getFileSystemView(basePath).getTimeline().filterCompletedOrMajorOrMinorCompactionInstants();
if (LOG.isDebugEnabled()) {
LOG.debug("Client [ LastTs={}, TimelineHash={}], localTimeline={}",lastKnownInstantFromClient, timelineHashFromClient, localTimeline.getInstants());
}
if ((!localTimeline.getInstantsAsStream().findAny().isPresent())
&& HoodieTimeline.INVALID_INSTANT_TS.equals(lastKnownInstantFromClient)) {
return false;
}
// As a safety check, even if hash is same, ensure instant is present
if (!localTimeline.containsOrBeforeTimelineStarts(lastKnownInstantFromClient)) {
return true; <<-- this triggers first refresh; subsequent checks pass this check
}
String localTimelineHash = localTimeline.getTimelineHash();
// refresh if timeline hash mismatches
return !localTimelineHash.equals(timelineHashFromClient); <<-- After first refresh; subsequent checks pass this check too.
}
@hgudladona The timeline server has some issues when running for multiple clients even with the refresh check fix. The concept of "latest" can change during the execution of some code and will cause the client to have an inconsistent view of the table. Longer term, I think the server will need to incorporate the completion time of the last instant from the client into the calls to the file system view combined with changes you recommend above.
In the meantime, I have created a patch where the async services will run with new clients. The write clients themselves were not built with thread safety in mind so using the same object across threads may also lead to some problems down the road.
https://github.com/apache/hudi/pull/13422