faas-flow icon indicating copy to clipboard operation
faas-flow copied to clipboard

Make AWS states Language interpreter

Open s8sg opened this issue 5 years ago • 2 comments

The main idea here is to make use of AWS State Language to define pipelines

s8sg avatar Feb 20 '19 02:02 s8sg

i can take this if you provide info for first steps. As i understand i need to parse aws step defintions in json and create Workflow with needed Dags.

vtolstov avatar Jul 25 '19 22:07 vtolstov

Great. We might not support all the features of step functions, but for now we can create a dag definition based on what we support. One amazon state language specification example I was looking at

{
  "Comment": "An example of the Amazon States Language using a choice state.",
  "StartAt": "FirstState",
  "States": {
    "FirstState": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789012:function:FUNCTION_NAME",
      "Next": "ChoiceState"
    },
    "ChoiceState": {
      "Type" : "Choice",
      "Choices": [
        {
          "Variable": "$.foo",
          "NumericEquals": 1,
          "Next": "FirstMatchState"
        },
        {
          "Variable": "$.foo",
          "NumericEquals": 2,
          "Next": "SecondMatchState"
        }
      ],
      "Default": "DefaultState"
    },

    "FirstMatchState": {
      "Type" : "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789012:function:OnFirstMatch",
      "Next": "NextState"
    },

    "SecondMatchState": {
      "Type" : "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789012:function:OnSecondMatch",
      "Next": "NextState"
    },

    "DefaultState": {
      "Type": "Fail",
      "Error": "DefaultStateError",
      "Cause": "No Matches!"
    },

    "NextState": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789012:function:FUNCTION_NAME",
      "End": true
    }
  }
}

The corresponding generated dag contructs in faas-flow will be like

       dag := flow.Dag()
       dag.Node("FirstState").Apply("FUNCTION_NAME")
       conditionalDags := dag.ConditionalBranch("ChoiceState",
                []string{"1", "2", "default"}, // possible conditions
                func(response []byte) []string {
                        result := &struct{
                                  Foo int      `json:"foo"`
                        }{} 
                        json.Load(response, &result)
                        switch result.Foo {
                              case 1:
                                      return []string{fmt.Sprintf("%s", result. Foo)}
                              case 2:
                                      return []string{fmt.Sprintf("%s", result. Foo)}
                        } 
                        return []string{ "default" }
                },
       )
       conditionalDags["1"].Node("FirstMatchState").Apply("OnFirstMatch")
       conditionalDags["2"].Node("SecondMatchState").Apply("OnSecondMatch")
       conditionalDags["default"].Node("DefaultState").Modify(func(data []byte) ([]byte, error) {
                return data, faasflow.AWSDefaultStateError
       })
       dag.Node("NextState").Apply("FUNCTION_NAME")
       dag.Edge("FirstState", "ChoiceState")
       dag.Edge("ChoiceState", "NextState")

The goal is to provide an sdk function like

dag  = flow.GenerateDag(faasflow.AwsStateLang, defnition)
flow.SetDag(dag)

I can't quite get my head around that how will we create the flow defnition from the parse data, it seems there will be a lot of callbacks and datastructure involved. I guess best to start with simple example, like only dag no conditions. Its great if you can give it a try

s8sg avatar Jul 26 '19 03:07 s8sg