faas-flow
faas-flow copied to clipboard
Make AWS states Language interpreter
The main idea here is to make use of AWS State Language to define pipelines
i can take this if you provide info for first steps. As i understand i need to parse aws step defintions in json and create Workflow with needed Dags.
Great. We might not support all the features of step functions, but for now we can create a dag definition based on what we support. One amazon state language specification example I was looking at
{
"Comment": "An example of the Amazon States Language using a choice state.",
"StartAt": "FirstState",
"States": {
"FirstState": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:FUNCTION_NAME",
"Next": "ChoiceState"
},
"ChoiceState": {
"Type" : "Choice",
"Choices": [
{
"Variable": "$.foo",
"NumericEquals": 1,
"Next": "FirstMatchState"
},
{
"Variable": "$.foo",
"NumericEquals": 2,
"Next": "SecondMatchState"
}
],
"Default": "DefaultState"
},
"FirstMatchState": {
"Type" : "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:OnFirstMatch",
"Next": "NextState"
},
"SecondMatchState": {
"Type" : "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:OnSecondMatch",
"Next": "NextState"
},
"DefaultState": {
"Type": "Fail",
"Error": "DefaultStateError",
"Cause": "No Matches!"
},
"NextState": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:FUNCTION_NAME",
"End": true
}
}
}
The corresponding generated dag contructs in faas-flow will be like
dag := flow.Dag()
dag.Node("FirstState").Apply("FUNCTION_NAME")
conditionalDags := dag.ConditionalBranch("ChoiceState",
[]string{"1", "2", "default"}, // possible conditions
func(response []byte) []string {
result := &struct{
Foo int `json:"foo"`
}{}
json.Load(response, &result)
switch result.Foo {
case 1:
return []string{fmt.Sprintf("%s", result. Foo)}
case 2:
return []string{fmt.Sprintf("%s", result. Foo)}
}
return []string{ "default" }
},
)
conditionalDags["1"].Node("FirstMatchState").Apply("OnFirstMatch")
conditionalDags["2"].Node("SecondMatchState").Apply("OnSecondMatch")
conditionalDags["default"].Node("DefaultState").Modify(func(data []byte) ([]byte, error) {
return data, faasflow.AWSDefaultStateError
})
dag.Node("NextState").Apply("FUNCTION_NAME")
dag.Edge("FirstState", "ChoiceState")
dag.Edge("ChoiceState", "NextState")
The goal is to provide an sdk function like
dag = flow.GenerateDag(faasflow.AwsStateLang, defnition)
flow.SetDag(dag)
I can't quite get my head around that how will we create the flow defnition from the parse data, it seems there will be a lot of callbacks and datastructure involved. I guess best to start with simple example, like only dag no conditions. Its great if you can give it a try