DotNetty icon indicating copy to clipboard operation
DotNetty copied to clipboard

Problems with MQTT server sending data

Open Holo-k opened this issue 2 years ago • 2 comments

After my device sends some basic environment information of the device to the server, the mqtt server pings the data sent by the client before.

this sample code

 class MqttHandler : SimpleChannelInboundHandler<Packet>
    {
        public MqttHandler() { }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="context"></param>
        /// <param name="msg"></param>
        private async Task ProcessMqttMsgAsync(IChannelHandlerContext context, Packet msg)
        {
            switch (msg.PacketType)
            {
                case PacketType.CONNECT:
                    await context.WriteAndFlushAsync(new ConnAckPacket { ReturnCode = ConnectReturnCode.Accepted, SessionPresent = true });

                    break;

                case PacketType.PUBACK:
                    if (msg is PubAckPacket pubAckPacket)
                    {
                        var msgId = pubAckPacket.PacketId;
                    }
                    break;
                case PacketType.PUBCOMP:
                    if (msg is PubCompPacket pubCompPacket)
                    {
                        var msgId = pubCompPacket.PacketId;
                    }
                    break;
                case PacketType.PUBREC:
                    if (msg is PubRecPacket pubRecPacket)
                    {
                    }
                    break;
                case PacketType.PUBREL:
                    if (msg is PubRelPacket pubRelPacket)
                    {
                    }
                    break;
                case PacketType.SUBSCRIBE:
                    await context.WriteAndFlushAsync(SubAckPacket.InResponseTo(msg as SubscribePacket, QualityOfService.ExactlyOnce));
                    break;
                case PacketType.UNSUBSCRIBE:
                    await context.WriteAndFlushAsync(UnsubAckPacket.InResponseTo(msg as UnsubscribePacket));
                    break;
                case PacketType.PINGREQ:
                    await context.WriteAndFlushAsync(PingRespPacket.Instance);
                    break;
                case PacketType.DISCONNECT:

                    break;
                default:
                    break;
            }
        }

        private async Task ProcessConnectAsync(IChannelHandlerContext context, ConnectPacket connectPacket)
        {
            await context.WriteAndFlushAsync(CreateMqttConnectionAck(ConnectReturnCode.Accepted, connectPacket));
        }

        private ConnAckPacket CreateMqttConnectionAck(ConnectReturnCode returnCode, ConnectPacket connectPacket)
        {
            if (connectPacket == null)
                return null;
            return new ConnAckPacket { ReturnCode = returnCode, SessionPresent = !connectPacket.CleanSession };
        }
        protected override void ChannelRead0(IChannelHandlerContext context, Packet message)
        {
            if (message is PublishPacket publishPacket)
            {                
                Console.WriteLine(publishPacket.Payload.ToString(encoding: Encoding.UTF8));
                return;
            }
            try
            {
                if (message is Packet msg)
                {
                    ProcessMqttMsgAsync(context, msg).GetAwaiter();
                }
                else
                {
                    context.CloseAsync();
                }
            }
            finally
            {

            }
        }
    }

        public static void Main() => RunServerAsync().Wait();
       

        public static async Task RunServerAsync()
        {
            IChannel boundChannel;

            var bossGroup = new MultithreadEventLoopGroup(1);
            var workerGroup = new MultithreadEventLoopGroup();

            try
            {                     

                var bootstrap = new ServerBootstrap();
                bootstrap.Group(bossGroup, workerGroup);

                bootstrap.Channel<TcpServerSocketChannel>();

                bootstrap.Option(ChannelOption.SoBacklog, 100)
                        .Option(ChannelOption.SoKeepalive, false)
                        .ChildHandler(new ActionChannelInitializer<ISocketChannel>(channel =>
                        {
                            IChannelPipeline pipeline = channel.Pipeline;
                            pipeline.AddLast(MqttEncoder.Instance, new MqttDecoder(true, 64 * 1024), new MqttHandler());
                        }
                        ));

                boundChannel = await bootstrap.BindAsync(1883);


                Console.WriteLine("mqtt server is start");

                Console.ReadLine();

            }
            catch(Exception ex)
            {
                Console.WriteLine("mqtt server failue");
            }
            
        }
    }

Holo-k avatar Oct 29 '22 10:10 Holo-k

https://github.com/linfx/MqttFx

MqttFx is a mqtt v3.1.1 client using DotNetty

linfx avatar Nov 23 '22 09:11 linfx

https://github.com/linfx/MqttFx

MqttFx is a mqtt v3.1.1 client using DotNetty

谢谢. 不过我这边是作为mqtt broker server, MqttFx也能作为broker? 之前大佬使用DotNetty作为broker, 好多坑, 除了这个发送的数据会把客户端发过来的一起发过去外, 最麻烦的还是引用次数为0报错的问题, 暂时还没有发现那里出了问题

Holo-k avatar Dec 05 '22 09:12 Holo-k