hudi
hudi copied to clipboard
[SUPPORT] Expect job status failed in spark batch model
Describe the problem you faced
@Override
public void write(HoodieRecord record, Option<IndexedRecord> avroRecord) {
...
fileWriter.writeAvro(record.getRecordKey(),
rewriteRecordWithMetadata((GenericRecord) avroRecord.get(), path.getName()));
} else {
fileWriter.writeAvroWithMetadata(record.getKey(), rewriteRecord((GenericRecord) avroRecord.get()));
}
...
} catch (Throwable t) {
// Not throwing exception from here, since we don't want to fail the entire job
// for a single record
writeStatus.markFailure(record, t, recordMetadata);
LOG.error("Error writing record " + record, t);
}
}
private def commitAndPerformPostOperations(spark: SparkSession,
...
): (Boolean, common.util.Option[java.lang.String], common.util.Option[java.lang.String]) = {
if (writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors).count() == 0) {
log.info("Proceeding to commit the write.")
...
(commitSuccess && metaSyncSuccess, compactionInstant, clusteringInstant)
} else {
log.error(s"${tableInstantInfo.operation} failed with errors")
if (log.isTraceEnabled) {
log.trace("Printing out the top 100 errors")
writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors)
.take(100)
.foreach(ws => {
log.trace("Global error :", ws.getGlobalError)
if (ws.getErrors.size() > 0) {
ws.getErrors.foreach(kt =>
log.trace(s"Error for key: ${kt._1}", kt._2))
}
})
}
(false, common.util.Option.empty(), common.util.Option.empty())
}
}
I have a SchemaCompatibilityException
in method rewriteRecord
, but it had be catch, only print log in driver: ERROR HoodieSparkSqlWriter$: UPSERT failed with errors
, and the spark job status is SUCCEEDED
some question need help:
- Is it for streaming scenes? If throw exceptiion how to restore it?
- in batch model, will use job status as trigger condition often. how to resolve this scene in current version, thanks
@vinothchandar @xushiyan @yihua Could you help answer this question?
which version of hudi are you using? We have a config https://hudi.apache.org/docs/configurations/#hoodiedatasourcewritestreamingignorefailedbatch: when set to true, will ignore failures and will proceed to next batch. Default value for this was true until 0.12.0.
Can you set the value of this config to false and let us know if you are still facing issues.
@nsivabalan thanks reply, I use 0.11.0 version. But we are batch job, not streaming job. Follow the config which you advice in code, I found the execption processing logic in streaming model, I will try to refer it to implement my logic in batch job. thanks
Try(
HoodieSparkSqlWriter.write(
sqlContext, mode, updatedOptions, data, hoodieTableConfig, writeClient, Some(triggerAsyncCompactor), Some(triggerAsyncClustering))
) match {
case Success((true, commitOps, compactionInstantOps, clusteringInstant, client, tableConfig)) =>
log.info(s"Micro batch id=$batchId succeeded"
+ (commitOps.isPresent match {
case true => s" for commit=${commitOps.get()}"
case _ => s" with no new commits"
}))
writeClient = Some(client)
hoodieTableConfig = Some(tableConfig)
if (compactionInstantOps.isPresent) {
asyncCompactorService.enqueuePendingAsyncServiceInstant(
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, compactionInstantOps.get()))
}
if (clusteringInstant.isPresent) {
asyncClusteringService.enqueuePendingAsyncServiceInstant(new HoodieInstant(
State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringInstant.get()
))
}
Success((true, commitOps, compactionInstantOps))
case Failure(e) =>
// clean up persist rdds in the write process
data.sparkSession.sparkContext.getPersistentRDDs
.foreach {
case (id, rdd) =>
try {
rdd.unpersist()
} catch {
case t: Exception => log.warn("Got excepting trying to unpersist rdd", t)
}
}
log.error(s"Micro batch id=$batchId threw following exception: ", e)
if (ignoreFailedBatch) {
log.info(s"Ignore the exception and move on streaming as per " +
s"${DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key} configuration")
Success((true, None, None))
} else {
if (retryCnt > 1) log.info(s"Retrying the failed micro batch id=$batchId ...")
Failure(e)
}
case Success((false, commitOps, compactionInstantOps, clusteringInstant, client, tableConfig)) =>
log.error(s"Micro batch id=$batchId ended up with errors"
+ (commitOps.isPresent match {
case true => s" for commit=${commitOps.get()}"
case _ => s""
}))
if (ignoreFailedBatch) {
log.info(s"Ignore the errors and move on streaming as per " +
s"${DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key} configuration")
Success((true, None, None))
} else {
if (retryCnt > 1) log.info(s"Retrying the failed micro batch id=$batchId ...")
Failure(new HoodieCorruptedDataException(s"Micro batch id=$batchId ended up with errors"))
}
}
thanks for reporting. I got what you are looking for. have put up a fix here. https://github.com/apache/hudi/pull/7140 yet to write tests. but you can test it out and let us know if it looks good.
feel free to reopen if the fix does not solve your use-case.
@KnightChess Hi, the problem has been fixed, but the test is missing, can you provide a way to reproduce the problem, then I can add it to the UT