logisland icon indicating copy to clipboard operation
logisland copied to clipboard

Support for structured stream checkpointing on azure filesystem

Open mathieu-rossignol opened this issue 2 years ago • 3 comments

Also used to store the current kafka offset while writing to the structured stream sink

mathieu-rossignol avatar Aug 24 '21 14:08 mathieu-rossignol

Usage with currently commited code in the branch:

In StructuredStream configuration, put something like (replace <blabla> with relevant strings):

spark.base.checkpoint.path: wasbs://<myContainer>@<myStorageAccount>.blob.core.windows.net/spark-checkpointing

Then in KafkaStreamProcessingEngine global configuration, set for instance the matching account key using a custom spark configuration key like:

spark.custom.config.fs.azure.account.key.<myStorageAccount>.blob.core.windows.net: +H5IuOtsebY7fO6QyyntmlRLe3G8Rv0jcye6kzE2Wz4NrU3IdB4Q8ocJY2ScY9cQrJNXxUg2WbYJPndMuQWUCQ==

Generic custom spark configuration keys are introduced in this branch and allow to call sparkConfig.set("xxx.yyy.zzz","someValue") using dynamic property of KafkaStreamProcessingEngine with the form:

spark.custom.config.xxx.yyy.zzz: someValue

See this page for examples on the custom fs.azure.* configuration keys (account key, sas key...)

image

Currently identified problem is that although needed azure clients libs azure-storage-2.0.0.jar and hadoop-azure-2.7.0.jar are well packaged in the spark 2 engine fat jar as well as references to the needed hadoop FileSystem implementation classes in the META-INF/services/org.apache.hadoop.fs.FileSystem (org.apache.hadoop.fs.azure.NativeAzureFileSystem, org.apache.hadoop.fs.azure.NativeAzureFileSystem$Secure), the implementation classes are not found. It seems that the classpath defined by logisland.sh puts spark libs before logisland ones and once META-INF/services/org.apache.hadoop.fs.FileSystem delivered in a jar of the used spark installation (SPARK_HOME) is found, dynamic loading (ServiceLoader) system uses it and cannot find our packaged implementation of azure filesystem which results in a java.io.IOException: No FileSystem for scheme: wasbs exception.

The current workaround I found is to put in every jars dir of the spark cluster executor nodes the needed jars (azure-storage-2.0.0.jar and hadoop-azure-2.7.0.jar). I will try to see if I can tweak logisland.sh to change the classpath or find another solution...

mathieu-rossignol avatar Aug 24 '21 16:08 mathieu-rossignol

merged in release/1.4.0, but requires the workaround

mathieu-rossignol avatar Sep 01 '21 16:09 mathieu-rossignol