hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT] Intermittent timeouts with timeline server based write markers

Open hgudladona opened this issue 6 months ago • 11 comments

Describe the problem you faced

Hello, We are seeing intermittent timeouts with timeline server based write markers. The stack trace is below. All the marker configurations are default. However timeout value we have set is 30s while the default is 5m. Our workloads run on EKS cluster with oss spark and hudi. This issue does not show up consistently, and cannot provide a reproducible steps. However it tends to show up, sometimes, under higher load. Due to this behavior we have reset the marker behavior to DIRECT at the expense of S3 API costs. We need help identifying the problem and fixing this.

To Reproduce

Steps to reproduce the behavior:

We cannot guarantee the behavior can be reproduced consistently. But here is our setup

  1. Run Delta streamer from kafka in continuous mode with timeline server based markers
  2. In each commit write ~1200 partitions (implicitly parquet files and marker requests)
  3. Run 150 Executors with 2 cores each and sufficient memory
  4. Wait for this to fail :)

Expected behavior

Timeline server based markers consistently succeed.

Environment Description

  • Hudi version : 0.14.1

  • Spark version : 3.4.x

  • Hadoop version : 3.3

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : yes

  • Driver resources:

   Limits:
     cpu:     8
     memory:  6758Mi
   Requests:
     cpu:     8
     memory:  6758Mi

Additional context

We suspect this to be a bug in the locking behavior in the MarkerDirState, which shows up only on a higher load. As stated above the driver itself is sufficiently resourced. We tried to profile the driver using JProfiler during the occurrence of the problem but we could not find any obvious problems. Kindly, let me know what additional information you need here.

Stacktrace

	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:342)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleInsertPartition(BaseSparkCommitActionExecutor.java:348)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:259)
	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:905)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:905)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:377)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1552)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1462)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1526)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1349)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:375)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:326)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.hudi.exception.HoodieRemoteException: Failed to create marker file tenant=xxxxxx/date=20250524/3427d9c9-ea16-4af1-89f0-9d6fc1045cdf-0_648-22-13747_20250524161054306.parquet.marker.MERGE
Read timed out
	at org.apache.hudi.table.marker.TimelineServerBasedWriteMarkers.executeCreateMarkerRequest(TimelineServerBasedWriteMarkers.java:187)
	at org.apache.hudi.table.marker.TimelineServerBasedWriteMarkers.create(TimelineServerBasedWriteMarkers.java:143)
	at org.apache.hudi.table.marker.WriteMarkers.create(WriteMarkers.java:95)
	at org.apache.hudi.io.HoodieWriteHandle.createMarkerFile(HoodieWriteHandle.java:144)
	at org.apache.hudi.io.HoodieMergeHandle.init(HoodieMergeHandle.java:198)
	at org.apache.hudi.io.HoodieMergeHandle.<init>(HoodieMergeHandle.java:134)
	at org.apache.hudi.io.HoodieMergeHandle.<init>(HoodieMergeHandle.java:125)
	at org.apache.hudi.io.HoodieMergeHandleFactory.create(HoodieMergeHandleFactory.java:68)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.getUpdateHandle(BaseSparkCommitActionExecutor.java:400)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:368)
	at org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.handleUpdate(BaseSparkDeltaCommitActionExecutor.java:79)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:335)
	... 30 more
Caused by: java.net.SocketTimeoutException: Read timed out
	at java.base/java.net.SocketInputStream.socketRead0(Native Method)
	at java.base/java.net.SocketInputStream.socketRead(Unknown Source)
	at java.base/java.net.SocketInputStream.read(Unknown Source)
	at java.base/java.net.SocketInputStream.read(Unknown Source)
	at org.apache.hudi.org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:139)
	at org.apache.hudi.org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:155)
	at org.apache.hudi.org.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:284)
	at org.apache.hudi.org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:140)
	at org.apache.hudi.org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57)
	at org.apache.hudi.org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:261)
	at org.apache.hudi.org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:165)
	at org.apache.hudi.org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:167)
	at org.apache.hudi.org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:272)
	at org.apache.hudi.org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:124)
	at org.apache.hudi.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:271)
	at org.apache.hudi.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:184)
	at org.apache.hudi.org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:88)
	at org.apache.hudi.org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
	at org.apache.hudi.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:184)
	at org.apache.hudi.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
	at org.apache.hudi.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:107)
	at org.apache.hudi.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55)
	at org.apache.hudi.org.apache.http.client.fluent.Request.execute(Request.java:151)
	at org.apache.hudi.table.marker.TimelineServerBasedWriteMarkers.executeRequestToTimelineServer(TimelineServerBasedWriteMarkers.java:233)
	at org.apache.hudi.table.marker.TimelineServerBasedWriteMarkers.executeCreateMarkerRequest(TimelineServerBasedWriteMarkers.java:184)
	... 41 more```

hgudladona avatar May 25 '25 14:05 hgudladona

This is a call from the executor to the driver. and socket timeout indicates either

  • the driver is GC-ing/unresponsive.
  • over the timeline server is overloaded

@yihua @nsivabalan any pattern matching with other issues we have seen with same symptoms.

I think this is blocking @hgudladona from enabling timeline markers..

@hgudladona could you share number of executors cores you have, to size up the scale of requests hitting the timeline server. (we've seen it handle 90K qps fine)

vinothchandar avatar May 30 '25 18:05 vinothchandar

The default value for the timeout (for view manager and marker api) is 5m, we have lowered it to 30s to fail fast to secondary view for timeline view clients. We also have task retries set to 5, which also consistently fails on retry.

In this case the number of executor tasks are ~1200 in the write stage, which is very reasonable and this works most of the time.

We did test the marker API ourself with K6 and able to produce 30K RPS without fail, but this timeout happens periodically.

I found this in Hudi JIRA https://issues.apache.org/jira/browse/HUDI-5670 which has the same stacktrace.

One other possibility that the driver is responsive is a contention on a lock at higher load, which is also hard to reproduce.

hgudladona avatar May 30 '25 18:05 hgudladona

After profiling the driver this is what we found. There are 100s of Jetty qtp(Queued Thread Pool) threads that are blocked and remained blocked when we encountered the socket timeouts. It looks like the Timeline server (Javalin/Jetty) is overwhelmed and exhausted its threads to serve new requests due to some blocking calls for timeline view refresh. The marker request itself does not need a view refresh but they seem to be blocked due to the other timeline requests

Any ideas on how to address this?

Image Image Image Image

gudladona avatar Jun 03 '25 16:06 gudladona

Please find this overlay with socket reads. For some reason the view sync seems to happening lot more times than I am expecting.

Image

Image

hgudladona avatar Jun 03 '25 19:06 hgudladona

We updated the cleaner service that was running async to run inline to reduce the chance of timeline view going outof sync and there by needing a refresh. The result is as follows. The job has been very stable and the blocking operation of timeline sync is very small. I am not sure if there are any changes since 0.14.1 that fix this behavior.

Image Image

hgudladona avatar Jun 04 '25 00:06 hgudladona

would something like this in the ViewHandler work better?

  private final ReadWriteLock viewLock = new ReentrantReadWriteLock();

...

  /**
   * Syncs data-set view if local view is behind.
   */
  private boolean syncIfLocalViewBehind(Context ctx) {
    // First check with read lock - allows concurrent reads
    viewLock.readLock().lock();
    try {
        if (!isLocalViewBehind(ctx)) {
            return false;  // View is in sync, no need for write lock
        }
    } finally {
        viewLock.readLock().unlock();
    }

    // Only if first check indicates sync is needed, get write lock
    viewLock.writeLock().lock();
    try {
        // Double check with write lock
        if (isLocalViewBehind(ctx)) {
            String basePath = ctx.queryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM);
            SyncableFileSystemView view = viewManager.getFileSystemView(basePath);
            view.sync();
            return true;
        }
        return false;
    } finally {
        viewLock.writeLock().unlock();
    }
  }

hgudladona avatar Jun 08 '25 02:06 hgudladona

hey @gudladona : you might have got the root cause to a fair extent. Our fsv sync on the timeline server might have frequent refresh calls if we are operating w/ all async services. what you have proposed should work and avoids contention to some extent. But still may not completely avoid it, since if the client is having updated timeline, but timeline server is lagging, we have to trigger the refresh and w/ lazy refresh, just when the refresh kicks in, all executor requests has to wait for the refresh to complete.

If you get a chance to try out above fix, do let us know how much it could avoid the contention vs baseline.

nsivabalan avatar Jun 09 '25 18:06 nsivabalan

We are starting to test this and will get back with what we find. My suspicion is that we are hitting this condition more often. If the remote client view is always behind, and the view in the timeline server is ahead, this condition will cause unnecessary trashing without any meaningful outcome. If this is the case need to figure if there is a peek operation we can do that can ensure the in-memory timeline matches the file system for subsequent calls without a full reload

    // refresh if timeline hash mismatches
    if (!localTimelineHash.equals(timelineHashFromClient)) {
      return true;
    }

hgudladona avatar Jun 10 '25 23:06 hgudladona

Here is what we have found, turns out the reason for all the refreshes is the mismatch of the timeline hash. There is no scenario where the timeline has will match but the local timeline is ahead of the instant reported by client here. AFAICT, what this means is the code below is dead. You should consider renaming the function - as even if the timeline server view is ahead, the hash mismatch will cause it to be treated as behind.

// As a safety check, even if hash is same, ensure instant is present
return !localTimeline.containsOrBeforeTimelineStarts(lastKnownInstantFromClient);

Realistically, at least in our case, without fixing the logic in isLocalViewBehind regardless of the locking mechanism we will end up doing many refreshes in async mode.

To fix this behavior, We may need to introduce a tracker to skip subsequent refreshes upon a first refresh for the same timeline hash from the client

Please advise how to proceed.

hgudladona avatar Jun 11 '25 14:06 hgudladona

Here is another data point, if we flip the conditions to check for the local timeline containing the instant from the remote request first, with the above proposed ReadWriteLock we are seeing only 1 refresh per async job, subsequent requests always pass the isLocalViewBehind check, both timeline instant check and hash matching check, basically working as expected.

However, I am unsure of the repercussions of doing this.

BEFORE

/**
     * Determines if local view of table's timeline is behind that of client's view.
     */
    private boolean isLocalViewBehind(Context ctx) {
      String basePath = ctx.queryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM);
      String lastKnownInstantFromClient = getLastInstantTsParam(ctx);
      String timelineHashFromClient = getTimelineHashParam(ctx);
      HoodieTimeline localTimeline =
          viewManager.getFileSystemView(basePath).getTimeline().filterCompletedOrMajorOrMinorCompactionInstants();
      if (LOG.isDebugEnabled()) {
        LOG.debug("Client [ LastTs={}, TimelineHash={}], localTimeline={}",lastKnownInstantFromClient, timelineHashFromClient, localTimeline.getInstants());
      }

      if ((!localTimeline.getInstantsAsStream().findAny().isPresent())
          && HoodieTimeline.INVALID_INSTANT_TS.equals(lastKnownInstantFromClient)) {
        return false;
      }

      String localTimelineHash = localTimeline.getTimelineHash();
      // refresh if timeline hash mismatches
      if (!localTimelineHash.equals(timelineHashFromClient)) {
        return true;
      }

      // As a safety check, even if hash is same, ensure instant is present
      return !localTimeline.containsOrBeforeTimelineStarts(lastKnownInstantFromClient);
    }

AFTER

/**
     * Determines if local view of table's timeline is behind that of client's view.
     */
    private boolean isLocalViewBehind(Context ctx) {
      String basePath = ctx.queryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM);
      String lastKnownInstantFromClient = getLastInstantTsParam(ctx);
      String timelineHashFromClient = getTimelineHashParam(ctx);
      HoodieTimeline localTimeline =
          viewManager.getFileSystemView(basePath).getTimeline().filterCompletedOrMajorOrMinorCompactionInstants();
      if (LOG.isDebugEnabled()) {
        LOG.debug("Client [ LastTs={}, TimelineHash={}], localTimeline={}",lastKnownInstantFromClient, timelineHashFromClient, localTimeline.getInstants());
      }

      if ((!localTimeline.getInstantsAsStream().findAny().isPresent())
          && HoodieTimeline.INVALID_INSTANT_TS.equals(lastKnownInstantFromClient)) {
        return false;
      }


      // As a safety check, even if hash is same, ensure instant is present
      if (!localTimeline.containsOrBeforeTimelineStarts(lastKnownInstantFromClient)) {
        return true; <<-- this triggers first refresh; subsequent checks pass this check 
      }

      String localTimelineHash = localTimeline.getTimelineHash();
      // refresh if timeline hash mismatches
      return !localTimelineHash.equals(timelineHashFromClient); <<-- After first refresh; subsequent checks pass this check too. 
    }

hgudladona avatar Jun 12 '25 01:06 hgudladona

@hgudladona The timeline server has some issues when running for multiple clients even with the refresh check fix. The concept of "latest" can change during the execution of some code and will cause the client to have an inconsistent view of the table. Longer term, I think the server will need to incorporate the completion time of the last instant from the client into the calls to the file system view combined with changes you recommend above.

In the meantime, I have created a patch where the async services will run with new clients. The write clients themselves were not built with thread safety in mind so using the same object across threads may also lead to some problems down the road.

https://github.com/apache/hudi/pull/13422

the-other-tim-brown avatar Jun 12 '25 02:06 the-other-tim-brown