logisland
logisland copied to clipboard
Support for structured stream checkpointing on azure filesystem
Also used to store the current kafka offset while writing to the structured stream sink
Created branch feature/LOGISLAND-587-Support-for-structured-stream-checkpointing-on-azure-filesystem to implement this feature.
Usage with currently commited code in the branch:
In StructuredStream configuration, put something like (replace <blabla> with relevant strings):
spark.base.checkpoint.path: wasbs://<myContainer>@<myStorageAccount>.blob.core.windows.net/spark-checkpointing
Then in KafkaStreamProcessingEngine global configuration, set for instance the matching account key using a custom spark configuration key like:
spark.custom.config.fs.azure.account.key.<myStorageAccount>.blob.core.windows.net: +H5IuOtsebY7fO6QyyntmlRLe3G8Rv0jcye6kzE2Wz4NrU3IdB4Q8ocJY2ScY9cQrJNXxUg2WbYJPndMuQWUCQ==
Generic custom spark configuration keys are introduced in this branch and allow to call sparkConfig.set("xxx.yyy.zzz","someValue")
using dynamic property of KafkaStreamProcessingEngine with the form:
spark.custom.config.xxx.yyy.zzz: someValue
See this page for examples on the custom fs.azure.*
configuration keys (account key, sas key...)
Currently identified problem is that although needed azure clients libs azure-storage-2.0.0.jar
and hadoop-azure-2.7.0.jar
are well packaged in the spark 2 engine fat jar as well as references to the needed hadoop FileSystem
implementation classes in the META-INF/services/org.apache.hadoop.fs.FileSystem
(org.apache.hadoop.fs.azure.NativeAzureFileSystem
, org.apache.hadoop.fs.azure.NativeAzureFileSystem$Secure
), the implementation classes are not found. It seems that the classpath defined by logisland.sh
puts spark libs before logisland ones and once META-INF/services/org.apache.hadoop.fs.FileSystem
delivered in a jar of the used spark installation (SPARK_HOME
) is found, dynamic loading (ServiceLoader) system uses it and cannot find our packaged implementation of azure filesystem which results in a java.io.IOException: No FileSystem for scheme: wasbs
exception.
The current workaround I found is to put in every jars
dir of the spark cluster executor nodes the needed jars (azure-storage-2.0.0.jar
and hadoop-azure-2.7.0.jar
). I will try to see if I can tweak logisland.sh
to change the classpath or find another solution...
merged in release/1.4.0, but requires the workaround