asyncio icon indicating copy to clipboard operation
asyncio copied to clipboard

Can't receive replies to multicast UDP packets

Open gpjt opened this issue 8 years ago • 6 comments

It doesn't appear to be possible to receive replies to multicast UDP messages. Server-side multicast does work, but only with a bit of extra config.

Given the following (working) server code, adapted from the UDP Echo example:

import asyncio
import logging
import socket
import struct

BROADCAST_PORT = 1910
BROADCAST_ADDR = "239.255.255.250"


class MulticastServerProtocol:

    def connection_made(self, transport):
        self.transport = transport


    def datagram_received(self, data, addr):
        print('Received {!r} from {!r}'.format(data, addr))
        data = "I received {!r}".format(data).encode("ascii")
        print('Send {!r} to {!r}'.format(data, addr))
        self.transport.sendto(data, addr)



loop = asyncio.get_event_loop()
loop.set_debug(True)
logging.basicConfig(level=logging.DEBUG)


sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('', BROADCAST_PORT))
group = socket.inet_aton(BROADCAST_ADDR)
mreq = struct.pack('4sL', group, socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)

listen = loop.create_datagram_endpoint(
    MulticastServerProtocol,
    sock=sock,
)
transport, protocol = loop.run_until_complete(listen)

loop.run_forever()
loop.close()

...the following non-asyncio client code sends a broadcast packet and correctly receives the responses:

import socket
import struct
import sys

BROADCAST_PORT = 1910
BROADCAST_ADDR = "239.255.255.250"


sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(3)
ttl = struct.pack('b', 1)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl)

try:
    sent = sock.sendto(
        sys.argv[1].encode("ascii"),
        (BROADCAST_ADDR, BROADCAST_PORT)
    )

    while True:
        try:
            data, server = sock.recvfrom(1024)
        except socket.timeout:
            break
        else:
            print("Reply from {}: {!r}".format(server, data))

finally:
    sock.close()

However, the following code, which I'm pretty sure is the asyncio equivalent, sends out the mutlicast packet correctly but never receives a response:

import asyncio
import socket
import struct
import sys

BROADCAST_PORT = 1910
BROADCAST_ADDR = "239.255.255.250"



class DiscoveryClientProtocol:
    def __init__(self, loop):
        self.loop = loop
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport
        sock = self.transport.get_extra_info('socket')
        sock.settimeout(3)
        ttl = struct.pack('b', 1)
        sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl)

        self.transport.sendto(sys.argv[1].encode("ascii"))

    def datagram_received(self, data, addr):
        print("Reply from {}: {!r}".format(addr, data))
        # Don't close the socket as we might get multiple responses.

    def error_received(self, exc):
        print('Error received:', exc)

    def connection_lost(self, exc):
        print("Socket closed, stop the event loop")
        self.loop.stop()


loop = asyncio.get_event_loop()
connect = loop.create_datagram_endpoint(
    lambda: DiscoveryClientProtocol(loop),
    remote_addr=(BROADCAST_ADDR, BROADCAST_PORT),
)
transport, protocol = loop.run_until_complete(connect)
loop.run_forever()
transport.close()
loop.close()

gpjt avatar Jan 04 '17 16:01 gpjt

Hi, I haven't looked in depth, but this:

    sock.settimeout(3)

puts the socket in blocking mode. You must call sock.settimeout(0) or sock.set_blocking(False).

Martiusweb avatar Jan 05 '17 12:01 Martiusweb

I get the same effect if I remove that line, though :-( Likewise if I replace it with sock.settimeout(0) or with sock.setblocking(False)

gpjt avatar Jan 05 '17 12:01 gpjt

Any news on this? I seem to be running into the same issue.

jnises avatar Jan 25 '17 10:01 jnises

I experience the same problem. What I found out, asnycio server works, if there is a non-asyncio server running in parallel ...

Julius2342 avatar Jan 28 '17 16:01 Julius2342

I've been able to reproduce your issue using this code.

The problem is related to asyncio connecting to the broadcast address. Quoting this post:

Now, the main problem is that once you connect() a UDP socket, that effectively establishes a one-to-one relationship, such that you can only send out data to that one address, AND you can only receive data from that one address... So, anything that arrives that is NOT from the address you connect()'d to is dropped... And, of course, nothing that arrives will actually appear to be FROM the broadcast address (either the limited or subnet one)... Instead, it'll be from the actual host IP that really sent it...

The following patch fixes the issue:

diff --git a/asyncio/base_events.py b/asyncio/base_events.py
index 0174375..5b1256e 100644
--- a/asyncio/base_events.py
+++ b/asyncio/base_events.py
@@ -828,7 +828,8 @@ def create_datagram_endpoint(self, protocol_factory,
                     if local_addr:
                         sock.bind(local_address)
                     if remote_addr:
-                        yield from self.sock_connect(sock, remote_address)
+                        if not allow_broadcast:
+                            yield from self.sock_connect(sock, remote_address)
                         r_addr = remote_address
                 except OSError as exc:
                     if sock is not None:
diff --git a/asyncio/selector_events.py b/asyncio/selector_events.py
index ed2b4d7..02082c8 100644
--- a/asyncio/selector_events.py
+++ b/asyncio/selector_events.py
@@ -1044,7 +1044,7 @@ def sendto(self, data, addr=None):
             # Attempt to send it right away first.
             try:
                 if self._address:
-                    self._sock.send(data)
+                    self._sock.sendto(data, self._address)
                 else:
                     self._sock.sendto(data, addr)
                 return

There might be a better way though.

vxgmichel avatar Feb 09 '17 16:02 vxgmichel

I am not sure the current status of this discussion, but for the sake of those stumbling upon this thread...

asyncio can handle this. Just create your own socket and pass it on to created_datagram_endpoint, asyncio will leave your socket alone. Also you must specify address and port in self.transport.sendto. Here is the client (slightly modified to support IPv6 multicast, just in case someone is interrested)

import asyncio
import socket
import struct
import sys

BROADCAST_PORT = 1910
BROADCAST_ADDR = "239.255.255.250"
#BROADCAST_ADDR = "ff0e::10"

class DiscoveryClientProtocol:
    def __init__(self, loop, addr):
        self.loop = loop
        self.transport = None
        self.addr = addr

    def connection_made(self, transport):
        self.transport = transport
        sock = self.transport.get_extra_info('socket')
        sock.settimeout(3)
        addrinfo = socket.getaddrinfo(self.addr, None)[0]
        if addrinfo[0] == socket.AF_INET: # IPv4
            ttl = struct.pack('@i', 1)
            sock.setsockopt(socket.IPPROTO_IP, 
                socket.IP_MULTICAST_TTL, ttl)
        else:
            ttl = struct.pack('@i', 2)
            sock.setsockopt(socket.IPPROTO_IPV6, 
                socket.IPV6_MULTICAST_HOPS, ttl)

        self.transport.sendto(sys.argv[1].encode("ascii"), (self.addr,BROADCAST_PORT))

    def datagram_received(self, data, addr):
        print("Reply from {}: {!r}".format(addr, data))
        # Don't close the socket as we might get multiple responses.

    def error_received(self, exc):
        print('Error received:', exc)

    def connection_lost(self, exc):
        print("Socket closed, stop the event loop")
        self.loop.stop()


loop = asyncio.get_event_loop()

addrinfo = socket.getaddrinfo(BROADCAST_ADDR, None)[0]
sock = socket.socket(addrinfo[0], socket.SOCK_DGRAM)
connect = loop.create_datagram_endpoint(
    lambda: DiscoveryClientProtocol(loop,BROADCAST_ADDR),
    sock=sock,
)
transport, protocol = loop.run_until_complete(connect)
loop.run_forever()
transport.close()
loop.close()

So as to cover the IPv6 bits, here is the server side

import asyncio
import logging
import socket
import struct

BROADCAST_PORT = 1910
BROADCAST_ADDR = "239.255.255.250"
#BROADCAST_ADDR = "ff0e::10"

class MulticastServerProtocol:

    def connection_made(self, transport):
        self.transport = transport


    def datagram_received(self, data, addr):
        print('Received {!r} from {!r}'.format(data, addr))
        data = "I received {!r}".format(data).encode("ascii")
        print('Send {!r} to {!r}'.format(data, addr))
        self.transport.sendto(data, addr)

loop = asyncio.get_event_loop()
loop.set_debug(True)
logging.basicConfig(level=logging.DEBUG)

addrinfo = socket.getaddrinfo(BROADCAST_ADDR, None)[0]
sock = socket.socket(addrinfo[0], socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
group_bin = socket.inet_pton(addrinfo[0], addrinfo[4][0])
if addrinfo[0] == socket.AF_INET: # IPv4
    sock.bind(('', BROADCAST_PORT))
    mreq = group_bin + struct.pack('=I', socket.INADDR_ANY)
    sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
else:
    sock.bind(('', BROADCAST_PORT))
    mreq = group_bin + struct.pack('@I', 0)
    sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq)



listen = loop.create_datagram_endpoint(
    MulticastServerProtocol,
    sock=sock,
)
transport, protocol = loop.run_until_complete(listen)

loop.run_forever()
loop.close()

Me think this issue is a non issue and therefore should be closed

frawau avatar Aug 23 '17 08:08 frawau