luigiext-gcloud icon indicating copy to clipboard operation
luigiext-gcloud copied to clipboard

*luigi-gcloud* is an luigi extension that enables full support for the Google Cloud Platform. Making it possible to do complex orchestration between different Google Big Data jobs.

Go to the Github wiki for an extensive explanation about every aspect of luigi-gcloud:

https://github.com/alexvanboxel/luigiext-gcloud/wiki

#Luigi-GCloud

Congratulation - your wrote a bunch of Cloud DataFlows, Spark jobs or had your arsenal of Pig and Hive script migrated to the Google Cloud Platform. Now it’s time to bring them in production. Have you thought about reliability, data availability, retries after failure?

Workflow management should not be an afterthought, it should be part of your development process. But don't start writing a lot of code, rely on a workflow manager.

Luigi (https://github.com/spotify/luigi) is a workflow manager where you describe your workflow as a Directed Acyclic Graph. What makes Luigi unique is that it doesn't use some DSL or XML to describe it, but Python code. Write a class to describe the task at hand, and all it's dependencies. Luigi will do the rest in making sure your task will run.

Luigi-GCloud Examples DAG

Go quickly to the [[Getting Started]] to run the examples.

What is Luigi-GCloud

Luigi-gloud is an extension on luigi that gives a consistent integration to all of Google Cloud's Big Data services. Here is an example of

class DataProcSparkCopy(DataProcSparkTask):
    day = luigi.DateParameter()

    def output(self):
        return GCSFlagTarget(self.day.strftime('gs://bucket/out/%Y/%m/%d'))

    def job_file(self):
        return "/luigi-spark/build/libs/luigi-spark-copy-1.0.jar"

    def name(self):
        return "spark-copy-${year}-${month}-${day}"

    def main(self):
        return "luigi.gcloud.spark.Copy"

    def args(self):
        return [
            self.day.strftime('gs://bucket/in/%Y/%m/%d'),
            self.day.strftime('gs://bucket/out/%Y/%m/%d')
        ]

    def variables(self):
        return {
            # used in the name customisation
            'year': self.day.strftime('%Y'),
            'month': self.day.strftime('%m'),
            'day': self.day.strftime('%d'),
        }

Unique Features

Google Cloud Platform services

luigi-gcloud extends Luigi with transparant integration of the following Google Cloud Platform services:

  • DataProc
  • DataFlow
  • BigQuery
  • Storage

DataProc jobs generated by luigi-gcloud

Cloud Logging integration

Logging to Cloud Logging