flink-cdc icon indicating copy to clipboard operation
flink-cdc copied to clipboard

[Bug] Caused by: com.github.shyiko.mysql.binlog.event.deserialization.MissingTableMapEventException and lost data

Open dylenWu opened this issue 2 years ago • 12 comments

Search before asking

  • [X] I searched in the issues and found nothing similar.

Flink version

1.14.6

Flink CDC version

2.3

Database and its version

mysql

Minimal reproduce step

Caused by: com.github.shyiko.mysql.binlog.event.deserialization.MissingTableMapEventException: No TableMapEventData has been found for table id:15987. Usually that means that you have started reading binary log 'within the logical event group' (e.g. from WRITE_ROWS and not proceeding TABLE_MAP

What did you expect to see?

flink job can start from checkpoint

What did you see instead?

no restart and loss data

Anything else?

No response

Are you willing to submit a PR?

  • [X] I'm willing to submit a PR!

dylenWu avatar Jun 15 '23 08:06 dylenWu

please aasign this issus to me @leonardBang

dylenWu avatar Jun 15 '23 08:06 dylenWu

I found the reason, mainly because our binglog client was restarted during the consumption process, but the location of the restart was on the binlog of Write_rows type. As a result, when the binlog was deserialized, the corresponding table of the binlog data could not be found. Information. Because when parsing the data of the binlog type of the Write_rows type, the data of the table_map type must be parsed first. for example, When restart consuming from position 9510298, MissingTableMapEventException will appear, because to consume data of type write_row, data of type table_map must first be obtained, so that the schema of the table can be obtained and parsed normally image

dylenWu avatar Jun 15 '23 08:06 dylenWu

1.我们可以从下面这部分代码看出问题,当出现MissingTableMapEventException这种报错的时候,程序不会抛出异常,而是继续执行,导致过程中Write_rows类型的数据被丢弃,最终导致数据丢失 In BinaryLogClient class,我发现对于MissingTableMapEventException会被捕捉,但是不会像EOFException、SocketException异常被再次抛出,意味着,如果我们在上层的监听器实现onEventDeserializationFailure不做处理的话,这个异常将不会再被处理。 private void listenForEventPackets() throws IOException { ....... } catch (Exception e) { Throwable cause = e instanceof EventDataDeserializationException ? e.getCause() : e; if (cause instanceof EOFException || cause instanceof SocketException) { throw e; } if (isConnected()) { for (LifecycleListener lifecycleListener : lifecycleListeners) { lifecycleListener.onEventDeserializationFailure(this, e); } } continue; } '''''''' } finally { if (isConnected()) { if (completeShutdown) { disconnect(); // initiate complete shutdown sequence (which includes keep alive thread) } else { disconnectChannel(); }

2.由于binlogclient的代码是zendesk的代码,我们无法修改,所以只能通过lifecycleListener的方式,在上层来处理,我继续看代码,找到应用层监听器的实现,在MySqlStreamingChangeEventSource类里面 public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) { if (eventDeserializationFailureHandlingMode == EventProcessingFailureHandlingMode.FAIL) { ...... logStreamingSourceState(); errorHandler.setProducerThrowable(wrap(ex)); } else if (eventDeserializationFailureHandlingMode .... } else { ....... } }

3.然后我们看ErrorHandler的实现,可以看到会把报错信息发送给queue public void setProducerThrowable(Throwable producerThrowable) { .... if (first) { if (retriable) { queue.producerException( new RetriableException("An exception occurred in the change event producer. This connector will be restarted.", producerThrowable)); } ...... } 4.报错信息发送给queue之后,我发现queue的poll方法要把这个报错throw是有条件的,就是在一定时间内,binglog数据不为空的时候 public List<T> poll() throws InterruptedException { ...... while (!timeout.expired() && queue.drainTo(records, maxBatchSize) == 0) { throwProducerExceptionIfPresent(); ........ } 5.这样造成的问题就是如果长时间有数据的话,这个报错就不会抛出,而且由于长时间没报错,这个时间段虽然有报错日志,但是程序是正常运行的,而且如果在报错前还进行了checkpoint,那这部分数据就丢失了。

dylenWu avatar Jun 15 '23 08:06 dylenWu

please aasign this issus to me @leonardBang

Assigned to you @dylenWu, please keep going, looking forward your analysis.

leonardBang avatar Jun 15 '23 08:06 leonardBang

I think we should add this logic to this method to let the client stop receiving messages, so that the queue poll() method can throw an exception image please review it @leonardBang

dylenWu avatar Jun 16 '23 03:06 dylenWu

@dylenWu The option to just ignore the exception is useful. I encountered a weird case where consuming from a later offset will throw the exception that the binlog file does not exist. I tried to manually specify a earlier offset, and the error showed up. In my case I'm relying on the upsert semanitcs of sink operators so having some data duplication is tolerable.

qidian99 avatar Jun 19 '23 03:06 qidian99

@dylenWu The option to just ignore the exception is useful. I encountered a weird case where consuming from a later offset will throw the exception that the binlog file does not exist. I tried to manually specify a earlier offset, and the error showed up. In my case I'm relying on the upsert semanitcs of sink operators so having some data duplication is tolerable.

We cannot ignore the exception, otherwise it will cause data loss

dylenWu avatar Jun 19 '23 06:06 dylenWu

@dylenWu image image image image 我发现当BinlogClient因为心跳超时重新连接的时候,之前保留下来的tableMapEventByTableId里的数据会因为这个ROTATE事件清除掉tableMapEventByTableId,导致下一个event的解析错误,是不是因为这个原因导致的问题?

zhenyimo avatar Jun 19 '23 08:06 zhenyimo

在消费write_rows类型的binlog数据的时候,一定要先消费table_map类型的binlog获取表相关的信息,不然无法解析write_rows类型的binlog。但是当BinlogClient因为心跳超时重新连接的时候,它消费的位点可能在事务的中间,导致没有先消费table_map类型的数据,而是直接消费了write_rows类型的数据。详情请看我上面截的图。

dylenWu avatar Jun 20 '23 03:06 dylenWu

2.4.1也出现了这个问题 2.4.2有计划修复吗

GuXianWei avatar Oct 12 '23 06:10 GuXianWei

在消费write_rows类型的binlog数据的时候,一定要先消费table_map类型的binlog获取表相关的信息,不然无法解析write_rows类型的binlog。但是当BinlogClient因为心跳超时重新连接的时候,它消费的位点可能在事务的中间,导致没有先消费table_map类型的数据,而是直接消费了write_rows类型的数据。详情请看我上面截的图。

感谢讲解。我想把生产环境2.1版本的cdc升级到2.4,但是因为兼容性问题 https://github.com/ververica/flink-cdc-connectors/issues/1795#issuecomment-1342237192 只能新起一个2.4版本cdc的job,不从旧job的checkpoint恢复、而是从旧job最后消费的binlog位点启动。但问题在于,这个位点通常都是write_rows类型,没有schema数据。那么【从指定的binlog位点启动】这个功能应该怎么使用呢,自己去binlog里面找非write_rows类型的最后一条记录?那么问题又来了,如果我的源库有100+张表,如何确保我选择的位点每张表都有schema呢

ldwnt avatar Dec 26 '23 07:12 ldwnt

@dylenWu do you think the PR https://github.com/ververica/flink-cdc-connectors/pull/3065 would help in this issue? This is so that the tableMapEventByTableId would not be cleared on rotate event created due to binlog client restart

shikai93 avatar Feb 18 '24 13:02 shikai93

Considering collaboration with developers around the world, please re-create your issue in English on Apache Jira under project Flink with component tag Flink CDC. Thank you!

PatrickRen avatar Mar 20 '24 08:03 PatrickRen