connectors icon indicating copy to clipboard operation
connectors copied to clipboard

Delta Source for Apache Flink

Open dennyglee opened this issue 4 years ago • 2 comments

Build a Flink/Delta source (i.e., Flink reads from Delta Lake) potentially leveraging the Delta Standalone Reader. Expansion of issue #74 .

PR Plan:

  • PR 5 - Option support in Bounded Mode - DONE
  • PR 6 - Continuous mode without monitoring for changes - DONE
  • PR 6.1 - Package Refactoring - DONE
  • PR 7 - Monitoring for changes in Continuous mode - DONE
  • PR 7.1 - More tests for PR 7 including IT case tests for Continuous mode. - DONE
  • PR 9 - Builder - DONE
  • PR 10 - Builder refactoring to extract Schema from Delta Log. - DONE
  • PR 10.1 - Tests for SourceUtils::buildSourceSchema, DeltaSourceBuilderBase::getSourceSchema and SnapshotSupplier implementations. - DONE
  • PR 11 - Partition Support using Delta log to identify partition columns. - DONE
  • PR 12 - Get BATCH_SIZE for Format builder from Source options - this value is currently hardcoded. Add validation for option names and values including using not applicable options for given mode via generic option method. - DONE
  • PR 12.1 More tests To option type conversion + handling option(...) validation errors. - DONE
  • PR 13 - Use new DeltaLog API getVersionAtOrAfterTimestamp for continuous getChanges call when using starting timestamp. This uses the "streaming" semantics of timestamp -> version conversion. - DONE
  • @scottsand-db -> review Java docs.
  • PR 14 - Additional IT tests including all options and end to end tests using Source and Sink. - DONE
  • After PR 14 run Sample job using Delta Sink/Source on a real cluster. - DONE
  • PR 15 - simple test case for remote file table path, similar to #340 and #341 - DONE
  • PR 16 - update README - DONE
  • PR 17 - update examples - DONE
  • PR 18 - add "columnNames" key support to .option(....)
  • PR 19 - Flink Sink integration test that creates a Delta Log checkpoint and asserts its correctness - DONE
  • PR 20 - update javadocs - ** DONE**

-- All functionalities are in place up to this point. The following PRs are extra tests, eventual bug fixes and code/javadoc adjustments. --

P1 Functionalities

  • Aggregate and report as one all option validation errors from Source Builder.
  • continuous from version 0, handling Metadata & Protocol actions.
  • source metrics. also update readme.

~~PR 11 - Annotate with @deprecated Sink's builder methods that names starts from "with". Add new ones without "with" prefix and overload partitions with List argument for Sink.~~

TODOs

  • Use index for keeping track of already processed paths in SnapshotProcessor - needs new Delta API.
  • Re-visit Delta Standalone APIs for when the source see's metadata/protocol actions. Update ActionProcessor class to handle MetaData and Protocol Actions

dennyglee avatar Aug 04 '21 03:08 dennyglee

Regarding point: "After PR 14 run Sample job using Delta Sink/Source on a real cluster."

Basic manual tests were executed on Flink cluster running on a local docker setup. Task Manager and Job Manager nodes were using shared docker volume to read from and write to Delta tables. Versions 0.5.0-SNAPSHOT of delta standalone and flink connector were used.

Performed tests:

  • Bounded mode: Delta Table -> Flink Source -> Flink Sink Source parallelism level = 3 Sink parallelism level = 3 DeltaEndToEnd

Source and Sink parallelism level = 3 DeltaEndToEnd2

  • Continuous mode Test setup: One job running two pipelines. First pipeline contains a custom source generating continuous stream of events and Flink Delta Sink, that writes data into Delta table.

Second pipeline contains Delta Source setup to work in continuous mode and to read Delta table created by the first pipeline. Source is followed by another Delta Sink. StreamingEndToEnd

kristoffSC avatar Jun 21 '22 16:06 kristoffSC

Latest javadocs as of commit eaab5866bb7d2c9240fb97c09fc48925a25510da 0.5.0-SNAPSHOT.zip

scottsand-db avatar Jul 12 '22 15:07 scottsand-db

this has been released in Delta connectors 0.5.0

tdas avatar Aug 26 '22 22:08 tdas