pulsar icon indicating copy to clipboard operation
pulsar copied to clipboard

[improve][jdbc] Add time handling support

Open omrifried opened this issue 3 months ago • 2 comments

Main Issue: https://github.com/apache/pulsar/discussions/23109

Motivation

  • This PR adds abstract support for JDBC sink to enable datetime inserts into databases

Modifications

  • Added a protected method on the base jdbc sink to handle datetimes
  • Parse avro logical types to pull correct time information
  • Insert correct datetimes into databases
  • Fall back to raw data types on lack of support

Verifying this change

  • [ ] Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:

  • *Added unit tests to all JDBC database connectors for handling datetimes

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • [ ] Dependencies (add or upgrade a dependency)
  • [x] The public API
  • [ ] The schema
  • [ ] The default values of configurations
  • [ ] The threading model
  • [ ] The binary protocol
  • [ ] The REST endpoints
  • [ ] The admin CLI options
  • [ ] The metrics
  • [ ] Anything that affects deployment

Documentation

  • [ ] doc
  • [ ] doc-required
  • [x] doc-not-needed
  • [ ] doc-complete

omrifried avatar Sep 22 '25 16:09 omrifried

Thanks for the contribution @omrifried. There are checkstyle errors. Configuring code style and checkstyle in IntelliJ will help address those. Please check the guide at https://pulsar.apache.org/contribute/setup-ide/#configure-code-style .

You can run mvn -T 1C initialize checkstyle:check to quickly run the check on the command line.

[INFO] There are 17 errors reported by Checkstyle 10.14.2 with /home/runner/work/pulsar/pulsar/buildtools/src/main/resources/pulsar/checkstyle.xml ruleset.
Error:  src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[26,1] (imports) ImportOrder: Import java.sql.Date appears after other imports that it should precede
Error:  src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[65] (regexp) RegexpSingleline: Trailing whitespace
Error:  src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[77] (sizes) LineLength: Line is longer than 120 characters (found 123).
Error:  src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[79] (sizes) LineLength: Line is longer than 120 characters (found 121).
Error:  src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[97] (sizes) LineLength: Line is longer than 120 characters (found 168).
Error:  src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[255,13] (blocks) RightCurly: '}' at column 13 should be on the same line as the next part of a multi-block statement (one that directly contains multiple blocks: if/else-if/else, do/while or try/catch/finally).
Error:  src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[357] (regexp) RegexpSingleline: Trailing whitespace
Error:  src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[358] (regexp) RegexpSingleline: Trailing whitespace
Error:  src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[360,21] (naming) LocalVariableName: Name 'timestamp_converted' must match pattern '^[a-z][a-zA-Z0-9]*$'.
Error:  src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[519,21] (blocks) RightCurly: '}' at column 21 should be on the same line as the next part of a multi-block statement (one that directly contains multiple blocks: if/else-if/else, do/while or try/catch/finally).
Error:  src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[522,21] (blocks) RightCurly: '}' at column 21 should be on the same line as the next part of a multi-block statement (one that directly contains multiple blocks: if/else-if/else, do/while or try/catch/finally).
Error:  src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[524] (sizes) LineLength: Line is longer than 120 characters (found 122).
Error:  src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[528,13] (coding) FallThrough: Fall through from previous branch of the switch statement.
Error:  src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[532] (sizes) LineLength: Line is longer than 120 characters (found 219).
Error:  src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[534,21] (blocks) RightCurly: '}' at column 21 should be on the same line as the next part of a multi-block statement (one that directly contains multiple blocks: if/else-if/else, do/while or try/catch/finally).
Error:  src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[537,21] (blocks) RightCurly: '}' at column 21 should be on the same line as the next part of a multi-block statement (one that directly contains multiple blocks: if/else-if/else, do/while or try/catch/finally).
Error:  src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[543,13] (coding) FallThrough: Fall through from previous branch of the switch statement.

lhotari avatar Sep 23 '25 12:09 lhotari

https://github.com/apache/pulsar/tree/master/tests#readme

Apologies for the delayed response @lhotari , I missed the notification on the PR. I will try to get to the integration tests next weekend. I also confirmed functionality with Postgres in a running instance, but will add integration tests for more solid test coverage

omrifried avatar Dec 01 '25 05:12 omrifried