Using thread in on_event becomes slower
Describe the bug I am exploring best practice for processing high frequency input data and to avoid depending on tick predefined frequency As we know ROS use spin method to keep poll ROS node running and the sampling rate is slower than a while loop that runs the thread's run method, This is true for Dora as the tick is similar as spin. So I created a thread to processing the webcam data t with a while loop to compare the performance. However the visual output has high latency. I am expecting a similar results as using the original way in examples which does not have obvious latency.
This test is based on examples/python-dataflow The operator on_event now is just a trigger for thread to start. So everything is wrapped inside thread while loop as it does not depends on on_event frequency but in while loop running as fast as it can be. If a thread is already started then next on_event it will skip.
To Reproduce Dora 0.3.4 , Ubuntu 22.04 replace the webcam.py with this below one and dataflow.yaml as well.
CAMERA_INDEX = int(os.getenv("CAMERA_INDEX", 0))
CAMERA_WIDTH = 640
CAMERA_HEIGHT = 480
video_capture = cv2.VideoCapture(CAMERA_INDEX)
font = cv2.FONT_HERSHEY_SIMPLEX
start = time.time()
import threading
class Operator:
def __init__(self) -> None:
self.frame = None
self.stop_thread = False
self.video_thread = None
def capture_video(self, event, send_output):
start = time.time()
while not self.stop_thread and time.time() - start < 50:
ret, frame = video_capture.read()
if not ret:
frame = np.zeros((CAMERA_HEIGHT, CAMERA_WIDTH, 3), dtype=np.uint8)
cv2.putText(
frame,
"No Webcam was found at index %d" % (CAMERA_INDEX),
(int(30), int(30)),
font,
0.75,
(255, 255, 255),
2,
1,
)
self.frame = frame
# Wait next dora_input
event_type = event["type"]
if event_type == "INPUT":
if self.frame is not None:
try:
send_output(
"image",
cv2.imencode(".jpg", self.frame)[1].tobytes(),
event["metadata"],
)
except Exception as e:
print(f"Failed to send output: {e}")
elif event_type == "STOP":
self.stop_thread = True
break
else:
print("received unexpected event:", event_type)
break
video_capture.release()
def on_event(
self,
event,
send_output,
) -> DoraStatus:
# Start video capture in a separate thread if not already running
if self.video_thread is None or not self.video_thread.is_alive():
self.video_thread = threading.Thread(target=self.capture_video, args=(event, send_output))
self.video_thread.start()
return DoraStatus.CONTINUE # Continue processing events
Here is the dataflow.yml, I changed to use operator
nodes:
- id: webcam
operator:
python: ./webcam.py
inputs:
tick:
source: dora/timer/millis/50
queue_size: 1000
outputs:
- image
- id: object_detection
custom:
source: ./object_detection.py
inputs:
image: webcam/image
outputs:
- bbox
- id: plot
custom:
source: ./plot.py
inputs:
image: webcam/image
bbox: object_detection/bbox
to start the program, run
dora start dataflow.yml --name my-dataflow
So, FYI, we're going to depreciate operators, so please use node API in the future.
In the meantime, it seems that you're spawning a new thread at each event which is going to add some overhead.
So it is not surprising that the thread are slower than without spawning a thread.
I wouldn't recommend using threading in general, as there is many python object that are not threadsafe such as numpy and pandas so I would recommend knowing what you're doing as well as using Mutex locks.
Also, using threading might add some GIL issues.
In time of doubts I would recommend using multiple dora python nodes.
I would recommend trying:
CAMERA_INDEX = int(os.getenv("CAMERA_INDEX", 0))
CAMERA_WIDTH = 640
CAMERA_HEIGHT = 480
video_capture = cv2.VideoCapture(CAMERA_INDEX)
font = cv2.FONT_HERSHEY_SIMPLEX
start = time.time()
import threading
class Operator:
def __init__(self) -> None:
self.frame = None
self.stop_thread = False
self.video_thread = None
def capture_video(self, event, send_output):
start = time.time()
while not self.stop_thread and time.time() - start < 50:
ret, frame = video_capture.read()
if not ret:
frame = np.zeros((CAMERA_HEIGHT, CAMERA_WIDTH, 3), dtype=np.uint8)
cv2.putText(
frame,
"No Webcam was found at index %d" % (CAMERA_INDEX),
(int(30), int(30)),
font,
0.75,
(255, 255, 255),
2,
1,
)
self.frame = frame
# Wait next dora_input
event_type = event["type"]
if event_type == "INPUT":
if self.frame is not None:
try:
send_output(
"image",
cv2.imencode(".jpg", self.frame)[1].tobytes(),
event["metadata"],
)
except Exception as e:
print(f"Failed to send output: {e}")
elif event_type == "STOP":
self.stop_thread = True
break
else:
print("received unexpected event:", event_type)
break
video_capture.release()
def on_event(
self,
event,
send_output,
) -> DoraStatus:
# Start video capture in a separate thread if not already running
if self.video_thread is None or not self.video_thread.is_alive():
self.video_thread = threading.Thread(target=self.capture_video, args=(event, send_output))
self.video_thread.start()
return DoraStatus.CONTINUE # Continue processing events
op = Op()
while True:
op.on_event({"type": "INPUT"}, print)
This run without dora, and is equal to the overhead of the threading mecanism
Thanks. The above thread code started ok after i reboot PC. I am trying to explore the ROS Multi-Threaded Executor scenario usage in DORA. Some third party existing module are provided as it is and needed to be as threaded env with multiple callbacks (=callbackgroup in ROS). Anyway this is not a high priority. If needed threading model in dora customer can still achieve with the python native thread but with own risk.