openmrn
openmrn copied to clipboard
High-performance hub component for dealing with many sockets and high throughput
This PR adds DirectHub, which is an alternative implementation to the Hub/CanHub/HubDeviceSelect and GridConnect bridge infrastructure we have today.
The following documentation (and some more) is in utils/DirectHub.md in the PR as well.
DirectHub design
DirectHub is a high performance router component that is suited to do the forwarding of packets to multiple receivers with minimal CPU and latency overhead.
It specifically addresses three performance issues with the traditional CanHub / Dispatcher infrastructure:
- DirectHub is zero-copy when forwarding packets between sockets. There is a buffer which is filled with a ::read on the source socket, and then the DirectHub passes around a reference to this buffer all the way to the output object, which then ::write's it to the output socket.
- CanHub and GcTcpHub as they operate together perform separate GridConnect formatting for every port. When data passes from one port in to three others out, there would be one parsing and three separate GridConnect rendering calls. DirectHub uses a single GridConnect representation and passes around that representation. Only one parsing is done when a CanIf needs a struct can_frame.
- DirectHub performs inline calls to the ports when forwarding the packet, while CanHub/GcTcpHub allocates a new copy of the buffer which then gets enqueued separately for each port's queue, on separate StateFlows. This means that the Executor is spinning a lot less for DirectHub, therefore the context switching overhead is much smaller. (note 1)
As future expansion, DirectHub by design will allow routing packets across multiple interface types (e.g. CAN, GridConnect and native-TCP), apply packet filtering, and admission control / fair queueing for multiple trafic sources.
(note 1): There is a conceptual problem in Buffer<T>* in that it conflates
two different but equally important characteristics of data flow. A Buffer<T>
is reference-counted, and it can be queued. However, while different owners may
hold separate references (to the same memory), only one owner is allowed to
enqueue a Buffer<T> into a Q, QList, or StateFlowWithQueue. This is
because there is only one QMember pointer in the BufferBase. The result of
this conflation is that when a Dispatcher or a Hub / CanHub sends the
same data to multiple different ports or flows, it needs to actually create a
separate copy for each one of them, and taking a reference is not sufficient.
Theory of operation
Entry flow and threading model
In order to make the router have as little overhead as possible, almost everything about the router should be happening inline instead of asynchronously / via queueing. Virtual function calls are okay, but StateFlowWithQueue operations should be avoided.
Inline calls mean that there is a difference in threading concept: most of the time we use the thread of the caller. When concurrent calls are performed, we have to hold one of those calls until the other is complete.
Upon an entry call (after the admission controller, see later) we want to first check if the router is idle. If yes, we should grab a lock and start processing the message inline. If the router is busy, we should queue the incoming caller. To allow for both of these, the entry call doesn't actually give us a message, we get a callback instead that we'll invoke. The sender renders the actual message in that callback.
After processing the message, the router goes back to idle if the queue of held callers is found to be empty.
If the queue is non-empty, that means that a different thread called the router
while we were sending a message on the current thread. We notice this in the
on_done() method of the service. In this case the router remains busy and the
queue front is taken out for processing. The queue front is always an
Executable and it will be scheduled on the Service's executor (effectively
yielding), while the inline caller's thread gets released.
A consequence is that the caller's send callback may be called either on the caller's thread inline, or on the Service's thread, sometime later after the caller signaled the intention of sending something to the DirectHub.
A special case of this threading model is that when the caller runs on the same executor as the DirectHub, then the actual send callback is guaranteed to happen on that executor. This is the typical case on a single-processor OpenMRN application.
Entry API
The Entry API defines how to send traffic to the DirectHub. It is defined by
DirectHubInterface<T> and MessageAccessor<T> in DirectHub.hxx.
This is an integrated API that will internally consult the admission controller (not implemented, see later). There are three possible outcomes of an entry call:
- admitted and execute inline
- admitted but queued
- not admitted, blocked asynchronously. (this doesn't happen today)
When we queue or block the caller, a requirement is to not block the caller's thread. This is necessary to allow Executors and StateFlows sending traffic to the DirectHub.
When blocked, the best solution is to queue Executables (these are queueable). So we put them into a queue, and we put the next one onto the executor (yield) whenever we're ready, which is typically when the current packet's processing and routing is done.
If we're idle (available) to process the packet upon the entry call, we want to run it inline by calling run() on the Executable from the caller's thread.
In other words, assuming the caller is a StateFlow, the inline execution just
means that we run() the executable instead of notify()'ing it.
The syntax to prepare for both of this from a calling StateFlow (any
StateFlowBase):
Action have_message() {
// Makes the next run() go to fill_request(), but does not put *this onto
// the executor.
wait_and_call(STATE(fill_request));
// Will cause run() to be called now or later.
target->enqueue_send(this);
// Ensures we do not disturn state_ or the notification.
return wait();
}
Action fill_request() {
target->mutable_message()->set_...; // fills message buffer
target->do_send();
// should not be call_immediately() because threading is not certain at this
// point.
return yield_and_call(STATE(something_next));
}
There is a slightly more complicated sequence of states to do if the yield at the end is undesired. The actual implementation of gridconnect / socket read flows use this more complicated mechanism to process multiple gridconnect frames that might have come with a single TCP packet.
Exit API
The Exit API defines how to the DirectHub sends traffic to the ports. It is
defined by DirectHubPort<T> and the same MessageAccessor<T> in
DirectHub.hxx.
Since we are trying to make as much of the DirectHub processing happen inline, the exit API is synchronous. The exit target is responsible for any queueing that needs to happen. This is very much like the current FlowInterface<>.
The exit API does not by definition get a ref of the payload. If they need one,
they should take one inline. However, unlike FlowInterface<T>, this means
that they can not use a Buffer pointer they get from putting it into a
queue. If they need to queue, they have to allocate a new QMember
somewhere. See (note 1) in the introduction on a significant difference that
this makes.
It is guaranteed that there is at least one ref is held during the time of the call, and the caller (the hub) will release that ref sometime after the exit call has returned.
The exit call gets an accessor instead of a sequence of parameters. The memory for the accessor is owned by the hub, and allows the target to inquire the necessary parameters. The accessor is assumed to be available only during the exit call and after the exit call has returned the accessor will be reused for other messages. This is harmonized with the entry API where we are not queueing data but we are queueing data sources, which then fill in the data when we are ready for them to do so.
API:
class DirectHubPort
{
void send(MessageAccessor *message);
};
class MessageAccessor
{
HubSource *source_;
HubSource *dst_;
BarrierNotifiable *done_;
bool isFlush_;
// For string typed hubs we have a BufferPtr<> data_ with a skip_ and size_
// encapsulated in a class:
LinkedDataPtr<uint8_t[]> payload_;
// For arbitrary hubs we have a reference to a buffer:
BufferPtr<T> payload_;
};
An important aspect is that the MessageAccessor is a constant sized object. The real payload is always kept as a reference to a Buffer that was allocated by the sender object. Output ports are allowed / encouraged to hold on to references to this Buffer, which allows the zero-copy operation.
Runner
The hub has at most one current message at any point in time (zero if the hub is idle, one if the hub is busy). This is the message that is being sent by the port that was last executed. The MessageAccessor is owned by the runner, and accessed by the port during the entry API to fill in payload and message parameters, and passed on to the ports as part of the exit API. There is no queue of messages.
The runner is not a classic StateFlow, because of the lack of this queue. The
runner only manages the concurrency and queueing of the senders. After the
designated sender fills in the message in the MessageAccessor, the runner is
informed that it shall process the packet. This happens without yielding, by an
inline call to do_send() on the DirectHubInterface.
Internally, do_send() performs the iteration over output ports, calling all
the exit APIs synchronously. Once this is complete, the message gets cleared,
which releases the leftover reference owned by the DirectHub. Then the service
is informed that it may now look for additional callers
(DirectHubService::on_done()) that may have been enqueued. If none there, the
hub goes idle. For an inline caller, the control returns to the caller, and it
may attempt to send another packet. This allows a single caller to send a
sequence of messages without needing to yield or spin on an executor.
When we give the packet to an output port, that operation should never block the router. We should rather block the incoming port than the router. It's the job of the incoming admission controller to hold back; in the absence of that the limit on the number and byte length of the buffers makes the data source hold back.
Output buffering
For TCP based output ports (both gridconnect-CAN-TCP and native TCP, but not gridconnect-USB) we want to ensure that the number of kernel calls is much less than the number of GridConnect packets that are being sent. This is essential in keeping the costs low, especially that on the CC32xx platform where each kernel call turns effectively into a packet to be sent to the network.
The DirectHub gets one call and one iteration for each GridConnect packet.
The mechanism that the legacy HubDevice infrastructure used is to create a BufferPort, which internally reassembles these packets into larger buffers whenever they come within a certain period of time. This results in data copies unfortunately.
The DirectHub<uint8_t[]> infrastructure appraches this differently. Instead of
copying the input data into a separate buffer, it attempts to recognize when
the input data came from the same source and used consecutive bytes of the same
buffer. This is accomplished by comparing the Buffer references and offset/size
values of consecutive calls (see LinkedDataBufferPtr::try_append_from() in
DataBuffer.hxx). When two packets came from consecutive bytes of a single
input buffer, then the two references are united into a single reference with a
longer size. So long as the calls of the DirectHub are without yield, this
works until the entire input buffer is reassembled into a single output buffer,
which will be then written with a single ::write() call to the socket.
While this mechanism is rather limited, it solves the the high-throughput problem, when an input client is sending a datagram or stream with a large number of CAN frames, a single 1460-byte read succeeds from the input socket, then a sequence of sends happen through the directhub without yielding. On the output there will be one write for almost all of the data, except a partial GridConnect packet which had to be held until the next read.
Since the output object keeps the reference to the input buffer, the input port's read flow will not observe the memory released until the output write has completed. Since the input port has a limited number of such buffers, this creates effective back-pressure on the input port not reading too much data into memory.
Message representation for untyped data in transit
See (note 1) in the introduction for background about the difference between reference counted objects and queueable objects (QMembers). Specifically, it is important to separate the queuing concept from the shared ownership of the data payload. This is because the same data payload might be owned by multiple output ports, but if the queue next pointer is tied to the buffer, then we cannot effectively share.
Generally, all data during transit is represented in BufferPtr<T>. This is a reference to an input buffer, but is not queueable. DirectHub does not use data queues internally, so that's OK.
For untyped / string data, we need to ensure that we keep the length of the contents as well. However, we don't generally know the length of the contents until the read has happened and we have processed the incoming data on the source port.
To avoid having to copy data, we perform a single longer read into a large buffer (typically 1460 bytes, the size of a TCP frame), then we segment this into individual messages. Each such message will have a reference to the longer buffer, and an offset and a length attribute (called skip_ and size_).
A particular case to be handled is when one message spans reaches beyond the end of one such buffer and into the beginning of the next buffer. It could also happen that a message is longer than 1460 bytes.
For this purpose we keep BufferBase objects linked to each other using the
next_ pointers. The queue created by the next_ pointers means that the data
payload continues in the next buffer. This is different from the
StateFlowWithQueue infrastructure, and generally the Q ans QList classes,
where the next_ pointer means that there is another data item (a different
message) waiting to be processed by the same StateFlow later.
The implementation of this mechanism is in LinkedDataBufferPtr in
utils/DataBuffer.hxx.
Some earlier notes:
BufferBase has the ability to do refcounting and shared ownership. It is possible to have a BufferBase that has untyped payload (i.e., just bytes). However, the BufferBase needs to know the amount of bytes as input; we cannot trim down the actual bytes read from the BufferBase's size field, or else we lose memory because after freeing the buffer will not be returned to the right size. An alternative possibility is to have a buffer pool that generates a single size buffer so everything is returned to the same queue. Then size can be adjusted to the exact number of bytes read. This might utilize a proxy buffer pool that takes buffer of a given size from the main buffer pool and then returns them there upon freeing, resetting the size to have them land in the appropriate bucket.
As an alternative, shared_ptr<string> is a standard template library solution
to the same problem. However, shared_ptr<string> causes two memory
allocations for payloads that are longer than 16 bytes, and it has a minimum of
36 bytes length (+payload length+4 if more than 16 bytes).
Note that the input message could be split between two shared buffer ownerships. This means that the queue entry needs two buffer pointers, an offset, and a length. We could use the buffer base next pointers to string up buffers that have data from the same message, even if it's more than two. This way we only need one buffer pointer. We have to assume that the respective bytes always go together.
It might make sense to support appending another message to the end of the buffers. This be especially true if the last buffer pointer is just partially used up, and thus the bufferptr at the end of the string of buffers is the same as the incoming next buffer.
Input segmentation
When data arrives from the socket to be read, we will allocate a shareable
buffer, then execute the asynchronous read. As the read completes, the input
data will be passed on to the segmenter. The goal of the segmenter is to find
the boundary of the message, for gridconnect the : ... ; delimiter, and on
native OpenLCB-TCP the binary length of the message. Then the message can be
passed on to routing.
It is possible that during segmentation we start with one ref of a buffer, and output two independent refs of the same buffer. This happens if a single kernel read ends up with more than one message, which is rather typical in GridConnect-TCP, but possibly even in GridConnect-USB.
It is also possible that the segmenter will retain a ref of the last read buffer, waiting for the completion of the message that is present therein.
We must keep reading bytes from the hardware until the segmenter is happy to send at least one packet onwards. Only thereafter should we send the packet (or consult the admission controller). It is essential that a partial packet must never be sent to the hub, because it is not guaranteed that we get the completion of that packet before another port might try to send a different packet. We can not interleave data from different packets, that would be an unparseable outputs.
Implementation note:
There are several things that have to happen on the ingress port, and the order in which we do these matters:
- allocate a BarrierNotificable* for accompanying the buffer.
- allocate a (1460-byte) buffer for the
::readcall. - perform the
::read - call the segmenter (which might result in additional buffers needed and
additional
::readcalls to be made) - (not implemented yet) consult the admission controller on whether we are allowed to send.
- send the message to the hub.
The above list is the current order. There is one suboptimal part, which is that we allocate a buffer earlier than when we know that there is data to read from the fd or socket. We could theoretically wait until the fd is selectable for read, and only then perform the buffer allocation. With the admission controller this will get even more complicated.
Legacy connection
We have two reasons to interact with a legacy CanHub:
- Running an OpenMRN
IfCanand aNoderequires this object to communicate with the external world. - Interacting with a hardware CAN controller via SocketCan or via OpenMRN
native CAN controller drivers can be done via
struct can_frametoday, and the implementation is inHubDeviceSelect<struct can_frame>.
To support these use-cases, there is a legacy bridge, which connects a
GridConnect typed DirectHub to a CanHub. It takes care of parsing the
GridConnect messages in one direction, formatting them in the other direction,
and the bridges the differences between the APIs.
When many CAN frames are generated consecutively, they typically get rendered into a single text buffer. However, they don't typically get sent off without a yield inbetween.
@balazsracz I observed one odd crash with this hub implementation while using a UWT-50, any ideas?
02:26:35.202: CanDatagramWriteFlow: No datagram response arrived from destination 090099020126.
02:26:35.203: MemoryConfig: Failed to send response datagram. error code 81000
02:26:48.360: heap:116.69kB (max block:30.99kB), PSRAM:1867.40kB (max block:1855.99kB), mainBufferPool:22.05kB, tasks:15
02:27:03.365: heap:116.69kB (max block:30.99kB), PSRAM:1829.58kB (max block:1791.99kB), mainBufferPool:22.05kB, tasks:15
02:27:06.624: Incoming connection from 10.0.0.180, fd 53.
02:27:07.599: 0x3fce1118: Error writing to fd 51: (104) Connection reset by peer
02:27:18.369: heap:114.89kB (max block:30.99kB), PSRAM:1863.57kB (max block:1823.99kB), mainBufferPool:30.41kB, tasks:15
02:27:18.900: 0x3fce3dcc: Error reading from fd 53: (128) Socket is not connected
Assertion failed in file /scratch/trains/OpenMRNIDF/src/executor/AsyncNotifiableBlock.cpp line 62: assert(--max)
assert failed: virtual AsyncNotifiableBlock::~AsyncNotifiableBlock() AsyncNotifiableBlock.cpp:62 (0)
Backtrace: 0x4037a2d6:0x3fcdb590 0x40381d15:0x3fcdb5b0 0x4038e721:0x3fcdb5d0 0x42097e5e:0x3fcdb700 0x42091ac3:0x3fcdb730 0x42077d4d:0x3fcdb760 0x42077736:0x3fcdb790 0x42015cef:0x3fcdb7c0
0x4037a2d6: panic_abort at IDF/components/esp_system/panic.c:472
0x40381d15: esp_system_abort at IDF/components/esp_system/port/esp_system_chip.c:93
0x4038e721: __assert_func at IDF/components/newlib/assert.c:81
0x42097e5e: AsyncNotifiableBlock::~AsyncNotifiableBlock() at OpenMRNIDF/src/executor/AsyncNotifiableBlock.cpp:66 (discriminator 1)
0x42091ac3: DirectHubPortSelect::DirectHubReadFlow::~DirectHubReadFlow() at OpenMRNIDF/src/utils/DirectHub.cpp:267 (discriminator 2)
(inlined by) DirectHubPortSelect::~DirectHubPortSelect() at OpenMRNIDF/src/utils/DirectHub.cpp:558 (discriminator 2)
(inlined by) DirectHubPortSelect::~DirectHubPortSelect() at OpenMRNIDF/src/utils/DirectHub.cpp:558 (discriminator 2)
(inlined by) DirectHubPortSelect::flow_exit(bool) at OpenMRNIDF/src/utils/DirectHub.cpp:812 (discriminator 2)
(inlined by) DirectHubPortSelect::flow_exit(bool) at OpenMRNIDF/src/utils/DirectHub.cpp:788 (discriminator 2)
(inlined by) DirectHubPortSelect::write_flow_exit() at OpenMRNIDF/src/utils/DirectHub.cpp:781 (discriminator 2)
(inlined by) DirectHubPortSelect::report_and_exit() at OpenMRNIDF/src/utils/DirectHub.cpp:719 (discriminator 2)
0x42077d4d: StateFlowBase::run() at OpenMRNIDF/src/executor/StateFlow.cpp:63 (discriminator 4)
(inlined by) StateFlowBase::run() at OpenMRNIDF/src/executor/StateFlow.cpp:58 (discriminator 4)
0x42077736: ExecutorBase::entry() at OpenMRNIDF/src/executor/Executor.cpp:324
0x42015cef: OSThread::inherit() at OpenMRNIDF/src/os/OS.hxx:100
(inlined by) Executor<1u>::thread_body() at OpenMRNIDF/src/executor/Executor.hxx:345
The flow was initiated by the UWT-50 sending a request for loco 4014 and it seems to have lost connection or otherwise failed to reply to the SNIP/FDI portion of the assignment. The UWT-50 became unresponsive so I restarted it and then it lost connection (weak signal from AP I suspect, AP is a Comcast/Xfinity device). When it disconnected the cleanup of the DirectHubPortSelect seems to have triggered one too many notifications.
IDF is v5.2.1, OpenMRNIDF is the "directhub" branch which is based on the 5.1.0 branch with this PR integrated for testing.