shc icon indicating copy to clipboard operation
shc copied to clipboard

Custom sink provider for structured streaming

Open sutugin opened this issue 6 years ago • 16 comments

What changes were proposed in this pull request?

Custom sink provider for using shc in structured streaming job. https://github.com/hortonworks-spark/shc/issues/205 For all HBase-related options must be set prefixed "hbase."

How was this patch tested?

inputDF.
   writeStream.
   queryName("hbase writer").
   format("hbase").
   option("checkpointLocation", checkPointProdPath).
   options(Map("hbase.schema_array"->schema_array,"hbase.schema_record"->schema_record, hbase.catalog->catalog)).
  outputMode(OutputMode.Update()).
  trigger(Trigger.ProcessingTime(30.seconds)).
   start

Run structured streaming job and write to HBase)))

sutugin avatar Apr 04 '18 09:04 sutugin

Thanks for this PR, @sutugin

cc: @dongjoon-hyun

weiqingy avatar Apr 05 '18 22:04 weiqingy

You forget that SHC suppoorts Avro schema . The user should be able to pass any key in options to define them.

sbarnoud avatar Apr 10 '18 09:04 sbarnoud

I propose for options to use:

class HBaseStreamSink(sqlContext: SQLContext,options: Map[String, String]) extends Sink  {

  val defaultFormat = "org.apache.spark.sql.execution.datasources.hbase"
  val specifiedHBaseParams =
    options
      .keySet
      .filter(_.toLowerCase(Locale.ROOT).startsWith("hbase."))
      .map { k => k.drop(6).toString -> options(k) }
      .toMap

  override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized {

    /** As per SPARK-16020 arbitrary transformations are not supported, but
      * converting to an RDD allows us to do magic.
      */

    val df = sqlContext.sparkSession.createDataFrame(data.rdd, data.schema)
    df.write
      .options(specifiedHBaseParams)
      .format(defaultFormat)
      .save()
  }
}

sbarnoud avatar Apr 12 '18 12:04 sbarnoud

Then, -) the sink doesn't updates Spark counters ... it should be done somewhere -) the sink short name is not registered And finally avro type is not working because the avro serializer swap's fields. It looks similar to https://issues.apache.org/jira/browse/SPARK-21402 I'm using the following avro schema:

{"namespace": "example.avro", "type": "record", "name": "DocumentPojo","fields": [
	{"name": "id", "type": "string"},
	{"name": "key",  "type": "string"},
	{"name": "last", "type": "boolean"},
	{"name": "columns", "type": {"type": "map", "values": "string"}}]
}

And i get the following exception:

java.lang.ClassCastException: java.lang.Boolean cannot be cast to scala.collection.immutable.Map
        at org.apache.spark.sql.execution.datasources.hbase.types.SchemaConverters$$anonfun$createConverterToAvro$6.apply(Avro.scala:281)
        at org.apache.spark.sql.execution.datasources.hbase.types.SchemaConverters$$anonfun$createConverterToAvro$7.apply(Avro.scala:304)
        at org.apache.spark.sql.execution.datasources.hbase.types.Avro.toBytes(Avro.scala:56)
        at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation$$anonfun$org$apache$spark$sql$execution$datasources$hbase$HBaseRelation$$convertToPut$1$1.apply(HBaseRelation.scala:221)
        at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation$$anonfun$org$apache$spark$sql$execution$datasources$hbase$HBaseRelation$$convertToPut$1$1.apply(HBaseRelation.scala:217)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

sbarnoud avatar Apr 12 '18 12:04 sbarnoud

I just declare the Avro schema in the alphabetical order of field names: and it works.

No idea where is the problem, but it has to be corrected.

{"namespace": "example.avro", "type": "record", "name": "DocumentPojo","fields": [
	{"name": "columns", "type": {"type": "map", "values": "string"}},
	{"name": "id", "type": "string"},
	{"name": "key",  "type": "string"},
	{"name": "last", "type": "boolean"}]
}

sbarnoud avatar Apr 12 '18 14:04 sbarnoud

@sbarnoud, Unfortunately I have never used avro and can't comment on it, the only thing I can assume is that in the incoming streaming dataset it is necessary to order the columns exactly as specified in the avro schema "the sun doesn't updates Spark counters ... it should be done somewhere " - what specific counters do you mean?

sutugin avatar Apr 12 '18 16:04 sutugin

Hi,

For counters:

            @Override
            public void onQueryProgress(QueryProgressEvent event) {
                log.info("QueryProgressEvent event :"+event.progress().numInputRows());
            }

QueryProgressEvent contains some counters like numInputRows ... that are not updated.

For Avro, I found the bug, but didn't succeed to test (my patch is not loaded first ...). The bug is because org.apache.spark.sql.execution.datasources.hbase.types.Avro doesn't use the same schema to serialize and to deserialize. Here is the code:

class Avro(f:Option[Field] = None) extends SHCDataType {

  def fromBytes(src: HBaseType): Any = {
    if (f.isDefined) {
      val m = AvroSerde.deserialize(src, f.get.exeSchema.get)
      val n = f.get.avroToCatalyst.map(_ (m))
      n.get
    } else {
      throw new UnsupportedOperationException(
        "Avro coder: without field metadata, 'fromBytes' conversion can not be supported")
    }
  }

  def toBytes(input: Any): Array[Byte] = {
    // Here we assume the top level type is structType
    if (f.isDefined) {
      val record = f.get.catalystToAvro(input)
      AvroSerde.serialize(record, f.get.schema.get)
    } else {
      throw new UnsupportedOperationException(
        "Avro coder: Without field metadata, 'toBytes' conversion can not be supported")
    }
  }
}

As you can see, the avro schema used to serialize is f.get.schema.get which is the dataset schema instead of f.get.exeSchema.get which is the user supplied one. That's why if the field names are not in the same order in BOTH serialization fails.

sbarnoud avatar Apr 12 '18 17:04 sbarnoud

@sbarnoud,

  • Spark structured streaming has his own internal mechanism (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reporting-metrics-programmatically-using-asynchronous-apis) and the user must decide whether he wants to track progress.

  • As for avro, I don't understand how my PR can solve the problem

sutugin avatar Apr 12 '18 19:04 sutugin

Hi

Sorry but metrics reporting doesn't work for me. I already configure my job to have them, and if i change my sink to parquet on the same stream, i get them. Did you try it ? Have you the same behavior ?

I didn't understand why you use pattern matching (and not startsWith) here:

private val hbaseSettings = parameters.filterKeys(
    _.toLowerCase matches hbaseOptionPrefix + "*") map {
    case (k, v) => (k.replace(hbaseOptionPrefix, ""), v)
}

If you really want to, please use the right pattern: hbase\..* \. => to match a dot .* => to match anythings

scala>  "hbase.x" matches "hbase.*"
res5: Boolean = true

scala>  "hbasex" matches "hbase.*"
res6: Boolean = true

scala> "hbase.x" matches "hbase\\..*"
res7: Boolean = true

scala> "hbase.x" matches "hbase\\."
res8: Boolean = false

scala>

I will open an issue for Avro.

sbarnoud avatar Apr 13 '18 09:04 sbarnoud

https://stackoverflow.com/questions/48466019/number-of-input-rows-in-spark-structured-streaming-with-custom-sink

That's the reason why metrics are not updated.

sbarnoud avatar Apr 13 '18 11:04 sbarnoud

This code works, and avoids the current comment "allows us to do magic" ;-):

class HBaseStreamSink(sqlContext: SQLContext,options: Map[String, String]) extends Sink with Logging {

  val defaultFormat = "org.apache.spark.sql.execution.datasources.hbase"

  val specifiedHBaseParams =
    options
      .keySet
      .filter(_.toLowerCase(Locale.ROOT).startsWith("hbase."))
      .map { k => k.drop(6).toString -> options(k) }
      .toMap

  override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized {
    // use a local variable to make sure the map closure doesn't capture the whole DataFrame
    val schema = data.schema
    val res = data.queryExecution.toRdd.mapPartitions { rows =>
        val converter = CatalystTypeConverters.createToScalaConverter(schema)
        rows.map(converter(_).asInstanceOf[Row])
      }

    val df = sqlContext.sparkSession.createDataFrame(res,schema)
    df.write
      .options(specifiedHBaseParams)
      .format(defaultFormat)
      .save()
  }
}

18/04/13 14:26:11 INFO StreamExecution: Streaming query made progress: { "id" : "3197f58d-9dc6-4d02-b081-e8fed6d5b57f", "runId" : "1d313972-6150-48cf-b448-c32411f4d421", "name" : "KafkaToHBaseStream", "timestamp" : "2018-04-13T12:26:02.788Z", "numInputRows" : 2,

sbarnoud avatar Apr 13 '18 12:04 sbarnoud

@sbarnoud, Hi! Thank you, great work!

sutugin avatar Apr 13 '18 13:04 sutugin

Hi,

I have on my own version added the support of "short names" -) shc -) shcstream

Could you validate those short names ?

core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister

org.apache.spark.sql.execution.datasources.hbase.DefaultSource
org.apache.spark.sql.execution.streaming.HBaseStreamSinkProvider

org/apache/spark/sql/execution/datasources/hbase/HBaseRelation.scala

private[sql] class DefaultSource extends RelationProvider with CreatableRelationProvider with DataSourceRegister {

  override def shortName(): String = "shc" // TODO Validate this name

org/apache/spark/sql/execution/streaming/HBaseStreamSinkProvider.scala

class HBaseStreamSinkProvider
  extends DataSourceRegister
    with StreamSinkProvider {
  override def createSink(sqlContext: SQLContext,
                          parameters: Map[String, String],
                          partitionColumns: Seq[String],
                          outputMode: OutputMode): Sink = {
    new HBaseStreamSink(sqlContext,parameters)
  }

  override def  shortName(): String = "shcstream" // TODO Validate this name
}

sbarnoud avatar Dec 13 '18 09:12 sbarnoud

@sbarnoud, hi! I don't mind, or you suggest changing names in PR?

sutugin avatar Dec 15 '18 12:12 sutugin

No, i let you decide the names but i sent you the correct code. Both batch and streaming short names must be defined and different and both classes must be in the ressources. As far as i see it isn’t currently the case.

sbarnoud avatar Dec 16 '18 21:12 sbarnoud

I think with the advent of spark 2.4, this is no longer relevant, foreachBatch will solve all the problems of custom sinks (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch)

sutugin avatar Dec 17 '18 01:12 sutugin