shc icon indicating copy to clipboard operation
shc copied to clipboard

SHC with Spark Structured Streaming

Open EDALJO opened this issue 7 years ago • 35 comments

Hi,

I have a Spark Structured Streaming application where I'd like to write streaming data to HBase using SHC. It reads data from a location where new csv files continuously are being created. The defined catalog works for writing a DataFrame with identical data into HBase. The key components of my streaming application are a DataStreamReader and a DataStreamWriter.

val inputDataStream = spark
      .readStream
      .option("sep", ",")
      .schema(schema)
      .csv("/path/to/data/*.csv")

inputDataStream
      .writeStream
      .outputMode("append")
      .options(
        Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "2"))
      .format("org.apache.spark.sql.execution.datasources.hbase")
      .start

When running the application I'm getting the following message:

Exception in thread "main" java.lang.UnsupportedOperationException: Data source org.apache.spark.sql.execution.datasources.hbase does not support streamed writing at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:285) at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:286) at my.package.SHCStreamingApplication$.main(SHCStreamingApplication.scala:153) at my.package.SHCStreamingApplication.main(SHCStreamingApplication.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Does anyone know a solution or way/workaround to still use the SHC for writing structured streaming data to HBase? Thanks in advance!

EDALJO avatar Dec 07 '17 10:12 EDALJO

You can write your custom sink provider, inherited from StreamSinkProvider, this is my implementation:

package HBase
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.execution.datasources.hbase._

class HBaseSink(options: Map[String, String]) extends Sink with Logging {
  // String with HBaseTableCatalog.tableCatalog
  private val hBaseCatalog = options.get("hbasecat").map(_.toString).getOrElse("")

  override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized {   
    val df = data.sparkSession.createDataFrame(data.rdd, data.schema)
    df.write
      .options(Map(HBaseTableCatalog.tableCatalog->hBaseCatalog,
        HBaseTableCatalog.newTable -> "5"))
      .format("org.apache.spark.sql.execution.datasources.hbase").save()
  }
}

class HBaseSinkProvider extends StreamSinkProvider with DataSourceRegister {
  def createSink(
                  sqlContext: SQLContext,
                  parameters: Map[String, String],
                  partitionColumns: Seq[String],
                  outputMode: OutputMode): Sink = {
    new HBaseSink(parameters)
  }

  def shortName(): String = "hbase"
}

This is example, how to use ():

inputDF.
   writeStream.
   queryName("hbase writer").
   format("HBase.HBaseSinkProvider").
   option("checkpointLocation", checkPointProdPath).
   option("hbasecat", catalog).
   outputMode(OutputMode.Update()).
   trigger(Trigger.ProcessingTime(30.seconds)).
   start

sutugin avatar Mar 14 '18 11:03 sutugin

Thanks for your answer - exactly the type of solution I was looking for. I only had time to test it quickly, but seems to be working perfectly!

EDALJO avatar Mar 20 '18 15:03 EDALJO

Excellent, glad to help!!!

sutugin avatar Mar 20 '18 16:03 sutugin

Thank you, much helps

hamroune avatar Apr 09 '18 10:04 hamroune

I've implemented your solution with HBaseSinkProvider by following steps:

  1. Clone shc.
  2. Put HBaseSinkProvider.scala to core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase.
  3. Compile shc.
  4. Run spark-submit with --jars some_path/shc/core/target/shc-core-1.1.2-2.2-s_2.11-SNAPSHOT.jar

My code is written in python, I'm including it below. The error is: pyspark.sql.utils.StreamingQueryException: u'Queries with streaming sources must be executed with writeStream.start();; I'm new in Spark and haven't experience Scala, so I cannot understand the problem. Can you please help me with it?

def consume(schema_name, brokers, topic, group_id): spark = SparkSession
.builder
.appName('SparkConsumer')
.config('hbase.zookeeper.property.clientPort', '2282')
.getOrCreate()

print 'read Avro schema from file: {}...'.format(schema_name)
schema = avro.schema.parse(open(schema_name, 'rb').read())
reader = avro.io.DatumReader(schema)
print 'the schema is read'

rows = spark \
    .readStream \
    .format('kafka') \
    .option('kafka.bootstrap.servers', brokers) \
    .option('subscribe', topic) \
    .option('group.id', group_id) \
    .option('maxOffsetsPerTrigger', 1000) \
    .option("startingOffsets", "earliest") \
    .load()
rows.printSchema()

schema = StructType([ \
        StructField('consumer_id', StringType(), False), \
        StructField('audit_system_id', StringType(), False), \
        StructField('object_path', StringType(), True), \
        StructField('object_type', StringType(), False), \
        StructField('what_action', StringType(), False), \
        StructField('when', LongType(), False), \
        StructField('where', StringType(), False), \
        StructField('who', StringType(), True), \
        StructField('workstation', StringType(), True) \
    ])

def decode_avro(msg):
    bytes_reader = io.BytesIO(bytes(msg))
    decoder = avro.io.BinaryDecoder(bytes_reader)
    data = reader.read(decoder)
    return (\
            data['consumer_id'],\
            data['audit_system_id'],\
            data['object_path'],\
            data['object_type'],\
            data['what_action'],\
            data['when'],\
            data['where'],\
            data['who'],\
            data['workstation']\
           )

udf_decode_avro = udf(decode_avro, schema)

values = rows.select('value')
values.printSchema()

changes = values.withColumn('change', udf_decode_avro(col('value'))).select('change.*')
changes.printSchema()

change_catalog = '''
{
    "table":
    {
        "namespace": "default",
        "name": "changes",
        "tableCoder": "PrimitiveType"
    },
    "rowkey": "consumer_id",
    "columns":
    {
        "consumer_id": {"cf": "rowkey", "col": "consumer_id", "type": "string"},
        "audit_system_id": {"cf": "d", "col": "audit_system_id", "type": "string"},
        "object_path": {"cf": "d", "col": "object_path", "type": "string"},
        "object_type": {"cf": "d", "col": "object_type", "type": "string"},
        "what_action": {"cf": "d", "col": "what_action", "type": "string"},
        "when": {"cf": "t", "col": "when", "type": "bigint"},
        "where": {"cf": "d", "col": "where", "type": "string"},
        "who": {"cf": "d", "col": "who", "type": "string"},
        "workstation": {"cf": "d", "col": "workstation", "type": "string"}
    }
}'''

query = changes \
    .writeStream \
    .format('HBase.HBaseSinkProvider')\
    .option('hbasecat', change_catalog) \
    .option("checkpointLocation", '/tmp/checkpoint') \
    .outputMode("append") \
    .start()

query.awaitTermination()

merfill avatar Jun 27 '18 11:06 merfill

Try to write to the console orfile, will there be the same error?

sutugin avatar Jun 27 '18 13:06 sutugin

No, when I'm trying to write records to console, everything is OK. I'm using following python code instead HBase.HBaseSinkProvider:

query = changes \
         .writeStream \
        .outputMode("append") \
        .format('console') \
        .start()

Output is somethinf like this:

Batch: 7

+-----------+---------------+--------------------+-----------+-----------+-------------+-------------+----------------+--------------------+ |consumer_id|audit_system_id| object_path|object_type|what_action| when| where| who| workstation| +-----------+---------------+--------------------+-----------+-----------+-------------+-------------+----------------+--------------------+ | 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a| | 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a| | 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a| | 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a| | 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a| | 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a| | 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a| | 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a| | 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a| | 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a| | 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a| | 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a| | 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a| | 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a| | 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a| | 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a| | 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a| | 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a| | 111| 222|\172.28.26.190\T...| File| Added|1520171584122|172.28.26.190|PD6\fsatestuser1|win10test1.pd6.local| | 111| 222|\172.28.26.190\T...| File| Added|1520171584126|172.28.26.190|PD6\fsatestuser1|win10test1.pd6.local| +-----------+---------------+--------------------+-----------+-----------+-------------+-------------+----------------+--------------------+ only showing top 20 rows

2018-06-27 14:01:26 INFO WriteToDataSourceV2Exec:54 - Data source writer org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@30f35fc5 committed. 2018-06-27 14:01:26 INFO SparkContext:54 - Starting job: start at NativeMethodAccessorImpl.java:0 2018-06-27 14:01:26 INFO DAGScheduler:54 - Job 15 finished: start at NativeMethodAccessorImpl.java:0, took 0,000037 s 2018-06-27 14:01:26 INFO MicroBatchExecution:54 - Streaming query made progress: { "id" : "1e6076ad-b403-46ca-9438-e4913660700d", "runId" : "7740c36b-dc2d-4880-8ea6-0efde89b1ef5", "name" : null, "timestamp" : "2018-06-27T11:01:26.389Z", "batchId" : 7, "numInputRows" : 669, "inputRowsPerSecond" : 883.7516512549538, "processedRowsPerSecond" : 1327.3809523809523, "durationMs" : { "addBatch" : 471, "getBatch" : 4, "getOffset" : 2, "queryPlanning" : 11, "triggerExecution" : 503, "walCommit" : 15 }, "stateOperators" : [ ], "sources" : [ { "description" : "KafkaSource[Subscribe[changes]]", "startOffset" : { "changes" : { "0" : 7000 } }, "endOffset" : { "changes" : { "0" : 7669 } }, "numInputRows" : 669, "inputRowsPerSecond" : 883.7516512549538, "processedRowsPerSecond" : 1327.3809523809523 } ], "sink" : { "description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@3715c84c" } }

merfill avatar Jun 27 '18 13:06 merfill

Unfortunately I am now without a computer, try to run the outputMode to update, if it does not help and will not be able to find a solution, then email me after July 5, I will try to help.

sutugin avatar Jun 27 '18 13:06 sutugin

Unfortunatelly, "update" mode also not work. I've received the same error (see below). Thank you in advance.

pyspark.sql.utils.StreamingQueryException: u'Queries with streaming sources must be executed with writeStream.start();;\nLogicalRDD [key#52, value#53, topic#54, partition#55, offset#56L, timestamp#57, timestampType#58], true\n\n=== Streaming Query ===\nIdentifier: [id = 66509eea-e706-4745-9b35-f82f09752b43, runId = c0e78fe7-f978-4f64-88cd-801d766f78c5]\nCurrent Committed Offsets: {}\nCurrent Available Offsets: {KafkaSource[Subscribe[changes]]: {"changes":{"0":7669}}}\n\nCurrent State: ACTIVE\nThread State: RUNNABLE\n\nLogical Plan:\nProject [change#22.consumer_id AS consumer_id#25, change#22.audit_system_id AS audit_system_id#26, change#22.object_path AS object_path#27, change#22.object_type AS object_type#28, change#22.what_action AS what_action#29, change#22.when AS when#30L, change#22.where AS where#31, change#22.who AS who#32, change#22.workstation AS workstation#33]\n+- Project [value#8, decode_avro(value#8) AS change#22]\n +- Project [value#8]\n +- StreamingExecutionRelation KafkaSource[Subscribe[changes]], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]\n'

merfill avatar Jun 27 '18 14:06 merfill

Please try to use this version of shc (https://github.com/sutugin/shc) and compile with corresponding hbase/phoenix version. I have tried to use this without arvo format perfectly. /**

  • In option must be specified string with HBaseTableCatalog.tableCatalog
  • {{{
  • inputDF.
  • writeStream.
  • format("hbase").
  • option("checkpointLocation", checkPointProdPath).
  • options(Map("hbase.catalog->catalog)).
  • outputMode(OutputMode.Update()).
  • trigger(Trigger.ProcessingTime(30.seconds)).
  • start
  • }}} / For using arvo, please try the following. /*
  • In option must be specified string with HBaseTableCatalog.tableCatalog
  • {{{
  • inputDF.
  • writeStream.
  • format("hbase").
  • option("checkpointLocation", checkPointProdPath).
  • options(Map("hbase.schema_array"->schema_array,"hbase.schema_record"->schema_record, hbase.catalog->catalog)).
  • outputMode(OutputMode.Update()).
  • trigger(Trigger.ProcessingTime(30.seconds)).
  • start
  • }}} */

ddkongbb avatar Aug 10 '18 16:08 ddkongbb

Hello @sutugin .. I've implemented your solution. However data is not getting updated in HBase. It's not even throwing any exception too. Can you suggest anything in this regard?

swarup5s avatar Sep 17 '18 09:09 swarup5s

Hi @swarup5s ,if you give me the implementation code and how you use it, show me the logs, maybe we can find the problem together.

sutugin avatar Sep 18 '18 04:09 sutugin

Hi @sutugin thanks for your help. Appreciate it.

//this class is under ...org/apache/spark/sql/execution/streaming/ //path. I'm executing from IntelliJ.. package org.apache.spark.sql.execution.streaming

import org.apache.spark.sql.execution.datasources.hbase.Logging import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider} import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.{DataFrame, Row, SQLContext}

import org.apache.spark.sql.execution.datasources.hbase._

class HBaseSink(options: Map[String, String]) extends Sink with Logging { // String with HBaseTableCatalog.tableCatalog private val hBaseCatalog = options.get("hbasecat").map(_.toString).getOrElse("")

override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized { val df = data.sparkSession.createDataFrame(data.rdd, data.schema) df.write .options(Map(HBaseTableCatalog.tableCatalog->hBaseCatalog)) .format("org.apache.spark.sql.execution.datasources.hbase").save() } }

class HBaseSinkProvider extends StreamSinkProvider with DataSourceRegister { def createSink( sqlContext: SQLContext, parameters: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = { new HBaseSink(parameters) }

def shortName(): String = "hbase" }

/** *My code goes here------------------------------------------------------------------------------ */ //... //...

def catalog = s"""{ |"table":{"namespace":"default", "name":"hbase_table"}, |"rowkey":"key", |"columns":{ |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"}, |"subscriberName":{"cf":"subscriberInfo", "col":"Name", "type":"string"}, |"subscriberNumber":{"cf":"subscriberInfo", "col":"PhoneNumber", "type":"string"}, |"messageTemplate":{"cf":"messageInfo", "col":"template", "type":"string"}, |"lastTS":{"cf":"messageInfo", "col":"ts", "type":"String"} |} |}""".stripMargin

def withCatalog(cat: String): DataFrame = { spark.sqlContext .read .options(Map(HBaseTableCatalog.tableCatalog->cat)) .format("org.apache.spark.sql.execution.datasources.hbase") .load() } val BaseRecorddf = withCatalog(catalog) //record from HBase as Batch or normal Dataframe

val streamdf = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "server_addr") .option("subscribe", "topic_name") .option("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer") .option("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer") .option("auto.offset.reset","earliest") .option("group.id","realTimeStream") .option("enable.auto.commit",true: java.lang.Boolean) .load()

//streaming Dataframe here
val schemaStreamDf = streamdf
  .selectExpr("CAST(key AS STRING) as IMSI", "CAST(value AS STRING) as Loc")

//...

//some biz logic here and some join between the batch and streaming dataframe and finaldf is the streaming DF //which should be written back to the HBase in real time.

try{

    finaldfd.
      writeStream.
      format("org.apache.spark.sql.execution.streaming.HBaseSinkProvider").
      option("checkpointLocation", "some_path_here").
      option("hbasecat",catalog).
      outputMode(OutputMode.Update()).
      trigger(Trigger.ProcessingTime(some_seconds))
      .start()
      .awaitTermination()
    
  }
catch {
  case e: Exception => println(e)
}

I've done some changes. now some StreamingQueryException is thrown but not sure what is getting wrong being a novice. Here's the logs:

org.apache.spark.sql.streaming.StreamingQueryException: null === Streaming Query === Identifier: [id = a0997feb-efeb-4976-b8eb-5efaa3c1b8c9, runId = 6f4181f0-ffd4-4aa9-a0bc-475457e4e93e] Current Committed Offsets: {} Current Available Offsets: {KafkaSource[Subscribe[topic_name]]: {"topic_name":{"2":282,"1":305,"0":293}}}

Current State: ACTIVE Thread State: RUNNABLE

Logical Plan: Project [rowkey#75, subscriberName#1, subscriberNumber#2, messageTemplate#3, 1537292624 AS lastTS#82] +- Project [rowkey#11 AS rowkey#75, subscriberName#1, subscriberNumber#2, messageTemplate#3, lastTS#4] +- Filter (cast(cast(lastTS#4 as bigint) as double) < 1.536990223E9) +- Project [rowkey#11, subscriberName#1, subscriberNumber#2, messageTemplate#3, lastTS#4] +- Project [rowkey#11, subscriberName#1, subscriberNumber#2, messageTemplate#3, lastTS#4, Loc#46] +- Join Inner, (rowkey#11 = rowkey#45) :- Project [rowkey#0 AS rowkey#11, subscriberName#1, subscriberNumber#2, messageTemplate#3, lastTS#4] : +- Relation[rowkey#0,subscriberName#1,subscriberNumber#2,messageTemplate#3,lastTS#4] HBaseRelation(Map(catalog -> { "table":{"namespace":"default", "name":"hbase_table"}, "rowkey":"key", "columns":{ "rowkey":{"cf":"rowkey", "col":"key", "type":"string"}, "subscriberName":{"cf":"subscriberInfo", "col":"Name", "type":"string"}, "subscriberNumber":{"cf":"subscriberInfo", "col":"PhoneNumber", "type":"string"}, "messageTemplate":{"cf":"messageInfo", "col":"template", "type":"string"}, "lastTS":{"cf":"messageInfo", "col":"ts", "type":"String"} } }),None) +- Project [cast(key#30 as string) AS rowkey#45, cast(value#31 as string) AS Loc#46] +- StreamingExecutionRelation KafkaSource[Subscribe[topic_name]], [key#30, value#31, topic#32, partition#33, offset#34L, timestamp#35, timestampType#36]

On the other hand message is successfully written to the console.

swarup5s avatar Sep 18 '18 17:09 swarup5s

Hello @sutugin, first of all thank your your great help. I'm experiencing @swarup5s 's problem when i call:

data.sparkSession.createDataFrame(data.rdd, data.schema)

The same happens whenever i call something like data.rdd

I think the problem is outside your code, somewhere else. Maybe more spark related?

stefcorda avatar Oct 09 '18 21:10 stefcorda

Hi @sympho410! Unfortunately, I can't find the problem by a few lines of code, I need to debug and look for the cause... The only assumption - try to change the order of the columns in accordance with how you have specified in the scheme

sutugin avatar Oct 11 '18 09:10 sutugin

Hello @sutugin, I noticed just now you texted back! I managed to get past that problem thanks to another one of your commits:

 val schema = data.schema

    val res = data.queryExecution.toRdd.mapPartitions { rows =>
      val converter = CatalystTypeConverters.createToScalaConverter(schema)
      rows.map(converter(_).asInstanceOf[Row])
    }

    val df = data.sparkSession.createDataFrame(res, schema)

Now it works properly - Thank you again for your help :)

stefcorda avatar Oct 13 '18 09:10 stefcorda

Hello @sutugin and @sympho410 I am also working on a similar kind of problem and I want to make bulk put to HBase from structured spark streaming. I see the code above tries to does the but what I am not able to understand is the use of catalog here. It seems like a predefined schema kind of thing. but since Hbase is schema-less means I can add any new column as well in future so how can I fix a catalog prior? Can anyone of you explain me here what I am missing? and what exactly is the purpose and meaning of catalog is. Also, can anyone explain what is "5" here in this line-> HBaseTableCatalog.newTable -> "5" ? Any help is greatly appreciated.

Thanks in advance!

vibnimit avatar Jul 19 '19 09:07 vibnimit

Hello @sutugin and @sympho410 I am also working on a similar kind of problem and I want to make bulk put to HBase from structured spark streaming. I see the code above tries to does the but what I am not able to understand is the use of catalog here. It seems like a predefined schema kind of thing. but since Hbase is schema-less means I can add any new column as well in future so how can I fix a catalog prior? Can anyone of you explain me here what I am missing? and what exactly is the purpose and meaning of catalog is. Also, can anyone explain what is "5" here in this line-> HBaseTableCatalog.newTable -> "5" ? Any help is greatly appreciated.

Thanks in advance!

It seems to me - the meaning of the catalog is to properly structure the data for serialization and deserialization. The need to specify the scheme is a feature of the implementation of this library and is not tied to the structured streaming. You can try to work around these limitations by generating a schema on the fly, based on the schema of the data inside each butch, but you must be sure that all strings inside the Butch have the same schema or try to use foreach writer and get the schema for each row separately.

sutugin avatar Jul 23 '19 13:07 sutugin

Can anyone please provide a compiled jar with HbaseSink compiled in, I tried building the shc project but i get the error

HBaseRelation.scala:108: value foreach is not a member of Nothing
        hBaseConfiguration.foreach(_.foreach(e => conf.set(e._1, e._2)))
                                     ^

I tried implementing "hbaseSink" class in my project and used SHC in maven dependency, but it is now working, i get an error as

Queries with streaming sources must be executed with writeStream.start();

It would be very helpful if I can get the compiled jar . Thanks

omkarahane avatar Oct 05 '19 14:10 omkarahane

Can anyone please provide a compiled jar with HbaseSink compiled in, I tried building the shc project but i get the error

HBaseRelation.scala:108: value foreach is not a member of Nothing
        hBaseConfiguration.foreach(_.foreach(e => conf.set(e._1, e._2)))
                                     ^

I tried implementing "hbaseSink" class in my project and used SHC in maven dependency, but it is now working, i get an error as

Queries with streaming sources must be executed with writeStream.start();

It would be very helpful if I can get the compiled jar . Thanks

Try to build from my fork (https://github.com/sutugin/shc), though I have not updated it for a long time. Only in pom.xml specify the actual version of spark for you, for me it is <sparc.version>2.3.1< / sparc.version>

sutugin avatar Oct 08 '19 08:10 sutugin

@sutugin Thanks for replying, I'm working on databricks platform which hosts spark 2.4.3, so I have access to ".foreachbatch" api hence above is not needed anymore.

omkarahane avatar Oct 08 '19 09:10 omkarahane

@omkarahane Good idea, I already wrote someone about this method. https://github.com/hortonworks-spark/shc/pull/238#issuecomment-447698100

sutugin avatar Oct 08 '19 09:10 sutugin

@sutugin I'm still facing an issue, it is different form the one I mentioned above, here is the link where I have mentioned all the details, please see if you can help. https://stackoverflow.com/questions/58306725/not-able-to-write-data-to-hbase-using-spark-structured-streaming

Thanks.

omkarahane avatar Oct 10 '19 07:10 omkarahane

@sutugin I'm still facing an issue, it is different form the one I mentioned above, here is the link where I have mentioned all the details, please see if you can help. https://stackoverflow.com/questions/58306725/not-able-to-write-data-to-hbase-using-spark-structured-streaming

Thanks.

@omkarahane , try make "fat" jar with sbt dependency libraryDependencies += "com.hortonworks.shc" % "shc-core" % "1.1.0.3.1.2.1-1". I think this will solve the problem with "java.lang.NoClassDefFoundError"

sutugin avatar Oct 10 '19 08:10 sutugin

@sutugin I'm still facing an issue, it is different form the one I mentioned above, here is the link where I have mentioned all the details, please see if you can help. https://stackoverflow.com/questions/58306725/not-able-to-write-data-to-hbase-using-spark-structured-streaming Thanks.

@omkarahane , try make "fat" jar with sbt dependency libraryDependencies += "com.hortonworks.shc" % "shc-core" % "1.1.0.3.1.2.1-1". I think this will solve the problem with "java.lang.NoClassDefFoundError"

I tried running a the job with a fat jar which was created using maven, still the issue wasn't resolved. I guess the fat jars created with sbt and maven would almost be the same?

omkarahane avatar Oct 10 '19 09:10 omkarahane

@omkarahane, maybe this will help you https://github.com/hortonworks-spark/shc/issues/223#issuecomment-375111619

sutugin avatar Oct 10 '19 10:10 sutugin

@omkarahane, maybe this will help you #223 (comment)

@sutugin, Thanks a lot, you pointed me in the right direction, hbase jars were missing, I have added those jars and installed them as a library on the cluster so job has access to them, it solved my initial problem, but now I'm getting another exception:

java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods$.parse(Lorg/json4s/JsonInput;Z)Lorg/json4s/JsonAST$JValue;

This also seems to be a dependency issue, This is what I have tried,

  1. Uploaded the following jars: json4s-ast, json4s-core,json4s-jackson
  2. Versions tried, 3.4,3.5,3.6
  3. Put a maven dependency in my jar, built fat jar and uploaded it as library, so transitive dependencies can be satisfied.

Still getting the same error.

omkarahane avatar Oct 11 '19 06:10 omkarahane

@omkarahane, Similar problems are described here:

  • https://forums.databricks.com/questions/18306/xgboost4j-fails-to-save-on-52-runtime.html
  • https://github.com/dmlc/xgboost/issues/3924 Try to build shc with spark version 2.4 and use this jar in dependencies in your project

sutugin avatar Oct 11 '19 08:10 sutugin

@sutugin @merfill I add sutugin's scala to compile as merfill said.

But I get error I use Java

  • spark-2.3.2
  • hase-2.0.1
  • scala 2.11.12
  • jdk 1.8
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;

import java.io.IOException;
import java.io.Serializable;
import java.util.Properties;


public class KafkaStructStream implements Serializable {

    private String servers;
    private String jks;
    private String schema;

    public KafkaStructStream(String[] args) {
//        this.servers = args[0];
//        this.jks = args[1];
    }

    private Dataset<Row> initStructKafka() throws IOException {
        Properties prop = Config.getProp();
        this.schema = prop.getProperty("hbase.traffic.schema");
        SparkSession spark = SparkSession
                .builder()
                .appName("Kafka")
                .master("local[*]")
                .getOrCreate();
        return spark.readStream().format("kafka")
                .option("kafka.bootstrap.servers", prop.getProperty("kafka.broker.list"))
                .option("kafka.ssl.truststore.location", Config.getPath(Config.KAFKA_JKS))
//                .option("kafka.bootstrap.servers", this.servers)
//                .option("kafka.ssl.truststore.location", this.jks)
                .option("kafka.ssl.truststore.password", prop.getProperty("kafka.jks.passwd"))
                .option("kafka.security.protocol", "SSL")
                .option("kafka.ssl.endpoint.identification.algorithm", "")
                .option("startingOffsets", "latest")
//                .option("subscribe", kafkaProp.getProperty("kafka.topic"))

                .option("subscribe", "traffic")
                .load()
                .selectExpr("CAST(topic AS STRING)", "CAST(value AS STRING)");
    }

    private void run() {
        Dataset<Row> df = null;
        try {
            df = initStructKafka();
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
        df.printSchema();

        StructType trafficSchema = new StructType()
                .add("guid", DataTypes.StringType)
                .add("time", DataTypes.LongType)
                .add("end_time", DataTypes.LongType)
                .add("srcip", DataTypes.StringType)
                .add("srcmac", DataTypes.StringType)
                .add("srcport", DataTypes.IntegerType)
                .add("destip", DataTypes.StringType)
                .add("destmac", DataTypes.StringType)
                .add("destport", DataTypes.IntegerType)
                .add("proto", DataTypes.StringType)
                .add("appproto", DataTypes.StringType)
                .add("upsize", DataTypes.LongType)
                .add("downsize", DataTypes.LongType);

        Dataset<Row> ds = df.select(functions.from_json(df.col("value").cast(DataTypes.StringType), trafficSchema).as("data")).select("data.*");
        StreamingQuery query = ds.writeStream()
                .format("HBase.HBaseSinkProvider")
                .option("HBaseTableCatalog.tableCatalog", this.schema)
                .option("checkpointLocation", "/tmp/checkpoint")
                .start();

//        StreamingQuery query = ds.writeStream().format("console")
//                .trigger(Trigger.Continuous("2 seconds"))
//                .start();
        try {
            query.awaitTermination();
        } catch (StreamingQueryException e) {
            e.printStackTrace();
        }
    }


    public static void main(String[] args) {
        KafkaStructStream k = new KafkaStructStream(args);
        k.run();
    }

}

ERROR

19/11/20 15:35:09 ERROR MicroBatchExecution: Query [id = 3f3688bb-6c3d-45bc-ab33-23968069abc0, runId = 0346659e-cb5f-4ee2-919a-00ca124e1e3e] terminated with error
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
LogicalRDD [key#66, value#67, topic#68, partition#69, offset#70L, timestamp#71, timestampType#72], true

	at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:374)
	at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:37)
	at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:35)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
	at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:35)
	at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:51)
	at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:62)
	at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:60)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
	at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:2980)
	at org.apache.spark.sql.Dataset.rdd(Dataset.scala:2978)
	at HBase.HBaseSink.addBatch(HBaseSinkProvider.scala:14)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:475)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:473)
	at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:472)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
	at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
org.apache.spark.sql.streaming.StreamingQueryException: Queries with streaming sources must be executed with writeStream.start();;
LogicalRDD [key#66, value#67, topic#68, partition#69, offset#70L, timestamp#71, timestampType#72], true

=== Streaming Query ===
Identifier: [id = 3f3688bb-6c3d-45bc-ab33-23968069abc0, runId = 0346659e-cb5f-4ee2-919a-00ca124e1e3e]
Current Committed Offsets: {}
Current Available Offsets: {KafkaSource[Subscribe[traffic]]: {"traffic":{"0":118641202}}}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
Project [data#25.guid AS guid#27, data#25.time AS time#28L, data#25.end_time AS end_time#29L, data#25.srcip AS srcip#30, data#25.srcmac AS srcmac#31, data#25.srcport AS srcport#32, data#25.destip AS destip#33, data#25.destmac AS destmac#34, data#25.destport AS destport#35, data#25.proto AS proto#36, data#25.appproto AS appproto#37, data#25.upsize AS upsize#38L, data#25.downsize AS downsize#39L]
+- Project [jsontostructs(StructField(guid,StringType,true), StructField(time,LongType,true), StructField(end_time,LongType,true), StructField(srcip,StringType,true), StructField(srcmac,StringType,true), StructField(srcport,IntegerType,true), StructField(destip,StringType,true), StructField(destmac,StringType,true), StructField(destport,IntegerType,true), StructField(proto,StringType,true), StructField(appproto,StringType,true), StructField(upsize,LongType,true), StructField(downsize,LongType,true), cast(value#22 as string), Some(Asia/Shanghai), true) AS data#25]
   +- Project [cast(topic#9 as string) AS topic#21, cast(value#8 as string) AS value#22]
      +- StreamingExecutionRelation KafkaSource[Subscribe[traffic]], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]

	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
LogicalRDD [key#66, value#67, topic#68, partition#69, offset#70L, timestamp#71, timestampType#72], true

	at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:374)
	at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:37)
	at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:35)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
	at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:35)
	at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:51)
	at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:62)
	at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:60)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
	at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:2980)
	at org.apache.spark.sql.Dataset.rdd(Dataset.scala:2978)
	at HBase.HBaseSink.addBatch(HBaseSinkProvider.scala:14)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:475)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:473)
	at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:472)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
	at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
	... 1 more

631068264 avatar Nov 20 '19 07:11 631068264

Hi @631068264, if you build from my fork, try specifying the format: "org.apache.spark.sql.execution.streaming.HBaseStreamSinkProvider" or "hbase"

sutugin avatar Nov 21 '19 08:11 sutugin