shc
shc copied to clipboard
Custom sink provider for structured streaming
What changes were proposed in this pull request?
Custom sink provider for using shc in structured streaming job. https://github.com/hortonworks-spark/shc/issues/205 For all HBase-related options must be set prefixed "hbase."
How was this patch tested?
inputDF.
writeStream.
queryName("hbase writer").
format("hbase").
option("checkpointLocation", checkPointProdPath).
options(Map("hbase.schema_array"->schema_array,"hbase.schema_record"->schema_record, hbase.catalog->catalog)).
outputMode(OutputMode.Update()).
trigger(Trigger.ProcessingTime(30.seconds)).
start
Run structured streaming job and write to HBase)))
Thanks for this PR, @sutugin
cc: @dongjoon-hyun
You forget that SHC suppoorts Avro schema . The user should be able to pass any key in options to define them.
I propose for options to use:
class HBaseStreamSink(sqlContext: SQLContext,options: Map[String, String]) extends Sink {
val defaultFormat = "org.apache.spark.sql.execution.datasources.hbase"
val specifiedHBaseParams =
options
.keySet
.filter(_.toLowerCase(Locale.ROOT).startsWith("hbase."))
.map { k => k.drop(6).toString -> options(k) }
.toMap
override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized {
/** As per SPARK-16020 arbitrary transformations are not supported, but
* converting to an RDD allows us to do magic.
*/
val df = sqlContext.sparkSession.createDataFrame(data.rdd, data.schema)
df.write
.options(specifiedHBaseParams)
.format(defaultFormat)
.save()
}
}
Then, -) the sink doesn't updates Spark counters ... it should be done somewhere -) the sink short name is not registered And finally avro type is not working because the avro serializer swap's fields. It looks similar to https://issues.apache.org/jira/browse/SPARK-21402 I'm using the following avro schema:
{"namespace": "example.avro", "type": "record", "name": "DocumentPojo","fields": [
{"name": "id", "type": "string"},
{"name": "key", "type": "string"},
{"name": "last", "type": "boolean"},
{"name": "columns", "type": {"type": "map", "values": "string"}}]
}
And i get the following exception:
java.lang.ClassCastException: java.lang.Boolean cannot be cast to scala.collection.immutable.Map
at org.apache.spark.sql.execution.datasources.hbase.types.SchemaConverters$$anonfun$createConverterToAvro$6.apply(Avro.scala:281)
at org.apache.spark.sql.execution.datasources.hbase.types.SchemaConverters$$anonfun$createConverterToAvro$7.apply(Avro.scala:304)
at org.apache.spark.sql.execution.datasources.hbase.types.Avro.toBytes(Avro.scala:56)
at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation$$anonfun$org$apache$spark$sql$execution$datasources$hbase$HBaseRelation$$convertToPut$1$1.apply(HBaseRelation.scala:221)
at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation$$anonfun$org$apache$spark$sql$execution$datasources$hbase$HBaseRelation$$convertToPut$1$1.apply(HBaseRelation.scala:217)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
I just declare the Avro schema in the alphabetical order of field names: and it works.
No idea where is the problem, but it has to be corrected.
{"namespace": "example.avro", "type": "record", "name": "DocumentPojo","fields": [
{"name": "columns", "type": {"type": "map", "values": "string"}},
{"name": "id", "type": "string"},
{"name": "key", "type": "string"},
{"name": "last", "type": "boolean"}]
}
@sbarnoud, Unfortunately I have never used avro and can't comment on it, the only thing I can assume is that in the incoming streaming dataset it is necessary to order the columns exactly as specified in the avro schema "the sun doesn't updates Spark counters ... it should be done somewhere " - what specific counters do you mean?
Hi,
For counters:
@Override
public void onQueryProgress(QueryProgressEvent event) {
log.info("QueryProgressEvent event :"+event.progress().numInputRows());
}
QueryProgressEvent contains some counters like numInputRows ... that are not updated.
For Avro, I found the bug, but didn't succeed to test (my patch is not loaded first ...). The bug is because org.apache.spark.sql.execution.datasources.hbase.types.Avro doesn't use the same schema to serialize and to deserialize. Here is the code:
class Avro(f:Option[Field] = None) extends SHCDataType {
def fromBytes(src: HBaseType): Any = {
if (f.isDefined) {
val m = AvroSerde.deserialize(src, f.get.exeSchema.get)
val n = f.get.avroToCatalyst.map(_ (m))
n.get
} else {
throw new UnsupportedOperationException(
"Avro coder: without field metadata, 'fromBytes' conversion can not be supported")
}
}
def toBytes(input: Any): Array[Byte] = {
// Here we assume the top level type is structType
if (f.isDefined) {
val record = f.get.catalystToAvro(input)
AvroSerde.serialize(record, f.get.schema.get)
} else {
throw new UnsupportedOperationException(
"Avro coder: Without field metadata, 'toBytes' conversion can not be supported")
}
}
}
As you can see, the avro schema used to serialize is f.get.schema.get which is the dataset schema instead of f.get.exeSchema.get which is the user supplied one. That's why if the field names are not in the same order in BOTH serialization fails.
@sbarnoud,
-
Spark structured streaming has his own internal mechanism (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reporting-metrics-programmatically-using-asynchronous-apis) and the user must decide whether he wants to track progress.
-
As for avro, I don't understand how my PR can solve the problem
Hi
Sorry but metrics reporting doesn't work for me. I already configure my job to have them, and if i change my sink to parquet on the same stream, i get them. Did you try it ? Have you the same behavior ?
I didn't understand why you use pattern matching (and not startsWith) here:
private val hbaseSettings = parameters.filterKeys(
_.toLowerCase matches hbaseOptionPrefix + "*") map {
case (k, v) => (k.replace(hbaseOptionPrefix, ""), v)
}
If you really want to, please use the right pattern: hbase\..* \. => to match a dot .* => to match anythings
scala> "hbase.x" matches "hbase.*"
res5: Boolean = true
scala> "hbasex" matches "hbase.*"
res6: Boolean = true
scala> "hbase.x" matches "hbase\\..*"
res7: Boolean = true
scala> "hbase.x" matches "hbase\\."
res8: Boolean = false
scala>
I will open an issue for Avro.
https://stackoverflow.com/questions/48466019/number-of-input-rows-in-spark-structured-streaming-with-custom-sink
That's the reason why metrics are not updated.
This code works, and avoids the current comment "allows us to do magic" ;-):
class HBaseStreamSink(sqlContext: SQLContext,options: Map[String, String]) extends Sink with Logging {
val defaultFormat = "org.apache.spark.sql.execution.datasources.hbase"
val specifiedHBaseParams =
options
.keySet
.filter(_.toLowerCase(Locale.ROOT).startsWith("hbase."))
.map { k => k.drop(6).toString -> options(k) }
.toMap
override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized {
// use a local variable to make sure the map closure doesn't capture the whole DataFrame
val schema = data.schema
val res = data.queryExecution.toRdd.mapPartitions { rows =>
val converter = CatalystTypeConverters.createToScalaConverter(schema)
rows.map(converter(_).asInstanceOf[Row])
}
val df = sqlContext.sparkSession.createDataFrame(res,schema)
df.write
.options(specifiedHBaseParams)
.format(defaultFormat)
.save()
}
}
18/04/13 14:26:11 INFO StreamExecution: Streaming query made progress: { "id" : "3197f58d-9dc6-4d02-b081-e8fed6d5b57f", "runId" : "1d313972-6150-48cf-b448-c32411f4d421", "name" : "KafkaToHBaseStream", "timestamp" : "2018-04-13T12:26:02.788Z", "numInputRows" : 2,
@sbarnoud, Hi! Thank you, great work!
Hi,
I have on my own version added the support of "short names" -) shc -) shcstream
Could you validate those short names ?
core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
org.apache.spark.sql.execution.datasources.hbase.DefaultSource
org.apache.spark.sql.execution.streaming.HBaseStreamSinkProvider
org/apache/spark/sql/execution/datasources/hbase/HBaseRelation.scala
private[sql] class DefaultSource extends RelationProvider with CreatableRelationProvider with DataSourceRegister {
override def shortName(): String = "shc" // TODO Validate this name
org/apache/spark/sql/execution/streaming/HBaseStreamSinkProvider.scala
class HBaseStreamSinkProvider
extends DataSourceRegister
with StreamSinkProvider {
override def createSink(sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
new HBaseStreamSink(sqlContext,parameters)
}
override def shortName(): String = "shcstream" // TODO Validate this name
}
@sbarnoud, hi! I don't mind, or you suggest changing names in PR?
No, i let you decide the names but i sent you the correct code. Both batch and streaming short names must be defined and different and both classes must be in the ressources. As far as i see it isn’t currently the case.
I think with the advent of spark 2.4, this is no longer relevant, foreachBatch will solve all the problems of custom sinks (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch)