[FLINK-35674][cdc-connector][mysql]Fix blocking caused by searching for timestamp in binlog file
@yuxiqian PTAL
Thanks for the update, now we know the blocking is caused by two reasons:
- We did not deal with empty queue.
- The BinaryLogClient is missing LifecycleListener, or more specifically, missing throwing exception in
onCommunicationFailure.
To fix it, I prefer to add a anonymous LifecycleListener which throw the exception directly, and for empty queue, just return null and remove the current binlog file from the list. This fix could be straightforward and easy to test (using binlog file that only contains rotate events and using same server id). What do you think?
Thanks for the update, now we know the blocking is caused by two reasons:
- We did not deal with empty queue.
- The BinaryLogClient is missing LifecycleListener, or more specifically, missing throwing exception in
onCommunicationFailure.To fix it, I prefer to add a anonymous LifecycleListener which throw the exception directly, and for empty queue, just return null and remove the current binlog file from the list. This fix could be straightforward and easy to test (using binlog file that only contains rotate events and using same server id). What do you think?
I agree most of those views ,but why throw the exception to end a job instead of retrying . In my tests, blocking task just retry once and it work normally .Here are some of my views: 1、 If a flink-cdc job always start with exception to end a job ,because BinaryLogClient listener registration failed , what we should do for this situation? 2、we only can remove the current binlog file ,because it only contains rotate events . 3、A new flink-cdc job will take some times to real work .
Directly throwing the exception is the simplest solution, to deal with it better, you can also retry connecting for some specific exceptions, but it should not retry for all exceptions. I think it's not safe to directly go to the retry logic here.
By the way, there should be similar implementations in debezium, I'm not sure. I will check it out later.
How can we retry when I fail to register the listener communication? By the way, I also looking forword to find similar implementations in debezium
The reason for the registration failure of BinaryLogClient is that if the server id is not set, BinaryLogClient will use the default 65535 as server-id, so we need to register the server id in each source.
Binlog file must has timestamp in v4 ,so we should don't consider the empty timestamp.
If i flush log with mysql client twice and the binlog file must has FormatDescriptionEventData event and timestamp.
@whhe PTAL
I think adding a server id can indeed solve the issue, and a LifecycleListener is also needed for other exceptions. Can you add a basic implementation like ReaderThreadLifecycleListener please?
I think adding a server id can indeed solve the issue, and a LifecycleListener is also needed for other exceptions. Can you add a basic implementation like
ReaderThreadLifecycleListenerplease?
where i can add a basic implementation like ReaderThreadLifecycleListener ?
May be we should consider fix the BinaryLogClient with LifecycleListener in a new pr ,because another pr also should be fixed https://github.com/apache/flink-cdc/pull/1915/
May be we should consider fix the BinaryLogClient with LifecycleListener in a new pr ,because another pr also should be fixed https://github.com/apache/flink-cdc/pull/1915/
+1, creating a new pr for it makes sense to me.
@shiyiky Thanks for this PR. Please add some description about this bug and changes from this PR.
@ruanhang1993 Desc:
- If the binaryLogClient does not have a registered server-id, the binaryLogClient will use the default
65535parameter. If there is only one source in a Flink job, it will be normal. - If multiple sources register binaryLogClients using the default
65535in a Flink job, registration may fail because one of the sources is searching for the timestamp of the binlog file and has not yet released the client registered using65535as theserver-id. - If registration fails. BlockingQueue will always be empty and waiting without doing nothing.
Changes:
When registering the binaryLogClient to search timestamp , use the server-id unique to each source for registration.