rosidl_python
rosidl_python copied to clipboard
byte[] AKA IDL sequence<octet> fields required to be sequence of bytes instances instead of bytes instance
Bug report
Required Info:
- Operating System:
- focal
- Installation type:
- source
- Version or commit hash:
- master
- DDS implementation:
- n/a
- Client library (if applicable):
- rclpy
Steps to reproduce issue
from test_msgs.msg import UnboundedSequences
msg = UnboundedSequences()
msg.byte_values = b'ffff'
Expected behavior
A bytes
instance should be accepted, as it is the type documented in the design doc.
http://design.ros2.org/articles/idl_interface_definition.html
Actual behavior
>>> from test_msgs.msg import UnboundedSequences
>>> msg = UnboundedSequences()
>>> msg.byte_values = msg.byte_values = b'ffff'
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/sloretz/ws/ros2/install/test_msgs/lib/python3.8/site-packages/test_msgs/msg/_unbounded_sequences.py", line 466, in byte_values
assert \
AssertionError: The 'byte_values' field must be a set or sequence and each value of type 'bytes'
It only accepts a sequence of bytes instances
msg.byte_values = (b'ff', b'ff')
Additional information
I noticed while reviewing #129
I ran into this issue as well. Thought I'd make a note about also supporting bytearray
objects (https://docs.python.org/3/library/stdtypes.html#bytearray)
And for anyone wanting a quick work around:
le_bytes = b'ffff'
msg.bytes_field = tuple(bytes([a]) for a in le_bytes)
And for returning a list of byte objects into a single byte object:
data = [b'ff', b'ff']
concat_bytes = bytes([ab for a in data for ab in a])
It's a bit awkward. Can the python IDL be changed so maybe a byte[] message field would yield bytearray instead of List[bytes]?
This issue is still present in ROS2 Humble (using ros:humble
docker image) and a script like the following:
import numpy as np
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from std_msgs.msg import ByteMultiArray
from custom_msgs.idl import ByteResult
"""
module custom_msgs {
module idl {
struct ByteResult {
sequence<octet> data;
string error;
};
};
};
"""
class MinimalExample(Node):
def __init__(self):
super().__init__("minimal_example")
self.subscriber = self.create_subscription(
Image,
"input_topic",
self.callback,
10
)
self.publisher = self.create_publisher(
ByteResult,
"output_topic",
10
)
def callback(self, msg):
image = np.frombuffer(msg.data, np.uint8).reshape(msg.height, msg.width, -1)
result_data = image # replace with your actual processing code
output_msg = ByteResult()
output_msg.data = [bytes([b]) for b in result_data] # bytearray and class bytes not supported; this line is *EXTREMELY* slow
self.publisher.publish(output_msg)
def main():
rclpy.init()
node = MinimalExample()
try:
rclpy.spin(node)
except KeyboardInterrupt:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()