Feature: Add Support for AsyncAPI v2/v3 as Source Definition
Adding support for AsyncAPI definitions would extend the types of workflows that can be described with Arazzo, as many organizations are now combining Request/Response APIs (OpenAPI) with Event-Driven architectures (AsyncAPI).
The latest version of AsyncAPI is v3, but since v2 is still the most widely adopted version, the intention is to provide support for both versions as feasible, and prioritize v3 only when supporting both becomes impractical.
Required Changes to Arazzo Specification
- Add
asyncapias a source definition type. - Extend the meaning of
workflow.step.operationIdto include pointing to an AsyncAPIoperationId(either v2 or v3): - Extend the meaning of
workflow.step.parameters.in:headershould also include AsyncAPI message headers.pathshould include AsyncAPIchannel.parameters, as they are similar to OpenAPIpathparameters (or consider adding achannelvalue for disambiguation).
- Runtime expressions already include support for
$message.bodyand$message.headerthis would be used for context in successCriterias
Additional Considerations for Broker-Intermediated APIs
While AsyncAPI and OpenAPI are similar, and it’s tempting to use the operationId in AsyncAPI, there is a key difference: broker-mediated APIs are reciprocal (publishing/sending on one side is subscribing/receiving on the other).
- Sometimes, an API might only document one side’s perspective. For example:
- A service that publishes events might document its "publish" operations.
- However, it might not document the "subscribe" operation that consumers would use to receive these events (consumers just use the "publish" operation documentation and tooling).
- Teams and organizations may model this symmetry differently, and sometimes the "inverse" operation may not be defined at all.
To support this, we would need an alternative way to reference the reciprocal operation.
Additional Changes to Arazzo Specification
When the desired operationId is not documented in the existing AsyncAPI, we can support referencing the channel instead of the operation and specify the reciprocal action (send|receive).
- Add
workflow.step.channelas a fixed field pointing to an AsyncAPI channel: - Add
workflow.step.actionwith supported valuessend|receive.
The combination of channel+action would be mutually exclusive with operationId, with operationId being the preferred option when available.
Meaning of AsyncAPI Steps
Send/Publish
A workflow step pointing to a Send/Publish operation will send a message with the workflow.step.requestBody.
workflow.step.parameters will also be used to populate AsyncAPI message.headers and channel.parameters.
The only meaningful successCriteria is "OK, message accepted," so it may be omitted.
This action is non-blocking and has no outputs; it will immediately continue to the next step, either onSuccess or onFailure.
This is an example of a workflow step sending a message:
- stepId: "send_order_received"
operationId: "$sourceDescriptions.OrdersAsyncAPI.onOrderEvent"
requestBody:
contentType: "application/json"
payload: |-
{
"id": "{$steps.place_order.outputs.orderId}",
"orderTime": "{$steps.place_order.outputs.order.orderTime}",
"status": "RECEIVED",
"customerDetails": {$steps.fetch_customer_details.outputs.customer},
"restaurantDetails": {$steps.fetch_restaurant_details.outputs.restaurant},
"orderItems": [
{$inputs.menuItem}
]
}
onSuccess:
- name: "goto_wait_for_downstream"
type: "goto"
stepId: "wait_RestaurantsStatusChannel_accepted"
Receive/Subscribe
This is a blocking action: the workflow will wait until successCriteria is met or it times out.
In the following example, in the wait_KitchenOrdersStatusChannel_accepted step, the workflow would "wait until a message is received where the message’s body contains a customerOrderId that matches the orderId of the previous step’s output."
- stepId: "wait_KitchenOrdersStatusChannel_accepted"
channel: "$sourceDescriptions.RestaurantsOpenAPI.KitchenOrdersStatusChannel"
action: "receive"
successCriteria:
- condition: "$message.body.customerOrderId == $steps.place_order.outputs.orderId"
This step can include outputs using the contexts $message.body and $message.header
Support for Parallel Invocation: Forking and Joining
When using non-blocking steps, there may be a need for parallel invocation, involving forking and joining.
Parallel Invocation and Looping can also be applied to OpenAPI Request/Response steps.
Because of the complexity of this, we could address this in a separate issue for further discussion.
@ivangsa thank you for the very thoughtful proposal. Below I'll aim to process the proposal, add comments/remarks, seek clarification, or acknowledge the suggestion.
General Assumptions:
The latest version of AsyncAPI is v3, but since v2 is still the most widely adopted version, the intention is to provide support for both versions as feasible, and prioritize v3 only when supporting both becomes impractical.
Acknowledge - we should strive to support both unless we prove that impractical.
High-level changes to the Arazzo Specification:
Add
asyncapias a source definition type.
Acknowledge - this is as expected.
Extend the meaning of workflow.step.operationId to include pointing to an AsyncAPI operationId (either v2 or v3)
Acknowledge - this is as expected.
Extend the meaning of
workflow.step.parameters.in:
headershould also include AsyncAPI message headers.
Acknowledge - this is as expected.
Extend the meaning of
workflow.step.parameters.in:
- path should include AsyncAPI
channel.parameters, as they are similar to OpenAPI path parameters (or consider adding a channel value for disambiguation).
I have a preference for a new channel value to be explicit but others are welcome to share their preferences too and we'll work towards one of these options.
Runtime expressions already include support for $message.body and $message.header this would be used for context in successCriterias
Acknowledge. These are being removed in 1.0.1 for hygiene purposes, so this would involve adding them back in and reestablishing the text and construct of the Runtime Expressions to support messages.
Dealing with reciprocal pub/sub scenarios when only one side is documented
When the desired operationId is not documented in the existing AsyncAPI, we can support referencing the channel instead of the operation and specify the reciprocal action (send|receive). Add
workflow.step.channelas a fixed field pointing to an AsyncAPI channel
To follow the pattern used for OpenAPI, this might be more applicable as mutually exclusive construct of a channelId (v3) and channelPath (v2)
Add workflow.step.action with supported values send|receive
Do we have any opinions on the choice of send|receive v publish|subscribe in this context? Are there pros/cons to the nomenclature choice?
The combination of channel+action would be mutually exclusive with operationId, with operationId being the preferred option when available.
Acknowledge - if we introduce this (or channelId / channelPath) they would be mutually exclusive with operationId and we'd clarify recommendations.
Remark: Generally in Arazzo we have a goal to be as deterministic as possible and to not make assumptions, we rely on the underlying API descriptions to express what is possible. If we are to go down a path allowing workflows to be described across omitted parts of an AsyncAPI description are we asking ourselves of trouble in the long run. Will this cause challenges for tooling w.r.t. validations/parsing OR is it an undisputed given that the channels always express both the ability to send and receive? I would like some further input from the AsyncAPI community/core team on this. Albeit laborious to enforce authors/providers to document both sides of the coin, perhaps that's the only way we can be sure what's possible. Will we also have to make assumptions on message schemas / parameters etc?
Step and Messages - Send/Publish
A workflow step pointing to a Send/Publish operation will send a message with the
workflow.step.requestBody.
Acknowledge - this is as expected.
workflow.step.parameters will also be used to populate AsyncAPI
message.headersandchannel.parameters.
Acknowledge - this is as expected.
The only meaningful successCriteria is "OK, message accepted," so it may be omitted.
It would be good practice to specify the criteria and assuming a HTTP status code will be return, it can be set (especially if you want to handle onSuccess or onFailure flows)
This action is non-blocking and has no outputs; it will immediately continue to the next step, either onSuccess or onFailure.
I assume if no onSuccess is described, then the next sequential step is what we continue with.
Step and Messages - Receive/Subscribe
What is described makes sense but I am wondering would we benefit from an ability to set a wait/timeout?
Since most of the replies are just acknowledgments, here are the two main items:
Do we have any opinions on the choice of send|receive v publish|subscribe in this context? Are there pros/cons to the nomenclature choice?
I would prefer send|receive since it is the terminology used in the latest AsyncAPI v3. This keeps it simple and consistent. (For workflows, I would choose wait instead of receive because that better reflects what a workflow does. However, I believe it would be wiser to stick to send|receive.)
Remark: Generally in Arazzo we have a goal to be as deterministic as possible and to not make assumptions, we rely on the underlying API descriptions to express what is possible. If we are to go down a path allowing workflows to be described across omitted parts of an AsyncAPI description are we asking ourselves of trouble in the long run.
I understand your concern. This is a long-standing source of discussion and confusion for broker-based APIs, and there are many differing approaches and opinions: documenting both ends, documenting only from the provider's point of view, or only from the consumer's point of view.
And it varies wildly how different teams and orgs manage this... This is why I propose supporting both approaches.
we rely on the underlying API descriptions to express what is possible
Even if both ends are described in their respective APIs, these APIs don’t necessarily describe "what is possible"; they simply describe what an specific application is doing.
For instance, consider an "Order Updates Channel." The existence of an API documenting the "publish" operation does not necessarily mean any other application should be publishing to, or writing a workflow for, that channel.
- The official recomended approach from the core-team for AsyncAPI v3 is for each application to document all the operations the application perform on each channel, so following this recommendation both ends would be documented.
This means that if a new application wants to consume a message that is already documented, it should write a new AsyncAPI definition to document that operation (which is something internal, is not an actual API) and it requires crossing a $ref to the original definition, which may be authenticated.
This approach mixes "public API information" (what others can do with your API) with internal information (what your application does with other apps' APIs). This undermines the use of API definitions for self-service and discovery. And there is no standardized way to differentiate between public APIs and internal ones.
This is problematic, that's why there are different options and approaches
- Also with AsyncAPI v3 there is also the "official" possibility to define only channels and messages (operations being optional) so an organization may choose to have only channels described with AsyncAPI
OR is it an undisputed given that the channels always express both the ability to send and receive?
I don't think this is an "undisputed given", I believe in some cases there are teams/orgs that publish messages to a channel that can not be used to subscribe... (ex. publishing to a message broker via an http bridge, or an internal process dispatches messages from one channel to another) Althought I never seen this personaly, I think in these particular cases there would be an API definition for the "reciprocal" action otherwise it would not be discoverable/usable.. (what would be the use for a channel that can only be publishe or subscribed?)
My worry is that if we do only support one way we may be leaving valid options/appraches behind.
Thinking about 'human-in-the-loop' #353 should perhaps be considered as part of any proposals here too
Hi, I haven't had the time to champion this feature. If anyone is interested in taking it over, feel free to pick it up! Otherwise, I think is better closing this issue until someone can drive it forward.
Still looking in to it.. may reopen later but thank you for all that you did so far with it.
I'm going to reopen as this is something we want to drive forward :-) Just a timing challenge. Thanks for your initial inputs into this @ivangsa
I mentioned in the human-in-the-loop issue but think it might work here as well.. we could start to work on this using the arazzo extension mechanism to avoid breaking/changing the spec at this time. I had come up with 4 possible scenarios for async use, there are likely more.
1. Fire and Forget Pattern
Description: Send an asynchronous message without waiting for any response. The workflow continues immediately after sending.
Use Cases:
- Event notifications (user signed up, order placed)
- Logging/auditing events
- Triggering background processes
- Broadcasting updates to multiple consumers
Key Characteristics:
- Non-blocking execution
- No response expected
- No correlation needed
- Fastest pattern, lowest complexity
Example Scenario: After creating an order, send an "OrderCreated" event to notify inventory, shipping, and analytics services, then immediately proceed to show confirmation to the user.
arazzo: 1.0.1
info:
title: Async Order Processing
version: 1.0.0
sourceDescriptions:
- name: orderAsyncAPI
url: https://example.com/asyncapi.yaml
type: openapi # Still use openapi type for now
x-is-asyncapi: true # Extension to indicate AsyncAPI
x-asyncapi-version: "3.0.0"
workflows:
- workflowId: processOrder
steps:
- stepId: sendOrderEvent
operationId: placeholder # Required field, ignored when x-async used
x-async-operation:
type: send
channel: "orders/created"
operationId: "$sourceDescriptions.orderAsyncAPI.publishOrderCreated"
fire-and-forget: true
requestBody:
contentType: application/json
payload:
orderId: $inputs.orderId
timestamp: $inputs.timestamp
# No successCriteria needed for fire-and-forget
onSuccess:
- name: continue
type: goto
stepId: nextStep
2. Async Request with Deferred Response Pattern
Description: Send an asynchronous request, continue with other work, then wait for the response at a later step in the workflow.
Use Cases:
- Long-running operations (file processing, report generation)
- External service integrations with variable response times
- Operations where you can do useful work while waiting
- Optimizing workflow performance by parallelizing independent tasks
Key Characteristics:
- Correlation required between request and response
- Flexible positioning of wait step
- Can timeout if response doesn't arrive
- Allows workflow optimization
Example Scenario: Submit a fraud check request, continue processing the order details, then wait for fraud check results before final approval.
steps:
- stepId: sendOrderRequest
operationId: placeholder
x-async-operation:
type: send
channel: "orders/requests"
correlationId: "$inputs.orderId"
x-correlation-key: "orderId" # Field in message to match
requestBody:
contentType: application/json
payload:
orderId: $inputs.orderId
items: $inputs.items
outputs:
correlationId: $x-async.correlationId
- stepId: doOtherWork
operationId: someRestApiCall
# ... regular sync operations
- stepId: waitForOrderConfirmation
operationId: placeholder
x-async-operation:
type: receive
channel: "orders/confirmations"
timeout: 30 # seconds
correlation:
id: $steps.sendOrderRequest.outputs.correlationId
match: "$message.body.orderId"
successCriteria:
- condition: $x-async.messageReceived == true
- condition: $message.body.status == 'confirmed'
outputs:
confirmation: $message.body
receivedAt: $message.timestamp
3. Parallel Aggregation Pattern
Description: Send multiple async requests in parallel, then collect and aggregate all responses before proceeding.
Use Cases:
- Gathering data from multiple services
- Price comparisons across vendors
- Availability checks across locations
- Building composite responses from microservices
Key Characteristics:
- Multiple simultaneous async operations
- Wait strategies (all/any/quorum)
- Timeout handling for slow responders
- Result aggregation logic
Example Scenario: Check product availability across 5 warehouses simultaneously, collect all responses within 10 seconds, then select the optimal fulfillment location.
steps:
# Each async operation is its own step with a tag
- stepId: checkInventory
operationId: placeholder
x-async-operation:
type: send
channel: "inventory/check"
correlationId: "inv-${{inputs.orderId}}"
x-track-for-aggregation: "availability-check"
requestBody:
contentType: application/json
payload:
sku: $inputs.productSku
quantity: $inputs.requestedQty
orderId: $inputs.orderId
- stepId: checkWarehouse2
operationId: placeholder
x-async-operation:
type: send
channel: "warehouse2/check"
correlationId: "wh2-${{inputs.orderId}}"
x-track-for-aggregation: "availability-check"
requestBody:
contentType: application/json
payload:
sku: $inputs.productSku
quantity: $inputs.requestedQty
orderId: $inputs.orderId
- stepId: checkWarehouse3
operationId: placeholder
x-async-operation:
type: send
channel: "warehouse3/check"
correlationId: "wh3-${{inputs.orderId}}"
x-track-for-aggregation: "availability-check"
requestBody:
contentType: application/json
payload:
sku: $inputs.productSku
quantity: $inputs.requestedQty
orderId: $inputs.orderId
# Aggregate all operations with matching tag
- stepId: aggregateAvailability
operationId: placeholder
x-async-aggregate:
wait-for-group: "availability-check"
timeout: 10
strategy: "all" # or "any" or "quorum: 2"
channels:
- channel: "inventory/response"
correlationMatch: "inv-${{inputs.orderId}}"
outputAs: "mainInventory"
- channel: "warehouse2/response"
correlationMatch: "wh2-${{inputs.orderId}}"
outputAs: "warehouse2"
- channel: "warehouse3/response"
correlationMatch: "wh3-${{inputs.orderId}}"
outputAs: "warehouse3"
successCriteria:
- condition: $x-async-aggregate.completed == true
outputs:
# Raw responses from each source
mainInventory: $x-async-aggregate.responses.mainInventory
warehouse2Stock: $x-async-aggregate.responses.warehouse2
warehouse3Stock: $x-async-aggregate.responses.warehouse3
# Computed aggregate values
totalAvailable: |
${{
x-async-aggregate.responses.mainInventory.available +
x-async-aggregate.responses.warehouse2.available +
x-async-aggregate.responses.warehouse3.available
}}
# Find best warehouse (most stock)
bestWarehouse: |
${{
x-async-aggregate.responses.mainInventory.available >=
Math.max(x-async-aggregate.responses.warehouse2.available,
x-async-aggregate.responses.warehouse3.available)
? "main"
: x-async-aggregate.responses.warehouse2.available >=
x-async-aggregate.responses.warehouse3.available
? "warehouse2"
: "warehouse3"
}}
# Detailed availability array
availabilityDetails:
- location: "main"
available: $x-async-aggregate.responses.mainInventory.available
leadTime: $x-async-aggregate.responses.mainInventory.leadTimeDays
- location: "warehouse2"
available: $x-async-aggregate.responses.warehouse2.available
leadTime: $x-async-aggregate.responses.warehouse2.leadTimeDays
- location: "warehouse3"
available: $x-async-aggregate.responses.warehouse3.available
leadTime: $x-async-aggregate.responses.warehouse3.leadTimeDays
# Metadata about the aggregation
aggregationMetadata:
completedAt: $x-async-aggregate.completedAt
responsesReceived: $x-async-aggregate.receivedCount
timedOut: $x-async-aggregate.timedOut
failedCorrelations: $x-async-aggregate.failed
# Use the aggregated results
- stepId: selectFulfillmentLocation
operationId: reserveInventory
parameters:
- name: warehouse
in: query
value: $steps.aggregateAvailability.outputs.bestWarehouse
- name: quantity
in: query
value: $inputs.requestedQty
requestBody:
contentType: application/json
payload:
orderId: $inputs.orderId
sku: $inputs.productSku
availabilitySnapshot: $steps.aggregateAvailability.outputs.availabilityDetails
4. Post-Workflow Callback Pattern
Description: Initiate async operations that will complete after the workflow ends, with results delivered via webhooks or other callback mechanisms.
Use Cases:
- Long-running background jobs (hours/days)
- Human-in-the-loop processes
- External system integrations
- Decoupled processing pipelines
Key Characteristics:
- Workflow completes before async operation
- Requires callback endpoint registration
- External correlation/tracking needed
- Most complex pattern
Example Scenario: Submit a video for processing, immediately return a tracking ID to the user, then send webhook notifications when transcoding completes, thumbnails are generated, and the video is published.
workflows:
- workflowId: longRunningProcess
x-async-callbacks:
enabled: true
correlationStore: "redis://correlation-service" # Optional external service
webhookBase: "$inputs.callbackUrl"
steps:
- stepId: startAsyncProcess
operationId: placeholder
x-async-operation:
type: send
channel: "processing/start"
trackingId: "$inputs.trackingId"
x-callback-registration:
events:
- event: "processing.completed"
webhook: "$inputs.callbackUrl/completed"
filter: "$message.trackingId == $inputs.trackingId"
- event: "processing.failed"
webhook: "$inputs.callbackUrl/failed"
filter: "$message.trackingId == $inputs.trackingId"
requestBody:
contentType: application/json
payload:
trackingId: $inputs.trackingId
data: $inputs.processData
outputs:
trackingId: $inputs.trackingId
callbacksRegistered: $x-async.callbacks
- stepId: returnTrackingInfo
operationId: placeholder
x-final-response:
immediate: true # Don't wait for async completion
outputs:
trackingId: $steps.startAsyncProcess.outputs.trackingId
status: "processing"
callbackUrls: $steps.startAsyncProcess.outputs.callbacksRegistered
Based on the conversation happening within #367, here's a draft proposal of specification changes to add initial support for AsyncAPI v3.0 to Arazzo v1.1.0
Taking the examples in #367 into consideration here's a proposal outlining the specification changes to support AsyncAPI in v1.1.0. I've also provided an updated example which complies to the this proposal below. If we're in general agreement, I'll update PR #367 to reflect the proposal.
Summary of the specification changes to add AsyncAPI support (not exhaustive)
Source Description Object
type
Updated Allowed Values:
openapiarazzoasyncapi(new)
Description/Rationale:
The addition of asyncapi enables referencing AsyncAPI 3.0.0 definitions and allows workflows to model message-driven systems in addition to traditional request/response patterns.
Runtime Expressions
$message
Type: object Available In: Steps with kind: asyncapi and action: receive Description: Provides access to the body and headers of a message received via an AsyncAPI-defined channel. This runtime expression enables event-driven workflows to assert success or extract outputs from message data.
Example:
successCriteria:
- condition: $message.payload != null
- condition: $message.header.correlationId == 'abc123'
$elapsedTime (optional - nice to have)
Type: integer
Available In: After a step completes
Description:
Represents the total time (in milliseconds) that a step took to execute. Can be used in successCriteria and onFailure.criteria to enforce nuanced timeout behaviour or performance related onFailure / onSuccess behaviour.
Example:
successCriteria:
- condition: $elapsedTime < 5000
Step Object
kind
Type: string Allowed Values: openapi, asyncapi, workflow Description: Indicates the type of step. Required when the document contains multiple sourceDescriptions of different types. Enables correct interpretation of fields like operationId, action, etc. We'll clearly explain to implementors how to infer this if omitted and what defaults should be. The generally thought process is that this is good moving forward but we can't make mandatory in minor release. It should be present for those looking to express async types of steps
action
Type: string
Allowed Values: send, receive
Required If: kind is asyncapi
Description: |
Indicates whether the step is sending (publishing) or receiving (subscribing to) a message on a channel described in an AsyncAPI document.
timeout
Type: integer
Format: milliseconds
** Description:** |
Defines the maximum allowed execution time for a step in milliseconds. If the step does not complete within the specified duration, it is considered a failure. The default behavior upon timeout is to terminate the workflow equivalent to using an end failure action.
Example:
timeout: 5000
Timeout behavior can be overridden by defining an onFailure block with criteria based on $elapsedTime. This would allow retry behaviour etc. if needed.
Example:
onFailure:
- name: retryTimeout
type: retry
retryLimit: 3
retryAfter: 1000
criteria:
- condition: $elapsedTime >= 5000
correlationId
Type: string (value runtime expression or literal)
Description:
Used in asyncapi steps to associate messages across send/receive operations. Typically references an ID passed in the message header or payload to correlate requests and responses.
Example:
correlationId: $inputs.CreateOrder.orderRequestId
dependsOn
Type: string array
Description:
Specifies a list of step identifiers that must complete (or be waited for) before the current step can begin execution. This enables modelling of explicit execution dependencies within a workflow. Note about forking: we leaning towards not having an explicit fork property in asyncapi kind steps. Instead we can assume that any type of such step with action: receive is by default non-blocking (or asynchronous) in nature. Other steps can leverage dependsOn to ensure the joining type of behaviour.
Example:
dependsOn:
- $steps.ConfirmOrder
Step Execution Semantics
A step is considered successful only when all successCriteria are satisfied. If any condition fails, the step is deemed to have failed, and onFailure logic (if defined) is evaluated and executed.
There is no dedicated timeout field. Instead, timeout behavior must be expressed using $elapsedTime within the successCriteria.
Example:
successCriteria:
- condition: $statusCode == 200
- condition: $elapsedTime < 5000
onFailure:
- name: handleTimeout
type: end
criteria:
- condition: $elapsedTime >= 5000
Updated Example:
arazzo: 1.1.0
info:
title: Workflow for placing an Order
version: 1.0.0
sourceDescriptions:
- name: OrderApi
url: ./openapi/order.yaml
type: openapi
- name: AsyncOrderApi
url: ./asyncapi/order.yaml
type: asyncapi
workflows:
- workflowId: PlaceOrder
inputs:
required:
- CreateOrder
type: object
properties:
CreateOrder:
required:
- orderRequestId
- productId
- quantity
type: object
properties:
orderRequestId:
type: string
productId:
type: integer
quantity:
type: integer
steps:
- kind: asyncapi
stepId: ConfirmOrder
operationId: $sourceDescriptions.AsyncOrderApi.PlaceOrder
action: receive # Non Blocking Step by default
timeout: 6000
correlationId: $inputs.CreateOrder.orderRequestId
successCriteria:
- condition: $message.payload != null
outputs:
orderId: $message.body.orderId
- kind: asyncapi
stepId: CreateOrder
operationId: $sourceDescriptions.AsyncOrderApi.PlaceOrder
action: send
parameters:
- name: orderRequestId
in: header
value: $inputs.CreateOrder.orderRequestId
requestBody:
payload:
productId: $inputs.CreateOrder.productId
quantity: $inputs.CreateOrder.quantity
- stepId: GetOrderDetails
operationId: $sourceDescriptions.OrderApi.getOrder
dependsOn:
- $steps.ConfirmOrder
parameters:
- name: orderId
in: path
value: $steps.ConfirmOrder.outputs.orderId
successCriteria:
- condition: $statusCode == 200
components: {}