flink-cdc
flink-cdc copied to clipboard
[mysql-cdc] Fix MySqlSplitSerializer ConcurrentModificationException
MySqlSplitSerializer iterate over tableSchemas may cause java.util.ConcurrentModificationException.
@leonardBang Would you help to review the PR?
@leonardBang Would you help to review the PR?
Thanks for the contribution, but could you explain when the ConcurrentModificationException would be thrown?
@leonardBang Would you help to review the PR?
Thanks for the contribution, but could you explain when the ConcurrentModificationException would be thrown?
In our case, we monitor 10+ tables ,each table has 100-800 million rows ,sometimes thrown this exception.
@leonardBang Would you help to review the PR?
Thanks for the contribution, but could you explain when the ConcurrentModificationException would be thrown?
我这里也遇到了相同的报错,错误详细信息如下,查看代码也没有找到引起此报错的问题,您这里有什么建议吗? 2023-05-24 04:22:15,229 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Failed to trigger checkpoint 4 for job c078560b7f5b002fe1b193b2364705ed. (1 consecutive failed attempts so far) java.util.ConcurrentModificationException: null at java.util.HashMap$HashIterator.nextNode(HashMap.java:1469) ~[?:1.8.0_332] at java.util.HashMap$EntryIterator.next(HashMap.java:1503) ~[?:1.8.0_332] at java.util.HashMap$EntryIterator.next(HashMap.java:1501) ~[?:1.8.0_332] at com.ververica.cdc.connectors.mysql.source.split.MySqlSplitSerializer.writeTableSchemas(MySqlSplitSerializer.java:186) ~[?:?] at com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.serializeSnapshotPendingSplitsState(PendingSplitsStateSerializer.java:165) ~[?:?] at com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.serializeHybridPendingSplitsState(PendingSplitsStateSerializer.java:182) ~[?:?] at com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.serialize(PendingSplitsStateSerializer.java:88) ~[?:?] at com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.serialize(PendingSplitsStateSerializer.java:50) ~[?:?] at org.apache.flink.runtime.source.coordinator.SourceCoordinator.writeCheckpointBytes(SourceCoordinator.java:385) ~[flink-dist_2.11-1.14.4.jar:1.14.4] at org.apache.flink.runtime.source.coordinator.SourceCoordinator.toBytes(SourceCoordinator.java:370) ~[flink-dist_2.11-1.14.4.jar:1.14.4] at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$checkpointCoordinator$5(SourceCoordinator.java:244) ~[flink-dist_2.11-1.14.4.jar:1.14.4] at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$8(SourceCoordinator.java:329) ~[flink-dist_2.11-1.14.4.jar:1.14.4] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_332] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_332] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_332] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_332] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_332] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_332] at java.lang.Thread.run(Thread.java:750) [?:1.8.0_332]
2023-08-17 15:39:03.678 [Checkpoint Timer] WARN [CheckpointFailureManager:114] - Failed to trigger or complete checkpoint 1 for job 19fa67e78565fcbbe4ce359b211dba9c. (0 consecutive failed attempts so far)
org.apache.flink.runtime.checkpoint.CheckpointException: Trigger checkpoint failure.
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$getCheckpointException$18(CheckpointCoordinator.java:2235)
at java.util.Optional.orElseGet(Optional.java:267)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.getCheckpointException(CheckpointCoordinator.java:2234)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:995)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:973)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$startTriggeringCheckpoint$7(CheckpointCoordinator.java:701)
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.ConcurrentModificationException: null
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1437)
at java.util.HashMap$EntryIterator.next(HashMap.java:1471)
at java.util.HashMap$EntryIterator.next(HashMap.java:1469)
at com.ververica.cdc.connectors.mysql.source.split.MySqlSplitSerializer.writeTableSchemas(MySqlSplitSerializer.java:187)
at com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.serializeSnapshotPendingSplitsState(PendingSplitsStateSerializer.java:166)
at com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.serializeHybridPendingSplitsState(PendingSplitsStateSerializer.java:183)
at com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.serialize(PendingSplitsStateSerializer.java:89)
at com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.serialize(PendingSplitsStateSerializer.java:50)
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.writeCheckpointBytes(SourceCoordinator.java:514)
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.toBytes(SourceCoordinator.java:499)
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$checkpointCoordinator$7(SourceCoordinator.java:373)
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$10(SourceCoordinator.java:458)
at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
... 8 common frames omitted
2023-08-17 15:39:03.716 [pool-18-thread-1] INFO [JdbcConnection:962] - Connection gracefully closed
我也遇到了这个错误
Thanks for fixing the bug, @sandyfog . I append a commit to fix this bug.
@sandyfog @leonardBang Please help to review it.
@ruanhang1993 Could you please rebase this PR with latest master
branch?
This pull request has been automatically marked as stale because it has not had recent activity for 60 days. It will be closed in 30 days if no further activity occurs.