gpdb
gpdb copied to clipboard
Code changes to support CDC
This PR is the foundation to implement CDC.
1, Add info to logs
Customer can utilize logical decoding to synchronize data from one source-GP to another target-GP.
If customers want to maintain transaction consistency and transaction atomicity on target-GP, they need to collect logical logs from the coordinator and segments of source-GP, translate the logs into SQL statements and assemble them into transactions, and commit these transactions to target-GP. There are one-to-one correspondences between transactions on source-GP and transactions on dest-GP.
The framework is like this:
In order to make this architecture work, the following modifications need to be made:
1.1, distributed_forget log
distributed_forget log is generated by coordinator and is used to mark the end of a distributed transaction.
It contains the distributed transaction ID, but does not contain information about the segments that executed this distributed transaction.
As shown in the figure, this distributed transaction is executed on two segments, but cdc_replayer only received the logs of segment-0 and the distributed_forget log.
The logs on segment-1 were not sent for some reasons.
At this time, the replayer does not know whether to commit the transaction because it does not know whether all the logs of the distributed transaction have been completely collected.
I want to add a field named 'nsegs' to the
distributed_forget log to represent the number of segments executing the distributed transaction.
As shown in the example, cdc_replayer learns from the distributed_forget log that this distributed transaction was executed on 2 segments, so it needs to wait for the log on another segment.
1.2, one-phase transaction
A one-phase transaction is a transaction that only execute on one segment. The coordinator will not generate the distributed_forget log for one-phase transaction.
I want to add a field to the local transaction commit log that identifies whether the transaction is one-phase.
As shown in the figure, when cdc_replayer receives such a commit log, it can commit the transaction without waiting for distributed_forget log.
It should be emphasized that logical decoding only works when wal_level >= logical, so these new fields will only be written to the xlog when wal_level >= logical. Our customers' GPs now all have wal_level < logical set, so this code change won't have any impact.
2 Calculating the value of the nsegs field
I use the information returned from QE to QD to calculate the value of the nsegs field in distributed_forget log.
Specifically, I added a bitmap in TMGXACTLOCAL to mark which segments the distributed transaction generated logs on. We are using this field of the TMGXACTLOCAL structure on QD.
processResult in cdbdisp_async.c is the function used by QD to process QE return messages. The write_log field in pg_conn indicates whether the QE corresponding to this connection has generated logs. If logs is generated, we assume that the corresponding QE has executed the distributed transaction and record its segment-id in the bitmap. In this way, when the distributed_forget log is generated, we can check this bitmap to know how many segments have executed the distributed transaction.
3 test cases
Because the code modification only involves the decoding of the distributed_forget log and the decoding of the one-phase transaction commit log, I wrote two corresponding test cases.
The original contrib/test_decoding contains test cases for logical decoding of postgres. But for currently the testing cases in directory contrib/test_decoding does not actually run in the pipeline because the wal_level has set to be logical, so let's just add the testings cases on to the test_decoding to avoid to much duplicated codes.
The codes are added with a few differences:
- Add the declaration and definition of the
pg_decode_distributed_forgetfunction, which is used to make corresponding output based on the decoded content of thedistributed_forgetlog. As considering the ABI testings, we add one global function pointer calledPluginDistributedForgetCbwhich need to be initialized if you the plugin wants to handledistributed_forgetwal log. - Modify the
pg_output_beginfunction. When the user uses thepg_logical_slot_get_changescommand,pg_output_beginwill be called and output something to indicate the beginning of a transaction. I modified this function to add some new output for testing when the transaction is a one-phase transaction.
4 the code change it syncrep.c
The transaction on the coordinator needs to confirm that the sync-standby has received the transaction commit log (xact.c:1774) before committing the transaction.
If there are multiple sync-standbys, we need to wait until the lsn of the slowest sync-standby exceeds the lsn of the coordinator transaction log before the coordinator transaction can be committed.
The problem with the original code is that the logical WALSender will be regarded as a sync-standby and its sync progress lsn will be checked. However, the logical replication is asynchronous and cannot be used as a sync-standby.
The sync_priority of the logical walsender on the coordinator is 0, and this condition can be used to filter. In fact, the original code of postgres does a sync_priority check (syncrep.c:898), where the logic on both sides can be aligned.
Welcome suggestions.
I want to add a field named 'nsegs' to the distributed_forget log to represent the number of segments executing the distributed transaction.
A one-phase transaction is a transaction that only execute on one segment. The coordinator will not generate the distributed_forget log for one-phase transaction.
Is that possible to unify both fields/flags to one ? Like nsegs is 1 indicates one-phase, > 1 indicates normal two-phases transaction (just a rough example maybe inappropriate). Understand current design requires different logs (or sources) to carry them for identifying cases, but logically, regardless of different logs, one field could satisfy the requirement of identifying one or two-phases transaction, right ? And if this field could be included in some common xlog, that would be unified and simpler.
And if this field could be included in some common xlog, that would be unified and simpler.
One idea is to write nseg in the QE commit WAL:
- If
nseg==1, it's a 1PC and we can commit immediately; - If
nseg>1, it's a 2PC and we need to waitnsegof QE records AND thedistributed_forgetrecord; - If there's no
nseg, it's local commit and we can commit immediately.
But to do this we would need pass the nseg from the QD to QE. Currently QD passes this info to QE: https://github.com/greenplum-db/gpdb/blob/c14fc13ea92ae7d18a81d0ada15fabb22b965b0c/src/backend/tcop/postgres.c#L5508
But that seems to be the total number of segments in cluster. We would need to pass the nseg that are participating in the query. I think it shouldn't be hard (check cdbdisp_dispatchCommandInternal->buildGpQueryString, there's a segments that should be the info we need), we did similar thing for hot standby dispatch.
I want to add a field named 'nsegs' to the distributed_forget log to represent the number of segments executing the distributed transaction.
A one-phase transaction is a transaction that only execute on one segment. The coordinator will not generate the distributed_forget log for one-phase transaction.
Is that possible to unify both fields/flags to one ? Like nsegs is 1 indicates one-phase, > 1 indicates normal two-phases transaction (just a rough example maybe inappropriate). Understand current design requires different logs (or sources) to carry them for identifying cases, but logically, regardless of different logs, one field could satisfy the requirement of identifying one or two-phases transaction, right ? And if this field could be included in some common xlog, that would be unified and simpler.
If they could be merged into one nseg count logic, that will be easier for replayer to replay. But currently, there are no xlog for one phase written on coordinator. I would like to read the codes here try to figure out the possibility of adding such xlog, but I wonder if this could be accepted by GP and got merged.
If they could be merged into one nseg count logic, that will be easier for replayer to replay. But currently, there are no xlog for one phase written on coordinator. I would like to read the codes here try to figure out the possibility of adding such xlog, but I wonder if this could be accepted by GP and got merged.
I don't think we need to add extra xlog. Given the statement:
I want to add a field to the local transaction commit log that identifies whether the transaction is one-phase.
I thought we don't need to add new field to the local commit log, just re-use nsegs is enough to identify whether it is one-phase or two-phases transaction (as @huansong 's suggested), if my understanding is correct.
And if this field could be included in some common xlog, that would be unified and simpler.
One idea is to write
nsegin the QE commit WAL:
- If
nseg==1, it's a 1PC and we can commit immediately;- If
nseg>1, it's a 2PC and we need to waitnsegof QE records AND thedistributed_forgetrecord;- If there's no
nseg, it's local commit and we can commit immediately.But to do this we would need pass the
nsegfrom the QD to QE. Currently QD passes this info to QE:https://github.com/greenplum-db/gpdb/blob/c14fc13ea92ae7d18a81d0ada15fabb22b965b0c/src/backend/tcop/postgres.c#L5508
But that seems to be the total number of segments in cluster. We would need to pass the
nsegthat are participating in the query. I think it shouldn't be hard (checkcdbdisp_dispatchCommandInternal->buildGpQueryString, there's asegmentsthat should be the info we need), we did similar thing for hot standby dispatch.
Thanks! @huansong I agree with your solution. But looking into current existing implementation, we have one_phase flag in the commit xlog, also along with your gxid in your merged commit(https://github.com/greenplum-db/gpdb/pull/17057/files), this solution can also handle the previous 3 scenarios:
- If one_phase is set, it is one phase commit, we can commit it immediately.
- If gxid is set, not one_phase flag set, this is a 2-phase commits, we need to wait
nsegof QE records AND thedistributed_forgetrecord; - If no gxid, this is local commit, we can commit it immediately.
And as one-phase can be easily got from current code of GPDB, no further dispatch infos need to be added, so we prefer to make the as little as modifications to the code, so I think current solution is simpler compared to that dispatching another nseg and write it in each commit. If you have other considerations, please let me know.
This PR is ready to be reviewed. Please continue reviewing. Thanks very much!