holoscan-sdk icon indicating copy to clipboard operation
holoscan-sdk copied to clipboard

Distributed Fragments with Conditions

Open genevanmeter opened this issue 11 months ago • 4 comments

Please describe your question

I am in the process to port an existing holoscan pipeline to a distributed fragment based pipeline. One aspect that is leading to confusion is how to control the execution of fragments. It appear to be different than with single fragment applications.

The expectation for the application below:

  • Tx emits at 10Hz (controlled by time.sleep)
  • Rx runs at 2Hz (controlled by PeriodicCondition)
  • If Rx is not ready then the message should be dropped (We want to drop extra messages)

Is there a way to specify a PeriodicCondition for a fragment rather than an operator?

Please specify what Holoscan SDK version you are using

latest 2.9.0

Please add any details about your platform or use case

x86_64 dGPU

import time

from holoscan.core import Application, Fragment, IOSpec

from holoscan.core import (
    Operator, OperatorSpec, ConditionType
)
from holoscan.conditions import PeriodicCondition, CountCondition

class TxOp(Operator):
    def __init__(self, *args, **kwargs):
        self.x = 0
        super().__init__(*args, **kwargs)

    def setup(self, spec: OperatorSpec):
        spec.output("tx").condition(ConditionType.NONE)

    def compute(self, op_input, op_output, context):
        time.sleep(0.1)

        print(f"Tx sent {self.x}")
        op_output.emit(self.x, "tx")

        self.x += 1

class RxOp(Operator):
    def __init__(self, *args, **kwargs):
        self.x = 0
        super().__init__(*args, **kwargs)

    def setup(self, spec: OperatorSpec):
        spec.input("rx").connector(
            IOSpec.ConnectorType.DOUBLE_BUFFER,
            capacity=1,
            policy=0,  # "0: pop, 1: reject, 2: fault"
        ).condition(ConditionType.MESSAGE_AVAILABLE, min_size=1, front_stage_max_size=1)


    def compute(self, op_input, op_output, context):
        msg_in = op_input.receive("rx")

        print(f"Rx Got {msg_in}")


class Fragment1(Fragment):

    def compose(self):
        tx = TxOp(self, CountCondition(self, 100) , name="tx_op")

        self.add_operator(tx)


class Fragment2(Fragment):

    def compose(self):
        rx = RxOp(self, PeriodicCondition(self, 500_000_000), name="rx_op")

        self.add_operator(rx)

class App(Application):

    def compose(self):
        fragment1 = Fragment1(self, name="fragment1")
        fragment2 = Fragment2(self, name="fragment2")

        self.add_flow(fragment1, fragment2, {("tx_op.tx",
                                              "rx_op.rx")})

def main():
    app = App()
    app.run()

if __name__ == "__main__":
    main()

Result

  • Tx sends out 10Hz
  • Rx processes at 2Hz but is processing queue of old messages.
  • Tx finishes but Rx keeps running
Tx sent 0
Rx Got 0
Tx sent 1
Tx sent 2
Tx sent 3
Tx sent 4
Rx Got 1
Tx sent 5
Tx sent 6
Tx sent 7
Tx sent 8
Tx sent 9
Rx Got 2
Tx sent 10
Tx sent 11
Tx sent 12

genevanmeter avatar Jan 28 '25 19:01 genevanmeter

A single fragment version which does perform as expected.

import time

from holoscan.core import Application, Fragment, IOSpec

from holoscan.core import (
    Operator, OperatorSpec, ConditionType
)
from holoscan.conditions import PeriodicCondition, CountCondition

class TxOp(Operator):
    def __init__(self, *args, **kwargs):
        self.x = 0
        super().__init__(*args, **kwargs)

    def setup(self, spec: OperatorSpec):
        spec.output("tx").condition(ConditionType.NONE)

    def compute(self, op_input, op_output, context):
        time.sleep(0.1)

        print(f"Tx sent {self.x}")
        op_output.emit(self.x, "tx")

        self.x += 1

class RxOp(Operator):
    def __init__(self, *args, **kwargs):
        self.x = 0
        super().__init__(*args, **kwargs)

    def setup(self, spec: OperatorSpec):
        spec.input("rx").connector(
            IOSpec.ConnectorType.DOUBLE_BUFFER,
            capacity=1,
            policy=0,  # "0: pop, 1: reject, 2: fault"
        ).condition(ConditionType.MESSAGE_AVAILABLE, min_size=1, front_stage_max_size=1)

    def compute(self, op_input, op_output, context):
        msg_in = op_input.receive("rx")

        print(f"Rx Got {msg_in}")


class App(Application):

    def compose(self):

        tx = TxOp(self, CountCondition(self, 100) , name="tx_op")
        rx = RxOp(self, PeriodicCondition(self, 500_000_000), name="rx_op")


        self.add_flow(tx, rx, {("tx","rx")})

def main():
    app = App()
    app.run()

if __name__ == "__main__":
    main()

genevanmeter avatar Jan 28 '25 19:01 genevanmeter

I'm am now leaning toward that this is potentially a bug. Using the ping_distributed example:

  1. Tx has sent message 3 before Rx receives message 1 but Tx output should have the default DownstreamMessageAffordableCondition.
  2. Tx message 10 is never processed.
[info] [multi_thread_scheduler.cpp:300] MultiThreadScheduler started worker thread [pool name: default_pool, thread uid: 0]
[info] [ping_tensor_tx.cpp:104] PingTensorTxOp data type = uint8_t
[info] [ping_tensor_tx.cpp:237] Sent message 1
[info] [ping_tensor_tx.cpp:237] Sent message 2
[info] [ping_tensor_rx.cpp:38] rx received default CUDA stream from port 'in'
[info] [ping_tensor_tx.cpp:237] Sent message 3
[info] [ping_tensor_rx.cpp:55] rx received message 1: Tensor key: 'out', shape: (32, 64)
[info] [ping_tensor_rx.cpp:38] rx received default CUDA stream from port 'in'
[info] [ping_tensor_tx.cpp:237] Sent message 4
[info] [ping_tensor_rx.cpp:55] rx received message 2: Tensor key: 'out', shape: (32, 64)
[info] [ping_tensor_rx.cpp:38] rx received default CUDA stream from port 'in'
[info] [ping_tensor_rx.cpp:55] rx received message 3: Tensor key: 'out', shape: (32, 64)
[info] [ping_tensor_tx.cpp:237] Sent message 5
[info] [ping_tensor_rx.cpp:38] rx received default CUDA stream from port 'in'
[info] [ping_tensor_rx.cpp:55] rx received message 4: Tensor key: 'out', shape: (32, 64)
[info] [ping_tensor_tx.cpp:237] Sent message 6
[info] [ping_tensor_rx.cpp:38] rx received default CUDA stream from port 'in'
[info] [ping_tensor_rx.cpp:55] rx received message 5: Tensor key: 'out', shape: (32, 64)
[info] [ping_tensor_tx.cpp:237] Sent message 7
[info] [ping_tensor_rx.cpp:38] rx received default CUDA stream from port 'in'
[info] [ping_tensor_rx.cpp:55] rx received message 6: Tensor key: 'out', shape: (32, 64)
[info] [ping_tensor_tx.cpp:237] Sent message 8
[info] [ping_tensor_rx.cpp:38] rx received default CUDA stream from port 'in'
[info] [ping_tensor_rx.cpp:55] rx received message 7: Tensor key: 'out', shape: (32, 64)
[info] [ping_tensor_tx.cpp:237] Sent message 9
[info] [ping_tensor_rx.cpp:38] rx received default CUDA stream from port 'in'
[info] [ping_tensor_rx.cpp:55] rx received message 8: Tensor key: 'out', shape: (32, 64)
[info] [ping_tensor_tx.cpp:237] Sent message 10
[info] [ping_tensor_rx.cpp:38] rx received default CUDA stream from port 'in'
[info] [ping_tensor_rx.cpp:55] rx received message 9: Tensor key: 'out', shape: (32, 64)
[info] [multi_thread_scheduler.cpp:425] No entities left to schedule, force stopping
[info] [multi_thread_scheduler.cpp:430] No ready, wait time or wait event jobs. Exiting.
[info] [multi_thread_scheduler.cpp:694] Stopping all async jobs

genevanmeter avatar Jan 31 '25 21:01 genevanmeter

I'm am now leaning toward that this is potentially a bug.

One issue is that unlike for single-fragment applications, distributed applications do NOT have any DownstreamMessageAffordableCondition on output ports making inter-fragment connections because that condition type is not supported by the design of the underlying GXF UcxTransmitter/UcxReceiver used to send data between the fragments. This is a known issue, but I would have to check if we are missing public documentation of it.

Since Holoscan v2.0, the UcxTransmitter also works in an async transmit mode by default where it will push the message to a transmit queue and compute will return immediately after that without waiting until the message is actually received downstream by the other fragment. Since Holoscan v2.6, that async behavior can be disabled by setting environment variable HOLOSCAN_UCX_ASYNCHRONOUS=false. Can you run your application with the async mode disabled and see what is the behavior in that case?

Thank you for providing the minimal application code to reproduce the issue. I will try to take a look when I have a chance and see if there are other potential solutions to the issue.

grlee77 avatar Feb 03 '25 16:02 grlee77

Adding HOLOSCAN_UCX_ASYNCHRONOUS=false produced a different result.

Tx is initially running at 10 Hz but after Rx receives is second message (ie Rx Got 1) Tx is now synced to Rx and running at 2Hz but Rx is a message behind.

I added timestamp deltas from application start for reference

0:00:01.360792 Tx sent 0
0:00:01.448240 Rx Got 0
0:00:01.548186 Tx sent 1
0:00:01.648798 Tx sent 2
0:00:01.948379 Rx Got 1
0:00:02.048788 Tx sent 3
0:00:02.448385 Rx Got 2
0:00:02.548800 Tx sent 4
0:00:02.948421 Rx Got 3
0:00:03.048812 Tx sent 5
0:00:03.448389 Rx Got 4
0:00:03.548800 Tx sent 6
0:00:03.948390 Rx Got 5
0:00:04.048799 Tx sent 7
0:00:04.448392 Rx Got 6
0:00:04.548804 Tx sent 8
0:00:04.948383 Rx Got 7
0:00:05.048798 Tx sent 9
0:00:05.448387 Rx Got 8
0:00:05.548801 Tx sent 10

Our initial thought for trying distributed fragments was more for organizationally purposes by collecting all the operators for a given task into a fragment and reusing that fragment in different applications. Adding the flows is then simplified to the in/out of the "fragment". If the in/outs are common for certain fragments also being able to swap out entire fragments.

We could achieved this by refactoring the single fragment application.

genevanmeter avatar Feb 04 '25 12:02 genevanmeter