dlt-meta icon indicating copy to clipboard operation
dlt-meta copied to clipboard

Add non-Delta as Sink

Open ravi-databricks opened this issue 1 year ago • 4 comments

Support non delta as sink using metadata approach.

  • In metadata if sink is non delta use Structure streaming approach with foreachbatch
  • Use DAB to deploy non-DLT pipelines to databricks workspace

ravi-databricks avatar Apr 17 '24 20:04 ravi-databricks

This feature can be implemented using DLT's sink API as described below:

API guide

Create a sink To create a sink, you can use the new create_sink() sink API. This API accepts three arguments: A string for the sink name A string specifying the format (can be either kafka or delta) A map of sink options, formatted as {string: string} All of the sink options available in DBR are supported. E.g. All authentication options in Kafka.

create_sink(<sink_name>, <format>, <options_string_string_map>)

Code examples Create a Kafka sink

create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "subscribe": "my_topic"
  }
)

Create a Delta sink by giving the file system path to the table


create_sink(
  "my_delta_sink",
  "delta",
  { "path": "//path/to/my/delta/table" }
)

Create a Delta sink by giving the table name in UC

create_sink(
  "my_delta_sink",
  "delta",
  { "tableName": "my_catalog.my_schema.my_table" }
)

Use append_flow to write to a sink Once the sink object is created, you can set up an append_flow that writes to the sink.

create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})

@append_flow(name = "flow", target = "my_sink")
def flowFunc():
  return read_stream("xxx")

ravi-databricks avatar Aug 16 '24 16:08 ravi-databricks

Added bronze_sinks and silver_sinks options in onboarding file as below:

[
   {
      "name":"sink_name1",
      "format":"delta",
      "options":{
         "tableName":"uc.tablename"
      }
   },
   {
      "name":"sink_name2",
      "format":"kafka",
      "options":{
         "kafka.bootstrap.servers":"{kafka_sink_broker}",
         "topic":"{kafka_sink_topic}"
      }
   }
]

added write_to_sinks in dataflow_pipeline.py under AppendFlowWriter

    @staticmethod
    def write_to_sinks(sinks: list[DLTSink], write_to_sink):
        """Write to Sink."""
        for sink in sinks:
            dlt.create_sink(sink.name, sink.format, sink.options)
            dlt.append_flow(name=f"{sink.name}_flow", target=sink.name)(write_to_sink)

Above code can be invoked while doing write:

        if self.dataflowSpec.sinks:
            dlt_sinks = DataflowSpecUtils.get_sinks(self.dataflowSpec.sinks, self.spark)
            AppendFlowWriter.write_to_sinks(dlt_sinks, self.write_to_delta)

ravi-databricks avatar Sep 04 '24 02:09 ravi-databricks

Once DLT Direct publishing mode is in PuPr then will merge into release branch.

ravi-databricks avatar Sep 18 '24 20:09 ravi-databricks

  • Added support for kafka, eventhub and external delta tables
  • Example for kafka as sink shown here
  • Added select exp and filter condition for every target sink
        "bronze_sinks": [
                {
                    "name": "bronze_customer_kafka_sink1",
                    "format": "kafka",
                    "options": {
                        "kafka_sink_servers_secret_scope_name":"{kafka_sink_servers_secret_scope_name}",
                        "kafka_sink_servers_secret_scope_key":"{kafka_sink_servers_secret_scope_key}",
                        "kafka.security.protocol":"PLAINTEXT",
                        "topic":"{kafka_sink_topic}_1"
                    },
                    "select_exp":["value"],
                    "where_clause":"value is not null"  
                },
                {
                    "name": "bronze_customer_kafka_sink2",
                    "format": "kafka",
                    "options": {
                        "kafka_sink_servers_secret_scope_name":"{kafka_sink_servers_secret_scope_name}",
                        "kafka_sink_servers_secret_scope_key":"{kafka_sink_servers_secret_scope_key}",
                                               "kafka.security.protocol":"PLAINTEXT",
                        "kafka.security.protocol":"PLAINTEXT",
                        "topic":"{kafka_sink_topic}_2"
                    },
                    "select_exp":["value"],
                    "where_clause":"value is not null"                    
                }
        ] 
Image

ravi-databricks avatar Feb 28 '25 01:02 ravi-databricks