hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT] Expect job status failed in spark batch model

Open KnightChess opened this issue 2 years ago • 1 comments

Describe the problem you faced

@Override
  public void write(HoodieRecord record, Option<IndexedRecord> avroRecord) {
    ...
          fileWriter.writeAvro(record.getRecordKey(),
              rewriteRecordWithMetadata((GenericRecord) avroRecord.get(), path.getName()));
        } else {
          fileWriter.writeAvroWithMetadata(record.getKey(), rewriteRecord((GenericRecord) avroRecord.get()));
        }
        ...
    } catch (Throwable t) {
      // Not throwing exception from here, since we don't want to fail the entire job
      // for a single record
      writeStatus.markFailure(record, t, recordMetadata);
      LOG.error("Error writing record " + record, t);
    }
  }
private def commitAndPerformPostOperations(spark: SparkSession,
                                             ...
                                            ): (Boolean, common.util.Option[java.lang.String], common.util.Option[java.lang.String]) = {
    if (writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors).count() == 0) {
      log.info("Proceeding to commit the write.")
      ...
      (commitSuccess && metaSyncSuccess, compactionInstant, clusteringInstant)
    } else {
      log.error(s"${tableInstantInfo.operation} failed with errors")
      if (log.isTraceEnabled) {
        log.trace("Printing out the top 100 errors")
        writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors)
          .take(100)
          .foreach(ws => {
            log.trace("Global error :", ws.getGlobalError)
            if (ws.getErrors.size() > 0) {
              ws.getErrors.foreach(kt =>
                log.trace(s"Error for key: ${kt._1}", kt._2))
            }
          })
      }
      (false, common.util.Option.empty(), common.util.Option.empty())
    }
  }

I have a SchemaCompatibilityException in method rewriteRecord, but it had be catch, only print log in driver: ERROR HoodieSparkSqlWriter$: UPSERT failed with errors, and the spark job status is SUCCEEDED

some question need help:

  • Is it for streaming scenes? If throw exceptiion how to restore it?
  • in batch model, will use job status as trigger condition often. how to resolve this scene in current version, thanks

KnightChess avatar Sep 15 '22 07:09 KnightChess

@vinothchandar @xushiyan @yihua Could you help answer this question?

KnightChess avatar Sep 21 '22 07:09 KnightChess

which version of hudi are you using? We have a config https://hudi.apache.org/docs/configurations/#hoodiedatasourcewritestreamingignorefailedbatch: when set to true, will ignore failures and will proceed to next batch. Default value for this was true until 0.12.0.

Can you set the value of this config to false and let us know if you are still facing issues.

nsivabalan avatar Oct 23 '22 04:10 nsivabalan

@nsivabalan thanks reply, I use 0.11.0 version. But we are batch job, not streaming job. Follow the config which you advice in code, I found the execption processing logic in streaming model, I will try to refer it to implement my logic in batch job. thanks

      Try(
        HoodieSparkSqlWriter.write(
          sqlContext, mode, updatedOptions, data, hoodieTableConfig, writeClient, Some(triggerAsyncCompactor), Some(triggerAsyncClustering))
      ) match {
        case Success((true, commitOps, compactionInstantOps, clusteringInstant, client, tableConfig)) =>
          log.info(s"Micro batch id=$batchId succeeded"
            + (commitOps.isPresent match {
            case true => s" for commit=${commitOps.get()}"
            case _ => s" with no new commits"
          }))
          writeClient = Some(client)
          hoodieTableConfig = Some(tableConfig)
          if (compactionInstantOps.isPresent) {
            asyncCompactorService.enqueuePendingAsyncServiceInstant(
              new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, compactionInstantOps.get()))
          }
          if (clusteringInstant.isPresent) {
            asyncClusteringService.enqueuePendingAsyncServiceInstant(new HoodieInstant(
              State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringInstant.get()
            ))
          }
          Success((true, commitOps, compactionInstantOps))
        case Failure(e) =>
          // clean up persist rdds in the write process
          data.sparkSession.sparkContext.getPersistentRDDs
            .foreach {
              case (id, rdd) =>
                try {
                  rdd.unpersist()
                } catch {
                  case t: Exception => log.warn("Got excepting trying to unpersist rdd", t)
                }
            }
          log.error(s"Micro batch id=$batchId threw following exception: ", e)
          if (ignoreFailedBatch) {
            log.info(s"Ignore the exception and move on streaming as per " +
              s"${DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key} configuration")
            Success((true, None, None))
          } else {
            if (retryCnt > 1) log.info(s"Retrying the failed micro batch id=$batchId ...")
            Failure(e)
          }
        case Success((false, commitOps, compactionInstantOps, clusteringInstant, client, tableConfig)) =>
          log.error(s"Micro batch id=$batchId ended up with errors"
            + (commitOps.isPresent match {
              case true =>  s" for commit=${commitOps.get()}"
              case _ => s""
            }))
          if (ignoreFailedBatch) {
            log.info(s"Ignore the errors and move on streaming as per " +
              s"${DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key} configuration")
            Success((true, None, None))
          } else {
            if (retryCnt > 1) log.info(s"Retrying the failed micro batch id=$batchId ...")
            Failure(new HoodieCorruptedDataException(s"Micro batch id=$batchId ended up with errors"))
          }
      }

KnightChess avatar Oct 24 '22 08:10 KnightChess

thanks for reporting. I got what you are looking for. have put up a fix here. https://github.com/apache/hudi/pull/7140 yet to write tests. but you can test it out and let us know if it looks good.

nsivabalan avatar Nov 04 '22 18:11 nsivabalan

feel free to reopen if the fix does not solve your use-case.

nsivabalan avatar Nov 04 '22 18:11 nsivabalan

@KnightChess Hi, the problem has been fixed, but the test is missing, can you provide a way to reproduce the problem, then I can add it to the UT

Zouxxyy avatar Nov 13 '22 15:11 Zouxxyy