paimon icon indicating copy to clipboard operation
paimon copied to clipboard

Stream job, Failed to write deletion vectors [Bug]

Open macdoor opened this issue 1 year ago • 5 comments

Search before asking

  • [X] I searched in the issues and found nothing similar.

Paimon version

paimon-flink-1.19-0.9-20240803.002144-49.jar paimon-flink-1.20-0.9-20240806.002229-1.jar

Compute Engine

java version "22.0.2" 2024-07-16 Java(TM) SE Runtime Environment Oracle GraalVM 22.0.2+9.1 (build 22.0.2+9-jvmci-b01) Java HotSpot(TM) 64-Bit Server VM Oracle GraalVM 22.0.2+9.1 (build 22.0.2+9-jvmci-b01, mixed mode, sharing)

flink 1.19.1 flink 1.20.0 minio 2024-08-03T04:33:23Z

Minimal reproduce step

The Stream jobs that can be executed correctly in paimon-flink-1.19-0.8.2.jar will cause the following error when executed in 0.9 snapshot

index-9326e638-d025-4e6b-a647-ef39ecee0a45-8.zip

2024-08-04 13:35:43
java.io.IOException: Could not perform checkpoint 10 for operator Writer : claim_report_head (10/12)#0.
	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1394)
	at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488)
	at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
	at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
	at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
	at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:122)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
	at java.base/java.lang.Thread.run(Thread.java:1570)
Caused by: java.io.IOException: java.lang.RuntimeException: Failed to write deletion vectors.
	at org.apache.paimon.flink.sink.StoreSinkWriteImpl.prepareCommit(StoreSinkWriteImpl.java:225)
	at org.apache.paimon.flink.sink.GlobalFullCompactionSinkWrite.prepareCommit(GlobalFullCompactionSinkWrite.java:184)
	at org.apache.paimon.flink.sink.TableWriteOperator.prepareCommit(TableWriteOperator.java:121)
	at org.apache.paimon.flink.sink.RowDataStoreWriteOperator.prepareCommit(RowDataStoreWriteOperator.java:189)
	at org.apache.paimon.flink.sink.PrepareCommitOperator.emitCommittables(PrepareCommitOperator.java:100)
	at org.apache.paimon.flink.sink.PrepareCommitOperator.prepareSnapshotPreBarrier(PrepareCommitOperator.java:80)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89)
	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:332)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$18(StreamTask.java:1437)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1425)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1382)
	... 22 more
Caused by: java.lang.RuntimeException: Failed to write deletion vectors.
	at org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.write(DeletionVectorsIndexFile.java:156)
	at org.apache.paimon.index.IndexFileHandler.writeDeletionVectorsIndex(IndexFileHandler.java:290)
	at org.apache.paimon.deletionvectors.DeletionVectorsMaintainer.writeDeletionVectorsIndex(DeletionVectorsMaintainer.java:100)
	at org.apache.paimon.compact.CompactDeletionFile.generateFiles(CompactDeletionFile.java:45)
	at org.apache.paimon.compact.CompactDeletionFile$LazyCompactDeletionFile.getOrCompute(CompactDeletionFile.java:121)
	at org.apache.paimon.operation.AbstractFileStoreWrite.prepareCommit(AbstractFileStoreWrite.java:217)
	at org.apache.paimon.table.sink.TableWriteImpl.prepareCommit(TableWriteImpl.java:239)
	at org.apache.paimon.flink.sink.StoreSinkWriteImpl.prepareCommit(StoreSinkWriteImpl.java:220)
	... 33 more
Caused by: java.io.FileNotFoundException: File does not exist: s3://paimon-prod/raw_data.db/claim_report_head/index/index-9326e638-d025-4e6b-a647-ef39ecee0a45-8
	at com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:361)
	at org.apache.flink.fs.s3presto.common.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85)
	at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.getFileStatus(PluginFileSystemFactory.java:105)
	at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.getFileStatus(SafetyNetWrapperFileSystem.java:65)
	at org.apache.paimon.flink.FlinkFileIO.getFileStatus(FlinkFileIO.java:79)
	at org.apache.paimon.fs.FileIO.getFileSize(FileIO.java:200)
	at org.apache.paimon.deletionvectors.DeletionVectorIndexFileWriter$SingleIndexFileWriter.writtenIndexFile(DeletionVectorIndexFileWriter.java:138)
	at org.apache.paimon.deletionvectors.DeletionVectorIndexFileWriter.emptyIndexFile(DeletionVectorIndexFileWriter.java:96)
	at org.apache.paimon.deletionvectors.DeletionVectorIndexFileWriter.write(DeletionVectorIndexFileWriter.java:59)
	at org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.write(DeletionVectorsIndexFile.java:154)
	... 40 more

Here is full flink taskexecutor log file flink-dict-taskexecutor-0-cmtt-dict-17.log.zip

What doesn't meet your expectations?

Same behavior as paimon 0.8.2

Anything else?

No response

Are you willing to submit a PR?

  • [ ] I'm willing to submit a PR!

macdoor avatar Aug 04 '24 21:08 macdoor

Upgrade to paimon-flink-1.20-0.9-20240806.002229-1.jar, same exception

macdoor avatar Aug 06 '24 01:08 macdoor

I hit the same exception. Looks like it only happened with non-partitioned tables.

Caused by: java.io.IOException: java.lang.RuntimeException: Failed to write deletion vectors.
	at org.apache.paimon.flink.sink.StoreSinkWriteImpl.prepareCommit(StoreSinkWriteImpl.java:220)
	at org.apache.paimon.flink.sink.TableWriteOperator.prepareCommit(TableWriteOperator.java:121)
	at org.apache.paimon.flink.sink.RowDataStoreWriteOperator.prepareCommit(RowDataStoreWriteOperator.java:189)
	at org.apache.paimon.flink.sink.PrepareCommitOperator.emitCommittables(PrepareCommitOperator.java:100)
	at org.apache.paimon.flink.sink.PrepareCommitOperator.prepareSnapshotPreBarrier(PrepareCommitOperator.java:80)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89)
	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:322)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$15(StreamTask.java:1318)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1306)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1263)
	... 22 more
Caused by: java.lang.RuntimeException: Failed to write deletion vectors.
	at org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.write(DeletionVectorsIndexFile.java:143)
	at org.apache.paimon.index.IndexFileHandler.writeDeletionVectorsIndex(IndexFileHandler.java:204)
	at org.apache.paimon.deletionvectors.DeletionVectorsMaintainer.prepareCommit(DeletionVectorsMaintainer.java:101)
	at org.apache.paimon.operation.AbstractFileStoreWrite.prepareCommit(AbstractFileStoreWrite.java:204)
	at org.apache.paimon.table.sink.TableWriteImpl.prepareCommit(TableWriteImpl.java:207)
	at org.apache.paimon.flink.sink.StoreSinkWriteImpl.prepareCommit(StoreSinkWriteImpl.java:215)
	... 32 more
Caused by: java.io.FileNotFoundException: No such file or directory: s3a://path-to-table/index/index-9599ef01-f4ea-489a-b870-dd8fd5f6526d-0
	at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3866)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getFileStatus$24(S3AFileSystem.java:3556)
	at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
	at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:3554)
	at org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85)
	at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.getFileStatus(PluginFileSystemFactory.java:105)
	at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.getFileStatus(SafetyNetWrapperFileSystem.java:65)
	at org.apache.paimon.flink.FlinkFileIO.getFileStatus(FlinkFileIO.java:79)
	at org.apache.paimon.fs.FileIO.getFileSize(FileIO.java:193)
	at org.apache.paimon.deletionvectors.DeletionVectorIndexFileWriter$SingleIndexFileWriter.writtenIndexFile(DeletionVectorIndexFileWriter.java:138)
	at org.apache.paimon.deletionvectors.DeletionVectorIndexFileWriter.emptyIndexFile(DeletionVectorIndexFileWriter.java:96)
	at org.apache.paimon.deletionvectors.DeletionVectorIndexFileWriter.write(DeletionVectorIndexFileWriter.java:59)
	at org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.write(DeletionVectorsIndexFile.java:141)
	... 37 more

prm-xingcan avatar Aug 07 '24 22:08 prm-xingcan

My table is non-partitioned PK table

macdoor avatar Aug 07 '24 23:08 macdoor

How do you create your Paimon table? Could you provide its DDL? What streaming job are you running? Could you provide its SQL?

tsreaper avatar Aug 08 '24 03:08 tsreaper

I made a simplified example.

drop table default_catalog.default_database.orders;
CREATE TABLE  default_catalog.default_database.orders(
    order_id BIGINT,
    price        double,
    customer_id int,
    order_time   TIMESTAMP(3)
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1000',
  'fields.order_id.min' = '0', 
  'fields.customer_id.min' = '1', 
  'fields.customer_id.max' = '10000'
);

drop TABLE  paimon_catalog.raw_data.custom_last_order;
CREATE TABLE  dict_test.raw_data.custom_last_order(
    order_id BIGINT,
    price    double,
    customer_id int,
    order_time   TIMESTAMP(3),
    PRIMARY KEY (customer_id)  NOT ENFORCED
) WITH (
	'bucket' = '3',
	'changelog-producer' = 'lookup',
	'deletion-vectors.enabled' = 'true',
	'sink.parallelism' = '3',
	'merge-engine' = 'partial-update',
	'fields.order_time.sequence-group'='order_id',
	'fields.order_id.aggregate-function' = 'first_value',
	'sequence.field' = 'order_time',
	'write-buffer-spillable' = 'true',
	'full-compaction.delta-commits' = '10');

insert into paimon_catalog.raw_data.custom_last_order
select order_id, price, customer_id, order_time 
from default_catalog.default_database.orders;

flink-dict-taskexecutor-0-cmtt-dict-17 2.log.zip

macdoor avatar Aug 08 '24 05:08 macdoor

I did some more testing, I found that either streaming job or batch job will trigger the exception

macdoor avatar Aug 11 '24 18:08 macdoor

It looks like this is a problem only in S3...

JingsongLi avatar Aug 12 '24 03:08 JingsongLi

Local Disk, HDFS, OSS all are OK...

JingsongLi avatar Aug 12 '24 03:08 JingsongLi