iis icon indicating copy to clipboard operation
iis copied to clipboard

Repartition affiliation matching final output before generating reports

Open marekhorst opened this issue 8 months ago • 0 comments

According to the redmine ticket: https://support.openaire.eu/issues/10546 affiliation matching has failed during the very last phase of report generation due to to exceeding the predefined spark.network.timeout set to 600 seconds:

Job aborted due to stage failure: Task 451 in stage 58.0 failed 4 times, most recent failure: Lost task 451.3 in stage 58.0 (TID 746545, eos-m2-sn12.ocean.icm.edu.pl, executor 1041): org.apache.spark.SparkException: Error communicating with MapOutputTracker
    at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:270)
    at org.apache.spark.MapOutputTrackerWorker.getStatuses(MapOutputTracker.scala:736)
    at org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:689)
    at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49)
    at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [600 seconds]. This timeout is controlled by spark.network.timeout
    at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
    at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
    at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:76)
    at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:266)
    ... 17 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [600 seconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:220)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
    ... 20 more

The thing is increasing the timeout does not address the core reason of the failure.

Affiliation matching failed at very simple distinct operation run over the publication OAIds for which there was at least one affiliation matched. In particular this line 50 from the AffMatchReportGenerator

long docWithAtLeastOneOrgCount = matchedDocOrganizations.map(v->v.getDocumentId()).distinct().count();

The map() operation is the simplest possible (and it was part of prior stage 57) and the distinct() operation is being run on ~226 Mi of publication ids to be reduced to ~13 Mi of unique ids according to the numbers from the previous run.

So nothing fancy but still caused the fatal failure. One pretty straightforward fix would be running the repartition(1000) prior to distinct() operation in order to balance the load.

marekhorst avatar Apr 23 '25 15:04 marekhorst