DotNetty icon indicating copy to clipboard operation
DotNetty copied to clipboard

DatagramSocketChannel to provide a channel for each remote address

Open pandaGaume opened this issue 6 years ago • 9 comments

Hi All, I'm working on a CoAP Udp Reliability Handler. The DatagramSocketChannel is NOT providing a Context for each connection which also mean that it's "single threaded" over ONE EventLoop. What is the actual best options to implements a custom behavior to emulate a "per connection thread safe" channel as TCPSocketChannel work (wihtout reinventing the wheel :-))? ps : such question has already beeing asked long time ago on Netty [netty/netty/issues/344]

pandaGaume avatar Jan 10 '19 10:01 pandaGaume

I all, Seems my issue is not very popular... anyway, i continue to investigate and add idea to submit.

if we change a bit the SocketDatagramChannel (or override), adding an unsafe class to it, we may list the connections in a Dictionary<EndPoint,IChannel>. Could be something like

    public class MultiplexerSocketDatagramChannel : SocketDatagramChannel
    {
        static readonly Action<object, object> WriteInboundCallback = (c,m) => ((IChannel)c).Pipeline.FireChannelRead(m);
        IDictionary<EndPoint, IChannel> connections;

        public MultiplexerSocketDatagramChannel(Socket socket)
            : base(socket)
        {
            connections = new Dictionary<EndPoint, IChannel>();
        }
        protected override IChannelUnsafe NewUnsafe() => new MultiplexerSocketDatagramChannelUnsafe(this);

        sealed class MultiplexerSocketDatagramChannelUnsafe : AbstractSocketUnsafe
        {
            readonly List<object> readBuf = new List<object>();

            public MultiplexerSocketDatagramChannelUnsafe(MultiplexerSocketDatagramChannel channel)
                  : base(channel)
            {
            }

            new MultiplexerSocketDatagramChannel Channel => (MultiplexerSocketDatagramChannel)this.channel;

            public override void FinishRead(SocketChannelAsyncOperation operation)
            {
                Contract.Assert(this.channel.EventLoop.InEventLoop);

                MultiplexerSocketDatagramChannel ch = this.Channel;
                IChannelConfiguration config = ch.Configuration;
                IChannelPipeline pipeline = ch.Pipeline;
                IRecvByteBufAllocatorHandle allocHandle = ch.Unsafe.RecvBufAllocHandle;
                allocHandle.Reset(config);

                bool closed = false;
                Exception exception = null;
                try
                {
                    try
                    {
                        do
                        {
                            int localRead = ch.DoReadMessages(this.readBuf);
                            if (localRead == 0)
                            {
                                break;
                            }
                            if (localRead < 0)
                            {
                                closed = true;
                                break;
                            }
                            allocHandle.IncMessagesRead(localRead);

                        } while (allocHandle.ContinueReading());

                    }
                    catch (Exception e)
                    {
                        exception = e;
                    }

                    int size = this.readBuf.Count;
                    for (int i = 0; i < size; i++)
                    {
                        if( this.readBuf[i] is DatagramPacket packet)
                        {
                            IChannel channel;

                            if (!ch.connections.TryGetValue(packet.Sender,out channel))
                            {
                                // this is where we prepare the channel
                                channel = this.PrepareChannel(packet.Sender);
                                
                                // this will call the initializer
                                pipeline.FireChannelRead(channel);

                                // then we register the channel
                                ch.connections.Add(packet.Sender, channel);
                            }

                            if ( channel != null)
                            {
                                IEventLoop eventLoop = channel.EventLoop;
                                eventLoop.Execute(WriteInboundCallback, channel, packet);
                            }
                        }
                    }
                    this.readBuf.Clear();
                    allocHandle.ReadComplete();
                    pipeline.FireChannelReadComplete();
                }
                finally
                {
                }
            }

            IChannel PrepareChannel(EndPoint sender)
            {
            }
        }
    }`

pandaGaume avatar Mar 08 '19 15:03 pandaGaume

What is meant by PrepareChannel?

iamkisly avatar Apr 27 '19 21:04 iamkisly

Preparing the channel is setting up the pipeline, context, handlers and so on for each "connection". This is what a TCP connection does when receive a new incoming connection.

pandaGaume avatar Apr 29 '19 14:04 pandaGaume

Share please

iamkisly avatar Dec 25 '19 13:12 iamkisly

@pandaGaume I am very interested in having this feature. Have you made any progress or have an update for us?

tlf30 avatar May 07 '20 21:05 tlf30

@tlf30 II stop my company to use dotnetty, we move on another pipeline using TPL Dataflow over .net Core. I will have a quick look to see if i keep reference on this work (which we never put on production) and if so, i will let you know.

pandaGaume avatar May 08 '20 09:05 pandaGaume

For an implementation in java: I got this working with no modifications: https://github.com/Shevchik/UdpServerSocketChannel

Then used this for getting object decoding working on the client (only required on client, server object decoding works): https://github.com/apache/camel/blob/master/components/camel-netty/src/main/java/org/apache/camel/component/netty/codec/DatagramPacketObjectDecoder.java

I am doing ObjectEncoding/Decoding with it.

tlf30 avatar May 08 '20 14:05 tlf30

@tlf30 II stop my company to use dotnetty, we move on another pipeline using TPL Dataflow over .net Core. I will have a quick look to see if i keep reference on this work (which we never put on production) and if so, i will let you know.

Stuck on this issue for a while, if you could share the complete code will be appreciated.

sekkit avatar May 13 '20 02:05 sekkit

Netty have EpollDatagramChannel with option SO_REUSEPORT can bind same port more than once , these channels maybe schedule in multi-thread.BUT dotnetty not supported

SunneeYang avatar Apr 20 '22 09:04 SunneeYang