[improve][jdbc] Add time handling support
Main Issue: https://github.com/apache/pulsar/discussions/23109
Motivation
- This PR adds abstract support for JDBC sink to enable datetime inserts into databases
Modifications
- Added a protected method on the base jdbc sink to handle datetimes
- Parse avro logical types to pull correct time information
- Insert correct datetimes into databases
- Fall back to raw data types on lack of support
Verifying this change
- [ ] Make sure that the change passes the CI checks.
This change added tests and can be verified as follows:
- *Added unit tests to all JDBC database connectors for handling datetimes
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
- [ ] Dependencies (add or upgrade a dependency)
- [x] The public API
- [ ] The schema
- [ ] The default values of configurations
- [ ] The threading model
- [ ] The binary protocol
- [ ] The REST endpoints
- [ ] The admin CLI options
- [ ] The metrics
- [ ] Anything that affects deployment
Documentation
- [ ]
doc - [ ]
doc-required - [x]
doc-not-needed - [ ]
doc-complete
Thanks for the contribution @omrifried. There are checkstyle errors. Configuring code style and checkstyle in IntelliJ will help address those. Please check the guide at https://pulsar.apache.org/contribute/setup-ide/#configure-code-style .
You can run mvn -T 1C initialize checkstyle:check to quickly run the check on the command line.
[INFO] There are 17 errors reported by Checkstyle 10.14.2 with /home/runner/work/pulsar/pulsar/buildtools/src/main/resources/pulsar/checkstyle.xml ruleset.
Error: src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[26,1] (imports) ImportOrder: Import java.sql.Date appears after other imports that it should precede
Error: src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[65] (regexp) RegexpSingleline: Trailing whitespace
Error: src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[77] (sizes) LineLength: Line is longer than 120 characters (found 123).
Error: src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[79] (sizes) LineLength: Line is longer than 120 characters (found 121).
Error: src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[97] (sizes) LineLength: Line is longer than 120 characters (found 168).
Error: src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[255,13] (blocks) RightCurly: '}' at column 13 should be on the same line as the next part of a multi-block statement (one that directly contains multiple blocks: if/else-if/else, do/while or try/catch/finally).
Error: src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[357] (regexp) RegexpSingleline: Trailing whitespace
Error: src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[358] (regexp) RegexpSingleline: Trailing whitespace
Error: src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[360,21] (naming) LocalVariableName: Name 'timestamp_converted' must match pattern '^[a-z][a-zA-Z0-9]*$'.
Error: src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[519,21] (blocks) RightCurly: '}' at column 21 should be on the same line as the next part of a multi-block statement (one that directly contains multiple blocks: if/else-if/else, do/while or try/catch/finally).
Error: src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[522,21] (blocks) RightCurly: '}' at column 21 should be on the same line as the next part of a multi-block statement (one that directly contains multiple blocks: if/else-if/else, do/while or try/catch/finally).
Error: src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[524] (sizes) LineLength: Line is longer than 120 characters (found 122).
Error: src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[528,13] (coding) FallThrough: Fall through from previous branch of the switch statement.
Error: src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[532] (sizes) LineLength: Line is longer than 120 characters (found 219).
Error: src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[534,21] (blocks) RightCurly: '}' at column 21 should be on the same line as the next part of a multi-block statement (one that directly contains multiple blocks: if/else-if/else, do/while or try/catch/finally).
Error: src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[537,21] (blocks) RightCurly: '}' at column 21 should be on the same line as the next part of a multi-block statement (one that directly contains multiple blocks: if/else-if/else, do/while or try/catch/finally).
Error: src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[543,13] (coding) FallThrough: Fall through from previous branch of the switch statement.
https://github.com/apache/pulsar/tree/master/tests#readme
Apologies for the delayed response @lhotari , I missed the notification on the PR. I will try to get to the integration tests next weekend. I also confirmed functionality with Postgres in a running instance, but will add integration tests for more solid test coverage