lava icon indicating copy to clipboard operation
lava copied to clipboard

Execution hangs on communication between Sync/Async ProcessModels

Open gkarray opened this issue 3 years ago • 1 comments

Objective of issue: Several use cases involving communication between Processes implementing the LoihiProtocol and AsyncProtocol fail.

Lava version:

  • [ ] 0.5.0 (feature release)
  • [ ] 0.4.1 (bug fixes)
  • [x] 0.4.0 (current version)
  • [ ] 0.3.0
  • [ ] 0.1.2

I'm submitting a ...

  • [x] bug report
  • [ ] feature request
  • [ ] documentation request

Current behavior:

In the following use cases, we run a Sender Process with a ProcessModel implementing either (LoihiProtocol or AsyncProtocol) and Receiver Process with a ProcessModel implementing either (LoihiProtocol or AsyncProtocol). We run the experiments with either (RunSteps or RunContinuous) run condition. In cases where RunSteps is used, the number of steps is set to 10. In cases where RunContinuous is used, we sleep for 10 seconds after the call to sender.run().

We get the following behaviors:

  1. RunSteps/LoihiProtocol Sender/LoihiProtocol Receiver : Execution finishes. Sender prints "SENT" 10 times, Receiver prints "RECEIVED" 10 times.
  2. RunSteps/AsyncProtocol Sender/LoihiProtocol Receiver : Execution hangs in sender.run() call. Sender prints "SENT" multiple times, Receiver prints "RECEIVED" 10 times.
  3. RunSteps/LoihiProtocol Sender/AsyncProtocol Receiver : Execution hangs in sender.run() call. Sender prints "SENT" 10 times, Receiver prints "RECEIVED" 10 times.
  4. RunSteps/AsyncProtocol Sender/AsyncProtocol Receiver : Execution hangs in sender.run() call. Sender prints "SENT" multiple times (indefinitely), Receiver prints "RECEIVED" multiple times (indefinitely).
  5. RunContinuous/LoihiProtocol Sender/LoihiProtocol Receiver : Execution finishes. Sender prints "SENT" multiple times (for 10 seconds), Receiver prints "RECEIVED" multiple times (for 10 seconds).
  6. RunContinuous/AsyncProtocol Sender/LoihiProtocol Receiver : Non-deterministic; execution sometimes finishes, sometimes hangs in sender.stop() call. Sender prints "SENT" multiple times (for 10 seconds), Receiver prints "RECEIVED" multiple times (for 10 seconds).
  7. RunContinuous/LoihiProtocol Sender/AsyncProtocol Receiver : Non-deterministic; execution sometimes finishes, sometimes hangs in sender.stop() call. Sender prints "SENT" multiple times (for 10 seconds), Receiver prints "RECEIVED" multiple times (for 10 seconds).
  8. RunContinuous/AsyncProtocol Sender/AsyncProtocol Receiver : Execution finishes. Sender prints "SENT" multiple times (for 10 seconds), Receiver prints "RECEIVED" multiple times (for 10 seconds).

PS: It may be that use cases 5 and 8 are also non-deterministic. However, in all cases where I ran them, execution finished...

Expected behavior:

  1. RunSteps/LoihiProtocol Sender/LoihiProtocol Receiver : As current behavior.
  2. RunSteps/AsyncProtocol Sender/LoihiProtocol Receiver : If this is not allowed, an error should be raised. If this is allowed, execution should finish. Sender should print "SENT" multiple times, Receiver should print "RECEIVED" 10 times.
  3. RunSteps/LoihiProtocol Sender/AsyncProtocol Receiver : If this is not allowed, an error should be raised. If this is allowed, execution should finish. Sender should print "SENT" multiple times, Receiver should print "RECEIVED" 10 times.
  4. RunSteps/AsyncProtocol Sender/AsyncProtocol Receiver : This shouldn't be allowed. An error should be raised. (RunSteps with only AsyncProtocol ProcessModels feels incorrect)
  5. RunContinuous/LoihiProtocol Sender/LoihiProtocol Receiver : As current behavior. However current behavior may actually be non-deterministic and cases where execution hangs simply did not show up.
  6. RunContinuous/AsyncProtocol Sender/LoihiProtocol Receiver : Behavior should be deterministic. In all cases, execution should finish. Sender should print "SENT" multiple times (for 10 seconds), Receiver should print "RECEIVED" multiple times (for 10 seconds).
  7. RunContinuous/LoihiProtocol Sender/AsyncProtocol Receiver : Behavior should be deterministic. In all cases, execution should finish. Sender should print "SENT" multiple times (for 10 seconds), Receiver should print "RECEIVED" multiple times (for 10 seconds).
  8. RunContinuous/AsyncProtocol Sender/AsyncProtocol Receiver : As current behavior. However current behavior may actually be non-deterministic and cases where execution hangs simply did not show up.

Steps to reproduce:

  • Run the code below.

Related code:

sender.py

from lava.magma.core.process.process import AbstractProcess
from lava.magma.core.process.ports.ports import OutPort, InPort

from lava.magma.core.sync.protocols.async_protocol import AsyncProtocol
from lava.magma.core.sync.protocols.loihi_protocol import LoihiProtocol
from lava.magma.core.model.py.ports import PyOutPort, PyInPort
from lava.magma.core.model.py.type import LavaPyType
from lava.magma.core.resources import CPU
from lava.magma.core.decorator import implements, requires
from lava.magma.core.model.py.model import PyAsyncProcessModel, PyLoihiProcessModel

import numpy as np

HEIGHT = 5
WIDTH = 5


class Sender(AbstractProcess):
    def __init__(self) -> None:
        super().__init__()

        out_shape = (WIDTH, HEIGHT)

        self.out_port = OutPort(shape=out_shape)


@implements(proc=Sender, protocol=AsyncProtocol)
@requires(CPU)
class AsyncSender(PyAsyncProcessModel):
    out_port: PyOutPort = LavaPyType(PyOutPort.VEC_DENSE, int)

    def run_async(self):
        while True:
            self.out_port.send(np.ones((HEIGHT, WIDTH)))
            print("SENT")

            if self.check_for_stop_cmd():
                return


@implements(proc=Sender, protocol=LoihiProtocol)
@requires(CPU)
class SyncSender(PyLoihiProcessModel):
    out_port: PyOutPort = LavaPyType(PyOutPort.VEC_DENSE, int)

    def run_spk(self):
        self.out_port.send(np.ones((HEIGHT, WIDTH)))
        print("SENT")

receiver.py

from lava.magma.core.process.process import AbstractProcess
from lava.magma.core.process.ports.ports import OutPort, InPort

from lava.magma.core.sync.protocols.async_protocol import AsyncProtocol
from lava.magma.core.sync.protocols.loihi_protocol import LoihiProtocol
from lava.magma.core.model.py.ports import PyOutPort, PyInPort
from lava.magma.core.model.py.type import LavaPyType
from lava.magma.core.resources import CPU
from lava.magma.core.decorator import implements, requires
from lava.magma.core.model.py.model import PyAsyncProcessModel, PyLoihiProcessModel

HEIGHT = 5
WIDTH = 5


class Receiver(AbstractProcess):
    def __init__(self) -> None:
        super().__init__()

        in_shape = (WIDTH, HEIGHT)

        self.in_port = InPort(shape=in_shape)


@implements(proc=Receiver, protocol=AsyncProtocol)
@requires(CPU)
class AsyncReceiver(PyAsyncProcessModel):
    in_port: PyInPort = LavaPyType(PyInPort.VEC_DENSE, int)

    def run_async(self):
        while True:
            self.in_port.recv()
            print("RECEIVED")

            if self.check_for_stop_cmd():
                return


@implements(proc=Receiver, protocol=LoihiProtocol)
@requires(CPU)
class SyncReceiver(PyLoihiProcessModel):
    in_port: PyInPort = LavaPyType(PyInPort.VEC_DENSE, int)

    def run_spk(self):
        self.in_port.recv()
        print("RECEIVED")

main.py

from lava.magma.core.run_configs import RunConfig
from lava.magma.core.run_conditions import RunContinuous, RunSteps

from sender import Sender, SyncSender, AsyncSender
from receiver import Receiver, SyncReceiver, AsyncReceiver

import logging
import time

NUM_STEPS = 10
SLEEP_TIME = 10


# Simple RunConfig created for these tests
class MapCfg(RunConfig):
    def __init__(self, proc_model_map):
        super().__init__(custom_sync_domains=None, loglevel=logging.WARNING)
        self.proc_model_map = proc_model_map

    def select(self, proc, proc_models):
        return self.proc_model_map[proc.__class__]


def run_steps_test(proc_map):
    sender = Sender()
    receiver = Receiver()
    sender.out_port.connect(receiver.in_port)

    condition = RunSteps(num_steps=NUM_STEPS)
    run_cfg = MapCfg(proc_model_map=proc_map)

    print("BEGIN RUN")

    sender.run(condition, run_cfg)

    print("END RUN")
    print("BEGIN STOP")

    sender.stop()

    print("END STOP")


def run_continuous_test(proc_map):
    sender = Sender()
    receiver = Receiver()
    sender.out_port.connect(receiver.in_port)

    condition = RunContinuous()
    run_cfg = MapCfg(proc_model_map=proc_map)

    print("BEGIN RUN")

    sender.run(condition, run_cfg)

    print("END RUN")
    print("BEGIN SLEEP")

    time.sleep(SLEEP_TIME)

    print("END SLEEP")
    print("BEGIN STOP")

    sender.stop()

    print("END STOP")


if __name__ == "__main__":
    # Uncomment the use case you need to run, comment all other use cases

    # Use case 1
    run_steps_test({
        Sender: SyncSender,
        Receiver: SyncReceiver
    })
    # Use case 2
    run_steps_test({
        Sender: AsyncSender,
        Receiver: SyncReceiver
    })
    # Use case 3
    run_steps_test({
        Sender: SyncSender,
        Receiver: AsyncReceiver
    })
    # Use case 4
    run_steps_test({
        Sender: AsyncSender,
        Receiver: AsyncReceiver
    })

    # Use case 5
    run_continuous_test({
        Sender: SyncSender,
        Receiver: SyncReceiver
    })
    # Use case 6
    run_continuous_test({
        Sender: AsyncSender,
        Receiver: SyncReceiver
    })
    # Use case 7
    run_continuous_test({
        Sender: SyncSender,
        Receiver: AsyncReceiver
    })
    # Use case 8
    run_continuous_test({
        Sender: AsyncSender,
        Receiver: AsyncReceiver
    })

Other information: N/A

gkarray avatar Aug 22 '22 16:08 gkarray

@ysingh7 Have you had a chance to review this?

tim-shea avatar Jan 17 '24 18:01 tim-shea