feat: added support for AsyncAPI v3
- Added support for AsyncAPI v3
- Also cleaned up the spec to make it very clear that step-object can be oneOf openapi-step-object or asyncapi-step-object or workflow-step-object
- For AsyncAPI we really need support for timeout, fork and join. However, these are also useful for OpenAPI so added it at the base step object.
- For OpenAPI we need at least one successCriteria but for AsyncAPI it can be optional.
While we've tried to incorporate as much as possible from #270 not everything is covered.
Let's have you join Nick, Mike and myself next week and the next few weeks for some follow up on how this works. We started meeting to discuss the best approach forward as well. As this would fundamentally alter the spec we'll definitely need a bit of time to go through what you have here and what we are looking in to.. see what might overlap, etc. Shoot me a msg on slack with your email so I can add you to our conversations there.
Here is a simple example just for your reference.
arazzo: "1.0.1"
info:
title: "Workflow for placing an Order"
version: "1.0.0"
sourceDescriptions:
- name: "OrderApi"
url: "./openapi/order.yaml"
type: "openapi"
- name: "AsyncOrderApi"
url: "./asyncapi/order.yaml"
type: "asyncapi"
workflows:
- workflowId: "PlaceOrder"
inputs:
required:
- "CreateOrder"
type: "object"
properties:
CreateOrder:
required:
- "orderRequestId"
- "productId"
- "quantity"
type: "object"
properties:
orderRequestId:
type: "string"
productId:
type: "integer"
quantity:
type: "integer"
steps:
- stepId: "CreateOrder"
operationId: "$sourceDescriptions.AsyncOrderApi.PlaceOrder"
stepType: "asyncapi"
action: "send"
parameters:
- name: "orderRequestId"
in: "header"
value: "$inputs.CreateOrder.orderRequestId"
requestBody:
payload:
productId: "$inputs.CreateOrder.productId"
quantity: "$inputs.CreateOrder.quantity"
outputs:
orderRequestId: "$message.header.orderRequestId"
- stepId: "ConfirmOrder"
operationId: "$sourceDescriptions.AsyncOrderApi.PlaceOrder"
stepType: "asyncapi"
action: "receive"
correlationId: "$steps.CreateOrder.outputs.orderRequestId"
timeout: 6000
outputs:
orderId: "$message.body.orderId"
- stepId: "GetOrderDetails"
operationId: "$sourceDescriptions.OrderApi.getOrder"
parameters:
- name: "orderId"
in: "path"
value: "$steps.ConfirmOrder.outputs.orderId"
successCriteria:
- condition: "$statusCode == 200"
components: {}
As discussed, here is an example with fork and join
arazzo: "1.0.1"
info:
title: "Workflow for placing an Order"
version: "1.0.0"
sourceDescriptions:
- name: "OrderApi"
url: "./openapi/order.yaml"
type: "openapi"
- name: "AsyncOrderApi"
url: "./asyncapi/order.yaml"
type: "asyncapi"
workflows:
- workflowId: "PlaceOrder"
inputs:
required:
- "CreateOrder"
type: "object"
properties:
CreateOrder:
required:
- "orderRequestId"
- "productId"
- "quantity"
type: "object"
properties:
orderRequestId:
type: "string"
productId:
type: "integer"
quantity:
type: "integer"
steps:
- stepType: "asyncapi"
stepId: "ConfirmOrder"
operationId: "$sourceDescriptions.AsyncOrderApi.PlaceOrder"
action: "receive"
correlationId: "$inputs.CreateOrder.orderRequestId"
fork: true # Converts ConfirmOrder to a Non Blocking Step
timeout: 6000
outputs:
orderId: "$message.body.orderId"
- stepType: "asyncapi"
stepId: "CreateOrder"
operationId: "$sourceDescriptions.AsyncOrderApi.PlaceOrder"
action: "send"
parameters:
- name: "orderRequestId"
in: "header"
value: "$inputs.CreateOrder.orderRequestId"
requestBody:
payload:
productId: "$inputs.CreateOrder.productId"
quantity: "$inputs.CreateOrder.quantity"
- stepId: "GetOrderDetails"
operationId: "$sourceDescriptions.OrderApi.getOrder"
join: # Waits for ConfirmOrder to complete or timeout
- ConfirmOrder # Can also be 'true' to join/wait all
parameters:
- name: "orderId"
in: "path"
value: "$steps.ConfirmOrder.outputs.orderId"
successCriteria:
- condition: "$statusCode == 200"
components: {}
Thanks @nashjain - I will try to propose changes to the specification based on the examples later this week. There will no doubt be a few finer grained points that we'll need to discuss
Taking the above examples into consideration here's a proposal outlining the specification changes to support AsyncAPI in v1.1.0. I've also provided an updated example which complies to the this proposal below. If we're in general agreement, I'll update this PR to reflect the proposal.
Summary of the specification changes to add AsyncAPI support (not exhaustive)
Source Description Object
type
Updated Allowed Values:
openapiarazzoasyncapi(new)
Description/Rationale:
The addition of asyncapi enables referencing AsyncAPI 3.0.0 definitions and allows workflows to model message-driven systems in addition to traditional request/response patterns.
Runtime Expressions
$message
Type: object Available In: Steps with kind: asyncapi and action: receive Description: Provides access to the body and headers of a message received via an AsyncAPI-defined channel. This runtime expression enables event-driven workflows to assert success or extract outputs from message data.
Example:
successCriteria:
- condition: $message.payload != null
- condition: $message.header.correlationId == 'abc123'
$elapsedTime (optional - nice to have)
Type: integer
Available In: After a step completes
Description:
Represents the total time (in milliseconds) that a step took to execute. Can be used in successCriteria and onFailure.criteria to enforce nuanced timeout behaviour or performance related onFailure / onSuccess behaviour.
Example:
successCriteria:
- condition: $elapsedTime < 5000
Step Object
kind
Type: string Allowed Values: openapi, asyncapi, workflow Description: Indicates the type of step. Required when the document contains multiple sourceDescriptions of different types. Enables correct interpretation of fields like operationId, action, etc. We'll clearly explain to implementors how to infer this if omitted and what defaults should be. The generally thought process is that this is good moving forward but we can't make mandatory in minor release. It should be present for those looking to express async types of steps
action
Type: string
Allowed Values: send, receive
Required If: kind is asyncapi
Description: |
Indicates whether the step is sending (publishing) or receiving (subscribing to) a message on a channel described in an AsyncAPI document. Also enables explicit usage mode of webhooks and/or callbacks described in OpenAPI.
timeout
Type: integer
Format: milliseconds
** Description:** |
Defines the maximum allowed execution time for a step in milliseconds. If the step does not complete within the specified duration, it is considered a failure. The default behavior upon timeout is to terminate the workflow equivalent to using an end failure action.
Example:
timeout: 5000
Timeout behavior can be overridden by defining an onFailure block with criteria based on $elapsedTime. This would allow retry behaviour etc. if needed.
Example:
onFailure:
- name: retryTimeout
type: retry
retryLimit: 3
retryAfter: 1000
criteria:
- condition: $elapsedTime >= 5000
correlationId
Type: string (value runtime expression or literal)
Description:
Used in asyncapi steps to associate messages across send/receive operations. Typically references an ID passed in the message header or payload to correlate requests and responses.
Example:
correlationId: $inputs.CreateOrder.orderRequestId
dependsOn
Type: string array
Description:
Specifies a list of step identifiers that must complete (or be waited for) before the current step can begin execution. This enables modelling of explicit execution dependencies within a workflow. Note about forking: we leaning towards not having an explicit fork property in asyncapi kind steps. Instead we can assume that any type of such step with action: receive is by default non-blocking (or asynchronous) in nature. Other steps can leverage dependsOn to ensure the joining type of behaviour.
Example:
dependsOn:
- $steps.ConfirmOrder
Step Execution Semantics
A step is considered successful only when all successCriteria are satisfied. If any condition fails, the step is deemed to have failed, and onFailure logic (if defined) is evaluated and executed.
There is no dedicated timeout field. Instead, timeout behavior must be expressed using $elapsedTime within the successCriteria.
Example:
successCriteria:
- condition: $statusCode == 200
- condition: $elapsedTime < 5000
onFailure:
- name: handleTimeout
type: end
criteria:
- condition: $elapsedTime >= 5000
Updated Example:
arazzo: 1.1.0
info:
title: Workflow for placing an Order
version: 1.0.0
sourceDescriptions:
- name: OrderApi
url: ./openapi/order.yaml
type: openapi
- name: AsyncOrderApi
url: ./asyncapi/order.yaml
type: asyncapi
workflows:
- workflowId: PlaceOrder
inputs:
required:
- CreateOrder
type: object
properties:
CreateOrder:
required:
- orderRequestId
- productId
- quantity
type: object
properties:
orderRequestId:
type: string
productId:
type: integer
quantity:
type: integer
steps:
- kind: asyncapi
stepId: ConfirmOrder
operationId: $sourceDescriptions.AsyncOrderApi.PlaceOrder
action: receive # Non Blocking Step by default
timeout: 6000
correlationId: $inputs.CreateOrder.orderRequestId
successCriteria:
- condition: $message.payload != null
outputs:
orderId: $message.body.orderId
- kind: asyncapi
stepId: CreateOrder
operationId: $sourceDescriptions.AsyncOrderApi.PlaceOrder
action: send
parameters:
- name: orderRequestId
in: header
value: $inputs.CreateOrder.orderRequestId
requestBody:
payload:
productId: $inputs.CreateOrder.productId
quantity: $inputs.CreateOrder.quantity
- stepId: GetOrderDetails
operationId: $sourceDescriptions.OrderApi.getOrder
dependsOn:
- $steps.ConfirmOrder
parameters:
- name: orderId
in: path
value: $steps.ConfirmOrder.outputs.orderId
successCriteria:
- condition: $statusCode == 200
components: {}
Thanks @frankkilcommins The proposal mostly looks good to me. I just need sometime to think through a couple of items. I'm currently in Australia. Next week, once I'm back we could jump on a call to discuss and close it.
@nashjain We discussed a bit about the removal of fork: true and kind: async implicitly indicates a fork, so removing that and then using dependsOn instead of join since we have dependsOn in the spec already elsewhere. Also the nature of a "fire and forget" (dont need response) vs "fire and wait for a response" which basically assumes if a dependsOn isn't indicating a given async kind that has a correlation id (or maybe thats not even needed) that it would indicate a fire/forget scenario. What do you think? If you can join the next meeting we can discuss on that call that would be great.
Do we even need kind on the step level? We have the specific source description encoded in the operationId field, why can't we just look it up there?
operationId: "$sourceDescriptions.AsyncOrderApi.PlaceOrder"
But I also have a more general question/concern. I'm not clear about tooling support requirements.
Provided AsyncAPI spec supports so many different protocols (websockets, kafka, mqtt, grpc, sns, sqs, etc, etc) would tooling be expected to support all of those? Or some subset? Or what?
My concern is we may have almost no support from tooling as I don't even understand how "receive" can be implemented for some of those protocols.
Do we even need
kindon the step level? We have the specific source description encoded in the operationId field, why can't we just look it up there?
The proposal for kind is indeed an optional affordance that may:
- simplify/improve validation and schema constructs
- improve readability of Arazzo documents (for those wanting to read the YAML/JSON)
- cater for extensibility of other step types (tbd if applicable for 1.1.0):
human/agent(in-the-loop steps)wait/delay(temporal control steps)
It's not set in stone at this point and as you mention the type of API document can be used via the sourceDescription referenced by the operationId or operationPath. The value of keeping it may come down to how tooling and authors prefer to work, we’re open to feedback here.
But I also have a more general question/concern. I'm not clear about tooling support requirements.
Provided AsyncAPI spec supports so many different protocols (websockets, kafka, mqtt, grpc, sns, sqs, etc, etc) would tooling be expected to support all of those? Or some subset? Or what?
Arazzo itself is agnostic to the specific protocols that can be modelled within AsyncAPI. Tooling implementations may choose to support a subset of protocols depending on use case or runtime environment. We can look to state a documented set of "Recommended" protocols for tooling advertising Arazzo AsyncAPI support.
Off the top of my head (not final), something like:
| Protocol (or format) | Fit for Arazzo | Notes |
|---|---|---|
| Kafka, AMQP, MQTT, NATS, WebSockets | Recommended | High interoperability, strong JSON alignment, stable delivery semantics |
| SNS, SQS, SSE | Partial | Supported but may require additional setup (e.g. polling for SQS) |
| Others (e.g. STOMP, Pulsar, Mercure) | Not currently in scope |
To help perhaps tease out some of the further details, I've created a repo with some examples. Would be great for others to chime in and/or review/enrich the examples. See https://github.com/frankkilcommins/arazzo-examples for details.
@frankkilcommins — +1 on kind.
The proposal for kind is indeed an optional affordance that may:
simplify/improve validation and schema constructs
I disagree. This requires more validation in fact and makes it worse. Now every tool should add some checks to ensure kind matches type? and provide a human readable error, etc.
See example below.
arazzo: 1.1.0
info:
title: Workflow for placing an Order
version: 1.0.0
sourceDescriptions:
- name: OrderApi
url: ./openapi/order.yaml
type: openapi
- name: AsyncOrderApi
url: ./asyncapi/order.yaml
type: asyncapi
steps:
- kind: openapi # ❌ ooops, mismatch here between the source description `type` and `kind`, what do we do here?
stepId: ConfirmOrder
operationId: $sourceDescriptions.AsyncOrderApi.PlaceOrder
action: receive # Non Blocking Step by default
timeout: 6000
correlationId: $inputs.CreateOrder.orderRequestId
successCriteria:
- condition: $message.payload != null
outputs:
orderId: $message.body.orderId
It just adds unnecessary duplication and also inconsistency:
- using
typein source description - using
kindin step
improve readability of Arazzo documents (for those wanting to read the YAML/JSON)
Docs generators can display it nicely.
cater for extensibility of other step types (tbd if applicable for 1.1.0): human / agent (in-the-loop steps) wait / delay (temporal control steps)
This makes sense in general but then those steps won't be linked to any source descriptions so let's consider it later when those extensibility is introduced
@frankkilcommins — +1 on kind.
@mikeschinkel, any pros/cons you have in mind?
@RomanHotsiy — My +1 came from being on the bi-weekly Arazzo call with @frankkilcommins and @kevinduffey where we discussed this and @frankkilcommins explained the rationale which I remember seemed logical and useful and where the three of us agreed, though I do not remember the specifics.
I will let @frankkilcommins elaborate again, and/or maybe @kevinduffey can chime in.
If you are available Wednesday Nov 26th 7pm EET you could join the next call if you like.
@frankkilcommins @kevinduffey @mikeschinkel I've gone through the updates suggested by @frankkilcommins and it all makes sense to me. Good to go from my point of view. Once we agree on this, I can update the PR and also show a working demo of this spec with Specmatic in a couple of days.
Thanks @nashjain
Regarding kind, there are pros and cons, and I think it would be better to leave the addition of kind to when we're adding support for human in the loop or other kinds of steps (e.g., some folks have been asking for mcp steps too). I also think that having kind with options like openapi or asyncapi seems to be at the wrong level of granularity. IMHO, it would be more valuable to indicate something like api, workflow, human-in-loop, agent-in-loop, mcp etc.
To help evaluation, i've created another branch of the examples, which does not have the kind step property.
https://github.com/frankkilcommins/arazzo-examples/tree/without-kind/v1.1.0-prep/async-api-examples
cc @RomanHotsiy
I'm OK to drop kind, however, just want to call out 2 main advantages from my point of view:
- As a tool author, makes my implementation logic simpler (I'm not hypothetically saying this, I've built a parse, workflow testing and mocking tool for Arazzo. We started without
kindand added it as part of refactoring to simplify our code)- Yes, I agree it adds one more thing to check, but when someone makes a mistake as highlighted above, we still need to deal with it. So just skipping
kinddoes not remove the need for validation and human readable error messages.
- Yes, I agree it adds one more thing to check, but when someone makes a mistake as highlighted above, we still need to deal with it. So just skipping
- As a human reader of the spec, I don't need to deduce the step type by looking at source or presence of action. Just lot more straight forward to grok what is going on.
declarativeOVERimperative?
(I prefer calling it stepType to be consistent with the type declaration in the sourceDescriptions)
@frankkilcommins @kevinduffey @ndenny - as per our meeting on Nov 26th, I've updated the schema. Main changes:
- Removed stepType/kind and instead made action mandatory for asyncapi set. This is used to distinguish the asyncapi step.
- Replaced fork/join with dependsOn for better workflow management