pycyphal
pycyphal copied to clipboard
[WIP] Moving Cyphal/UDP to multicast
This MR is based on this proposol discussed in the OpenCyphal forum.
In short, the changes can be broken down into three pieces:
1. Datagram header format
Current:
uint8 version = 0
uint8 priority
void16
uint32 frame_index_eot
uint64 transfer_id
void64
Proposal:
uint8 version = 1 # UPDATE
uint8 priority
uint16 source_node_id # NEW
uint32 frame_index_eot
uint64 transfer_id
void64
Note: version will be bumped to 1, however no backward-compatibility changes are made (since the protocol is still in development).
2. Message
Current:
fixed reserved
(9 bits) (3 bits)
________ _
/ \ / \
11101111.0ddddddd.000sssss.ssssssss
\__/ \_____/ \____________/
(4 bits) (7 bits) (13 bits)
IPv4 subnet-ID subject-ID
multicast \_______________________/
prefix (23 bits)
collision-free multicast
addressing limit of
Ethernet MAC for IPv4
Proposal:
fixed message reserved
(9 bits) select. (3 bits)
________ res.| _
/ \ vv / \
11101111.0ddddd00.000sssss.ssssssss
\__/ \___/ \____________/
(4 bits) (5 bits) (13 bits)
IPv4 subnet-ID subject-ID
multicast \_______________________/
prefix (23 bits)
collision-free multicast
addressing limit of
Ethernet MAC for IPv4
3. Service
Current: regular unicast
Proposal:
fixed service
(9 bits) res. selector
________ ||
/ \ vv
11101111.0ddddd01.nnnnnnnn.nnnnnnnn
\__/ \___/ \_______________/
(4 bits) (5 bits) (16 bits)
IPv4 subnet-ID node-ID
multicast \_______________________/
prefix (23 bits)
collision-free multicast
addressing limit of
Ethernet MAC for IPv4
TODO
- [x] Datagram
- [ ] Message
- [ ] Service
- [ ] Add anonymous mode
- [ ] Update documentation
Question: Is anonymous listening possible or not? There seems to be some contradiction:
pycyphal/transport/udp/__init__.py:20:
The concept of anonymous transfer is not defined for Cyphal/UDP
vs
pycyphal/transport/udp/__init__.py:302
>>> tr_1 = pycyphal.transport.udp.UDPTransport('127.9.15.254', local_node_id=None) # Anonymous is only for listening.
Question: Does it make sense to move source_node_id to class Frame (defined in pycyphal/transport/commons/high_overhead_transport/_frame.py)?
Is anonymous listening possible or not? There seems to be some contradiction:
There is no contradiction: "The concept of anonymous transfer is not defined for Cyphal/UDP" means that you can't send anonymous transfers, but you can still create anonymous nodes that cannot communicate (they can only listen).
Does it make sense to move source_node_id to class Frame ( defined inpycyphal/transport/commons/high_overhead_transport/_frame.py)?
Probably not (yet) because you would have to update other transports dependent on this class.
Ok, thanks.
"Datagram" still needs some work, will ping you when a review is needed.
Does the passing of the local_node_id as source_node_id look right? (see last commit)
I have updated the unit test, to take into account source_node_id
Functional tests (run pytest from pycyphal/transport/udp):

Integration tests (pytest -k udp):

Can you review these changes? I think the first step of implementing changes related to "Datagram" is done.
The changes look good to me. Later on, we may want to enable anonymous transfers for UDP as I wrote on the forum; it should be a cheap change to introduce. The relevant part of the source code can be found by searching for "In Cyphal/UDP, the anonymous mode is somewhat bolted-on."
The text and diagrams in the module docstring at _udp.py will also need to be updated, perhaps later.
Nice work so far ;)
Later on, we may want to enable anonymous transfers for UDP as I wrote on the forum; it should be a cheap change to introduce.
The text and diagrams in the module docstring at _udp.py will also need to be updated, perhaps later.
Added to todo-list 👍
@pavel-kirienko
Could you review these changes? I just want to make sure that this part is correct (pycyphal/transport/udp/_ip) before I start updating the larger code base/unit tests.
Also some questions:
-
service_data_specifier_to_multicast_groupandmessage_data_specifier_to_multicast_groupare pretty similar; so just combine them right? Instead of 4 functions in_endpoint_mapping.py, this would result in 2. (EquatingMESSAGE_IDandSERVICE_IDinto one variableNODE_IDwould also help in this regard.) -
I renamed most of the constants used in
_endpoint_mapping.py, let me know if this change make sense.
Nice progress but there seems to be a problem with subject-/service-/node-ID mixup. Please give another look at section 4.1.1 "Transport model" of the Specification.
To publish a message on subject S, we send a multicast datagram to the multicast group whose address is computed as:
And the destination UDP port is set to 16383.
To send a request or response on service X to node N, we send a multicast datagram to the multicast group whose address is computed as:
And the destination UDP port is set to (16384 + X * 2 + (is_response)).
Could you please update your branch to sync up with master?
Could you confirm the changes are correct? I think I have addressed the issues.
Regarding pycyphal/transport/udp/_ip/_v4.py:
class IPv4SocketFactory(SocketFactory):
def __init__(self, domain_id: int)
def make_output_socket(
self, remote_node_id: typing.Optional[int], data_specifier: pycyphal.transport.DataSpecifier
) -> socket.socket:
# General setup
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
s.setblocking(False)
s.bind((str(self._local), 0)) # QUESTION: What local IP address bind to? (Does it need to be bound at all?)
# Message
remote_ip = message_data_specifier_to_multicast_group(self._domain_id, data_specifier)
remote_port = SUBJECT_PORT
# Service
remote_ip = service_data_specifier_to_multicast_group(self._domain_id, remote_node_id, data_specifier)
remote_port = service_data_specifier_to_udp_port(data_specifier)
# Connect
s.connect((str(remote_ip), remote_port))
def make_input_socket(
self, remote_node_id: typing.Optional[int], data_specifier: pycyphal.transport.DataSpecifier # CHANGE: need remote_node_id for service
) -> socket.socket:
# General setup
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
s.setblocking(False)
# Message
multicast_ip = message_data_specifier_to_multicast_group(self._domain_id, data_specifier)
multicast_port = SUBJECT_PORT
# Service
multicast_ip = service_data_specifier_to_multicast_group(self._domain_id_, remote_node_id, data_specifier)
multicast_port = service_data_specifier_to_udp_port(data_specifier)
# Bind
s.bind((str(multicast_ip), multicast_port))
Look right?
@pavel-kirienko
I think it's almost finished.
The last change that needs to be addressed, is mainly related to udp/_socket_reader.py.
class SocketReader contains the following note:
This class is the solution to the UDP demultiplexing problem. The objective is to read data from the supplied socket, parse it, and then forward it to interested listeners.
Why can't we ask the operating system to do this for us? Because there is no portable way of doing this(except for multicast sockets). Even on GNU/Linux, there is a risk of race conditions, but I'll spare you the details. Those who care may read this: https://stackoverflow.com/a/54156768/1007777.
This seems to suggest that some significant overhaul/simplification can be done here. Can you give some pointers?
Some smaller questions:
pycyphal/transport/udp/_tracer.py:160:
if ip_destination.is_multicast:
if udp_packet.destination_port == SUBJECT_PORT:
# Message packet
dst_nid = None # Broadcast
data_spec = multicast_group_to_message_data_specifier(ip_source, ip_destination)
else:
# Service packet
data_spec = udp_port_to_service_data_specifier(udp_packet.destination_port)
# QUESTION: Correct to use DOMAIN_ID_MASK here? (or make a seperate function for this?)
domain_id = (int(ip_destination)&DOMAIN_ID_MASK)>>18
dst_nid = service_multicast_group_to_node_id(domain_id, ip_destination)
tests/transport/udp/ip/v4.py:53/81:
msg_i = fac.make_input_socket(None, MessageDataSpecifier(612))
test_msg_o.sendto(b"Seagull", ("239.52.2.100", SUBJECT_PORT))
time.sleep(1) ##QUESTION: BlockingIOError: [Errno 35] Resource temporarily unavailable
rx = msg_i.recvfrom(1024)
assert rx[0] == b"Seagull"
assert rx[1][0] == "127.0.0.1" # Same address we just bound to.
This extra sleep(1) wasn't necessary before, however after some changes it started failing with the above error. Any remarks on this?
This seems to suggest that some significant overhaul/simplification can be done here. Can you give some pointers?
Yes, indeed, as you have correctly guessed, it should be possible to get rid of the SocketReader or at least simplify it. There may be an arbitrary number of sockets connected to the same multicast endpoint and the OS should perform demultiplexing correctly for you. Maybe for now, in the interest of minimizing the scope of this changeset, we should keep the socket reader in place and consider removing it later.
QUESTION: Correct to use DOMAIN_ID_MASK here? (or make a seperate function for this?)
I am not sure what are you going to use the domain-ID here for? It doesn't seem to be needed to parse the frame unless I am missing something.
This extra sleep(1) wasn't necessary before, however after some changes it started failing with the above error. Any remarks on this?
The kernel works in mysterious ways, what else can I say? Since this is just a test, it is fine to simply keep the sleep in place.
There is some discussion on the forum that you should be aware of: https://forum.opencyphal.org/t/cyphal-udp-architectural-issues-caused-by-the-dependency-between-the-nodes-ip-address-and-its-identity/1765/41
There's nothing major but it seems like we'll have to shuffle some bits around the header and the IP address, start using one common UDP port number for all traffic and discriminate services based on a dedicated service-ID field in the header instead of UDP ports, and also possibly add a header checksum. All of these changes seem quite minor in comparison to what you've already implemented here.
@pavel-kirienko
First I re-wrote pycyphal/transport/udp/_socket_reader.py like this:
def _dispatch_frame(
self, timestamp: Timestamp, source_ip_address: _IPAddress, frame: typing.Optional[UDPFrame]
) -> None:
# Do not accept datagrams emitted by the local node itself. Do not update the statistics either.
external = self._anonymous or (source_ip_address != self._local_ip_address)
if not external:
return
# Process the datagram. This is where the actual demultiplexing takes place.
# The node-ID mapper will return None for datagrams coming from outside of our Cyphal subnet.
handled = False
source_node_id = None
if frame is not None:
# if source_ip_address is part of our Cyphal subnet
if (DOMAIN_ID_MASK & int(source_ip_address)) == (DOMAIN_ID_MASK & int(self._local_ip_address)):
source_node_id = frame.source_node_id
# if source_ip_address is not part of our Cyphal subnet, source_node_id is None
else:
source_node_id = None
Now I'm starting to suspect that this is not how it's meant to be. Instead it should be:
class SocketReaderneeds a new variabledomain_id
def __init__(
self,
sock: socket.socket,
local_ip_address: _IPAddress,
domain_id: int,
anonymous: bool,
statistics: SocketReaderStatistics,
):
self._domain_id = domain_id
def _dispatch_framebecomes:
def _dispatch_frame(
self, timestamp: Timestamp, source_ip_address: _IPAddress, frame: typing.Optional[UDPFrame]
) -> None:
# Do not accept datagrams emitted by the local node itself. Do not update the statistics either.
external = self._anonymous or (source_ip_address != self._local_ip_address)
if not external:
return
# Process the datagram. This is where the actual demultiplexing takes place.
# The node-ID mapper will return None for datagrams coming from outside of our Cyphal subnet.
handled = False
source_node_id = None
if frame is not None:
# if source_ip_address is part of our Cyphal subnet
if self._domain_id == (DOMAIN_ID_MASK & int(source_ip_address)):
source_node_id = frame.source_node_id
# if source_ip_address is not part of our Cyphal subnet, source_node_id is None
else:
source_node_id = None
I'm not sure if source_ip_address is the multicast address here?
Note to self: replace subnet with domain_id, to avoid further confusion.
Now I'm starting to suspect that this is not how it's meant to be
You are correct in suspecting this!
I'm not sure if source_ip_address is the multicast address here?
In this new design, unicast IP addresses are no longer relevant at all. Any node can operate on any domain-ID with any node-ID regardless of its identity on the IP layer. Parameters like the source_ip_address should no longer be used anywhere except the socket factory (where it is needed only to initialize the socket correctly, this is done once per socket). The socket reader now only needs to read the datagram and pass it along regardless of the source address or the local IP address.
@pavel-kirienko
Can you check this _socket_reader implementation?
Main changes:
- Removed
self._local_ip_address - Removed
self._anonymous - Changed
SocketReaderStatistics
# Old
accepted_datagrams: typing.Dict[int, int] = dataclasses.field(default_factory=dict)
dropped_datagrams: typing.Dict[typing.Union[_IPAddress, int], int] = dataclasses.field(default_factory=dict)
# New
accepted_datagrams: typing.Dict[typing.Optional[int], int] = dataclasses.field(default_factory=dict)
dropped_datagrams: typing.Dict[typing.Optional[int], int] = dataclasses.field(default_factory=dict)
# Keys:
# None: Invalid node-ID (for dropped_datagrams)
# None: anonymous frame (for accepted_datagrams)
# Int: node-ID
Concerning the keys: use None for anonymous frames? (currently 0xffff but easy fix)
(Ignore the QUESTIONs in the code.)
Unit tests _unittest_socket_readerand _unittest_socket_reader_endpoint_reuse are running successfully, but need some additional changes to better reflect the new class structure.
I suspect that this updated socket reader is going to incite false node-ID collision warnings from the application layer because if a node both publishes and subscribes to some subject, it will receive its own publications. There should be some check in place that would discard incoming traffic where the source node-ID equals the local node-ID, I think.
Concerning the keys: use 0xffff for anonymous frames or None?
None. We need to keep in mind the difference between the high-layer concept (which is the lack of a node-ID) and its manifestation on the wire (which is 0xFFFF).
@pavel-kirienko
I'm having this issue in _unittest_udp_transport_ipv4:
I'm trying to send a service request from client_requester to server_listener.
At first I defined server_listener as follows:
server_listener = tr.get_input_session(
InputSessionSpecifier(ServiceDataSpecifier(444, ServiceDataSpecifier.Role.REQUEST), None), meta # remote_node_id is None
)
This makes sense from the perspective that a service input session needs to accept calls from all nodes.
However, if the test fails because there is no subscription to the multicast address 239.53.0.111, therefore client_requester is not receiving any of the sent frames.
# from pycyphal/transport/udp/_ip/_v4.py:
elif isinstance(data_specifier, ServiceDataSpecifier):
multicast_ip = service_data_specifier_to_multicast_group(self._domain_id, remote_node_id)
multicast_port = service_data_specifier_to_udp_port(data_specifier)
if sys.platform.startswith("linux") or sys.platform.startswith("darwin"):
Then I though, maybe it makes sense for server_listener to be defined as follows:
server_listener = tr.get_input_session(
InputSessionSpecifier(ServiceDataSpecifier(444, ServiceDataSpecifier.Role.REQUEST), 111), meta # remote_node_id is 111
)
111 is its own node_id, so it is basically subscribing itself to the multicast address where the frames are sent.
Then it does receive the frames, however they are discarded since the source_node_id (222 in this case) does not match the subscribed node_id 111:
# from pycyphal/transport/udp/_socket_reader.py
# Handle non-anonymous frames
if source_node_id is not None:
# Each frame is sent to the promiscuous listener (None) and to the selective listener (source_node_id).
# We parse the frame before invoking the listener in order to avoid the double parsing workload.
for key in (None, source_node_id):
try:
callback = self._listeners[key]
except LookupError:
pass
else:
handled = True
try:
callback(timestamp, frame)
except Exception as ex: # pragma: no cover
_logger.exception("%r: Unhandled exception in the listener for node-ID %r: %s", self, key, ex)
I'm prone to thinking the first approach is the right one. However, I'm not sure how to setup this promiscuous multicast address correctly, so it receives all the frames that are sent.
The relevant part of _v4.py:
if isinstance(data_specifier, MessageDataSpecifier):
multicast_ip = message_data_specifier_to_multicast_group(self._domain_id, data_specifier)
multicast_port = SUBJECT_PORT
if sys.platform.startswith("linux") or sys.platform.startswith("darwin"):
# Binding to the multicast group address is necessary on GNU/Linux: https://habr.com/ru/post/141021/
s.bind((str(multicast_ip), multicast_port))
else:
# Binding to a multicast address is not allowed on Windows, and it is not necessary there. Error is:
# OSError: [WinError 10049] The requested address is not valid in its context
s.bind(("", multicast_port))
try:
# Note that using INADDR_ANY in IP_ADD_MEMBERSHIP doesn't actually mean "any",
# it means "choose one automatically"; see https://tldp.org/HOWTO/Multicast-HOWTO-6.html
# This is why we have to specify the interface explicitly here.
s.setsockopt(
socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, multicast_ip.packed + self._local_ip_addr.packed
)
except OSError as ex:
s.close()
if ex.errno in (errno.EADDRNOTAVAIL, errno.ENODEV):
raise InvalidMediaConfigurationError(
f"Could not register multicast group membership {multicast_ip} via {self._local_ip_addr} using {s} "
f"[{errno.errorcode[ex.errno]}]"
) from None
raise # pragma: no cover
elif isinstance(data_specifier, ServiceDataSpecifier):
multicast_ip = service_data_specifier_to_multicast_group(self._domain_id, remote_node_id)
multicast_port = service_data_specifier_to_udp_port(data_specifier)
if sys.platform.startswith("linux") or sys.platform.startswith("darwin"):
s.bind((str(multicast_ip), multicast_port))
else:
s.bind(("", multicast_port))
try:
s.setsockopt(
socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, multicast_ip.packed + self._local_ip_addr.packed
)
except OSError as ex:
s.close()
if ex.errno in (errno.EADDRNOTAVAIL, errno.ENODEV):
raise InvalidMediaConfigurationError(
f"Could not register multicast group membership {multicast_ip} via {self._local_ip_addr} using {s} "
f"[{errno.errorcode[ex.errno]}]"
) from None
raise # pragma: no cover
@maksimdrachov
I'm prone to thinking the first approach is the right one.
This is correct.
However, I'm not sure how to setup this promiscuous multicast address correctly, so it receives all the frames that are sent.
The socket factory is not involved in the distinction between selective and promiscuous sessions: regardless of the mode, the socket would be configured identically. The distinction matters here where the socket reader is configured:
https://github.com/OpenCyphal/pycyphal/blob/8273967696d9c250264a252399681ae8d725b962/pycyphal/transport/udp/_udp.py#L340
Observe the logic: when you request a new input session, the UDP transport checks if there's a socket available for this endpoint already (perhaps in a different mode; e.g. there could be a selective session while you want to set up a new promiscuous session with the same data specifier). If no such endpoint is found, a new socket reader is created, otherwise the existing socket reader is used to add a new listener to it.
What you described suggests that the socket factory is operating properly but the socket reader is, apparently, somehow mishandling promiscuous listeners. I would check that.
Looking at the latest proposals from @thirtytwobits I am actually thinking if it would be easier to modify the input logic such that there is always exactly one socket dedicated to incoming service transfers (unless the transport is configured in the anonymous mode, in which case it wouldn't be possible to receive service transfers anyway so no socket is needed). Then you add some kind of basic dispatcher that receives frames from that socket and forwards them to the appropriate listener, much like the SocketReader currently does but the listener would be identified based not only on the remote address but also on the service-ID.
Next, considering that selective listeners aren't really used with subjects --- all subjects are promiscuous (there may be some support for selective subject listeners but this is not really part of the spec so not required) --- we can apply the next obvious simplification which is to remove the SocketReader completely and simply keep one socket per UDPInputSession of a subject (we may need a new specialization for that, like UDPSubjectInputSession, because the existing subclasses do own sockets).

All of this amounts to a somewhat deeper refactoring than I would initially be comfortable with but if we accept Scott's suggestions (so far there seems to be no reason not to), then we'd have to do it anyway. If you're available, we could meet next week face2face and discuss this. I'm hoping to bring the discussion around Scott's proposals to some sort of tentative conclusion by then.
@pavel-kirienko
output
Call-flow:
# _udp.py
def get_output_session(
self, specifier: pycyphal.transport.OutputSessionSpecifier, payload_metadata: pycyphal.transport.PayloadMetadata
) -> UDPOutputSession
# |
# v
# _udp.py
sock = self._sock_factory.make_output_socket(specifier.remote_node_id, specifier.data_specifier)
# |
# v
# _ip/_v4.py
def make_output_socket(
self, remote_node_id: typing.Optional[int], data_specifier: pycyphal.transport.DataSpecifier
) -> socket.socket
For Message, remote_node_id is always None. The socket is connected to the following multicast address:
remote_ip = message_data_specifier_to_multicast_group(self._subnet_id, data_specifier)
remote_port = SUBJECT_PORT
For Service, remote_node_id can't be None. (A service request always needs to be directed towards a certain remote_node_id). The multicast address is determined as follows:
remote_ip = service_data_specifier_to_multicast_group(self._subnet_id, remote_node_id)
remote_port = service_data_specifier_to_udp_port(data_specifier)
All output sockets are stored in:
# _udp.py
self._output_registry: typing.Dict[pycyphal.transport.OutputSessionSpecifier, UDPOutputSession] = {}
(Each UDPOutputSession has it's own socket, sending is done through UDPOutputSession.send())
Removal of SocketReader does not affect this part of the implementation.
input
Call-flow (current implementation):
# _udp.py
def get_input_session(
self, specifier: pycyphal.transport.InputSessionSpecifier, payload_metadata: pycyphal.transport.PayloadMetadata
) -> UDPInputSession:
# |
# v
# _udp.py
def _setup_input_session(
self, specifier: pycyphal.transport.InputSessionSpecifier, payload_metadata: pycyphal.transport.PayloadMetadata
) -> None:
# |
# v
# _udp.py
self._socket_reader_registry[specifier.data_specifier] = SocketReader(
sock=self._sock_factory.make_input_socket(specifier.remote_node_id, specifier.data_specifier),
local_node_id=self._local_node_id,
statistics=self._statistics.received_datagrams.setdefault(
specifier.data_specifier, SocketReaderStatistics()
),
)
# |
# v
# _ip/_v4.py
def make_input_socket(
self, remote_node_id: typing.Optional[int], data_specifier: pycyphal.transport.DataSpecifier
) -> socket.socket:
Call-flow (ported)
# _udp.py
def get_input_session(
self, specifier: pycyphal.transport.InputSessionSpecifier, payload_metadata: pycyphal.transport.PayloadMetadata
) -> UDPInputSession:
# |
# v
# _udp.py
sock = self._sock_factory.make_input_socket(specifier.remote_node_id, specifier.data_specifier)
# |
# v
# _ip/_v4.py
def make_input_socket(
self, remote_node_id: typing.Optional[int], data_specifier: pycyphal.transport.DataSpecifier
) -> socket.socket:
# adds listeners for relevant subject-ID/service-ID
For Message, remote_node_id is always None. The socket is bound to the following multicast address:
multicast_ip = message_data_specifier_to_multicast_group(self._subnet_id, data_specifier)
multicast_port = SUBJECT_PORT
For Service, remote_node_id is always None. The socket is bound to the following multicast address:
multicast_ip = service_data_specifier_to_multicast_group(self._subnet_id, remote_node_id)
multicast_port = service_data_specifier_to_udp_port(data_specifier)
QUESTION 1: I guess remote_node_id doesn't make a lot of sense in the input case, since both Message and Service are assumed to be receiving from all possible sources?
All input sockets are stored in:
# _udp.py
self._input_registry: typing.Dict[pycyphal.transport.InputSessionSpecifier, UDPInputSession] = {}
(Each UDPInputSession has it's own socket, receiving is done through UDPInputSession.receive())
conclusion
Here some changes that need to be made due to removal of SocketReader:
- Removal of
_socket_reader_registry: sockets are now stored inUDPInputSession.- All
SocketReader's functionality, is also moved into UDPInputSession:add_listener()remove_listener()_dispatch_frame(): becomes much simpler due to multicast
- All
- Altogether, due to changes: input setup/operation will become much closer to output setup/operation,
Multicast addresses are:
z = subject-ID (Message)
Multicast address | Message, Output Message, Input Service, Output Service, Input
---------------------------------------------------------------------------------------------------
remote_node_id = y | Not supported Not supported 239.x.x.y Not supported
|
remote_node_id = None | 239.x.x.z 239.x.x.z Not supported 239.x.255.255
|
Two types of UDPInputSession:
-
PromiscuousUDPInputSession-> rename toServiceInputSession?:- Service, Input
- Single socket, but multiple listeners such as in
SocketReader, to be able to handle different service calls. - If different subnets, we might have multiple
ServiceInputSessions in_input_registry.
-
UDPSubjectInputSession-> rename toMessageInputSession?:- Message, Input
- Single socket with single listener, each
subject-IDhas its ownMessageInputSession - If different subnets/subject-ids, we have multiple
MessageInputSessions in_input_registry.
QUESTION 2: No use of SelectiveUDPInputSession? I guess "Not supported" of "Message, Input" is SelectiveUDPInputSession?
QUESTION 3: What with SocketReaderStatistics? Not used in tests/_udp.py so not strictly necessary?
Regarding's Scott's changes, I've added to TODO, will implement after this Service part is done.
For Service, remote_node_id can't be None. (A service request always needs to be directed towards a certain remote_node_id). The multicast address is determined as follows:
remote_ip = service_data_specifier_to_multicast_group(self._subnet_id, remote_node_id)
remote_port = service_data_specifier_to_udp_port(data_specifier)
This is generally correct but the remote port is now (per Scott's proposal) fixed at 9382 (not sure how the number was chosen but there is probably no difference). It is simpler than it used to be.
For Service, remote_node_id is always None. The socket is bound to the following multicast address: ... QUESTION 1: I guess remote_node_id doesn't make a lot of sense in the input case, since both Message and Service are assumed to be receiving from all possible sources?
This is incorrect because service listeners can be promiscuous (accept service transfers from any remote node) or selective (accept service transfers from a specific remote node); the remote_node_id is None in the former case and a valid integer in the latter.
The promiscuous kind is used with service servers (see pycyphal.presentation.Server) because a server needs to be able to receive a request from any client on the network.
The selective kind is used with service clients (see pycyphal.presentation.Client) because client instances are instantiated for a specific remote endpoint that provides the service. See Presentation.make_client() in the linked docs. The objective of this design is to provide sufficient context to the transport layer implementation to allow the transport to perform required packet filtering at the low level instead of requiring the application to do this at the high level manually.
The above also applies to QUESTION 2.
Each UDPInputSession has it's own socket, receiving is done through UDPInputSession.receive()
Good. I think we should drop a few lines in the comments explaining that the UDP demultiplexing problem does not apply in this case because it is only manifested with point-to-point connections, not multicast. I am referring to this piece:
https://github.com/OpenCyphal/pycyphal/blob/8273967696d9c250264a252399681ae8d725b962/pycyphal/transport/udp/_socket_reader.py#L58-L61
I see that you decided to keep one input socket per session. I suppose this is also fine, although some inefficiencies due to data duplication in the kernel might arise compared to the approach I suggested originally, where there would be one input socket responsible for all incoming service transfers, which are then dispatched to the correct UDPInputSession based on the service-ID and the remote node-ID. Perhaps your approach is simpler and the performance issues would never arise --- we'll see. Let's proceed as you implemented it and see if it requires changes in the future.
All SocketReader's functionality, is also moved into UDPInputSession:
If there's a dedicated socket per session, why do we need to add/remove listeners dynamically? I assume the lifetime of a socket matches that of the session that owns it, so there should be no need for dynamic listener configuration. Am I missing something here?
QUESTION 3: What with SocketReaderStatistics? Not used in tests/_udp.py so not strictly necessary?
We can safely drop this for now. This is needed only for high-level diagnostics and it does not directly affect the functionality of the library. For instance, yakut subscribe, yakut call, etc. emit this information at exit.
@maksimdrachov, please either rebase on master or merge it into your branch. There have been important changes related to the CI.
@pavel-kirienko
Can you check udp/_ip? It's changed according to Scott's post.
I added to some unit tests here:
udp/_ip/_endpoint_mapping.pyudp/_ip/_v4.py
@pavel-kirienko
Re-iterating:
UDPInputSession | Message Service
----------------------------------------------------------------------------------
remote_node_id = int | Not supported SelectiveUDPInputSession
|
remote_node_id = None | PromiscuousUDPInputSession PromiscuousUDPInputSession
|
_udp.py:get_input_session:- becomes much closer
get_output_session _setup_input_session/_teardown_input_sessionare not required anymore due to much simpler setup procedure.
- becomes much closer
To make it a bit more clear for myself, this is how I see that UDPInputSession needs to look (mirroring with UDPOutputSession):
_input.py:
UDPInputSession: new memberself._sockasync def receive(self, monotonic_deadline: float) -> typing.Optional[pycyphal.transport.TransferFrom]async def _consume()(current SocketReader functionality, mirroring_emit)def _process_frame(self, timestamp: Timestamp, frame: typing.Optional[UDPFrame]) -> Nonedef _get_reassembler(self, source_node_id: int) -> TransferReassembler(Promiscuous)def _get_reassembler(self, source_node_id: int) -> TransferReassembler(Selective)
self._statistics_impl = PromiscuousUDPInputSessionStatistics()(Promiscuous)self._statistics_impl = SelectiveUDPInputSessionStatistics()(Selective)
_output.py:
async def send(self, transfer: pycyphal.transport.Transfer, monotonic_deadline: float) -> booldef construct_frame(index: int, end_of_transfer: bool, payload: memoryview) -> UDPFrameasync def _emit(self, header_payload_pairs: typing.Sequence[typing.Tuple[memoryview, memoryview]], monotonic_deadline: float) -> typing.Optional[Timestamp]
self._statistics = pycyphal.transport.SessionStatistics()
Is this correct?
Some smaller questions/remarks:
udp/_ip/_v4.py:
# make_output_socket
elif isinstance(data_specifier, ServiceDataSpecifier):
s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, self._local_ip_addr.packed)
s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, IPv4SocketFactory.MULTICAST_TTL)
remote_ip = service_node_id_to_multicast_group(remote_node_id)
remote_port = DESTINATION_PORT
# make_input_socket
elif isinstance(data_specifier, ServiceDataSpecifier):
multicast_ip = service_node_id_to_multicast_group(remote_node_id)
multicast_port = DESTINATION_PORT
if sys.platform.startswith("linux") or sys.platform.startswith("darwin"):
s.bind((str(multicast_ip), multicast_port))
else:
s.bind(("", multicast_port))
try:
s.setsockopt(
socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, multicast_ip.packed + self._local_ip_addr.packed
)
Add a check if ServiceDataSpecifier is Request (for make_input_socket) or Response (for make_output_socket)?
In udp/_ip/_endpoint_mapping.py, we define:
SUBJECT_ID_MASK = 2**15 - 1
however:
# transport/_data_specifier.py
@dataclasses.dataclass(frozen=True)
class MessageDataSpecifier(DataSpecifier):
SUBJECT_ID_MASK = 2**13 - 1
subject_id: int
def __post_init__(self) -> None:
if not (0 <= self.subject_id <= self.SUBJECT_ID_MASK):
raise ValueError(f"Invalid subject-ID: {self.subject_id}")
Any comment on this apparent discrepancy?
Is this correct?
Seems so, at least on the high level. I presume the next step would be to drop the no longer needed SocketReader, right?
Add a check if ServiceDataSpecifier is Request (for make_input_socket) or Response (for make_output_socket)?
I don't see what for. The request/response distinction does not seem to be manifested at the socket level, does it?
Any comment on this apparent discrepancy?
It is simply that Scott's multicast group address format reserves more bits for the subject-ID than necessary; that is all. You can see the same discrepancy in the Cyphal/serial header format. Although it seems inconsistent, ultimately, it does not matter, and it enables separate parts of the protocol to evolve independently.
@pavel-kirienko
I presume the next step would be to drop the no longer needed SocketReader, right?
Sure! (Currently leaving it in for reference)
I don't see what for. The request/response distinction does not seem to be manifested at the socket level, does it?
I was thinking maybe it would be good to warn the user if he tries to instantiate an input socket for a Response (which wouldn't make sense?). I guess it doesn't really matter for the actual socket.
It is simply that Scott's multicast group address format reserves more bits for the subject-ID than necessary; that is all.
Ok, I though maybe this was something overlooked in Scott's proposal.
I've updated the _output_session unit test, there's this small inconsistency I don't get:
- in
pycyphal/transport/udp/_frame.py:
assert (
memoryview(b"\x01\x07\x06\x00\xae\x08A\xc11\xd4\x00\x00\x00\x00\x00\x00\x01\x00\x00\x80\x00\x00\x03<"),
memoryview(b"e"),
) == UDPFrame(
priority=Priority.OPTIONAL,
source_node_id=6,
destination_node_id=2222,
snm=True,
subject_id=None,
service_id=321,
rnr=True,
transfer_id=54321,
index=1,
end_of_transfer=True,
user_data=0,
payload=memoryview(b"e"),
).compile_header_and_payload()
- in
tests/transport/udp/_output_session.py:
assert data_main_b == (
b"\x01\x07\x06\x00\xae\x08A\xc11\xd4\x00\x00\x00\x00\x00\x00\x01\x00\x00\x80\x00\x00V\x03"
+ b"e"
+ pycyphal.transport.commons.crc.CRC32C.new(b"one", b"two", b"three").value_as_bytes
)
Both check out correctly, however the bytes are not exactly the same (at the end the V and <).
What am I missing? The content is the same, but the header crc is somehow different?