debezium-connector-db2
debezium-connector-db2 copied to clipboard
INVESTIGATION ONLY : 1.7 zos changes
@jpechane : I am creating a new PR with the changes as i had to delete the older branch due to some confusion. This has only the ZOS changes associated with it in addition to the 1.7 branch.
@aminnezarat Hi, is it possible you could try and test this PR?
Hi @jpechane
Thanks for new PR, we have tested it in our team and faced with new error.
Our environment is: Z/os 1.12 Db2 for z/OS 10.1 Connector 2.3.1
On Mon, Jun 5, 2023, 9:30 PM Dr. Amin Nezarat @.***> wrote:
Hi Thanks for new PR, we have tested it in our team and faced with new error.
On Mon, Jun 5, 2023, 3:46 PM Jiri Pechanec @.***> wrote:
@aminnezarat https://github.com/aminnezarat Hi, is it possible you could try and test this PR?
— Reply to this email directly, view it on GitHub https://github.com/debezium/debezium-connector-db2/pull/76#issuecomment-1576678832, or unsubscribe https://github.com/notifications/unsubscribe-auth/AGBSH4UCN666K6PH6K24TKTXJXE2DANCNFSM6AAAAAAQRWT6BM . You are receiving this because you were mentioned.Message ID: @.***>
@aminnezarat Is it possible that LLSN can have dynamic length in your case? If yes the fix could be relativly easy in compareTo
method. The array lengths will be compared first and the content only if the length is the same.
@jpechane in DB2 10.1 (for z/OS) we have 10 byte for RBA (instead of LSN) so it seems we should consider 10 byte instead of 16 byte for below codes in Lsn (compareTo method) :
final int[] thisU = getUnsignedBinary();
final int[] thatU = o.getUnsignedBinary();
after changing the length to 10 byte, faced to new error as below:
@vahidsh1 Could you please share the source tree you use? Unfortunately I cannot match the line neumbers.
Dear @jpechane, Thank you for your attention. The branch address is: https://github.com/vahidsh1/debezium-connector-db2-1.7_ZOS_NewChanges/tree/lsn-change-10to16byte/
and the error is:
@vahidsh1 Could you please share schema of sysibm.sysdummy1
table?
Dear @jpechane,
Thank you, do you mean the following information?
@vahidsh1 As woraround, could you modify io.debezium.connector.db2.Db2Connection.timestampOfLsn(Lsn)
to just return Instant.ofEpochMilli(0)
?
I changed the code as you mentioned:
the previous error is solved and the outputs are as below:
it seems after initial snapshot (full snapshot), each time a new record is inserted to source table (ARRA table), debezium checks all recorded which was inserted in CDARRA table again. Is it right? (this conclusion is based on the output lines in the above images) Thank You again. regards Vahid Shahbazi
[email protected] https://www.linkedin.com/in/vahidshahbazi/
@vahidsh1 So we are moving forward, could you please disable the code in lines https://github.com/vahidsh1/debezium-connector-db2-1.7_ZOS_NewChanges/blob/lsn-change-10to16byte/src/main/java/io/debezium/connector/db2/Db2StreamingChangeEventSource.java#L192-L200
We'll then need to find out why it is not working. Right now let our goal be getting the streaming to run and we can then try to fix issues one by one.
Dear @jpechane,
Thank you again.
I disabled the mentioned lines (192-200 in Db2StreamingChangeEventSource.java).
After inserting first record the below logs were displayed:
After inserting second record the below logs were displayed:
@vahidsh1 Hi, I am sorry, I was on vacations so I am able to continue with it only now. So are you sure you removed lines similar to
// After restart for changes that were executed before the last committed offset
if (tableWithSmallestLsn.getChangePosition().compareTo(lastProcessedPositionOnStart) < 0) {
LOGGER.info("Skipping change {} as its position is smaller than the last recorded position {}", tableWithSmallestLsn,
lastProcessedPositionOnStart);
tableWithSmallestLsn.next();
continue;
}
??
Dear @jpechane Hi, yes you are right, the problem was solved after commenting the mentioned lines. Thank you. Could you please help for the next needed changes.
@jpechane
Hi again, we faced one other problems:
The problem was related to "increment" method in Lsn.java class which was related to 10 byte LSN (DB2 10.1 for z/OS) instead of 16 byte LSN.
We had to change some code as below:
public Lsn increment() {
final BigInteger bi = new BigInteger(this.toString().replace(":", ""), 16).add(BigInteger.ONE);
final byte[] biByteArray = bi.toByteArray();
final byte[] lsnByteArray = new byte[16]; --> final byte[] lsnByteArray = new byte[10];
for (int i = 0; i < biByteArray.length; i++) {
lsnByteArray[i + 16 - biByteArray.length] = biByteArray[i]; --> lsnByteArray[i + 10 - biByteArray.length] = biByteArray[i];
}
return Lsn.valueOf(lsnByteArray);
}
Now this problem was solved.
https://github.com/vahidsh1/debezium-connector-db2-1.7_ZOS_NewChanges/compare/main...develop
@vahidsh1 Excellent! So to sum up, we need to be able
- configure the size of LSN (and make sure they are properly compared)
- disable LSN to timestamp conversion
If we now try to do the necessary changes for 2.4, would you be willing to give it a test ride again?
Thanks
Dear @jpechane Hi again, sorry for my late, Sure I will work on it but before that I face another problem: In the source table I have a column "RATE" (decimal 13 scale 9, like 1234.123456789) now the debezium recognized this column as "{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"9","connect.decimal.precision":"13"} but in the source I inserted "RATE":"1.234" while after replication via debezium it changed to "RATE":"### SY1YgA==" instead of "RATE":"### 1.234" !!!! It seems this is related to this fact that in DB2 (for z/OS we have PACKED DECIMAL instead of DECIMAL). Now I am working on this issue to change the debezium db2 connector in order to convert PACKED DECIMAL to DECIMAL. Could you please guid me? Regards Vahid [email protected] https://www.linkedin.com/in/vahidshahbazi/
@vahidsh1 This is expected, please see https://debezium.io/documentation/reference/2.3/connectors/db2.html#db2-decimal-types and I believe documentation also misses https://debezium.io/documentation/reference/2.3/connectors/sqlserver.html#sqlserver-property-decimal-handling-mode so the correct description for decimal typ would be https://debezium.io/documentation/reference/2.3/connectors/sqlserver.html#sql-server-decimal-values
Dear jpechane By adding " "decimal.handling.mode":"double", " the problem is solved now decimal columns are correct after replication via debezium. Thank You for your guides. Now could you please advise me the next steps to sum up these changes into version 2.4. B. Regards, Vahid Shahbazi
Hello @jpechane and @vahidsh1,
I am Srinath, I am looking for embeded CDC for DB2 zOs. I am trying to connect db2 database with debezium-db2 connector but facing some issue. Seen your solution what value need to give for properties as you customized?
custom.db.type custom.cdc.program.schema custom.cdc.table.schema
Caused by: com.ibm.db2.jcc.am.SqlSyntaxErrorException: DB2 SQL Error: SQLCODE=-204, SQLSTATE=42704, SQLERRMC=ASNCDC.IBMSNAP_REGISTER, DRIVER=4.13.127
Hello @srinathramya 1- What is your DB2 and z/OS version? 2- Do you have SQL replication product (InfoSphere Replication System) on your z/OS? 3- The configuration should be as below: { "name": "db2-connector-ABCD", "config": { "connector.class": "io.debezium.connector.db2.Db2Connector", "database.hostname": "IPadrress", "database.server.name":"DSNP", "database.port": "DDF PORT number", "database.user": "username", "database.password": "password", "snapshot.mode":"initial", "database.dbname": "subsystem_name", "custom.db.type":"ZOS", "custom.cdc.table.schema":"your schema", "custom.cdc.program.schema":"ASNCDC", "database.cdcschema": "ASNCDC", "topic.prefix": "XYZ", "table.include.list": "dbname.tablename", "database.history.kafka.topic":"topic name", "database.history.kafka.bootstrap.servers":"localhost:9092", "schema.history.internal.kafka.bootstrap.servers": "localhost:9092", "schema.history.internal.kafka.topic": "topic name" } } Regards Shahbazi
Hello @vahidsh1 ,
Thanks for the reply.
1- What is your DB2 and z/OS version? Ans: 12.15 2- Do you have SQL replication product (InfoSphere Replication System) on your z/OS? Ans: No
But it seems ASNCDC schema not found in our db2 database. Do we need to create manually?
Regards Srinath.
Hi Dear @srinathramya
You need IRS (infosphere replication server) to read DB2 logs, analyze the logs, extract the changed records from logs and store the extracted records into temporary table. The schema for this temporary table is ASN (you can change it to ASNCDC). After storing the extracted records in temporary table, now debezium connect to this table (ASNCDC schema) to read and convert the records into kafka topics. So you need a process to extract database changes and store it in ASNCDC schema after that you can use debezium.
https://debezium.io/documentation/reference/1.1/connectors/db2.html https://www.ibm.com/support/pages/q-replication-and-sql-replication-product-documentation-pdf-format-version-101-linux-unix-and-windows
Regards, Vahid
Dear jpechane Could you please advise me the next steps to sum up these changes into version 2.5. B. Regards, Vahid Shahbazi
Dear @AshwinMK64, I hope this message finds you well. I wanted to reach out to discuss the modifications I made to the version you provided in your pull request. Over the past few months, I have made some modifications to ensure compatibility with DB2 v10 and z/OS 1.12. I am pleased to inform you that the results have been satisfactory. Now, I believe it is appropriate to apply these modifications to your original version. However, I would appreciate your guidance on the next steps. Could you please advise on how we should proceed? Thank you for your attention to this matter. I look forward to hearing from you soon.
Replaced with https://github.com/debezium/debezium-connector-db2/pull/154 for Debezium 2.x
@vahidsh1 Could you please take a look at the new PR?