cyclonedds-python
cyclonedds-python copied to clipboard
[Question] About the usage of shared memory in docker
Hi,
I'm trying to setup environment and other thins to be able to transfer messages using shared memory. First of all, here is my Image class:
Image class
@dataclass
class MainCameraImage(IdlStruct):
img: array[uint8, 4147200]
shape: array[uint64, 3]
@classmethod
def from_numpy(cls, img):
assert len(img.shape) == 3
seq_img = img.tobytes()
return MainCameraImage(seq_img, tuple(img.shape))
def to_numpy(self):
shape = self.shape
return np.frombuffer(self.img, dtype=np.uint8).reshape(shape)
Note that image array has a fixed size. Then I implement publisher and subscriber:
Publisher
import cv2
import time
import numpy as np
import fire
from cyclonedds.core import Listener, Qos, Policy
from cyclonedds.util import duration
from cyclonedds.internal import dds_infinity
from cyclonedds.domain import DomainParticipant
from cyclonedds.topic import Topic
from cyclonedds.pub import DataWriter
from dds_data_structures import MainCameraImage, Image, CameraState,DeviceState
def stream(dev=0, resize_to=None, output_topic_names=['camera_images', 'camera_images_vis', 'camera_images_clr'],
data_type='MainCameraImage', cap_gstreamer=True):
data_type = MainCameraImage
participant = DomainParticipant(0)
writers = []
# unknown, disabled, enabled, error
qos = Qos(
# livelines: automatic
# deadline: dds_infinity
# reliability: reliable
# durability: volatile
# history: keep last
Policy.Liveliness.Automatic(lease_duration=duration(seconds=10)),
Policy.Deadline(deadline=dds_infinity),
Policy.Reliability.Reliable(max_blocking_time=duration(seconds=1)),
Policy.Durability.Volatile,
Policy.History.KeepLast(1)
)
for topic_name in output_topic_names:
topic_out = Topic(participant, topic_name, data_type)
writers.append(DataWriter(participant, topic_out, qos))
frame = np.random.randint(0, 256, (1280, 1080, 3), dtype=np.uint8)
print(frame.shape, frame.dtype)
while True:
time_start = time.time()
ret, frame = 1, frame
if not ret:
print('Error: Unable to read frame')
break
if resize_to is not None:
frame = cv2.resize(frame, resize_to[::-1])
for writer in writers:
writer.write(data_type.from_numpy(frame))
loop_time = time.time() - time_start
print(f'Streaming Camera with topic {output_topic_names[0]}, {loop_time=}', end='\r')
if __name__ == '__main__':
fire.Fire(stream)
Subscriber
import cv2
import time
import numpy as np
import fire
from cyclonedds.core import Listener, Qos, Policy
from cyclonedds.util import duration
from cyclonedds.internal import dds_infinity
from cyclonedds.domain import DomainParticipant
from cyclonedds.topic import Topic
from cyclonedds.sub import DataReader
from dds_data_structures import MainCameraImage
def receive(topic_name='camera_images'):
data_type = MainCameraImage
participant = DomainParticipant(0)
qos = Qos(
# livelines: automatic
# deadline: dds_infinity
# reliability: reliable
# durability: volatile
# history: keep last
Policy.Liveliness.Automatic(lease_duration=duration(seconds=10)),
Policy.Deadline(deadline=dds_infinity),
Policy.Reliability.Reliable(max_blocking_time=duration(seconds=1)),
Policy.Durability.Volatile,
Policy.History.KeepLast(1)
)
topic_in = Topic(participant, topic_name, data_type)
reader = DataReader(participant, topic_in, qos)
while True:
time_start = time.time()
img = reader.read()
if len(img) == 0:
continue
img = img[0].to_numpy()
loop_time = time.time() - time_start
print(f'Received img {img.shape=}, {loop_time=}', end='\r')
if __name__ == '__main__':
fire.Fire(receive)
I've checked the requirements to the QoS for both publisher and subscriber here (as far as I can see they should be identical).
Next step I'm running iceoryx roudi in one docker container with --net=host --ipc=host -v /dev:/dev -v /tmp:/tmp
. I also set the environment variable CYCLONEDDS_URI
to the config file. I'm using default config file:
CYclonedds uri config
<?xml version="1.0" encoding="UTF-8" ?>
<CycloneDDS xmlns="https://cdds.io/config" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="https://cdds.io/config https://raw.githubusercontent.com/eclipse-cyclonedds/cyclonedds/master/etc/cyclonedds.xsd">
<Domain Id="any">
<General>
<Interfaces>
<NetworkInterface autodetermine="true" priority="default" multicast="default" />
</Interfaces>
<AllowMulticast>default</AllowMulticast>
</General>
<SharedMemory>
<Enable>true</Enable>
<LogLevel>info</LogLevel>
</SharedMemory>
</Domain>
</CycloneDDS>
Finally I run above scripts in the docker containers with the same net and ipc options. They work fine and subscriber receives messages from publisher. What I'm trying to understand:
- How to make sure that messages are being sent via shared memory?
-
ipcs -m
shows only firefox and vs code processes (I've checked pids via `ps -p). Should my subscriber and publisher be there if everything works as expected?
I've changed a bit cycloneconfig to enable tracing. When I start script, it reports its iceoryx address: Publisher:
1692100063.009583 [0] python3: Current process name for iceoryx is iceoryx_rt_71_1692100063009382776
1692100063.010857 [0] python3: My iceoryx address: 16/[a8:a1:59:71:3e:95:00:00:00:00:00:00:00:00:00:00]:0
Subscriber:
1692100056.575285 [0] python3: Current process name for iceoryx is iceoryx_rt_74_1692100056575087207
1692100056.576568 [0] python3: My iceoryx address: 16/[a8:a1:59:71:3e:95:00:00:00:00:00:00:00:00:00:00]:0
When I interrupt them, iceoryx reports the following:
unified_pipeline_dev-roudi-1 | 2023-08-15 14:48:30.929 [Warning]: Application iceoryx_rt_74_1692100056575087207 not responding (last response 1534 milliseconds ago) --> removing it
unified_pipeline_dev-roudi-1 | 2023-08-15 14:48:31.630 [Warning]: Application iceoryx_rt_71_1692100063009382776 not responding (last response 1502 milliseconds ago) --> removing it
Does it mean that everything works as expected? If yes, why don't I see my processes in ipcs?
From a look at your code I think it should be happy to use Iceoryx. I should try it, but ...
In Unix/Linux there are multiple ways of creating shared memory, and System V shared memory (which ipcs -m
reports on) is only one of them, and it is actually the less Unix-y way of doing it. The other tricks involve mmap
:
-
mmap
a file with theMAP_SHARED
option -
mmap
some (possibly anonymous block), thenfork
- like the first, but instead passing a file descriptor via a unix-domain socket to another process that can then map the same file.
Maybe some other trick exists, a lot has happened in recent years and I've not tried to follow it all, but these are the classic ones. I'm pretty sure Iceoryx uses the third.
You can see it if you look at the VM maps, either by cat
ing the right file in /proc
or by using pmap
. If you do that, you'll find the memory. That much is obvious from the trace and the roudi output.
To find out if it really uses Iceoryx for this data one option is to dig deeper in the Cyclone traces, it will tell you in the discovery output which addresses it uses and there you can find whether it uses Iceoryx. Or, you can use the Iceoryx introspection tool, it will show you what's going on there.
Finally, in a case like this where it is basically everything-via-iceoryx or everything-over-the-network, looking at network statistics will also work. netstat
or something like top
should do fine. Indeed, even top -H
(which shows individual threads) is often already sufficient: just look for threads named recv
/recvUC
/recvMC
. If none of those take substantial time while you're pushing a lot of large samples through, then it is not sending them over the network.
Thank you for your such detailed reply!
Suppose the PID of one of my processes is 37288
(publisher or subscriber, I guess, does not matter).
I've checked pmap -x 37288
, and it outputs:
37288: python3 /root/vloginov/image_ping.py
I've also checked top -H, for the recvUC it shows TIME+ like 1:49 whereas TIME+ for python process is like 11:36. Is it the ratio you are writing about?
Finally, in the netstat -p
I do not see any of my processes. I think, this is the strongest signal I'm not using network, is it true?
However, the thing that concerns me the most is that when I stop the iceoryx and rerun both scripts without CYCLONEDDS_URI
variable, the loop time is the same, also, I do not see my scripts in the netstat -p
. Am I doing something wrong?
bump this
Sorry that you had to bump it ...
1:49 for recvUC
sounds like it might not be using Iceoryx, that's quite a bit of CPU time. Python in is generally good at consuming CPU cycles, so that the python process taking a fair bit of time is probably simply application code executing.
However, the thing that concerns me the most is that when I stop the iceoryx and rerun both scripts without CYCLONEDDS_URI variable, the loop time is the same, also, I do not see my scripts in the netstat -p. Am I doing something wrong?
That's suspicious. A lot of the DDS work happens in the background, so if the Python application is simply taking a lot of time to process the data, it is possible that its execution time is almost independent of the work done in DDS regardless of whether it uses Iceoryx or the loopback interface.
I don't know exactly what Linux outputs with pmap -x
(I can still remember the Solaris output, but that's of no use to you!) and I don't know exactly what Linux does in netstat -p
and trying it out on macOS is quite meaningless. I'll try to get help some help 🙂
hi @morkovka1337!
i have something to say. at first it's good to see iox-roudi debug output, it can tell to us more then just info
log. to enable it you can use next command.
iox-roudi -l debug
the output will be contain information about apps registration like this
Reserving 22740232 bytes in the shared memory [iceoryx_mgmt]
[ Reserving shared memory successful ]
2023-10-16 14:47:03.299 [ Debug ]: Registered memory segment 0x7f6164ed9000 with size 22740232 to id 1
Reserving 149264720 bytes in the shared memory [root]
[ Reserving shared memory successful ]
2023-10-16 14:47:03.350 [ Debug ]: Roudi registered payload data segment 0x7f615c07f000 with size 149264720 to id 2
RouDi is ready for clients
and after you will start your pub/sub
example it will provide to us next information
2023-10-16 14:48:38.859 [ Debug ]: Registered new application iceoryx_rt_140_1697467718858636401
2023-10-16 14:48:38.860 [ Debug ]: Created new ConditionVariable for application iceoryx_rt_140_1697467718858636401
2023-10-16 14:48:39.619 [ Debug ]: Registered new application iceoryx_rt_139_1697467719618867611
2023-10-16 14:48:39.620 [ Debug ]: Created new ConditionVariable for application iceoryx_rt_139_1697467719618867611
for right now we know that roudi
works and also reserve segments in shm
.
let's check shm
^1 usage with next command.
lsof -r1 /dev/shm/ | grep python3
as you can see there is roudi
reserved segments that used by python3
processes with 73782
and 73811
PIDs.
python3 73782 root mem REG 0,25 149264720 3291 /dev/shm/root
python3 73782 root mem REG 0,25 22740232 3290 /dev/shm/iceoryx_mgmt
python3 73782 root 6u REG 0,25 22740232 3290 /dev/shm/iceoryx_mgmt
python3 73782 root 7u REG 0,25 149264720 3291 /dev/shm/root
python3 73811 root mem REG 0,25 149264720 3291 /dev/shm/root
python3 73811 root mem REG 0,25 22740232 3290 /dev/shm/iceoryx_mgmt
python3 73811 root 6u REG 0,25 22740232 3290 /dev/shm/iceoryx_mgmt
python3 73811 root 7u REG 0,25 149264720 3291 /dev/shm/root
also lets check next command.
lsof -r1 /dev/shm/ | grep iox-roudi
as you can see there is another reserved segment that used by iox-roudi
processes.
iox-roudi 73617 root mem REG 0,25 149264720 3291 /dev/shm/root
iox-roudi 73617 root mem REG 0,25 22740232 3290 /dev/shm/iceoryx_mgmt
iox-roudi 73617 root 4u REG 0,25 22740232 3290 /dev/shm/iceoryx_mgmt
iox-roudi 73617 root 5u REG 0,25 149264720 3291 /dev/shm/root
but wait let's check which processes associated with PIDs that used shared memory.
pmap -x 73811
73811: python3 213-docker/s_sub.py
pmap -x 73782
73782: python3 213-docker/s_pub.py
from here we already can determine that we are using iceoryx
for communication. cause if you will repeat all this steps with disabled shared memory in cyclonedds.xml
config file, you will not see the same output.
about recvUC
. i am more interested in another section that TIME+
in output of top -H
command (small messages at a not-so-high rate doesn't really take much CPU). let's compare section of python processes SHM
with and without iceoryx
.
~$ top -H| grep python3 # with shm -> | SHM |
73782 root 20 0 1841604 96988 39840 R 99.9 0.3 2:40.20 python3
73811 root 20 0 1832428 103272 39360 R 94.4 0.3 2:39.77 python3
73811 root 20 0 1832428 103524 39360 R 99.9 0.3 2:42.80 python3
73782 root 20 0 1837600 91696 39840 R 99.3 0.3 2:43.21 python3
~$ top -H| grep python3 # without shm ->| SHM |
74350 root 20 0 1591340 96548 39680 R 99.9 0.3 0:13.32 python3
74377 root 20 0 1582120 102960 39360 R 94.4 0.3 0:10.94 python3
74377 root 20 0 1582120 101448 39360 R 99.9 0.3 0:13.97 python3
74350 root 20 0 1587292 92356 39680 R 99.7 0.3 0:16.33 python3
Thank you for such a detailed analysis of my sutiation! I've checked previously the output of the roudi and saw the lines above about regestering memory segment. I will also do the rest steps and write here about my results.
@Splinter1984 I've checked all the proposed steps and can fully reproduce your output.
BTW, regrarding the situation that with and without roudi loop time of send \receive is the same. I've tried to run containers without setting the CYCLONEDDS_URI variable. The result suprisingly became order of magnitude faster (loop time ~0.003 sec vs ~0.05 sec when using CYCLONEDDS_URI var.
Note that neither iox-roudi, nor lsof does not register usage of shared memory by python3 process. However, top -H| grep python3
shows relatively the same result of shm occupation (~48k & 17k for writer and reader, respectively):
662001 root 20 0 1833824 113572 47716 R 99.9 0.1 2:11.01 python3
662064 root 20 0 1569168 76640 17316 S 18.8 0.1 0:25.96 python3
662001 root 20 0 1833824 113592 47716 R 99.7 0.1 2:14.02 python3
662064 root 20 0 1569168 75848 17316 R 21.2 0.1 0:26.60 python3
662001 root 20 0 1833824 113524 47644 R 99.9 0.1 2:17.03 python3
662064 root 20 0 1569168 79760 17268 R 20.6 0.1 0:27.22 python3
I've realised that previously I was checking wrong loop time. What I was checking actually was serialization time over network and shared memory. Instead, I've modified my reader
, so the function looks the following:
def receive(topic_name='test_cyclonedds_shm'):
data_type = MainCameraImage
participant = DomainParticipant(0)
qos = Qos(
# livelines: automatic
# deadline: dds_infinity
# reliability: reliable
# durability: volatile
# history: keep last
Policy.Liveliness.Automatic(lease_duration=duration(seconds=10)),
Policy.Deadline(deadline=dds_infinity),
Policy.Reliability.Reliable(max_blocking_time=duration(seconds=1)),
Policy.Durability.Volatile,
Policy.History.KeepLast(1)
)
# qos = Qos(Policy.EntityName("test reader"))
topic_in = Topic(participant, topic_name, data_type)
reader = DataReader(participant, topic_in, qos)
n = 1_000
avg_time = 0
i = 0
img = reader.take_one(timeout=duration(seconds=1))
while i < n:
img = reader.take_one(timeout=duration(seconds=1))
diff_time = datetime.datetime.now() - datetime.datetime.fromtimestamp(int(img.sample_info.source_timestamp // 10**9))
avg_time += diff_time.microseconds
i += 1
print(f'avg transmit time {avg_time / n / 1000} ms')
So I run it with CYCLONEDDS_URI=.... python3 reader.py
and just python3 reader.py
. In both cases transmssion time is about 500 ms. Afaik sample.source_info.source_timestamp is the sender timestamp, so subtracting current time from source timestamp should give me send time. Am I measuring correctly now?
hi @morkovka1337.
if you want to measure something that looks like transfer
process time, you need to create a Listener
with on_data_available
event registration, and register the time of data_available
trigger. then you can compare this time with source_timestamp
. this number will be more representative.
@Splinter1984 thank you, I will try it