azure-event-hubs-spark
azure-event-hubs-spark copied to clipboard
Missing events during write to event hub, unable to catch exceptions
- Spark version: 2.4.4
- spark-eventhubs artifactId and version: 2.3.14
We are trying to readStream from Event hubs, after the read part we are using the writeStream and foreachbatch, within the foreach batch we are doing some operations and writing the microbatch to another set of event hubs.
During the write part, we are missing few events randomly. We can surely say our processing is not dropping it because of the DQ check we have.
Another issue is, if we have given a wrong event hub name (which doesn't exist), the write part doesn't throw an exception and it just moves on to the next microbatch
Here is the sample code
val inputconnectionString = ConnectionStringBuilder("Endpoint=sb://xyz.servicebus.windows.net/;SharedAccessKeyName=SendAuthorizationRule;SharedAccessKey=Xyz=;EntityPath=TestInputEventHub").build
val inputehConf = EventHubsConf(inputconnectionString)
.setConsumerGroup("XYZ")
.setStartingPosition(EventPosition.fromEnqueuedTime("XYZ")
.setMaxEventsPerTrigger("XYZ")
.setReceiverTimeout("XYZ")
val reader = sparkSession
.readStream
.format("eventhubs")
.options(inputehConf.toMap)
.load()
val streamingQueryHandle = reader
.writeStream
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
logger.log("Starting StreamProcessor", isDebug = false)
EventStreamProcessor.process(batchDF, batchId, scdProviders)
}
.outputMode("update")
.option("checkpointLocation", AppConfigs.checkpointLocation)
.trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(AppConfigs.triggerIntervalDurationInSeconds)))
.start()
//For each batch executes this process
def process (batchDF: DataFrame, batchId: Long, scdProviders: mutable.Map[String, SCDProvider])
{
batchDF.persist()
val outputDF = <some operations>
val outputconnectionString = "Endpoint=sb://xyz.servicebus.windows.net/;SharedAccessKeyName=SendAuthorizationRule;SharedAccessKey=Xyz=;EntityPath=TestOutputEventHub"
val outputehConf = EventHubsConf(outputconnectionString)
try {
outputDF.toJSON.selectExpr("value as body")
.write
.format("eventhubs")
.options(eventHubsConfWrite.toMap)
.save()
}
catch {
case ex: Exception => {
println("Unable to save the files to EH " + ehConf.toMap)
ex.printStackTrace()
throw ex
}
}
}
I need to be able to catch the exception if there is some events not persisted to event hub during write. Also, Why don't i get an exception if i write to event hub that doesn't even exists.
Any help or pointers are appreciated.
Hello! The same happens to me in Python, in my case it happens in: write, read, writeStream and readStream.
The bug is probably in Scala, it doesn't launch events there and Python doesn't launch events either (according source code, Python depends Scala events)
With my team we build this code for the streaming process, we force the event with awaitTermination(). Maybe something like this will work in your batch process
try:
stream = read_sDF.writeStream \
.format("eventhubs") \
.options(**eventhubConfigurations) \
.option("checkpointLocation", eventhubCheckpointPath)
# start streaming
streamManager = stream.start()
# force error here, if any
streamManager.awaitTermination()
except StreamingQueryException as error:
if error.desc.startswith("Detected schema change"): # capture error according source code message
print("Schema changed, await...")
if streamManager.status['isTriggerActive']:
# force stop
streamManager.stop()
else:
# unrecognized error, raise exception
raise StreamingQueryException(error.desc, error.stackTrace)
Comments:
- The line "streamManager.awaitTermination()" in Databricks can make data displays disappear
- The line "streamManager.stop()" is necessary because the process doesn't update status (isTriggerActive).
Hope it helps
We already have this
streamingQueryHandle.awaitTermination()
also, @nyaghma why when we try this
connStr="Endpoint=sb://xyzEHN.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=WrongKey;EntityPath=NonExistingEH"
we don't get any errors when doing
...
.write
.format("eventhubs")