kafka-delta-ingest
kafka-delta-ingest copied to clipboard
Add in logic for "Create Delta Table if not exist"
In playing around with KDI I realize it doesn't have logic to "Create Delta Table if not exist". If you point the container to a Filesystem that has no Delta table, it complains:

Whereas if you point it to an existing Delta Table, works fine:

I think this should be trivial to add in, I created a "KDI Java" just for the heck it of it after chatting with @thovoll and it's a matter of checking if the Trx log version is !(log.snapshot().getVersion() > -1):
https://github.com/mdrakiburrahman/kafka-delta-ingest-adls/blob/2802eead5174e5fc00da047470572d5fd4c76981/1.KDI-Java/src/main/java/com/microsoft/kdi/KDI.java#L261
My use case is I'm pointing KDI at a large number of Change Data Capture Topics from Debezium - so it can start pumping out Delta Tables at scale. I can't assume the Delta Tables already exist - so this would be great to have!
@mdrakiburrahman would you use the first message received in order to infer the schema of the Delta Table? There's a small manner of concurrency to content with as well, what should happen if there are two processes which access object storage simultaneously and try to create the table at the same time?
Perhaps specifying the schema when launching the application is an option? Schema inference is gnarly in the real world in my experience