dora
dora copied to clipboard
How a DORA subscriber node simultaneously classifies and processes four types of data when they receive them
I'm currently using the if statement, which is processed by judging the data flow character name, but it doesn't seem to be working, not working the way I envisioned, here is part of my code
def on_input
self,
dora_input: dict,
send_output: Callable[[str, bytes], None],
):
if "color_image" == dora_input["id"]:
self.color_frame = (
dora_input["value"]
.to_numpy()
.reshape((CAMERA_HEIGHT, CAMERA_WIDTH, 3))
)
if "depth_image" == dora_input["id"]:
self.depth_frame = np.frombuffer(dora_input['value'].to_numpy(),
dtype=np.uint8).reshape((480, 640))
if "infra1_image" == dora_input["id"]:
self.infra1_frame = np.frombuffer(dora_input['value'].to_numpy(),
dtype=np.uint8).reshape((480, 640))
if "infra2_image" == dora_input["id"]:
self.infra2_frame = np.frombuffer(dora_input['value'].to_numpy(),
dtype=np.uint8).reshape((480, 640))
#彩色数据流
resized_color_image = np.ascontiguousarray(self.color_frame, np.uint8)
writer.write(resized_color_image)
resized_color_image = cv2.resize(resized_color_image, (800, 600))
# 深度数据流
resized_depth_image = (self.depth_frame.astype(np.uint16) * (65535 // 255))
resized_depth_image = cv2.applyColorMap(cv2.convertScaleAbs(resized_depth_image, alpha=0.03), cv2.COLORMAP_JET)
writer.write(resized_depth_image)
resized_depth_image = cv2.resize(resized_depth_image, (800, 600))
#左红外数据流
resized_infra1_image = (self.infra1_frame.astype(np.uint16) * (65535 // 255))
writer.write(resized_infra1_image)
resized_infra1_image = cv2.resize(resized_infra1_image, (480, 640))
# 右红外数据流
resized_infra2_image = (self.infra2_frame.astype(np.uint16) * (65535 // 255))
writer.write(resized_infra2_image)
resized_infra2_image = cv2.resize(resized_infra2_image, (480, 640))
# top_row = np.hstack((resized_color_image, resized_depth_image))
# bottom_row = np.hstack((resized_infra1_image, resized_infra2_image))
# combined_frame = np.vstack((top_row, bottom_row))
if not NO_DISPLAY:
cv2.imshow("D435i_image", resized_color_image)
cv2.waitKey(1)
self.last_time = time.time()
return DoraStatus.CONTINUE
Can you share your dataflow description yaml?
Can you share your dataflow description yaml? of course, my codes in dataflow description yaml are as follows:
nodes:
- id: webcam_D435i
operator:
python: ../webcam_D435i.py
inputs:
tick: dora/timer/millis/50
outputs:
- color_image
- depth_image
- infra1_image
- infra2_image
- imu_data
- depth_to_color_extrinsics
env:
DEVICE_INDEX: 0
- id: plot_D435i
operator:
python: ./plot_D435i_test.py
inputs:
color_image: webcam_D435i/color_image
depth_image: webcam_D435i/depth_image
infra1_image: webcam_D435i/infra1_image
infra2_image: webcam_D435i/infra2_image
edit: use ``` instead of '''
What is the issue you're facing?
I want to receive four data streams and visualize them at the same time.When I received four types of data for visualization testing one by one, I found that color images could be visualized, while depth images could not accept data. However, it works again when I subscribe only to deep data and visualize with a single node
I was not able to reproduce your error with a basic webcam operator.
My dataflow
nodes:
- id: webcam
operator:
python: webcam.py
inputs:
tick: dora/timer/millis/50
outputs:
- image_1
- image_2
- image_3
- id: object_detection
operator:
python: object_detection.py
inputs:
image: webcam/image_1
outputs:
- bbox
- id: plot
operator:
python: plot.py
inputs:
image_1: webcam/image_1
image_2: webcam/image_2
image_3: webcam/image_3
bbox: object_detection/bbox
My webcam operator:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
from typing import Callable, Optional
import os
import cv2
import numpy as np
import pyarrow as pa
from dora import DoraStatus
CAMERA_WIDTH = 640
CAMERA_HEIGHT = 480
CAMERA_INDEX = int(os.getenv("CAMERA_INDEX", 0))
font = cv2.FONT_HERSHEY_SIMPLEX
class Operator:
"""
Sending image from webcam to the dataflow
"""
def __init__(self):
self.video_capture = cv2.VideoCapture(CAMERA_INDEX)
self.start_time = time.time()
self.video_capture.set(cv2.CAP_PROP_FRAME_WIDTH, CAMERA_WIDTH)
self.video_capture.set(cv2.CAP_PROP_FRAME_HEIGHT, CAMERA_HEIGHT)
def on_event(
self,
dora_event: str,
send_output: Callable[[str, bytes | pa.UInt8Array, Optional[dict]], None],
) -> DoraStatus:
match dora_event["type"]:
case "INPUT":
ret, frame = self.video_capture.read()
if ret:
frame = cv2.resize(frame, (CAMERA_WIDTH, CAMERA_HEIGHT))
## Push an error image in case the camera is not available.
else:
frame = np.zeros((CAMERA_HEIGHT, CAMERA_WIDTH, 3), dtype=np.uint8)
cv2.putText(
frame,
"No Webcam was found at index %d" % (CAMERA_INDEX),
(int(30), int(30)),
font,
0.75,
(255, 255, 255),
2,
1,
)
send_output(
"image_1",
pa.array(frame.ravel()),
dora_event["metadata"],
)
send_output(
"image_2",
pa.array(frame.ravel()),
dora_event["metadata"],
)
send_output(
"image_3",
pa.array(frame.ravel()),
dora_event["metadata"],
)
case "STOP":
print("received stop")
case other:
print("received unexpected event:", other)
if time.time() - self.start_time < 20:
return DoraStatus.CONTINUE
else:
return DoraStatus.STOP
def __del__(self):
self.video_capture.release()
My plot operator:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
from typing import Callable, Optional
import cv2
import numpy as np
import pyarrow as pa
from utils import LABELS
from dora import DoraStatus
pa.array([])
CI = os.environ.get("CI")
CAMERA_WIDTH = 640
CAMERA_HEIGHT = 480
font = cv2.FONT_HERSHEY_SIMPLEX
class Operator:
"""
Plot image and bounding box
"""
def __init__(self):
self.image_1 = np.zeros((CAMERA_HEIGHT, CAMERA_WIDTH, 3))
self.image_2 = np.zeros((CAMERA_HEIGHT, CAMERA_WIDTH, 3))
self.image_3 = np.zeros((CAMERA_HEIGHT, CAMERA_WIDTH, 3))
self.bboxs = []
self.bounding_box_messages = 0
self.image_messages = 0
def on_event(
self,
dora_event: dict,
send_output: Callable[[str, bytes | pa.UInt8Array, Optional[dict]], None],
) -> DoraStatus:
if dora_event["type"] == "INPUT":
return self.on_input(dora_event, send_output)
return DoraStatus.CONTINUE
def on_input(
self,
dora_input: dict,
send_output: Callable[[str, bytes | pa.UInt8Array, Optional[dict]], None],
) -> DoraStatus:
"""
Put image and bounding box on cv2 window.
Args:
dora_input["id"] (str): Id of the dora_input declared in the yaml configuration
dora_input["data"] (bytes): Bytes message of the dora_input
send_output Callable[[str, bytes | pa.UInt8Array, Optional[dict]], None]:
Function for sending output to the dataflow:
- First argument is the `output_id`
- Second argument is the data as either bytes or `pa.UInt8Array`
- Third argument is dora metadata dict
e.g.: `send_output("bbox", pa.array([100], type=pa.uint8()), dora_event["metadata"])`
"""
if dora_input["id"] == "image_1":
frame = (
dora_input["value"]
.to_numpy()
.reshape((CAMERA_HEIGHT, CAMERA_WIDTH, 3))
.copy() # copy the image because we want to modify it below
)
self.image_1 = frame
self.image_messages += 1
print("received " + str(self.image_messages) + " images")
elif dora_input["id"] == "image_2":
frame = (
dora_input["value"]
.to_numpy()
.reshape((CAMERA_HEIGHT, CAMERA_WIDTH, 3))
.copy() # copy the image because we want to modify it below
)
self.image_2 = frame
self.image_messages += 1
print("received " + str(self.image_messages) + " images")
elif dora_input["id"] == "image_3":
frame = (
dora_input["value"]
.to_numpy()
.reshape((CAMERA_HEIGHT, CAMERA_WIDTH, 3))
.copy() # copy the image because we want to modify it below
)
self.image_3 = frame
self.image_messages += 1
print("received " + str(self.image_messages) + " images")
elif dora_input["id"] == "bbox" and len(self.image_1) != 0:
bboxs = dora_input["value"].to_numpy().view(np.float32)
self.bboxs = np.reshape(bboxs, (-1, 6))
self.bounding_box_messages += 1
print("received " + str(self.bounding_box_messages) + " bounding boxes")
for bbox in self.bboxs:
[
min_x,
min_y,
max_x,
max_y,
confidence,
label,
] = bbox
cv2.rectangle(
self.image_1,
(int(min_x), int(min_y)),
(int(max_x), int(max_y)),
(0, 255, 0),
2,
)
cv2.putText(
self.image_1,
LABELS[int(label)] + f", {confidence:0.2f}",
(int(max_x), int(max_y)),
font,
0.75,
(0, 255, 0),
2,
1,
)
if CI != "true":
cv2.imshow("frame_1", self.image_1)
cv2.imshow("frame_2", self.image_2)
cv2.imshow("frame_3", self.image_3)
if cv2.waitKey(1) & 0xFF == ord("q"):
return DoraStatus.STOP
return DoraStatus.CONTINUE
def __del__(self):
cv2.destroyAllWindows()
This works without issue.
I was not able to reproduce your error with a basic webcam operator.
My dataflow
nodes: - id: webcam operator: python: webcam.py inputs: tick: dora/timer/millis/50 outputs: - image_1 - image_2 - image_3 - id: object_detection operator: python: object_detection.py inputs: image: webcam/image_1 outputs: - bbox - id: plot operator: python: plot.py inputs: image_1: webcam/image_1 image_2: webcam/image_2 image_3: webcam/image_3 bbox: object_detection/bbox
My webcam operator:
#!/usr/bin/env python3 # -*- coding: utf-8 -*- import time from typing import Callable, Optional import os import cv2 import numpy as np import pyarrow as pa from dora import DoraStatus CAMERA_WIDTH = 640 CAMERA_HEIGHT = 480 CAMERA_INDEX = int(os.getenv("CAMERA_INDEX", 0)) font = cv2.FONT_HERSHEY_SIMPLEX class Operator: """ Sending image from webcam to the dataflow """ def __init__(self): self.video_capture = cv2.VideoCapture(CAMERA_INDEX) self.start_time = time.time() self.video_capture.set(cv2.CAP_PROP_FRAME_WIDTH, CAMERA_WIDTH) self.video_capture.set(cv2.CAP_PROP_FRAME_HEIGHT, CAMERA_HEIGHT) def on_event( self, dora_event: str, send_output: Callable[[str, bytes | pa.UInt8Array, Optional[dict]], None], ) -> DoraStatus: match dora_event["type"]: case "INPUT": ret, frame = self.video_capture.read() if ret: frame = cv2.resize(frame, (CAMERA_WIDTH, CAMERA_HEIGHT)) ## Push an error image in case the camera is not available. else: frame = np.zeros((CAMERA_HEIGHT, CAMERA_WIDTH, 3), dtype=np.uint8) cv2.putText( frame, "No Webcam was found at index %d" % (CAMERA_INDEX), (int(30), int(30)), font, 0.75, (255, 255, 255), 2, 1, ) send_output( "image_1", pa.array(frame.ravel()), dora_event["metadata"], ) send_output( "image_2", pa.array(frame.ravel()), dora_event["metadata"], ) send_output( "image_3", pa.array(frame.ravel()), dora_event["metadata"], ) case "STOP": print("received stop") case other: print("received unexpected event:", other) if time.time() - self.start_time < 20: return DoraStatus.CONTINUE else: return DoraStatus.STOP def __del__(self): self.video_capture.release()
My plot operator:
#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os from typing import Callable, Optional import cv2 import numpy as np import pyarrow as pa from utils import LABELS from dora import DoraStatus pa.array([]) CI = os.environ.get("CI") CAMERA_WIDTH = 640 CAMERA_HEIGHT = 480 font = cv2.FONT_HERSHEY_SIMPLEX class Operator: """ Plot image and bounding box """ def __init__(self): self.image_1 = np.zeros((CAMERA_HEIGHT, CAMERA_WIDTH, 3)) self.image_2 = np.zeros((CAMERA_HEIGHT, CAMERA_WIDTH, 3)) self.image_3 = np.zeros((CAMERA_HEIGHT, CAMERA_WIDTH, 3)) self.bboxs = [] self.bounding_box_messages = 0 self.image_messages = 0 def on_event( self, dora_event: dict, send_output: Callable[[str, bytes | pa.UInt8Array, Optional[dict]], None], ) -> DoraStatus: if dora_event["type"] == "INPUT": return self.on_input(dora_event, send_output) return DoraStatus.CONTINUE def on_input( self, dora_input: dict, send_output: Callable[[str, bytes | pa.UInt8Array, Optional[dict]], None], ) -> DoraStatus: """ Put image and bounding box on cv2 window. Args: dora_input["id"] (str): Id of the dora_input declared in the yaml configuration dora_input["data"] (bytes): Bytes message of the dora_input send_output Callable[[str, bytes | pa.UInt8Array, Optional[dict]], None]: Function for sending output to the dataflow: - First argument is the `output_id` - Second argument is the data as either bytes or `pa.UInt8Array` - Third argument is dora metadata dict e.g.: `send_output("bbox", pa.array([100], type=pa.uint8()), dora_event["metadata"])` """ if dora_input["id"] == "image_1": frame = ( dora_input["value"] .to_numpy() .reshape((CAMERA_HEIGHT, CAMERA_WIDTH, 3)) .copy() # copy the image because we want to modify it below ) self.image_1 = frame self.image_messages += 1 print("received " + str(self.image_messages) + " images") elif dora_input["id"] == "image_2": frame = ( dora_input["value"] .to_numpy() .reshape((CAMERA_HEIGHT, CAMERA_WIDTH, 3)) .copy() # copy the image because we want to modify it below ) self.image_2 = frame self.image_messages += 1 print("received " + str(self.image_messages) + " images") elif dora_input["id"] == "image_3": frame = ( dora_input["value"] .to_numpy() .reshape((CAMERA_HEIGHT, CAMERA_WIDTH, 3)) .copy() # copy the image because we want to modify it below ) self.image_3 = frame self.image_messages += 1 print("received " + str(self.image_messages) + " images") elif dora_input["id"] == "bbox" and len(self.image_1) != 0: bboxs = dora_input["value"].to_numpy().view(np.float32) self.bboxs = np.reshape(bboxs, (-1, 6)) self.bounding_box_messages += 1 print("received " + str(self.bounding_box_messages) + " bounding boxes") for bbox in self.bboxs: [ min_x, min_y, max_x, max_y, confidence, label, ] = bbox cv2.rectangle( self.image_1, (int(min_x), int(min_y)), (int(max_x), int(max_y)), (0, 255, 0), 2, ) cv2.putText( self.image_1, LABELS[int(label)] + f", {confidence:0.2f}", (int(max_x), int(max_y)), font, 0.75, (0, 255, 0), 2, 1, ) if CI != "true": cv2.imshow("frame_1", self.image_1) cv2.imshow("frame_2", self.image_2) cv2.imshow("frame_3", self.image_3) if cv2.waitKey(1) & 0xFF == ord("q"): return DoraStatus.STOP return DoraStatus.CONTINUE def __del__(self): cv2.destroyAllWindows()
This works without issue.
ok,thanks,i will have a try
Closing this as it seems solved.