hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT]compaction plan generate too fast

Open lqbFFF opened this issue 1 year ago • 6 comments

Tips before filing an issue

  • Have you gone through our FAQs?

  • Join the mailing list to engage in conversations and get faster support at [email protected].

  • If you have triaged this as a bug, then file an issue directly.

Describe the problem you faced

almost every commits generate one compaction plan. offline compaction task cannot finish so many compaction.

To Reproduce

Steps to reproduce the behavior:

there are my configs: map.put(FlinkOptions.COMPACTION_TRIGGER_STRATEGY.key(),"num_commits"); map.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(),"true"); map.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(),"24"); my flink checkpoint interval is 150 sec it should generate one compact plan per hour,but every commits generate one compaction plan. There is my some problem with my config?

Environment Description

  • Hudi version : 0.11.0

below is my code part ` //compact configuration.set(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, configuration.getBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED)); configuration.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, configuration.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED)); configuration.set(FlinkOptions.COMPACTION_TASKS, configuration.getInteger(FlinkOptions.COMPACTION_TASKS)); configuration.set(FlinkOptions.COMPACTION_TRIGGER_STRATEGY, configuration.getString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY)); configuration.set(FlinkOptions.COMPACTION_DELTA_COMMITS, configuration.getInteger(FlinkOptions.COMPACTION_DELTA_COMMITS)); configuration.set(FlinkOptions.COMPACTION_DELTA_SECONDS, configuration.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS)); configuration.set(FlinkOptions.COMPACTION_MAX_MEMORY, configuration.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY));

    //hive
    configuration.set(FlinkOptions.HIVE_SYNC_ENABLED, configuration.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED));
    configuration.set(FlinkOptions.HIVE_SYNC_MODE, configuration.getString(FlinkOptions.HIVE_SYNC_MODE));
    configuration.set(FlinkOptions.HIVE_SYNC_METASTORE_URIS, configuration.getString(FlinkOptions.HIVE_SYNC_METASTORE_URIS));
    configuration.set(FlinkOptions.HIVE_SYNC_JDBC_URL, configuration.getString(FlinkOptions.HIVE_SYNC_JDBC_URL));
    configuration.set(FlinkOptions.HIVE_SYNC_TABLE, tableName);
    configuration.set(FlinkOptions.HIVE_SYNC_DB, configuration.getString(FlinkOptions.HIVE_SYNC_DB));
    configuration.set(FlinkOptions.HIVE_SYNC_USERNAME, configuration.getString(FlinkOptions.HIVE_SYNC_USERNAME));
    configuration.set(FlinkOptions.HIVE_SYNC_PASSWORD, configuration.getString(FlinkOptions.HIVE_SYNC_PASSWORD));
    configuration.set(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP, configuration.getBoolean(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP));

    //timeline-server
    configuration.setString("hoodie.embed.timeline.server","false");

    //index
    configuration.set(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BLOOM.name());

    //failed
    configuration.set(FlinkOptions.IGNORE_FAILED, false);

    long ckpTimeout = rowDataDataStream.getExecutionEnvironment()
            .getCheckpointConfig().getCheckpointTimeout();
    configuration.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);

    RowType rowType = RowType.of(false, columnTypes, columnNames);
    configuration.setString(FlinkOptions.SOURCE_AVRO_SCHEMA,
            AvroSchemaConverter.convertToSchema(rowType).toString());

    //分区
    configuration.setString(FlinkOptions.KEYGEN_CLASS_NAME, ComplexAvroKeyGenerator.class.getName());
    configuration.setString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME, MultiPartKeysValueExtractor.class.getName());
    configuration.setString(FlinkOptions.PARTITION_PATH_FIELD, "createYear,createMonth");
    configuration.setString(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS, "createYear,createMonth");

    // bulk_insert mode
    final String writeOperation = configuration.get(FlinkOptions.OPERATION);
    if (WriteOperationType.fromValue(writeOperation) == WriteOperationType.BULK_INSERT) {
        Pipelines.bulkInsert(configuration, rowType, rowDataDataStream);
    } else
        // Append mode
        if (OptionsResolver.isAppendMode(configuration)) {
            Pipelines.append(configuration, rowType, rowDataDataStream, false);
        } else {
            DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(configuration, rowType, parallelism,
                    rowDataDataStream);
            DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(configuration, parallelism,
                    hoodieRecordDataStream);

            // compaction
            if (StreamerUtil.needsAsyncCompaction(configuration)) {
                //mor table and online compaction
                Pipelines.compact(configuration, pipeline);
            } else if (configuration.getString(FlinkOptions.TABLE_TYPE)
                    .toUpperCase(Locale.ROOT)
                    .equals(FlinkOptions.TABLE_TYPE_COPY_ON_WRITE)) {
                //cow table
                Pipelines.clean(configuration, pipeline);
            }
        }
}`

lqbFFF avatar Feb 20 '24 08:02 lqbFFF

@danny0405 help pls

lqbFFF avatar Feb 20 '24 08:02 lqbFFF

企业微信截图_17084193598057 this is hdfs compation request file

lqbFFF avatar Feb 20 '24 08:02 lqbFFF

@danny0405 help me,thanks

lqbFFF avatar Feb 20 '24 09:02 lqbFFF

Hmm, 0.11 is a very old release, can you show some logs in JM to enlight some clues of the scheduling process?

danny0405 avatar Feb 21 '24 04:02 danny0405

Hmm, 0.11 is a very old release, can you show some logs in JM to enlight some clues of the scheduling process?

I cannot find any related logs and clue about scheduling process in JM. compact plan generates unregularly

lqbFFF avatar Feb 21 '24 05:02 lqbFFF

Okay, is there any way you can upgrade your Hudi release to 0.14.1?

danny0405 avatar Feb 22 '24 02:02 danny0405