Arazzo-Specification icon indicating copy to clipboard operation
Arazzo-Specification copied to clipboard

Feature: Add Support for AsyncAPI v2/v3 as Source Definition

Open ivangsa opened this issue 1 year ago • 8 comments

Adding support for AsyncAPI definitions would extend the types of workflows that can be described with Arazzo, as many organizations are now combining Request/Response APIs (OpenAPI) with Event-Driven architectures (AsyncAPI).

The latest version of AsyncAPI is v3, but since v2 is still the most widely adopted version, the intention is to provide support for both versions as feasible, and prioritize v3 only when supporting both becomes impractical.

Required Changes to Arazzo Specification

  • Add asyncapi as a source definition type.
  • Extend the meaning of workflow.step.operationId to include pointing to an AsyncAPI operationId (either v2 or v3):
  • Extend the meaning of workflow.step.parameters.in:
    • header should also include AsyncAPI message headers.
    • path should include AsyncAPI channel.parameters, as they are similar to OpenAPI path parameters (or consider adding a channel value for disambiguation).
  • Runtime expressions already include support for $message.body and $message.header this would be used for context in successCriterias

Additional Considerations for Broker-Intermediated APIs

While AsyncAPI and OpenAPI are similar, and it’s tempting to use the operationId in AsyncAPI, there is a key difference: broker-mediated APIs are reciprocal (publishing/sending on one side is subscribing/receiving on the other).

  • Sometimes, an API might only document one side’s perspective. For example:
    • A service that publishes events might document its "publish" operations.
    • However, it might not document the "subscribe" operation that consumers would use to receive these events (consumers just use the "publish" operation documentation and tooling).
  • Teams and organizations may model this symmetry differently, and sometimes the "inverse" operation may not be defined at all.

To support this, we would need an alternative way to reference the reciprocal operation.

Additional Changes to Arazzo Specification

When the desired operationId is not documented in the existing AsyncAPI, we can support referencing the channel instead of the operation and specify the reciprocal action (send|receive).

The combination of channel+action would be mutually exclusive with operationId, with operationId being the preferred option when available.

Meaning of AsyncAPI Steps

Send/Publish

A workflow step pointing to a Send/Publish operation will send a message with the workflow.step.requestBody.

workflow.step.parameters will also be used to populate AsyncAPI message.headers and channel.parameters.

The only meaningful successCriteria is "OK, message accepted," so it may be omitted.

This action is non-blocking and has no outputs; it will immediately continue to the next step, either onSuccess or onFailure.

This is an example of a workflow step sending a message:

  - stepId: "send_order_received"
    operationId: "$sourceDescriptions.OrdersAsyncAPI.onOrderEvent"
    requestBody:
      contentType: "application/json"
      payload: |-
        {
            "id": "{$steps.place_order.outputs.orderId}",
            "orderTime": "{$steps.place_order.outputs.order.orderTime}",
            "status": "RECEIVED",
            "customerDetails": {$steps.fetch_customer_details.outputs.customer},
            "restaurantDetails": {$steps.fetch_restaurant_details.outputs.restaurant},
            "orderItems": [
            {$inputs.menuItem}
            ]
        }
    onSuccess:
    - name: "goto_wait_for_downstream"
      type: "goto"
      stepId: "wait_RestaurantsStatusChannel_accepted"

Receive/Subscribe

This is a blocking action: the workflow will wait until successCriteria is met or it times out.

In the following example, in the wait_KitchenOrdersStatusChannel_accepted step, the workflow would "wait until a message is received where the message’s body contains a customerOrderId that matches the orderId of the previous step’s output."

  - stepId: "wait_KitchenOrdersStatusChannel_accepted"
    channel: "$sourceDescriptions.RestaurantsOpenAPI.KitchenOrdersStatusChannel"
    action: "receive"
    successCriteria:
    - condition: "$message.body.customerOrderId == $steps.place_order.outputs.orderId"

This step can include outputs using the contexts $message.body and $message.header

Support for Parallel Invocation: Forking and Joining

When using non-blocking steps, there may be a need for parallel invocation, involving forking and joining.

Parallel Invocation and Looping can also be applied to OpenAPI Request/Response steps.

Because of the complexity of this, we could address this in a separate issue for further discussion.

ivangsa avatar Nov 07 '24 09:11 ivangsa

@ivangsa thank you for the very thoughtful proposal. Below I'll aim to process the proposal, add comments/remarks, seek clarification, or acknowledge the suggestion.

General Assumptions:

The latest version of AsyncAPI is v3, but since v2 is still the most widely adopted version, the intention is to provide support for both versions as feasible, and prioritize v3 only when supporting both becomes impractical.

Acknowledge - we should strive to support both unless we prove that impractical.

High-level changes to the Arazzo Specification:

Add asyncapi as a source definition type.

Acknowledge - this is as expected.

Extend the meaning of workflow.step.operationId to include pointing to an AsyncAPI operationId (either v2 or v3)

Acknowledge - this is as expected.

Extend the meaning of workflow.step.parameters.in:

  • header should also include AsyncAPI message headers.

Acknowledge - this is as expected.

Extend the meaning of workflow.step.parameters.in:

  • path should include AsyncAPI channel.parameters, as they are similar to OpenAPI path parameters (or consider adding a channel value for disambiguation).

I have a preference for a new channel value to be explicit but others are welcome to share their preferences too and we'll work towards one of these options.

Runtime expressions already include support for $message.body and $message.header this would be used for context in successCriterias

Acknowledge. These are being removed in 1.0.1 for hygiene purposes, so this would involve adding them back in and reestablishing the text and construct of the Runtime Expressions to support messages.

Dealing with reciprocal pub/sub scenarios when only one side is documented

When the desired operationId is not documented in the existing AsyncAPI, we can support referencing the channel instead of the operation and specify the reciprocal action (send|receive). Add workflow.step.channel as a fixed field pointing to an AsyncAPI channel

To follow the pattern used for OpenAPI, this might be more applicable as mutually exclusive construct of a channelId (v3) and channelPath (v2)

Add workflow.step.action with supported values send|receive

Do we have any opinions on the choice of send|receive v publish|subscribe in this context? Are there pros/cons to the nomenclature choice?

The combination of channel+action would be mutually exclusive with operationId, with operationId being the preferred option when available.

Acknowledge - if we introduce this (or channelId / channelPath) they would be mutually exclusive with operationId and we'd clarify recommendations.

Remark: Generally in Arazzo we have a goal to be as deterministic as possible and to not make assumptions, we rely on the underlying API descriptions to express what is possible. If we are to go down a path allowing workflows to be described across omitted parts of an AsyncAPI description are we asking ourselves of trouble in the long run. Will this cause challenges for tooling w.r.t. validations/parsing OR is it an undisputed given that the channels always express both the ability to send and receive? I would like some further input from the AsyncAPI community/core team on this. Albeit laborious to enforce authors/providers to document both sides of the coin, perhaps that's the only way we can be sure what's possible. Will we also have to make assumptions on message schemas / parameters etc?

Step and Messages - Send/Publish

A workflow step pointing to a Send/Publish operation will send a message with the workflow.step.requestBody.

Acknowledge - this is as expected.

workflow.step.parameters will also be used to populate AsyncAPI message.headers and channel.parameters.

Acknowledge - this is as expected.

The only meaningful successCriteria is "OK, message accepted," so it may be omitted.

It would be good practice to specify the criteria and assuming a HTTP status code will be return, it can be set (especially if you want to handle onSuccess or onFailure flows)

This action is non-blocking and has no outputs; it will immediately continue to the next step, either onSuccess or onFailure.

I assume if no onSuccess is described, then the next sequential step is what we continue with.

Step and Messages - Receive/Subscribe

What is described makes sense but I am wondering would we benefit from an ability to set a wait/timeout?

frankkilcommins avatar Nov 20 '24 10:11 frankkilcommins

Since most of the replies are just acknowledgments, here are the two main items:

Do we have any opinions on the choice of send|receive v publish|subscribe in this context? Are there pros/cons to the nomenclature choice?

I would prefer send|receive since it is the terminology used in the latest AsyncAPI v3. This keeps it simple and consistent. (For workflows, I would choose wait instead of receive because that better reflects what a workflow does. However, I believe it would be wiser to stick to send|receive.)

Remark: Generally in Arazzo we have a goal to be as deterministic as possible and to not make assumptions, we rely on the underlying API descriptions to express what is possible. If we are to go down a path allowing workflows to be described across omitted parts of an AsyncAPI description are we asking ourselves of trouble in the long run.

I understand your concern. This is a long-standing source of discussion and confusion for broker-based APIs, and there are many differing approaches and opinions: documenting both ends, documenting only from the provider's point of view, or only from the consumer's point of view.

And it varies wildly how different teams and orgs manage this... This is why I propose supporting both approaches.

we rely on the underlying API descriptions to express what is possible

Even if both ends are described in their respective APIs, these APIs don’t necessarily describe "what is possible"; they simply describe what an specific application is doing.

For instance, consider an "Order Updates Channel." The existence of an API documenting the "publish" operation does not necessarily mean any other application should be publishing to, or writing a workflow for, that channel.


  • The official recomended approach from the core-team for AsyncAPI v3 is for each application to document all the operations the application perform on each channel, so following this recommendation both ends would be documented.

This means that if a new application wants to consume a message that is already documented, it should write a new AsyncAPI definition to document that operation (which is something internal, is not an actual API) and it requires crossing a $ref to the original definition, which may be authenticated.

This approach mixes "public API information" (what others can do with your API) with internal information (what your application does with other apps' APIs). This undermines the use of API definitions for self-service and discovery. And there is no standardized way to differentiate between public APIs and internal ones.

This is problematic, that's why there are different options and approaches

  • Also with AsyncAPI v3 there is also the "official" possibility to define only channels and messages (operations being optional) so an organization may choose to have only channels described with AsyncAPI

OR is it an undisputed given that the channels always express both the ability to send and receive?

I don't think this is an "undisputed given", I believe in some cases there are teams/orgs that publish messages to a channel that can not be used to subscribe... (ex. publishing to a message broker via an http bridge, or an internal process dispatches messages from one channel to another) Althought I never seen this personaly, I think in these particular cases there would be an API definition for the "reciprocal" action otherwise it would not be discoverable/usable.. (what would be the use for a channel that can only be publishe or subscribed?)


My worry is that if we do only support one way we may be leaving valid options/appraches behind.

ivangsa avatar Nov 26 '24 14:11 ivangsa

Thinking about 'human-in-the-loop' #353 should perhaps be considered as part of any proposals here too

frankkilcommins avatar Jun 25 '25 16:06 frankkilcommins

Hi, I haven't had the time to champion this feature. If anyone is interested in taking it over, feel free to pick it up! Otherwise, I think is better closing this issue until someone can drive it forward.

ivangsa avatar Jun 27 '25 06:06 ivangsa

Still looking in to it.. may reopen later but thank you for all that you did so far with it.

kevinduffey avatar Jun 27 '25 08:06 kevinduffey

I'm going to reopen as this is something we want to drive forward :-) Just a timing challenge. Thanks for your initial inputs into this @ivangsa

frankkilcommins avatar Jun 27 '25 09:06 frankkilcommins

I mentioned in the human-in-the-loop issue but think it might work here as well.. we could start to work on this using the arazzo extension mechanism to avoid breaking/changing the spec at this time. I had come up with 4 possible scenarios for async use, there are likely more.

1. Fire and Forget Pattern

Description: Send an asynchronous message without waiting for any response. The workflow continues immediately after sending.

Use Cases:

  • Event notifications (user signed up, order placed)
  • Logging/auditing events
  • Triggering background processes
  • Broadcasting updates to multiple consumers

Key Characteristics:

  • Non-blocking execution
  • No response expected
  • No correlation needed
  • Fastest pattern, lowest complexity

Example Scenario: After creating an order, send an "OrderCreated" event to notify inventory, shipping, and analytics services, then immediately proceed to show confirmation to the user.

  arazzo: 1.0.1
  info:
    title: Async Order Processing
    version: 1.0.0
  sourceDescriptions:
    - name: orderAsyncAPI
      url: https://example.com/asyncapi.yaml
      type: openapi  # Still use openapi type for now
       x-is-asyncapi: true  # Extension to indicate AsyncAPI
       x-asyncapi-version: "3.0.0"

  workflows:
    - workflowId: processOrder
      steps:
        - stepId: sendOrderEvent
          operationId: placeholder  # Required field, ignored when x-async used
          x-async-operation:
            type: send
            channel: "orders/created"
            operationId: "$sourceDescriptions.orderAsyncAPI.publishOrderCreated"
            fire-and-forget: true
          requestBody:
            contentType: application/json
            payload:
              orderId: $inputs.orderId
              timestamp: $inputs.timestamp
          # No successCriteria needed for fire-and-forget
          onSuccess:
            - name: continue
              type: goto
              stepId: nextStep

2. Async Request with Deferred Response Pattern

Description: Send an asynchronous request, continue with other work, then wait for the response at a later step in the workflow.

Use Cases:

  • Long-running operations (file processing, report generation)
  • External service integrations with variable response times
  • Operations where you can do useful work while waiting
  • Optimizing workflow performance by parallelizing independent tasks

Key Characteristics:

  • Correlation required between request and response
  • Flexible positioning of wait step
  • Can timeout if response doesn't arrive
  • Allows workflow optimization

Example Scenario: Submit a fraud check request, continue processing the order details, then wait for fraud check results before final approval.

steps:
    - stepId: sendOrderRequest
      operationId: placeholder
      x-async-operation:
        type: send
        channel: "orders/requests"
        correlationId: "$inputs.orderId"
        x-correlation-key: "orderId"  # Field in message to match
      requestBody:
        contentType: application/json
        payload:
          orderId: $inputs.orderId
          items: $inputs.items
      outputs:
        correlationId: $x-async.correlationId

    - stepId: doOtherWork
      operationId: someRestApiCall
      # ... regular sync operations

    - stepId: waitForOrderConfirmation
      operationId: placeholder
      x-async-operation:
        type: receive
        channel: "orders/confirmations"
        timeout: 30  # seconds
        correlation:
          id: $steps.sendOrderRequest.outputs.correlationId
          match: "$message.body.orderId"
      successCriteria:
        - condition: $x-async.messageReceived == true
        - condition: $message.body.status == 'confirmed'
      outputs:
        confirmation: $message.body
        receivedAt: $message.timestamp

3. Parallel Aggregation Pattern

Description: Send multiple async requests in parallel, then collect and aggregate all responses before proceeding.

Use Cases:

  • Gathering data from multiple services
  • Price comparisons across vendors
  • Availability checks across locations
  • Building composite responses from microservices

Key Characteristics:

  • Multiple simultaneous async operations
  • Wait strategies (all/any/quorum)
  • Timeout handling for slow responders
  • Result aggregation logic

Example Scenario: Check product availability across 5 warehouses simultaneously, collect all responses within 10 seconds, then select the optimal fulfillment location.

  steps:
    # Each async operation is its own step with a tag
    - stepId: checkInventory
      operationId: placeholder
      x-async-operation:
        type: send
        channel: "inventory/check"
        correlationId: "inv-${{inputs.orderId}}"
        x-track-for-aggregation: "availability-check"
      requestBody:
        contentType: application/json
        payload:
          sku: $inputs.productSku
          quantity: $inputs.requestedQty
          orderId: $inputs.orderId

    - stepId: checkWarehouse2
      operationId: placeholder
      x-async-operation:
        type: send
        channel: "warehouse2/check"
        correlationId: "wh2-${{inputs.orderId}}"
        x-track-for-aggregation: "availability-check"
      requestBody:
        contentType: application/json
        payload:
          sku: $inputs.productSku
          quantity: $inputs.requestedQty
          orderId: $inputs.orderId

    - stepId: checkWarehouse3
      operationId: placeholder
      x-async-operation:
        type: send
        channel: "warehouse3/check"
        correlationId: "wh3-${{inputs.orderId}}"
        x-track-for-aggregation: "availability-check"
      requestBody:
        contentType: application/json
        payload:
          sku: $inputs.productSku
          quantity: $inputs.requestedQty
          orderId: $inputs.orderId

    # Aggregate all operations with matching tag
    - stepId: aggregateAvailability
      operationId: placeholder
      x-async-aggregate:
        wait-for-group: "availability-check"
        timeout: 10
        strategy: "all"  # or "any" or "quorum: 2"
        channels:
          - channel: "inventory/response"
            correlationMatch: "inv-${{inputs.orderId}}"
            outputAs: "mainInventory"
          - channel: "warehouse2/response"
            correlationMatch: "wh2-${{inputs.orderId}}"
            outputAs: "warehouse2"
          - channel: "warehouse3/response"
            correlationMatch: "wh3-${{inputs.orderId}}"
            outputAs: "warehouse3"
      successCriteria:
        - condition: $x-async-aggregate.completed == true
      outputs:
        # Raw responses from each source
        mainInventory: $x-async-aggregate.responses.mainInventory
        warehouse2Stock: $x-async-aggregate.responses.warehouse2
        warehouse3Stock: $x-async-aggregate.responses.warehouse3

        # Computed aggregate values
        totalAvailable: |
          ${{
            x-async-aggregate.responses.mainInventory.available +
            x-async-aggregate.responses.warehouse2.available +
            x-async-aggregate.responses.warehouse3.available
          }}

        # Find best warehouse (most stock)
        bestWarehouse: |
          ${{
            x-async-aggregate.responses.mainInventory.available >= 
              Math.max(x-async-aggregate.responses.warehouse2.available, 
                      x-async-aggregate.responses.warehouse3.available) 
              ? "main" 
              : x-async-aggregate.responses.warehouse2.available >= 
                x-async-aggregate.responses.warehouse3.available 
                ? "warehouse2" 
                : "warehouse3"
          }}

        # Detailed availability array
        availabilityDetails:
          - location: "main"
            available: $x-async-aggregate.responses.mainInventory.available
            leadTime: $x-async-aggregate.responses.mainInventory.leadTimeDays
          - location: "warehouse2"
            available: $x-async-aggregate.responses.warehouse2.available
            leadTime: $x-async-aggregate.responses.warehouse2.leadTimeDays
          - location: "warehouse3"
            available: $x-async-aggregate.responses.warehouse3.available
            leadTime: $x-async-aggregate.responses.warehouse3.leadTimeDays

        # Metadata about the aggregation
        aggregationMetadata:
          completedAt: $x-async-aggregate.completedAt
          responsesReceived: $x-async-aggregate.receivedCount
          timedOut: $x-async-aggregate.timedOut
          failedCorrelations: $x-async-aggregate.failed

    # Use the aggregated results
    - stepId: selectFulfillmentLocation
      operationId: reserveInventory
      parameters:
        - name: warehouse
          in: query
          value: $steps.aggregateAvailability.outputs.bestWarehouse
        - name: quantity
          in: query
          value: $inputs.requestedQty
      requestBody:
        contentType: application/json
        payload:
          orderId: $inputs.orderId
          sku: $inputs.productSku
          availabilitySnapshot: $steps.aggregateAvailability.outputs.availabilityDetails

4. Post-Workflow Callback Pattern

Description: Initiate async operations that will complete after the workflow ends, with results delivered via webhooks or other callback mechanisms.

Use Cases:

  • Long-running background jobs (hours/days)
  • Human-in-the-loop processes
  • External system integrations
  • Decoupled processing pipelines

Key Characteristics:

  • Workflow completes before async operation
  • Requires callback endpoint registration
  • External correlation/tracking needed
  • Most complex pattern

Example Scenario: Submit a video for processing, immediately return a tracking ID to the user, then send webhook notifications when transcoding completes, thumbnails are generated, and the video is published.

workflows:
  - workflowId: longRunningProcess
    x-async-callbacks:
      enabled: true
      correlationStore: "redis://correlation-service"  # Optional external service
      webhookBase: "$inputs.callbackUrl"

    steps:
      - stepId: startAsyncProcess
        operationId: placeholder
        x-async-operation:
          type: send
          channel: "processing/start"
          trackingId: "$inputs.trackingId"
          x-callback-registration:
            events:
              - event: "processing.completed"
                webhook: "$inputs.callbackUrl/completed"
                filter: "$message.trackingId == $inputs.trackingId"
              - event: "processing.failed"
                webhook: "$inputs.callbackUrl/failed"
                filter: "$message.trackingId == $inputs.trackingId"
        requestBody:
          contentType: application/json
          payload:
            trackingId: $inputs.trackingId
            data: $inputs.processData
        outputs:
          trackingId: $inputs.trackingId
          callbacksRegistered: $x-async.callbacks

      - stepId: returnTrackingInfo
        operationId: placeholder
        x-final-response:
          immediate: true  # Don't wait for async completion
        outputs:
          trackingId: $steps.startAsyncProcess.outputs.trackingId
          status: "processing"
          callbackUrls: $steps.startAsyncProcess.outputs.callbacksRegistered

kevinduffey avatar Jul 09 '25 17:07 kevinduffey

Based on the conversation happening within #367, here's a draft proposal of specification changes to add initial support for AsyncAPI v3.0 to Arazzo v1.1.0

Taking the examples in #367 into consideration here's a proposal outlining the specification changes to support AsyncAPI in v1.1.0. I've also provided an updated example which complies to the this proposal below. If we're in general agreement, I'll update PR #367 to reflect the proposal.

Summary of the specification changes to add AsyncAPI support (not exhaustive)

Source Description Object

type

Updated Allowed Values:

  • openapi
  • arazzo
  • asyncapi (new)

Description/Rationale:
The addition of asyncapi enables referencing AsyncAPI 3.0.0 definitions and allows workflows to model message-driven systems in addition to traditional request/response patterns.

Runtime Expressions

$message

Type: object Available In: Steps with kind: asyncapi and action: receive Description: Provides access to the body and headers of a message received via an AsyncAPI-defined channel. This runtime expression enables event-driven workflows to assert success or extract outputs from message data.

Example:

successCriteria:
  - condition: $message.payload != null
  - condition: $message.header.correlationId == 'abc123'

$elapsedTime (optional - nice to have)

Type: integer
Available In: After a step completes
Description:
Represents the total time (in milliseconds) that a step took to execute. Can be used in successCriteria and onFailure.criteria to enforce nuanced timeout behaviour or performance related onFailure / onSuccess behaviour.

Example:

successCriteria:
  - condition: $elapsedTime < 5000

Step Object

kind

Type: string Allowed Values: openapi, asyncapi, workflow Description: Indicates the type of step. Required when the document contains multiple sourceDescriptions of different types. Enables correct interpretation of fields like operationId, action, etc. We'll clearly explain to implementors how to infer this if omitted and what defaults should be. The generally thought process is that this is good moving forward but we can't make mandatory in minor release. It should be present for those looking to express async types of steps

action

Type: string Allowed Values: send, receive Required If: kind is asyncapi Description: | Indicates whether the step is sending (publishing) or receiving (subscribing to) a message on a channel described in an AsyncAPI document.

timeout

Type: integer Format: milliseconds ** Description:** | Defines the maximum allowed execution time for a step in milliseconds. If the step does not complete within the specified duration, it is considered a failure. The default behavior upon timeout is to terminate the workflow equivalent to using an end failure action.

Example:

timeout: 5000

Timeout behavior can be overridden by defining an onFailure block with criteria based on $elapsedTime. This would allow retry behaviour etc. if needed.

Example:

onFailure:
  - name: retryTimeout
    type: retry
    retryLimit: 3
    retryAfter: 1000
    criteria:
      - condition: $elapsedTime >= 5000

correlationId

Type: string (value runtime expression or literal) Description: Used in asyncapi steps to associate messages across send/receive operations. Typically references an ID passed in the message header or payload to correlate requests and responses.

Example:

correlationId: $inputs.CreateOrder.orderRequestId

dependsOn

Type: string array Description: Specifies a list of step identifiers that must complete (or be waited for) before the current step can begin execution. This enables modelling of explicit execution dependencies within a workflow. Note about forking: we leaning towards not having an explicit fork property in asyncapi kind steps. Instead we can assume that any type of such step with action: receive is by default non-blocking (or asynchronous) in nature. Other steps can leverage dependsOn to ensure the joining type of behaviour.

Example:

dependsOn:
  - $steps.ConfirmOrder

Step Execution Semantics

A step is considered successful only when all successCriteria are satisfied. If any condition fails, the step is deemed to have failed, and onFailure logic (if defined) is evaluated and executed.

There is no dedicated timeout field. Instead, timeout behavior must be expressed using $elapsedTime within the successCriteria.

Example:

successCriteria:
  - condition: $statusCode == 200
  - condition: $elapsedTime < 5000

onFailure:
  - name: handleTimeout
    type: end
    criteria:
      - condition: $elapsedTime >= 5000

Updated Example:

arazzo: 1.1.0
info:
  title: Workflow for placing an Order
  version: 1.0.0
sourceDescriptions:
  - name: OrderApi
    url: ./openapi/order.yaml
    type: openapi
  - name: AsyncOrderApi
    url: ./asyncapi/order.yaml
    type: asyncapi
workflows:
  - workflowId: PlaceOrder
    inputs:
      required:
        - CreateOrder
      type: object
      properties:
        CreateOrder:
          required:
            - orderRequestId
            - productId
            - quantity
          type: object
          properties:
            orderRequestId:
              type: string
            productId:
              type: integer
            quantity:
              type: integer
    steps:
      - kind: asyncapi
        stepId: ConfirmOrder
        operationId: $sourceDescriptions.AsyncOrderApi.PlaceOrder
        action: receive # Non Blocking Step by default
        timeout: 6000
        correlationId: $inputs.CreateOrder.orderRequestId
        successCriteria:
          - condition: $message.payload != null
        outputs:
          orderId: $message.body.orderId

      - kind: asyncapi
        stepId: CreateOrder
        operationId: $sourceDescriptions.AsyncOrderApi.PlaceOrder
        action: send
        parameters:
          - name: orderRequestId
            in: header
            value: $inputs.CreateOrder.orderRequestId
        requestBody:
          payload:
            productId: $inputs.CreateOrder.productId
            quantity: $inputs.CreateOrder.quantity

      - stepId: GetOrderDetails
        operationId: $sourceDescriptions.OrderApi.getOrder
        dependsOn:
          - $steps.ConfirmOrder
        parameters:
          - name: orderId
            in: path
            value: $steps.ConfirmOrder.outputs.orderId
        successCriteria:
          - condition: $statusCode == 200
components: {}

frankkilcommins avatar Oct 29 '25 22:10 frankkilcommins