dora icon indicating copy to clipboard operation
dora copied to clipboard

How a DORA subscriber node simultaneously classifies and processes four types of data when they receive them

Open Latitude9527 opened this issue 1 year ago • 6 comments

I'm currently using the if statement, which is processed by judging the data flow character name, but it doesn't seem to be working, not working the way I envisioned, here is part of my code

 def on_input
        self,
        dora_input: dict,
        send_output: Callable[[str, bytes], None],
    ):

        if "color_image" == dora_input["id"]:
            self.color_frame = (
                dora_input["value"]
                .to_numpy()
                .reshape((CAMERA_HEIGHT, CAMERA_WIDTH, 3))
            )
            

        if "depth_image" == dora_input["id"]:
            self.depth_frame = np.frombuffer(dora_input['value'].to_numpy(),
                                             dtype=np.uint8).reshape((480, 640))


        if "infra1_image" == dora_input["id"]:
            self.infra1_frame = np.frombuffer(dora_input['value'].to_numpy(),
                                              dtype=np.uint8).reshape((480, 640))


        if "infra2_image" == dora_input["id"]:
            self.infra2_frame = np.frombuffer(dora_input['value'].to_numpy(),
                                              dtype=np.uint8).reshape((480, 640))


        #彩色数据流
        resized_color_image = np.ascontiguousarray(self.color_frame, np.uint8)
        writer.write(resized_color_image)
        resized_color_image = cv2.resize(resized_color_image, (800, 600))

        # 深度数据流
        resized_depth_image = (self.depth_frame.astype(np.uint16) * (65535 // 255))
        resized_depth_image = cv2.applyColorMap(cv2.convertScaleAbs(resized_depth_image, alpha=0.03), cv2.COLORMAP_JET)
        writer.write(resized_depth_image)
        resized_depth_image = cv2.resize(resized_depth_image, (800, 600))

        #左红外数据流
        resized_infra1_image = (self.infra1_frame.astype(np.uint16) * (65535 // 255))
        writer.write(resized_infra1_image)
        resized_infra1_image = cv2.resize(resized_infra1_image, (480, 640))

        # 右红外数据流
        resized_infra2_image = (self.infra2_frame.astype(np.uint16) * (65535 // 255))
        writer.write(resized_infra2_image)
        resized_infra2_image = cv2.resize(resized_infra2_image, (480, 640))

        # top_row = np.hstack((resized_color_image, resized_depth_image))
        # bottom_row = np.hstack((resized_infra1_image, resized_infra2_image))
        # combined_frame = np.vstack((top_row, bottom_row))
        if not NO_DISPLAY:
            cv2.imshow("D435i_image", resized_color_image)
            cv2.waitKey(1)
        self.last_time = time.time()

        return DoraStatus.CONTINUE

Latitude9527 avatar Aug 03 '23 02:08 Latitude9527

Can you share your dataflow description yaml?

haixuanTao avatar Aug 03 '23 07:08 haixuanTao

Can you share your dataflow description yaml? of course, my codes in dataflow description yaml are as follows:

nodes:
  - id: webcam_D435i
    operator:
      python: ../webcam_D435i.py
      inputs:
        tick: dora/timer/millis/50
      outputs:
        - color_image
        - depth_image
        - infra1_image
        - infra2_image
        - imu_data
        - depth_to_color_extrinsics

    env:
      DEVICE_INDEX: 0

  - id: plot_D435i
    operator:
      python: ./plot_D435i_test.py
      inputs:
        color_image: webcam_D435i/color_image
        depth_image: webcam_D435i/depth_image
        infra1_image: webcam_D435i/infra1_image
        infra2_image: webcam_D435i/infra2_image

edit: use ``` instead of '''

Latitude9527 avatar Aug 03 '23 07:08 Latitude9527

What is the issue you're facing?

haixuanTao avatar Aug 03 '23 08:08 haixuanTao

I want to receive four data streams and visualize them at the same time.When I received four types of data for visualization testing one by one, I found that color images could be visualized, while depth images could not accept data. However, it works again when I subscribe only to deep data and visualize with a single node

Latitude9527 avatar Aug 03 '23 08:08 Latitude9527

I was not able to reproduce your error with a basic webcam operator.

My dataflow

nodes:
  - id: webcam
    operator:
      python: webcam.py
      inputs:
        tick: dora/timer/millis/50
      outputs:
        - image_1
        - image_2
        - image_3

  - id: object_detection
    operator:
      python: object_detection.py
      inputs:
        image: webcam/image_1
      outputs:
        - bbox

  - id: plot
    operator:
      python: plot.py
      inputs:
        image_1: webcam/image_1
        image_2: webcam/image_2
        image_3: webcam/image_3
        bbox: object_detection/bbox

My webcam operator:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import time
from typing import Callable, Optional

import os
import cv2
import numpy as np
import pyarrow as pa

from dora import DoraStatus

CAMERA_WIDTH = 640
CAMERA_HEIGHT = 480
CAMERA_INDEX = int(os.getenv("CAMERA_INDEX", 0))

font = cv2.FONT_HERSHEY_SIMPLEX


class Operator:
    """
    Sending image from webcam to the dataflow
    """

    def __init__(self):
        self.video_capture = cv2.VideoCapture(CAMERA_INDEX)
        self.start_time = time.time()
        self.video_capture.set(cv2.CAP_PROP_FRAME_WIDTH, CAMERA_WIDTH)
        self.video_capture.set(cv2.CAP_PROP_FRAME_HEIGHT, CAMERA_HEIGHT)

    def on_event(
        self,
        dora_event: str,
        send_output: Callable[[str, bytes | pa.UInt8Array, Optional[dict]], None],
    ) -> DoraStatus:
        match dora_event["type"]:
            case "INPUT":
                ret, frame = self.video_capture.read()
                if ret:
                    frame = cv2.resize(frame, (CAMERA_WIDTH, CAMERA_HEIGHT))

                ## Push an error image in case the camera is not available.
                else:
                    frame = np.zeros((CAMERA_HEIGHT, CAMERA_WIDTH, 3), dtype=np.uint8)
                    cv2.putText(
                        frame,
                        "No Webcam was found at index %d" % (CAMERA_INDEX),
                        (int(30), int(30)),
                        font,
                        0.75,
                        (255, 255, 255),
                        2,
                        1,
                    )

                send_output(
                    "image_1",
                    pa.array(frame.ravel()),
                    dora_event["metadata"],
                )
                send_output(
                    "image_2",
                    pa.array(frame.ravel()),
                    dora_event["metadata"],
                )
                send_output(
                    "image_3",
                    pa.array(frame.ravel()),
                    dora_event["metadata"],
                )
            case "STOP":
                print("received stop")
            case other:
                print("received unexpected event:", other)

        if time.time() - self.start_time < 20:
            return DoraStatus.CONTINUE
        else:
            return DoraStatus.STOP

    def __del__(self):
        self.video_capture.release()

My plot operator:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
from typing import Callable, Optional

import cv2
import numpy as np
import pyarrow as pa
from utils import LABELS

from dora import DoraStatus

pa.array([])

CI = os.environ.get("CI")
CAMERA_WIDTH = 640
CAMERA_HEIGHT = 480

font = cv2.FONT_HERSHEY_SIMPLEX


class Operator:
    """
    Plot image and bounding box
    """

    def __init__(self):
        self.image_1 = np.zeros((CAMERA_HEIGHT, CAMERA_WIDTH, 3))
        self.image_2 = np.zeros((CAMERA_HEIGHT, CAMERA_WIDTH, 3))
        self.image_3 = np.zeros((CAMERA_HEIGHT, CAMERA_WIDTH, 3))
        self.bboxs = []
        self.bounding_box_messages = 0
        self.image_messages = 0

    def on_event(
        self,
        dora_event: dict,
        send_output: Callable[[str, bytes | pa.UInt8Array, Optional[dict]], None],
    ) -> DoraStatus:
        if dora_event["type"] == "INPUT":
            return self.on_input(dora_event, send_output)
        return DoraStatus.CONTINUE

    def on_input(
        self,
        dora_input: dict,
        send_output: Callable[[str, bytes | pa.UInt8Array, Optional[dict]], None],
    ) -> DoraStatus:
        """
        Put image and bounding box on cv2 window.

        Args:
            dora_input["id"] (str): Id of the dora_input declared in the yaml configuration
            dora_input["data"] (bytes): Bytes message of the dora_input
            send_output Callable[[str, bytes | pa.UInt8Array, Optional[dict]], None]:
                Function for sending output to the dataflow:
                - First argument is the `output_id`
                - Second argument is the data as either bytes or `pa.UInt8Array`
                - Third argument is dora metadata dict
                e.g.: `send_output("bbox", pa.array([100], type=pa.uint8()), dora_event["metadata"])`
        """
        if dora_input["id"] == "image_1":
            frame = (
                dora_input["value"]
                .to_numpy()
                .reshape((CAMERA_HEIGHT, CAMERA_WIDTH, 3))
                .copy()  # copy the image because we want to modify it below
            )
            self.image_1 = frame

            self.image_messages += 1
            print("received " + str(self.image_messages) + " images")
        elif dora_input["id"] == "image_2":
            frame = (
                dora_input["value"]
                .to_numpy()
                .reshape((CAMERA_HEIGHT, CAMERA_WIDTH, 3))
                .copy()  # copy the image because we want to modify it below
            )
            self.image_2 = frame

            self.image_messages += 1
            print("received " + str(self.image_messages) + " images")
        elif dora_input["id"] == "image_3":
            frame = (
                dora_input["value"]
                .to_numpy()
                .reshape((CAMERA_HEIGHT, CAMERA_WIDTH, 3))
                .copy()  # copy the image because we want to modify it below
            )
            self.image_3 = frame

            self.image_messages += 1
            print("received " + str(self.image_messages) + " images")

        elif dora_input["id"] == "bbox" and len(self.image_1) != 0:
            bboxs = dora_input["value"].to_numpy().view(np.float32)
            self.bboxs = np.reshape(bboxs, (-1, 6))

            self.bounding_box_messages += 1
            print("received " + str(self.bounding_box_messages) + " bounding boxes")

        for bbox in self.bboxs:
            [
                min_x,
                min_y,
                max_x,
                max_y,
                confidence,
                label,
            ] = bbox
            cv2.rectangle(
                self.image_1,
                (int(min_x), int(min_y)),
                (int(max_x), int(max_y)),
                (0, 255, 0),
                2,
            )

            cv2.putText(
                self.image_1,
                LABELS[int(label)] + f", {confidence:0.2f}",
                (int(max_x), int(max_y)),
                font,
                0.75,
                (0, 255, 0),
                2,
                1,
            )

        if CI != "true":
            cv2.imshow("frame_1", self.image_1)
            cv2.imshow("frame_2", self.image_2)
            cv2.imshow("frame_3", self.image_3)
            if cv2.waitKey(1) & 0xFF == ord("q"):
                return DoraStatus.STOP

        return DoraStatus.CONTINUE

    def __del__(self):
        cv2.destroyAllWindows()

This works without issue.

haixuanTao avatar Aug 03 '23 12:08 haixuanTao

I was not able to reproduce your error with a basic webcam operator.

My dataflow

nodes:
  - id: webcam
    operator:
      python: webcam.py
      inputs:
        tick: dora/timer/millis/50
      outputs:
        - image_1
        - image_2
        - image_3

  - id: object_detection
    operator:
      python: object_detection.py
      inputs:
        image: webcam/image_1
      outputs:
        - bbox

  - id: plot
    operator:
      python: plot.py
      inputs:
        image_1: webcam/image_1
        image_2: webcam/image_2
        image_3: webcam/image_3
        bbox: object_detection/bbox

My webcam operator:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import time
from typing import Callable, Optional

import os
import cv2
import numpy as np
import pyarrow as pa

from dora import DoraStatus

CAMERA_WIDTH = 640
CAMERA_HEIGHT = 480
CAMERA_INDEX = int(os.getenv("CAMERA_INDEX", 0))

font = cv2.FONT_HERSHEY_SIMPLEX


class Operator:
    """
    Sending image from webcam to the dataflow
    """

    def __init__(self):
        self.video_capture = cv2.VideoCapture(CAMERA_INDEX)
        self.start_time = time.time()
        self.video_capture.set(cv2.CAP_PROP_FRAME_WIDTH, CAMERA_WIDTH)
        self.video_capture.set(cv2.CAP_PROP_FRAME_HEIGHT, CAMERA_HEIGHT)

    def on_event(
        self,
        dora_event: str,
        send_output: Callable[[str, bytes | pa.UInt8Array, Optional[dict]], None],
    ) -> DoraStatus:
        match dora_event["type"]:
            case "INPUT":
                ret, frame = self.video_capture.read()
                if ret:
                    frame = cv2.resize(frame, (CAMERA_WIDTH, CAMERA_HEIGHT))

                ## Push an error image in case the camera is not available.
                else:
                    frame = np.zeros((CAMERA_HEIGHT, CAMERA_WIDTH, 3), dtype=np.uint8)
                    cv2.putText(
                        frame,
                        "No Webcam was found at index %d" % (CAMERA_INDEX),
                        (int(30), int(30)),
                        font,
                        0.75,
                        (255, 255, 255),
                        2,
                        1,
                    )

                send_output(
                    "image_1",
                    pa.array(frame.ravel()),
                    dora_event["metadata"],
                )
                send_output(
                    "image_2",
                    pa.array(frame.ravel()),
                    dora_event["metadata"],
                )
                send_output(
                    "image_3",
                    pa.array(frame.ravel()),
                    dora_event["metadata"],
                )
            case "STOP":
                print("received stop")
            case other:
                print("received unexpected event:", other)

        if time.time() - self.start_time < 20:
            return DoraStatus.CONTINUE
        else:
            return DoraStatus.STOP

    def __del__(self):
        self.video_capture.release()

My plot operator:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
from typing import Callable, Optional

import cv2
import numpy as np
import pyarrow as pa
from utils import LABELS

from dora import DoraStatus

pa.array([])

CI = os.environ.get("CI")
CAMERA_WIDTH = 640
CAMERA_HEIGHT = 480

font = cv2.FONT_HERSHEY_SIMPLEX


class Operator:
    """
    Plot image and bounding box
    """

    def __init__(self):
        self.image_1 = np.zeros((CAMERA_HEIGHT, CAMERA_WIDTH, 3))
        self.image_2 = np.zeros((CAMERA_HEIGHT, CAMERA_WIDTH, 3))
        self.image_3 = np.zeros((CAMERA_HEIGHT, CAMERA_WIDTH, 3))
        self.bboxs = []
        self.bounding_box_messages = 0
        self.image_messages = 0

    def on_event(
        self,
        dora_event: dict,
        send_output: Callable[[str, bytes | pa.UInt8Array, Optional[dict]], None],
    ) -> DoraStatus:
        if dora_event["type"] == "INPUT":
            return self.on_input(dora_event, send_output)
        return DoraStatus.CONTINUE

    def on_input(
        self,
        dora_input: dict,
        send_output: Callable[[str, bytes | pa.UInt8Array, Optional[dict]], None],
    ) -> DoraStatus:
        """
        Put image and bounding box on cv2 window.

        Args:
            dora_input["id"] (str): Id of the dora_input declared in the yaml configuration
            dora_input["data"] (bytes): Bytes message of the dora_input
            send_output Callable[[str, bytes | pa.UInt8Array, Optional[dict]], None]:
                Function for sending output to the dataflow:
                - First argument is the `output_id`
                - Second argument is the data as either bytes or `pa.UInt8Array`
                - Third argument is dora metadata dict
                e.g.: `send_output("bbox", pa.array([100], type=pa.uint8()), dora_event["metadata"])`
        """
        if dora_input["id"] == "image_1":
            frame = (
                dora_input["value"]
                .to_numpy()
                .reshape((CAMERA_HEIGHT, CAMERA_WIDTH, 3))
                .copy()  # copy the image because we want to modify it below
            )
            self.image_1 = frame

            self.image_messages += 1
            print("received " + str(self.image_messages) + " images")
        elif dora_input["id"] == "image_2":
            frame = (
                dora_input["value"]
                .to_numpy()
                .reshape((CAMERA_HEIGHT, CAMERA_WIDTH, 3))
                .copy()  # copy the image because we want to modify it below
            )
            self.image_2 = frame

            self.image_messages += 1
            print("received " + str(self.image_messages) + " images")
        elif dora_input["id"] == "image_3":
            frame = (
                dora_input["value"]
                .to_numpy()
                .reshape((CAMERA_HEIGHT, CAMERA_WIDTH, 3))
                .copy()  # copy the image because we want to modify it below
            )
            self.image_3 = frame

            self.image_messages += 1
            print("received " + str(self.image_messages) + " images")

        elif dora_input["id"] == "bbox" and len(self.image_1) != 0:
            bboxs = dora_input["value"].to_numpy().view(np.float32)
            self.bboxs = np.reshape(bboxs, (-1, 6))

            self.bounding_box_messages += 1
            print("received " + str(self.bounding_box_messages) + " bounding boxes")

        for bbox in self.bboxs:
            [
                min_x,
                min_y,
                max_x,
                max_y,
                confidence,
                label,
            ] = bbox
            cv2.rectangle(
                self.image_1,
                (int(min_x), int(min_y)),
                (int(max_x), int(max_y)),
                (0, 255, 0),
                2,
            )

            cv2.putText(
                self.image_1,
                LABELS[int(label)] + f", {confidence:0.2f}",
                (int(max_x), int(max_y)),
                font,
                0.75,
                (0, 255, 0),
                2,
                1,
            )

        if CI != "true":
            cv2.imshow("frame_1", self.image_1)
            cv2.imshow("frame_2", self.image_2)
            cv2.imshow("frame_3", self.image_3)
            if cv2.waitKey(1) & 0xFF == ord("q"):
                return DoraStatus.STOP

        return DoraStatus.CONTINUE

    def __del__(self):
        cv2.destroyAllWindows()

This works without issue.

ok,thanks,i will have a try

Latitude9527 avatar Aug 04 '23 01:08 Latitude9527

Closing this as it seems solved.

haixuanTao avatar Aug 31 '24 04:08 haixuanTao