hudi
hudi copied to clipboard
[SUPPORT]compaction plan generate too fast
Tips before filing an issue
-
Have you gone through our FAQs?
-
Join the mailing list to engage in conversations and get faster support at [email protected].
-
If you have triaged this as a bug, then file an issue directly.
Describe the problem you faced
almost every commits generate one compaction plan. offline compaction task cannot finish so many compaction.
To Reproduce
Steps to reproduce the behavior:
there are my configs:
map.put(FlinkOptions.COMPACTION_TRIGGER_STRATEGY.key(),"num_commits"); map.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(),"true"); map.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(),"24");
my flink checkpoint interval is 150 sec
it should generate one compact plan per hour,but every commits generate one compaction plan.
There is my some problem with my config?
Environment Description
- Hudi version : 0.11.0
below is my code part ` //compact configuration.set(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, configuration.getBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED)); configuration.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, configuration.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED)); configuration.set(FlinkOptions.COMPACTION_TASKS, configuration.getInteger(FlinkOptions.COMPACTION_TASKS)); configuration.set(FlinkOptions.COMPACTION_TRIGGER_STRATEGY, configuration.getString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY)); configuration.set(FlinkOptions.COMPACTION_DELTA_COMMITS, configuration.getInteger(FlinkOptions.COMPACTION_DELTA_COMMITS)); configuration.set(FlinkOptions.COMPACTION_DELTA_SECONDS, configuration.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS)); configuration.set(FlinkOptions.COMPACTION_MAX_MEMORY, configuration.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY));
//hive
configuration.set(FlinkOptions.HIVE_SYNC_ENABLED, configuration.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED));
configuration.set(FlinkOptions.HIVE_SYNC_MODE, configuration.getString(FlinkOptions.HIVE_SYNC_MODE));
configuration.set(FlinkOptions.HIVE_SYNC_METASTORE_URIS, configuration.getString(FlinkOptions.HIVE_SYNC_METASTORE_URIS));
configuration.set(FlinkOptions.HIVE_SYNC_JDBC_URL, configuration.getString(FlinkOptions.HIVE_SYNC_JDBC_URL));
configuration.set(FlinkOptions.HIVE_SYNC_TABLE, tableName);
configuration.set(FlinkOptions.HIVE_SYNC_DB, configuration.getString(FlinkOptions.HIVE_SYNC_DB));
configuration.set(FlinkOptions.HIVE_SYNC_USERNAME, configuration.getString(FlinkOptions.HIVE_SYNC_USERNAME));
configuration.set(FlinkOptions.HIVE_SYNC_PASSWORD, configuration.getString(FlinkOptions.HIVE_SYNC_PASSWORD));
configuration.set(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP, configuration.getBoolean(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP));
//timeline-server
configuration.setString("hoodie.embed.timeline.server","false");
//index
configuration.set(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BLOOM.name());
//failed
configuration.set(FlinkOptions.IGNORE_FAILED, false);
long ckpTimeout = rowDataDataStream.getExecutionEnvironment()
.getCheckpointConfig().getCheckpointTimeout();
configuration.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
RowType rowType = RowType.of(false, columnTypes, columnNames);
configuration.setString(FlinkOptions.SOURCE_AVRO_SCHEMA,
AvroSchemaConverter.convertToSchema(rowType).toString());
//分区
configuration.setString(FlinkOptions.KEYGEN_CLASS_NAME, ComplexAvroKeyGenerator.class.getName());
configuration.setString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME, MultiPartKeysValueExtractor.class.getName());
configuration.setString(FlinkOptions.PARTITION_PATH_FIELD, "createYear,createMonth");
configuration.setString(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS, "createYear,createMonth");
// bulk_insert mode
final String writeOperation = configuration.get(FlinkOptions.OPERATION);
if (WriteOperationType.fromValue(writeOperation) == WriteOperationType.BULK_INSERT) {
Pipelines.bulkInsert(configuration, rowType, rowDataDataStream);
} else
// Append mode
if (OptionsResolver.isAppendMode(configuration)) {
Pipelines.append(configuration, rowType, rowDataDataStream, false);
} else {
DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(configuration, rowType, parallelism,
rowDataDataStream);
DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(configuration, parallelism,
hoodieRecordDataStream);
// compaction
if (StreamerUtil.needsAsyncCompaction(configuration)) {
//mor table and online compaction
Pipelines.compact(configuration, pipeline);
} else if (configuration.getString(FlinkOptions.TABLE_TYPE)
.toUpperCase(Locale.ROOT)
.equals(FlinkOptions.TABLE_TYPE_COPY_ON_WRITE)) {
//cow table
Pipelines.clean(configuration, pipeline);
}
}
}`
@danny0405 help pls
this is hdfs compation request file
@danny0405 help me,thanks
Hmm, 0.11 is a very old release, can you show some logs in JM to enlight some clues of the scheduling process?
Hmm, 0.11 is a very old release, can you show some logs in JM to enlight some clues of the scheduling process?
I cannot find any related logs and clue about scheduling process in JM. compact plan generates unregularly
Okay, is there any way you can upgrade your Hudi release to 0.14.1?