rosidl_python icon indicating copy to clipboard operation
rosidl_python copied to clipboard

byte[] AKA IDL sequence<octet> fields required to be sequence of bytes instances instead of bytes instance

Open sloretz opened this issue 3 years ago • 3 comments

Bug report

Required Info:

  • Operating System:
    • focal
  • Installation type:
    • source
  • Version or commit hash:
    • master
  • DDS implementation:
    • n/a
  • Client library (if applicable):
    • rclpy

Steps to reproduce issue

from test_msgs.msg import UnboundedSequences
msg = UnboundedSequences()
msg.byte_values = b'ffff'

Expected behavior

A bytes instance should be accepted, as it is the type documented in the design doc.

http://design.ros2.org/articles/idl_interface_definition.html

Actual behavior

>>> from test_msgs.msg import UnboundedSequences
>>> msg = UnboundedSequences()
>>> msg.byte_values = msg.byte_values = b'ffff'
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/sloretz/ws/ros2/install/test_msgs/lib/python3.8/site-packages/test_msgs/msg/_unbounded_sequences.py", line 466, in byte_values
    assert \
AssertionError: The 'byte_values' field must be a set or sequence and each value of type 'bytes'

It only accepts a sequence of bytes instances

msg.byte_values = (b'ff', b'ff')

Additional information

I noticed while reviewing #129

sloretz avatar Jun 08 '21 23:06 sloretz

I ran into this issue as well. Thought I'd make a note about also supporting bytearray objects (https://docs.python.org/3/library/stdtypes.html#bytearray)

And for anyone wanting a quick work around:

le_bytes = b'ffff'
msg.bytes_field = tuple(bytes([a]) for a in le_bytes)

And for returning a list of byte objects into a single byte object:

data = [b'ff', b'ff']
concat_bytes = bytes([ab for a in data for ab in a])

It's a bit awkward. Can the python IDL be changed so maybe a byte[] message field would yield bytearray instead of List[bytes]?

russkel avatar Jul 05 '21 04:07 russkel

This issue is still present in ROS2 Humble (using ros:humble docker image) and a script like the following:

import numpy as np
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from std_msgs.msg import ByteMultiArray
from custom_msgs.idl import ByteResult

"""
module custom_msgs {
  module idl {
    struct ByteResult {
      sequence<octet> data;
      string error;
    };
  };
};
"""

class MinimalExample(Node):
    def __init__(self):
        super().__init__("minimal_example")
        self.subscriber = self.create_subscription(
            Image,
            "input_topic",
            self.callback,
            10
        )
        self.publisher = self.create_publisher(
            ByteResult,
            "output_topic",
            10
        )

    def callback(self, msg):
        image = np.frombuffer(msg.data, np.uint8).reshape(msg.height, msg.width, -1)
        result_data = image  # replace with your actual processing code

        output_msg = ByteResult()
        output_msg.data = [bytes([b]) for b in result_data]  # bytearray and class bytes not supported; this line is *EXTREMELY* slow
        self.publisher.publish(output_msg)

def main():
    rclpy.init()
    node = MinimalExample()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        node.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()

dav-ell avatar Feb 29 '24 18:02 dav-ell