zenml icon indicating copy to clipboard operation
zenml copied to clipboard

Spark Integration

Open bcdurak opened this issue 3 years ago • 0 comments

Hey everyone and welcome to the long-awaited Spark PR.

With this PR, we go over a new integration with ZenML and you can think of it as our first steps toward the world of distributed programming.

CAUTION: This PR already has the docker configuration branch feature/ENG-1071-docker-flexibility merged.

Summary

Within the integration, you will find two distinct groups of implementations:

  • Materializers: SparkDataFrameMaterializer and SparkModelMaterializers
  • Step Operator: SparkStepOperator (base) and KubernetesSparkStepOperator

While materializers are quite straightforward and support the PySpark DataFrame API and the models in their machine learning library MLLib respectively, the step operators are a bit more complicated. They are responsible for launching a Spark job based on the given step and there are a few things to consider here.

Let's talk about the configuration first. Under the base class, you will find 4 instance parameters. - master is the master URL for the cluster. You might see different schemes for different cluster managers which are supported by Spark like Mesos, YARN, or Kubernetes. Within the context of this PR, the implementation supports Kubernetes as a cluster manager. - deploy_mode can either be 'cluster' (default) or 'client' and it decided where the driver node of the application will run. - submit_args is the JSON string of a dict, which will be used to define additional params if required (Spark has quite a lot of different parameters, so including them, all in the step operator was not implemented).

For Kubernetes, there are also some additional important parameters: - kubernetes_namespace is the namespace under which the driver and executor pods will run. - kubernetes_service_account is the service account that will be used by various Spark components (to create and watch the pods). - docker_parent_image (which originally comes from the PipelineDockerImageBuilder base class) indicates the name of a base image that has Spark enabled.

In addition to these parameters, the launch method of the step operator gets a DockerConfiguration and a ResourceConfiguration. Utilizing all of this, the overall configuration happens in 4 base methods:

  • _resource_configuration which translates our configuration object to Spark's resource configuration.
  • _backend_configuration which is responsible for cluster-manager-specific configuration.
  • _io_configuration is one of the critical ones. Even though we have materializers, Spark might require additional packages and configuration to work with a specific filesystem. For this implementation, I have implemented a way for Spark to recognize that it is operating with an S3 artifact store and configure itself accordingly. While this works on S3 at the moment, this also means that other artifact store flavors might be problematic if left without any additional configuration.
  • _additional_configuration takes the submit args and utilizes them to create a further configuration.

Once the configuration is completed, _launch_spark_job comes into play. This takes the completed config object and is responsible to run it on the given master URL with the specified deploy_mode. In the base class, this is achieved by using spark-submit as using pyspark makes us use spark-shell which does not support cluster as a deploy-mode.

All the configuration methods and the launch method are extendible. In fact, the KubernetesSparkStepOperator is implemented by just overwriting the _backend_configuration method of the base class.

Possible areas for improvement

  • IMPORTANT: This is a bit challenging to explain but I will give it a shot here. In an ideal case, when you are submitting an application to Spark, you have various ways to define additional packages which are required within the environment where Spark is running. As I was using an S3 artifact store, I quickly found out that this was the case for me. When I tried to install these packages (through various means) in the cluster mode, they only got installed to the driver pods but not the executor pods, which ended up failing my jobs. As a solution, I figured out that I can install these packages myself and create the base image using Spark's docker-image-tool locally (which copies over your local Spark installation along with the packages). I would very much like to avoid this solution and not force people to do what I did if they want to make it work with any artifact store which requires additional packages.
  • Once we figure out how to parse it, we can add a new ResourceConfiguration to the implementation for Spark.
  • Adding some integration tests

Small TODOs

  • Write the corresponding docs pages
  • Finishing up the docstrings
  • Finishing up the README of the example
  • Detect whether Spark is installed
  • Update the integration table

Pre-requisites

Please ensure you have done the following:

  • [X] I have read the CONTRIBUTING.md document.
  • [ ] If my change requires a change to docs, I have updated the documentation accordingly.
  • [ ] If I have added an integration, I have updated the integrations table and the corresponding website section.
  • [ ] I have added tests to cover my changes.

Types of changes

  • [ ] Bug fix (non-breaking change which fixes an issue)
  • [X] New feature (non-breaking change which adds functionality)
  • [ ] Breaking change (fix or feature that would cause existing functionality to change)
  • [ ] Other (add details above)

bcdurak avatar Aug 10 '22 15:08 bcdurak