tis icon indicating copy to clipboard operation
tis copied to clipboard

希望增加paimon数据源

Open kevinlin299 opened this issue 10 months ago • 6 comments

批量、实时同步希望增加paimon数据源的目标端写入

kevinlin299 avatar Mar 04 '25 08:03 kevinlin299

会加的,之前加过hudi 数据端,花费了不少时间,上线之后 用者寥寥,后来下线了。现在看起来paimon 流行度更高

baisui1981 avatar Mar 04 '25 09:03 baisui1981

批量写入:https://github.com/alibaba/DataX/pull/2083/files# Flink CDC pipeline 相关:https://www.processon.com/diagraming/6774c59e7af75c38daf1c1f0 view pipeline-connectors Paimon 相关配置:https://nightlies.apache.org/flink/flink-cdc-docs-release-3.3/docs/connectors/pipeline-connectors/paimon/

baisui1981 avatar Mar 16 '25 12:03 baisui1981

在 Flink CDC 中,YAML 配置文件的解析和组装的实现主要涉及以下核心类和流程:


1. 配置解析入口类

Flink CDC 通常通过 YamlParserPipelineOptionsFactory 类加载并解析 YAML 文件。例如:

// 类似代码逻辑(伪代码)
Yaml yaml = new Yaml();
Map<String, Object> config = yaml.load(inputStream);

这些类会将 YAML 的层级结构(如 sourcesinkpipeline)解析为键值对或 POJO 对象。


2. Source/Sink 工厂类

Flink CDC 通过 工厂模式 动态创建 Source 和 Sink 实例,核心类包括:

  • MysqlSourceFactory
    解析 source 配置,生成 DebeziumSourceFunction 或 Flink CDC 的新版 MySQLSource

    public class MysqlSourceFactory implements SourceFactory {
        public Source createSource(Map<String, String> config) {
            String host = config.get("hostname");
            int port = Integer.parseInt(config.get("port"));
            // 使用正则解析 tables 等参数
            return MySQLSource.builder().hostname(host).port(port)...build();
        }
    }
    
  • org.apache.flink.cdc.connectors.paimon.sink.PaimonDataSinkFactory
    解析 sink 配置,构建 Paimon 的 CatalogTableSink

    public class PaimonDataSinkFactory implements DataSinkFactory {
        public DataSink createDataSink(Map<String, String> config) {
            String warehouse = config.get("catalog.properties.warehouse");
            Catalog catalog = Catalog.create(...);
            return new PaimonDataSink(catalog, ...);
        }
    }
    

3. Pipeline 组装类

通过 PipelineBuilderJobGraphGenerator 类将 Source 和 Sink 组合成 Flink 作业:

// 伪代码
Source source = MysqlSourceFactory.create(sourceConfig);
Sink sink = PaimonSinkFactory.create(sinkConfig);

DataStream<RowData> stream = env.fromSource(source, ...);
stream.sinkTo(sink);

env.execute(pipelineConfig.getName());

此类会处理 pipeline.parallelism 等全局参数,并调用 Flink 的 StreamExecutionEnvironment 提交作业。


4. 底层依赖的关键组件

  • DebeziumDeserializationSchema
    负责将 Debezium 的变更事件(CDC 数据)解析为 Flink 的 RowData
  • PaimonCatalog
    管理 Paimon 表的元数据,与 catalog.properties.warehouse 配置直接关联。
  • Flink 的 SPI 机制
    通过 META-INF/services 下的 SPI 文件自动加载 SourceFactorySinkFactory 实现。

完整流程总结:

  1. YAML 解析:通过 SnakeYAML 等库将文本配置转换为内存对象。
  2. 工厂类实例化:根据 source.type=mysqlsink.type=paimon 找到对应的工厂类。
  3. 参数映射:将 hostnametables 正则、server-id 等参数传递给底层 Connector(如 Debezium)。
  4. 作业组装:将 Source 和 Sink 连接到 Flink 的 DataStream API,设置并行度后提交作业。

如果需要深入调试,可以重点关注 flink-cdc-connectors 模块中 *Factory 类和 YAML 解析相关的工具类。

baisui1981 avatar May 08 '25 09:05 baisui1981

另外,还有一个艰巨的任务需要整合 flink-cdc 的pipeline到现有TIS的执行流程中来,整个整合流程不能改变现有TIS增量执行流程的执行方式,

baisui1981 avatar May 14 '25 03:05 baisui1981

paimon 技术资料:https://flink-learning.org.cn/article/detail/c44518100be2ba050f754ab9e5bf66a9

baisui1981 avatar Jun 01 '25 13:06 baisui1981

在分布式环境中传输 CommitMessage 对象需要可靠的序列化方案。Apache Paimon 提供了专门的序列化机制来处理 CommitMessage,以下是完整的实现方案:

核心方案:使用 Paimon 内置序列化器

Paimon 提供了 CommitMessageSerializer 专门用于序列化 CommitMessage 对象,这是最安全且兼容性最好的方式。

import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageSerializer;

// 1. 在写入节点序列化
public byte[] serializeCommitMessages(List<CommitMessage> messages) {
    // 获取表结构信息(需从表中获取)
    RowType rowType = table.rowType();
    RowType partitionType = getPartitionType(table); // 自定义方法获取分区类型
    
    // 创建序列化器
    CommitMessageSerializer serializer = new CommitMessageSerializer(partitionType, rowType);
    
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    DataOutputStream out = new DataOutputStream(baos);
    
    try {
        // 写入消息数量
        out.writeInt(messages.size());
        
        // 序列化每条消息
        for (CommitMessage message : messages) {
            byte[] msgBytes = serializer.serialize(message);
            out.writeInt(msgBytes.length); // 写入长度
            out.write(msgBytes);           // 写入内容
        }
    } finally {
        out.close();
    }
    
    return baos.toByteArray();
}

// 2. 在提交节点反序列化
public List<CommitMessage> deserializeCommitMessages(byte[] data, Table table) {
    // 获取表结构(提交节点也需要访问表元数据)
    RowType rowType = table.rowType();
    RowType partitionType = getPartitionType(table);
    
    CommitMessageSerializer serializer = new CommitMessageSerializer(partitionType, rowType);
    List<CommitMessage> messages = new ArrayList<>();
    
    ByteArrayInputStream bais = new ByteArrayInputStream(data);
    DataInputStream in = new DataInputStream(bais);
    
    try {
        int count = in.readInt();
        for (int i = 0; i < count; i++) {
            int length = in.readInt();
            byte[] msgBytes = new byte[length];
            in.readFully(msgBytes);
            messages.add(serializer.deserialize(serializer.getVersion(), msgBytes));
        }
    } finally {
        in.close();
    }
    
    return messages;
}

// 辅助方法:获取分区类型
private RowType getPartitionType(Table table) {
    List<DataField> fields = new ArrayList<>();
    for (String partitionKey : table.partitionKeys()) {
        fields.add(new DataField(0, partitionKey, DataTypes.STRING()));
    }
    return new RowType(fields);
}

分布式架构实现

sequenceDiagram
    participant MySQL as MySQL分表
    participant Worker as 写入Worker
    participant Storage as 分布式存储
    participant Coordinator as 提交协调器
    
    loop 每个分表
        MySQL->>Worker: 拉取数据
        Worker->>Worker: 创建BatchTableWrite
        Worker->>Worker: 写入数据
        Worker->>Worker: prepareCommit()
        Worker->>Storage: 序列化CommitMessage
    end
    
    Coordinator->>Storage: 收集所有CommitMessage
    Coordinator->>Coordinator: 反序列化所有消息
    Coordinator->>Coordinator: 创建BatchTableCommit
    Coordinator->>Coordinator: commit(所有消息)
    Coordinator->>Hive: 刷新元数据

优化建议:减少元数据传输

在分布式环境中,为避免每个节点都访问元数据存储,可以使用以下优化:

// 在Driver节点提前获取并分发元数据
public class SchemaInfo implements Serializable {
    private final byte[] rowTypeBytes;
    private final byte[] partitionTypeBytes;
    
    public SchemaInfo(Table table) {
        this.rowTypeBytes = serializeRowType(table.rowType());
        this.partitionTypeBytes = serializeRowType(getPartitionType(table));
    }
    
    public CommitMessageSerializer createSerializer() {
        return new CommitMessageSerializer(
            deserializeRowType(partitionTypeBytes),
            deserializeRowType(rowTypeBytes)
        );
    }
    
    private byte[] serializeRowType(RowType rowType) {
        // 使用Paimon的RowTypeSerializer
        RowTypeSerializer serializer = new RowTypeSerializer();
        return serializer.serialize(rowType);
    }
    
    private RowType deserializeRowType(byte[] bytes) {
        RowTypeSerializer serializer = new RowTypeSerializer();
        return serializer.deserialize(bytes);
    }
}

// 使用示例(Worker节点)
SchemaInfo schemaInfo = getFromDriver(); // 从Driver获取
CommitMessageSerializer serializer = schemaInfo.createSerializer();

完整分布式流程

  1. 初始化阶段

    // Driver节点
    Table paimonTable = catalog.getTable(...);
    SchemaInfo schemaInfo = new SchemaInfo(paimonTable);
    broadcastSchemaToWorkers(schemaInfo);
    
  2. 写入阶段(Worker节点)

    BatchTableWrite writer = writeBuilder.newWrite();
    
    // 写入数据
    for (RowData row : mysqlData) {
        writer.write(row);
    }
    
    // 准备提交并序列化
    List<CommitMessage> messages = writer.prepareCommit();
    byte[] serialized = serializeCommitMessages(messages, schemaInfo);
    
    // 存储到共享存储(HDFS/S3)
    storeToSharedStorage(shardId, serialized);
    
  3. 提交阶段(Coordinator节点)

    List<CommitMessage> allMessages = new ArrayList<>();
    
    // 收集所有分片的CommitMessage
    for (int shardId : allShardIds) {
        byte[] data = readFromSharedStorage(shardId);
        List<CommitMessage> shardMessages = deserializeCommitMessages(data, paimonTable);
        allMessages.addAll(shardMessages);
    }
    
    // 原子提交
    try (BatchTableCommit commit = writeBuilder.newCommit()) {
        commit.commit(allMessages);
    
        // 刷新Hive元数据
        refreshHiveMetastore(paimonTable);
    } catch (Exception e) {
        commit.abort(allMessages);
        throw e;
    }
    

关键注意事项

  1. 元数据一致性

    • 所有Worker必须使用相同表结构的SchemaInfo
    • 在同步期间表结构不能变更
  2. 序列化版本控制

    // 在序列化数据中包含Paimon版本
    out.writeUTF(PaimonVersion.getVersion());
    
    // 反序列化时校验版本
    String version = in.readUTF();
    if (!PaimonVersion.isCompatible(version)) {
        throw new VersionMismatchException(...);
    }
    
  3. 处理大消息

    • 当CommitMessage过大时(>10MB),建议直接存储文件引用而非完整数据:
    // Worker节点
    String tmpPath = "hdfs://tmp/"+UUID.randomUUID();
    try (OutputStream out = createHdfsFile(tmpPath)) {
        out.write(serializedData);
    }
    return tmpPath; // 只传输路径
    
    // Coordinator节点
    byte[] data = readHdfsFile(tmpPath);
    
  4. 错误处理增强

    // 使用两步提交协议增强可靠性
    public class TransactionCoordinator {
        private Map<Integer, String> shardLocations = new ConcurrentHashMap<>();
    
        public void prepare(int shardId, byte[] data) {
            String path = storeData(data);
            shardLocations.put(shardId, path);
            markPrepared(shardId);
        }
    
        public void commit() {
            if (allShardsPrepared()) {
                List<CommitMessage> all = loadAllMessages();
                doCommit(all);
                markCommitted();
            }
        }
    
        public void rollback() {
            for (String path : shardLocations.values()) {
                deleteFile(path);
            }
        }
    }
    

这种方案能保证在分布式环境下可靠地传输 CommitMessage,同时保持 Paimon 提交的原子性和一致性,特别适合处理分库分表到数据湖的同步场景。

baisui1981 avatar Jun 12 '25 23:06 baisui1981

Q:那paimon 基于flink-cdc 的pipeline 数据同步方案,最终的commit的操作是依赖flink的checkoutpoint的执行而生效的,站在用户的角度肯定期望数据生效越快越好,那我们应该如何平衡这对矛盾呢? flink 的checkpoint的执行频率设置多少合适?

在 Paimon 基于 Flink CDC 的 Pipeline 数据同步方案中,commit 操作确实与 Flink Checkpoint 强绑定(每次成功的 Checkpoint 触发 Paimon Sink 提交一次事务,使数据可见)。用户期望数据“生效越快越好”(低延迟),但这与频繁 Checkpoint 带来的性能损耗确实存在矛盾。平衡这对矛盾需要综合考虑业务需求、数据量、集群资源和 Paimon 的特性。

以下是如何进行平衡的策略和建议:

1. 核心策略:理解“生效”的涵义,分层优化

  • “可见性” vs “可查询性”:
    • 可见性 (Visibility): 指数据被提交到 Paimon 的主分支 (main / default branch),对新写入可见。这由 Checkpoint 触发的 commit 保证。
    • 可查询性 (Queryability): 指数据对读取查询(尤其是最新快照查询)是高效可用的。这受到 Paimon 内部 Compaction(合并小文件)和 Snapshot 管理的影响。即使数据 commit 了(可见),如果小文件过多或 Compaction 滞后,查询性能也可能很差。
  • 优化重点: 优先保证数据在合理延迟内 commit(可见),同时确保 Paimon 的 Compaction 策略能跟上写入速度,避免查询性能恶化。不能只追求极低的 Checkpoint 间隔而忽略 Compaction。

2. 设置 Flink Checkpoint 间隔的关键考量因素

没有一个放之四海皆准的“最佳”间隔,需要根据具体场景调整。考虑以下因素:

  • 业务容忍的数据延迟 (RPO - Recovery Point Objective): 这是最核心的驱动因素。业务能接受数据延迟多久可见?
    • 要求秒级可见? -> 考虑 5s - 30s 的间隔。
    • 分钟级可见可接受? -> 1min - 5min 通常是性能和容错的良好平衡点。
    • 允许几分钟延迟? -> 5min - 10min+ 可以显著降低开销。
    • 务必与业务方明确沟通这个容忍度! 避免过度优化。
  • 数据变更速率:
    • 高吞吐 (High Throughput): 每秒处理大量记录或大记录。更倾向于稍长的间隔 (e.g., 1-5 min)。频繁 Checkpoint 在高吞吐下带来的序列化、网络、存储 I/O 开销巨大,极易成为瓶颈,导致整体吞吐骤降甚至背压/失败。此时保证稳定写入更重要。
    • 低吞吐 (Low Throughput): 每秒处理记录较少或记录小。可以尝试更短的间隔 (e.g., 10s - 60s),对系统整体压力相对较小。
  • 状态大小 (State Size):
    • 大状态 (Large State): Paimon Sink 状态大(表多、分区多、Bucket 多、数据量大)。强烈建议使用更长间隔 (e.g., >= 5 min)务必开启 RocksDB 状态后端和增量 Checkpoint。频繁做大状态快照是性能灾难。
    • 小状态 (Small State): 状态量小。对间隔长度敏感性较低,可根据吞吐和延迟需求选择更短间隔。
  • 集群资源 (Cluster Resources):
    • 资源充足 (Adequate Resources): 有足够的 CPU、网络带宽、磁盘 I/O 容量。可以支持相对更短的间隔。
    • 资源紧张 (Limited Resources): 必须使用较长间隔 (e.g., 5-10 min+)。频繁 Checkpoint 在资源不足时极易导致作业不稳定、失败或影响其他作业。
  • Paimon Compaction 能力:
    • 即使数据 commit 很快,如果 Paimon 的 compaction.dedicated 作业(推荐使用)或 full-compaction.delta-commits 配置跟不上写入速度,小文件会快速膨胀,严重影响后续写入和查询性能。缩短 Checkpoint 间隔(增加 commit 频率)会直接增加 Compaction 的压力。需要确保 Compaction 资源足够且配置合理 (如 num-sorted-run.compaction-trigger)。

3. 推荐的 Checkpoint 间隔设置范围 & 起始点

  • 通用起点 (Good Starting Point): 1 - 5 分钟 (60000ms - 300000ms)
    • 这个范围对大多数中等规模、中等吞吐的场景是一个性能和延迟的良好折衷。
    • 例如:execution.checkpointing.interval: 2min (120000ms)
  • 低延迟优先 (Lower Latency Priority - Use with Caution): 10 - 60 秒 (10000ms - 60000ms)
    • 仅适用于: 状态小 吞吐低 资源非常充足 业务确实需要秒级/几十秒级可见。
    • 风险: 在高吞吐/大状态下极易导致性能问题。必须密切监控!
  • 高吞吐/大状态/资源敏感优先 (Stability Priority): 5 - 10+ 分钟 (300000ms - 600000ms+)
    • 适用于: CDC 源头表多、数据量大、更新频繁、集群资源紧张的场景。
    • 牺牲一点延迟换取更高的稳定性和吞吐量。

4. 关键优化手段 (Beyond Checkpoint Interval)

仅仅调整间隔不够,必须结合以下优化:

  • 启用并优化 RocksDB + 增量 Checkpoint (Mandatory for Large State):
    state.backend: rocksdb
    state.backend.incremental: true // 绝对关键!大幅减少每次 Checkpoint 传输量
    state.backend.rocksdb.memory.managed: true // Flink 管理 RocksDB 内存,通常推荐
    state.backend.rocksdb.memory.write-buffer-ratio: 0.4 // 适当调整 MemTable 内存占比
    state.backend.rocksdb.memory.high-prio-pool-ratio: 0.1 // 索引/Filter 内存占比
    // 根据内存大小调整 block cache 等参数
    
  • 配置合理的 Checkpoint 超时 (checkpointTimeout):
    • 必须大于预期的平均 Checkpoint 持续时间。设置过短会导致频繁超时失败。
    • 初始值建议设为间隔的 2-5 倍。例如间隔 2min,超时可设 4-10min
    • 监控调整:观察 Dashboard 上 Checkpoint Duration,确保大部分 Checkpoint 在超时前完成。
  • 优化 Paimon Sink 配置:
    • sink.parallelism: 设置合适并行度,避免单个 Subtask 压力过大。通常与上游 CDC Source 并行度匹配或稍大。
    • bucket: 合理设置 Bucket 数量(如 10-100,根据数据量),保证写入分散。
    • write-buffer-size / write-buffer-spillable: 调整 Paimon Sink Writer 的内存缓冲区大小。
    • compaction.dedicated (强烈推荐): 启用独立 Compaction 作业,避免 Compaction 阻塞写入。这是保证高频 commit 下查询性能的关键!
    • full-compaction.delta-commits: 如果使用自动 Full Compaction,设置触发 Full Compaction 的 Delta Commit 次数。需要平衡小文件数量和 Compaction 开销。
    • changelog-producer: 根据源库变更日志情况选择合适模式 (none, input, lookup),影响状态大小和恢复行为。
  • 启用对齐超时 (alignmentTimeout):
    execution.checkpointing.aligned-checkpoint-timeout: 10s // 或更高
    
    • 在存在数据倾斜时,可以防止慢 Channel 拖累整个 Checkpoint,缩短对齐阶段阻塞时间,对提高吞吐有益。但这会在超时后降级为至少一次语义(对于 Paimon Sink,其内部机制通常能保证最终精确一次,但恢复时可能有短暂重复)。需要评估业务对短暂重复的容忍度。
  • 监控、监控、再监控!
    • Flink Dashboard: Checkpoint Duration (Sync/Async), Checkpoint Size, Alignment Duration, Alignment Buffered, Checkpointing Pending, Num Completed / Failed Checkpoints。确保持续时间稳定且远小于间隔和超时,无堆积。
    • 网络/CPU/磁盘 I/O: 观察 Checkpoint 期间资源使用峰值是否成为瓶颈。
    • Paimon Metrics/Logs: 关注 commit 耗时、文件生成速率、Compaction 状态(积压量、耗时)。观察 snapshot 文件的增长是否健康。
    • 作业吞吐量 (numRecordsInPerSecond 等): 确保没有因为频繁 Checkpoint 导致显著下降。

5. 总结与决策流程

  1. 明确业务需求: 确定可接受的最大数据延迟 (RPO)。这是 Checkpoint 间隔的上限目标。
  2. 评估负载: 分析数据源变更速率(记录数/s, 数据量 MB/s)、表结构(状态大小估算)。
  3. 审视资源: 评估 Flink 集群可用资源(CPU, 内存, 网络带宽, 存储 I/O)。
  4. 设置初始间隔: 基于步骤 1-3,在推荐范围内 (1-5 min) 选择一个保守值作为起点 (如 2 min)。
  5. 强制配置: 务必启用 RocksDB + incremental: true。配置合理的 checkpointTimeout (建议间隔的 2-5 倍)。
  6. 配置 Paimon: 合理设置 sink.parallelism, bucket, 启用 compaction.dedicated=true,调整 full-compaction.delta-commits 等。
  7. 压力测试 & 监控: 在生产级数据量和压力下运行。严密监控 Flink Checkpoint 指标、系统资源、Paimon Commit/Compaction 状态和作业吞吐量。
  8. 迭代优化:
    • 如果延迟可接受且系统稳定: 保持当前配置。
    • 如果延迟要求更严且资源有余量: 谨慎地尝试缩短间隔 (如 1 min),每次只调整一点,并重复监控。优先考虑优化 Paimon Compaction 和 RocksDB。
    • 如果出现性能瓶颈 (吞吐下降、Checkpoint 慢/失败、资源吃紧): 立即延长间隔 (如 3 min, 5 min)。优先解决 Compaction 积压或 RocksDB 性能问题。
    • 如果 Checkpoint 完成时间远小于间隔: 说明有缩短间隔的空间(如果业务需要),但也要看资源占用峰值。
  9. 权衡取舍: 时刻记住,将间隔从 5min 降到 1min 可能只将可见延迟减少 4min,但性能开销可能增加数倍甚至导致不稳定。确保业务收益值得这个代价。

结论: 对于大多数生产环境下的 Paimon CDC Pipeline,将 Flink Checkpoint 间隔设置在 1 到 5 分钟 之间是一个比较安全且平衡的选择2 分钟 是一个常用的、良好的默认起点。绝不能为了追求理论上的最低延迟而无限制地缩短间隔。 必须结合 RocksDB 增量 Checkpoint、Paimon 独立 Compaction、细致的监控和基于实际负载的调优,才能在满足业务延迟期望的同时,保障 Pipeline 的高吞吐和稳定性。

baisui1981 avatar Jun 24 '25 04:06 baisui1981

Q:有一个比较迫切的问题,在使用了flink-cdc 的pipeline功能来执行mysql到paimon 表的数据同步功能,在paimon 表中设置了sequence id ,但是发现执行 数据同步过程中,发现导入的速度比起没有设置sequence id 之前 断崖式下降,能告诉我如何解决吗?

在 Flink CDC + Paimon 的同步管道中,启用 sequence.field 后性能断崖式下降是一个常见且棘手的问题,核心原因是 Sequence ID 的引入强制 Paimon 在写入时进行严格的乱序处理和数据回滚,这带来了显著的开销。以下是根本原因分析和针对性解决方案:


一、性能下降的根本原因

  1. 全局排序与乱序处理 (SequenceFieldProcessor):

    • 没有 Sequence ID 时,Paimon Sink 只需按 bucket + partition 局部有序写入即可,效率较高。
    • 启用 Sequence ID 后:Paimon 必须确保 sequence.field 的值在整个表(或主键分区)内严格单调递增
    • 对于同一主键的记录,如果后到达的记录其 Sequence ID 小于 当前表中已存在的该主键记录的 Sequence ID,Paimon 必须执行 回滚:标记旧记录为删除,写入新记录。这个过程涉及:
      • 频繁的 主键查找 (可能触发 RocksDB Get 操作)
      • 额外的 删除标记 写入
      • 潜在的 小文件爆炸 (每次更新可能生成新文件)
  2. RocksDB Lookup 开销 (尤其致命!):

    • 为了实现跨记录的主键比较和 Sequence ID 比较,Paimon Sink 的 SequenceFieldProcessor 必须频繁查询 RocksDB 状态,获取当前主键对应的最新 Sequence ID。
    • 这种 随机读 (Lookup) 操作是 RocksDB 的性能杀手,尤其是在:
      • 主键基数高 (大量不同 Key)
      • 更新频繁 (CDC 源头表 Update 多)
      • 数据分布不均匀 (热点 Key)
    • 高频率的 Lookup 会:
      • 严重拖慢 MemTable Flush 和 Compaction
      • 阻塞写入线程,极大降低吞吐量
      • 耗尽 CPU 和 I/O 资源
  3. Compaction 压力剧增:

    • 频繁的回滚操作会产生大量 删除标记 (Delete Tombstones)小文件
    • Paimon 需要更频繁、更耗时的 Compaction 来合并文件、清理标记,这会:
      • 占用大量后台 CPU 和 I/O
      • 与写入争抢资源,形成恶性循环
      • 如果 Compaction 跟不上,小文件堆积会进一步恶化读写性能
  4. 网络与序列化开销:

    • 每个记录的处理逻辑更复杂,序列化/反序列化开销增加。

二、针对性解决方案 (从最有效到辅助)

优先级 1:解决 RocksDB Lookup 瓶颈 (最关键!)

  • 启用 Partial-Update (部分更新):

    CREATE TABLE ... WITH (
        ...
        'merge-engine' = 'partial-update', -- 核心配置!
        'partial-update.ignore-delete' = 'true' -- 可选,忽略源头DELETE事件
    );
    
    • 原理: 不再依赖 Sequence ID 进行全局排序和回滚。对于同一主键的多次更新,Paimon 直接将非主键字段按列合并 (Merge on Read)。写入时只需追加新值,无需查找旧值。
    • 优势:
      • 彻底消除 Sequence ID 比较所需的 RocksDB Lookup!
      • 写入吞吐量可恢复甚至超过无 Sequence ID 的水平。
      • 减少删除标记和小文件。
    • 适用场景: 绝大多数 CDC 同步场景 (尤其是事实表、日志表)。只要下游能接受最终一致性 (合并后的最新值)。
    • 注意:
      • 需确保 sequence.field 字段本身不参与 Partial-Update 合并 (通常它是元数据字段如 _version, _timestamp,用于判断新旧)。
      • 配置 'fields.<sequence_field_name>.ignore' = 'true' 或在 CREATE TABLE 语句中不声明该字段。
      • 理解语义:Partial-Update 不是精确覆盖,而是按字段合并。对强一致性要求极高的场景 (如账户余额) 需谨慎测试。
  • 优化 RocksDB 配置 (减轻 Lookup 压力):

    # flink-conf.yaml 或 Per-Job 配置
    state.backend: rocksdb
    state.backend.incremental: true # 必须开启!
    state.backend.rocksdb.memory.managed: true
    # 增大 Block Cache (缓解 Lookup 读放大)
    state.backend.rocksdb.block.cache-size: 256mb # 根据 TM 内存调整,建议 >= 256MB
    # 增大 MemTable 内存 (缓冲写入,减少 Lookup 触发的 Flush)
    state.backend.rocksdb.writebuffer.size: 128mb # 默认 64MB,可加倍
    state.backend.rocksdb.writebuffer.count: 4 # 默认 2,可加倍
    # 优化 Compaction (减少后台压力)
    state.backend.rocksdb.compaction.level.max-size-level-base: 256mb # L1 大小
    state.backend.rocksdb.compaction.level.target-file-size-base: 64mb
    

优先级 2:优化 Paimon 写入与 Compaction

  • 启用 Dedicated Compaction (绝对推荐!):

    CREATE TABLE ... WITH (
        ...
        'compaction.dedicated' = 'true' -- 独立 Compaction 作业
    );
    
    • 将资源密集的 Compaction 任务从写入作业剥离,避免争抢资源导致写入卡顿。
    • 必须配套使用,否则高频更新下 Compaction 会阻塞写入。
  • 调整 Paimon Writer 参数:

    CREATE TABLE ... WITH (
        ...
        'write-buffer-size' = '256m', -- 增大写入缓冲区 (默认 64m)
        'write-buffer-spillable' = 'true', -- 允许内存不足时 Spill 到磁盘
        'sink.parallelism' = '8' -- 增加 Sink 并行度 (与上游 CDC Source 匹配或略高)
    );
    
  • 合理设置 Bucket 数量:

    CREATE TABLE ... WITH (
        ...
        'bucket' = '8' -- 避免过多或过少 (建议 4-16 起步,按数据量调整)
    );
    
    • Bucket 过多:小文件问题加重,管理开销大。
    • Bucket 过少:并发度低,且单个 Bucket 内 Lookup 压力集中。

优先级 3:调整 Flink Checkpoint 与序列化

  • 适当增加 Checkpoint 间隔:
    execution.checkpointing.interval: 2min # 避免过频 Checkpoint 加剧压力
    execution.checkpointing.timeout: 10min # 确保超时足够长
    
  • 使用高效序列化:
    • 确保 Paimon 表 Schema 中的字段使用 Flink 原生支持的高效类型 (如 INT, BIGINT, STRING),避免复杂嵌套类型。

优先级 4:审视数据源头与 Sequence Field 选择

  • Sequence Field 的选择至关重要:
    • 理想情况:严格单调递增与事件实际发生顺序一致 的字段。如 MySQL 的 BIGINT AUTO_INCREMENT 主键或 Binlog 的 event_timestamp (需确保时钟同步)。
    • 避免使用 UPDATE_TIME非单调字段,极易导致乱序和频繁回滚。
  • 检查 CDC 源头是否有异常:
    • 大事务? (导致长时间延迟)
    • 频繁 DDL? (导致解析异常)
    • 网络抖动? (导致数据延迟到达)

三、推荐方案组合 (根据场景选择)

  1. 通用高效方案 (首选):

    • merge-engine = 'partial-update' (消除 Lookup)
    • compaction.dedicated = 'true' (隔离 Compaction)
    • 优化 RocksDB 配置 (增大 block.cache-size, writebuffer.size)
    • 合理设置 bucketsink.parallelism
  2. 必须保留 Sequence ID 全局排序的场景 (如强一致维度表):

    • 极致优化 RocksDB (重点在 block.cache-size, writebuffer.count/size)
    • compaction.dedicated = 'true' (必须!)
    • 增大 write-buffer-size
    • 增加 TM 内存与 CPU 资源
    • 严格选择单调递增的 sequence.field (如 MySQL 自增主键)

四、诊断命令

在应用优化后,通过以下命令监控效果:

-- 查看 Paimon 表最新快照文件信息
CALL sys.snapshot_identifier('database_name.table_name');

-- 检查 Compaction 状态
CALL sys.compaction_status('database_name.table_name');

-- 监控 Flink 作业 RocksDB 指标 (Flink WebUI)
*   rocksdb.block-cache-usage
*   rocksdb.estimate-num-keys
*   rocksdb.get-latency
*   rocksdb.write-amplification

总结: 解决 sequence.field 导致的性能骤降,核心在于 消除或减轻 RocksDB Lookup 开销partial-update 是最有效的解决方案。若必须保留全局排序,则需全力优化 RocksDB 内存配置、启用 Dedicated Compaction 并确保 Sequence 字段选择得当。

baisui1981 avatar Jun 24 '25 05:06 baisui1981

Q: 我理解 FSM(File-Sorted Merge Tree) 在paimon中起的作用是作为数据查询索引使用的,那根据官网提到的paimon 表的可使用parquet 作为存储格式,那 fsm 和parquet文件各自在 paimon 中起什么作用,有什么关系呢?

您对FSM的作用理解需要稍作修正——FSM不是单纯的索引,而是Paimon的底层存储架构范式,而Parquet是FSM架构中数据文件的具体存储格式。两者是架构层与实现层的关系。以下是详细解析:


1. FSM:Paimon的存储架构核心

FSM定义了数据在Paimon中的组织逻辑

  • 分层管理:数据按时间/版本分层(如L0新数据 → L1合并层 → L2归档层)
  • 全局有序:每层内文件按键(主键/分区键)排序,高层文件覆盖低层
  • 合并策略:通过Compaction合并小文件、清理过期数据
  • 索引系统:在文件级别构建Min-Max索引、布隆过滤器等
  • ACID保障:通过Snapshot机制实现原子提交

本质:一套管理数据文件生命周期、排序规则和访问方法的架构标准


2. Parquet:FSM架构中的物理存储格式

在FSM框架下,每个具体的数据文件(SST文件)物理存储采用Parquet格式

graph LR
A[FSM架构] --> B[L0层文件]
A --> C[L1层文件]
A --> D[L2层文件]
B --> E[Parquet格式]
C --> E
D --> E

Parquet在此扮演的角色:

  • 列式存储:按列压缩编码(如RLE、字典编码),提升分析查询效率
  • 内置统计信息:每个Row Group记录min/max/null count等(增强FSM的Min-Max过滤
  • 高效压缩:支持Snappy/ZSTD,减少存储空间
  • Schema演化:支持新增列等变更(与FSM的Schema Evolution协同)

本质:FSM架构中每个数据文件的具体存储实现


3. FSM与Parquet的协同工作原理

通过一个查询流程看两者如何配合:

sequenceDiagram
participant Query as 查询引擎
participant FSM as FSM管理器
participant Parquet as Parquet文件

Query->>FSM: 请求查询“user_id=1001”
FSM->>FSM: 1. 使用Min-Max索引过滤不包含1001的文件
FSM->>FSM: 2. 用布隆过滤器筛选可能文件
FSM->>Parquet: 3. 传递需读取的文件列表
Parquet->>Parquet: 4. 按列读取+应用谓词下推
Parquet-->>Query: 返回结果

关键协作点:

步骤 FSM职责 Parquet职责
文件过滤(1)(2) 利用文件级Min-Max索引跳过无关文件 提供Row Group级统计加速过滤
数据读取(4) 确定文件读取顺序 列式存储+谓词下推减少IO

4. 为什么需要两层设计?

FSM解决的核心问题

  • 高频写入:通过追加写+Compaction支持流式写入
  • 数据版本:Snapshot管理实现时间旅行
  • 全局有序:跨文件保证主键查询效率

Parquet解决的核心问题

  • 分析性能:列式存储优化扫描查询
  • 存储效率:高压缩比降低存储成本
  • 生态兼容:通用格式兼容Spark/Trino等引擎

分工优势:FSM管“哪些数据在哪”,Parquet管“数据如何高效存储读取”


5. 性能优化中的实际体现

当您进行调优时:

  • FSM层配置
    -- 控制文件大小与合并行为
    'snapshot.time-retained' = '1h'     -- 快照保留
    'compaction.max-size-amplification-percent' = '200' -- 合并阈值
    
  • Parquet层配置
    -- 优化单文件性能
    'parquet.block-size' = '128MB'       -- 行组大小
    'parquet.compression' = 'ZSTD'       -- 压缩算法
    

总结

概念 角色 类比
FSM 数据架构框架 图书馆的图书分类系统(杜威十进制)
Parquet 数据存储格式 图书的具体印刷格式(纸质/电子书)

FSM决定数据如何分层、排序、合并,而Parquet决定数据在磁盘上的二进制组织形式。两者协同使得Paimon既能支持高吞吐CDC写入,又能高效服务分析查询,这正是数据湖表的核心价值所在。

baisui1981 avatar Jun 24 '25 06:06 baisui1981

Q: paimon 中 底层(Level 0或更低层)的多个小文件,单个小文件中存储的记录数是有什么参数控制的,我理解只要适当增加单个文件中存储的记录条数,就能减少compaction执行的频率了吧?

您对 Paimon 小文件生成机制和 Compaction 频率的理解基本正确。在 Apache Paimon 中,控制底层(特别是 Level 0)小文件大小的核心参数直接影响着 Compaction 的触发频率。通过适当增加单个文件中存储的记录条数(或文件大小),确实可以有效减少 L0 层小文件的数量,从而降低 Compaction 被触发的频率。

以下是控制 L0 层小文件大小和记录数的关键参数及其工作原理:

🧩 1. 核心控制参数:write-buffer-sizetarget-file-size

  • write-buffer-size (默认值: 128mb):

    • 作用: 这是控制内存中 Buffer 大小的参数。当 Flink 或 Spark 任务向 Paimon 写入数据时,数据首先会缓存在内存中的一个排序缓冲区(Sorter Buffer)中。
    • 原理: 当一个 Buffer 被填满(达到 write-buffer-size)时,它会被排序并刷写(Spill)到磁盘,形成一个 L0 文件
    • 影响: write-buffer-size 直接决定了单个 L0 文件的最小期望大小。如果你期望每个 L0 文件大约是 256MB,那么就应该设置 write-buffer-size=256mb。Buffer 满了就刷写,自然就生成了一个 ~256MB 的文件。
  • target-file-size (默认值: 128mb):

    • 作用: 这是 Paimon 最终期望生成的稳定数据文件(通常是经过 Compaction 合并到更高层级的文件)的目标大小
    • 原理:Compaction 过程中(无论是 Universal 还是 Size-Tiered),Paimon 会尝试将输入的小文件合并,并输出大小接近 target-file-size 的文件到更高的层级(如 L1, L2)。
    • 影响 L0 的间接方式: 虽然 target-file-size 主要影响 Compaction 的输出,但它间接影响 L0 文件的“合并潜力”。如果 target-file-size 设置得很大(比如 1GB),那么 Compaction 需要积累更多的 L0 小文件(累计大小达到 ~1GB)才会触发合并操作。这相当于放宽了触发 Compaction 的“累计大小”阈值。
    • 注意: target-file-size 不直接控制单个 L0 文件的大小。L0 文件大小主要由 write-buffer-size 和 Checkpointing 决定。

🧩 2. 其他影响 L0 文件大小和记录数的因素

  • Checkpointing / Commit Interval (Flink Streaming):

    • 在 Flink 流式写入场景下,Flink 的 Checkpoint 间隔是决定 L0 文件大小和数量的最关键因素之一
    • 原理: Paimon 的 Sink 算子通常会在 Flink Checkpoint 时执行 snapshotState。为了确保精确一次的语义,当前内存 Buffer 中的数据(即使未达到 write-buffer-size)也会在 Checkpoint 时强制刷写到磁盘,生成一个 L0 文件。
    • 影响: 如果 Flink 的 Checkpoint 间隔(checkpoint.interval)很短(例如 10 秒),即使 write-buffer-size 设置为 256MB,也可能因为每个 Checkpoint 只积累了少量数据(比如 50MB)就被强制刷出,从而导致产生大量远小于 write-buffer-size 的 L0 小文件。
    • 结论:要减少 L0 小文件数量,在流式写入场景下,适当增加 Flink Checkpoint 间隔 (checkpoint.interval) 是至关重要的,让 Buffer 有更多时间积累接近 write-buffer-size 的数据量再刷出。但需权衡故障恢复时间(RTO)。
  • sort-spill-threshold (默认值: 未明确设置,通常内部管理):

    • 这个参数控制内存排序缓冲区在内存中最多能容纳多少行数据。当行数超过此阈值时,即使 Buffer 的内存占用未达到 write-buffer-size,也可能触发部分数据溢写(Spill)。这主要用于防止 OOM。
    • 影响: 如果记录非常大(例如宽表、大 JSON 对象),即使记录数不多,也可能快速占满内存 Buffer (write-buffer-size) 而刷写。如果记录非常小(例如计数器更新),则 sort-spill-threshold 可能先达到,导致按记录数刷写。调整 sort-spill-threshold 主要应对特殊数据分布场景,一般优先调整 write-buffer-size 和 Checkpoint 间隔。

✅ 如何通过增加文件大小/记录数减少 Compaction 频率

  1. 增大 write-buffer-size:

    • 这是最直接有效的方法。例如,从默认的 128mb 增加到 256mb512mb
    • 效果: 每个 L0 文件平均变大(更接近新设置的 buffer size),L0 层积累到触发 Compaction 阈值(如 Universal 的 num-sorted-run.stop-trigger)所需的文件数量变少,从而降低触发频率。
    • 注意: 需要确保 TaskManager 有足够的 JVM Heap 或 Managed Memory 来容纳更大的 Buffer。否则可能导致 OOM 或频繁 GC。
  2. 增大 Flink Checkpoint 间隔 (checkpoint.interval):

    • 对于流式写入,这是避免超小 L0 文件的关键。根据业务容忍的延迟和恢复时间,尽可能增加间隔(例如从 30s 增加到 1min 或 5min)。
    • 效果: 显著减少因频繁 Checkpoint 强制刷写产生的远小于 write-buffer-size 的微型 L0 文件数量。让大多数 L0 文件大小接近 write-buffer-size
  3. 增大 target-file-size:

    • 例如从 128mb 增加到 256mb512mb
    • 效果: 主要在 Universal Compaction 中影响较大。因为 Universal 在合并时会尽量将一组文件合并成一个接近 target-file-size 的大文件。增大 target-file-size 意味着 Compaction 需要积累更多的 L0 文件(总数据量更大)才会触发合并。对于 Size-Tiered,它定义了更高层级文件的大小目标,也间接影响合并策略。
    • 注意: 过大的 target-file-size 可能影响查询效率(读取大文件可能慢)和 Compaction 本身的资源消耗(合并更大量数据)。
  4. 调整 Compaction 触发阈值 (如 Universal 的 num-sorted-run.stop-trigger):

    • 虽然不是直接控制文件大小,但配合增大文件大小,调整这个阈值效果更好。例如,如果文件平均大了 2 倍,那么将 num-sorted-run.stop-trigger 从默认的 5 增加到 8 或 10,可能仍然能保持或延长触发间隔,同时允许 L0 积累更多文件(但总数据量更大),进一步提升读性能。
    • 效果: 直接放宽了触发 Compaction 的条件,让 L0 层可以堆积更多的文件(但每个文件更大了)才触发合并。

⚠️ 总结与权衡

  • 核心策略: 增大 write-buffer-size 并适当增大 Flink Checkpoint 间隔 是减少 L0 小文件数量、从而降低 Compaction 频率的最有效手段。这直接让每个 L0 文件包含更多记录/更大体积。
  • 辅助策略: 增大 target-file-size 和/或 Compaction 触发阈值(如 num-sorted-run.stop-trigger)可以进一步减少 Compaction 频率,因为它们提高了触发合并所需的“数据量”或“文件数”门槛。
  • 重要权衡:
    • 内存资源: 增大 write-buffer-size 会增加每个 Writer Task 的内存需求。
    • 故障恢复: 增大 Flink Checkpoint 间隔会增加故障恢复时需要重放的数据量(RTO 可能变长)。
    • 读取延迟: 过大的 L0 文件或过少的 Compaction 可能导致查询(尤其是点查)需要扫描更多的 L0 文件,增加读取延迟。需要监控读性能。
    • 空间放大: 减少 Compaction 频率可能导致短时间内存在更多冗余数据(旧版本数据、删除标记等),增加存储空间占用(空间放大)。需要配合合理的快照过期策略。

建议: 从调整 write-buffer-size (如 256MB) 和 Flink checkpoint.interval (如 1-5分钟) 开始,观察 L0 文件平均大小是否显著增大、Compaction 频率是否下降。再根据效果和资源/延迟情况,考虑是否调整 target-file-size 或 Compaction 触发参数。务必监控集群资源(内存、CPU)、Compaction 耗时、查询延迟和存储空间变化。📊

baisui1981 avatar Jun 24 '25 07:06 baisui1981

已经实现

baisui1981 avatar Jul 29 '25 00:07 baisui1981