asyncio
asyncio copied to clipboard
Can't receive replies to multicast UDP packets
It doesn't appear to be possible to receive replies to multicast UDP messages. Server-side multicast does work, but only with a bit of extra config.
Given the following (working) server code, adapted from the UDP Echo example:
import asyncio
import logging
import socket
import struct
BROADCAST_PORT = 1910
BROADCAST_ADDR = "239.255.255.250"
class MulticastServerProtocol:
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
print('Received {!r} from {!r}'.format(data, addr))
data = "I received {!r}".format(data).encode("ascii")
print('Send {!r} to {!r}'.format(data, addr))
self.transport.sendto(data, addr)
loop = asyncio.get_event_loop()
loop.set_debug(True)
logging.basicConfig(level=logging.DEBUG)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('', BROADCAST_PORT))
group = socket.inet_aton(BROADCAST_ADDR)
mreq = struct.pack('4sL', group, socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
listen = loop.create_datagram_endpoint(
MulticastServerProtocol,
sock=sock,
)
transport, protocol = loop.run_until_complete(listen)
loop.run_forever()
loop.close()
...the following non-asyncio client code sends a broadcast packet and correctly receives the responses:
import socket
import struct
import sys
BROADCAST_PORT = 1910
BROADCAST_ADDR = "239.255.255.250"
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(3)
ttl = struct.pack('b', 1)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl)
try:
sent = sock.sendto(
sys.argv[1].encode("ascii"),
(BROADCAST_ADDR, BROADCAST_PORT)
)
while True:
try:
data, server = sock.recvfrom(1024)
except socket.timeout:
break
else:
print("Reply from {}: {!r}".format(server, data))
finally:
sock.close()
However, the following code, which I'm pretty sure is the asyncio equivalent, sends out the mutlicast packet correctly but never receives a response:
import asyncio
import socket
import struct
import sys
BROADCAST_PORT = 1910
BROADCAST_ADDR = "239.255.255.250"
class DiscoveryClientProtocol:
def __init__(self, loop):
self.loop = loop
self.transport = None
def connection_made(self, transport):
self.transport = transport
sock = self.transport.get_extra_info('socket')
sock.settimeout(3)
ttl = struct.pack('b', 1)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl)
self.transport.sendto(sys.argv[1].encode("ascii"))
def datagram_received(self, data, addr):
print("Reply from {}: {!r}".format(addr, data))
# Don't close the socket as we might get multiple responses.
def error_received(self, exc):
print('Error received:', exc)
def connection_lost(self, exc):
print("Socket closed, stop the event loop")
self.loop.stop()
loop = asyncio.get_event_loop()
connect = loop.create_datagram_endpoint(
lambda: DiscoveryClientProtocol(loop),
remote_addr=(BROADCAST_ADDR, BROADCAST_PORT),
)
transport, protocol = loop.run_until_complete(connect)
loop.run_forever()
transport.close()
loop.close()
Hi, I haven't looked in depth, but this:
sock.settimeout(3)
puts the socket in blocking mode. You must call sock.settimeout(0)
or sock.set_blocking(False)
.
I get the same effect if I remove that line, though :-( Likewise if I replace it with sock.settimeout(0)
or with sock.setblocking(False)
Any news on this? I seem to be running into the same issue.
I experience the same problem. What I found out, asnycio server works, if there is a non-asyncio server running in parallel ...
I've been able to reproduce your issue using this code.
The problem is related to asyncio connecting to the broadcast address. Quoting this post:
Now, the main problem is that once you connect() a UDP socket, that effectively establishes a one-to-one relationship, such that you can only send out data to that one address, AND you can only receive data from that one address... So, anything that arrives that is NOT from the address you connect()'d to is dropped... And, of course, nothing that arrives will actually appear to be FROM the broadcast address (either the limited or subnet one)... Instead, it'll be from the actual host IP that really sent it...
The following patch fixes the issue:
diff --git a/asyncio/base_events.py b/asyncio/base_events.py
index 0174375..5b1256e 100644
--- a/asyncio/base_events.py
+++ b/asyncio/base_events.py
@@ -828,7 +828,8 @@ def create_datagram_endpoint(self, protocol_factory,
if local_addr:
sock.bind(local_address)
if remote_addr:
- yield from self.sock_connect(sock, remote_address)
+ if not allow_broadcast:
+ yield from self.sock_connect(sock, remote_address)
r_addr = remote_address
except OSError as exc:
if sock is not None:
diff --git a/asyncio/selector_events.py b/asyncio/selector_events.py
index ed2b4d7..02082c8 100644
--- a/asyncio/selector_events.py
+++ b/asyncio/selector_events.py
@@ -1044,7 +1044,7 @@ def sendto(self, data, addr=None):
# Attempt to send it right away first.
try:
if self._address:
- self._sock.send(data)
+ self._sock.sendto(data, self._address)
else:
self._sock.sendto(data, addr)
return
There might be a better way though.
I am not sure the current status of this discussion, but for the sake of those stumbling upon this thread...
asyncio can handle this. Just create your own socket and pass it on to created_datagram_endpoint, asyncio will leave your socket alone. Also you must specify address and port in self.transport.sendto. Here is the client (slightly modified to support IPv6 multicast, just in case someone is interrested)
import asyncio
import socket
import struct
import sys
BROADCAST_PORT = 1910
BROADCAST_ADDR = "239.255.255.250"
#BROADCAST_ADDR = "ff0e::10"
class DiscoveryClientProtocol:
def __init__(self, loop, addr):
self.loop = loop
self.transport = None
self.addr = addr
def connection_made(self, transport):
self.transport = transport
sock = self.transport.get_extra_info('socket')
sock.settimeout(3)
addrinfo = socket.getaddrinfo(self.addr, None)[0]
if addrinfo[0] == socket.AF_INET: # IPv4
ttl = struct.pack('@i', 1)
sock.setsockopt(socket.IPPROTO_IP,
socket.IP_MULTICAST_TTL, ttl)
else:
ttl = struct.pack('@i', 2)
sock.setsockopt(socket.IPPROTO_IPV6,
socket.IPV6_MULTICAST_HOPS, ttl)
self.transport.sendto(sys.argv[1].encode("ascii"), (self.addr,BROADCAST_PORT))
def datagram_received(self, data, addr):
print("Reply from {}: {!r}".format(addr, data))
# Don't close the socket as we might get multiple responses.
def error_received(self, exc):
print('Error received:', exc)
def connection_lost(self, exc):
print("Socket closed, stop the event loop")
self.loop.stop()
loop = asyncio.get_event_loop()
addrinfo = socket.getaddrinfo(BROADCAST_ADDR, None)[0]
sock = socket.socket(addrinfo[0], socket.SOCK_DGRAM)
connect = loop.create_datagram_endpoint(
lambda: DiscoveryClientProtocol(loop,BROADCAST_ADDR),
sock=sock,
)
transport, protocol = loop.run_until_complete(connect)
loop.run_forever()
transport.close()
loop.close()
So as to cover the IPv6 bits, here is the server side
import asyncio
import logging
import socket
import struct
BROADCAST_PORT = 1910
BROADCAST_ADDR = "239.255.255.250"
#BROADCAST_ADDR = "ff0e::10"
class MulticastServerProtocol:
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
print('Received {!r} from {!r}'.format(data, addr))
data = "I received {!r}".format(data).encode("ascii")
print('Send {!r} to {!r}'.format(data, addr))
self.transport.sendto(data, addr)
loop = asyncio.get_event_loop()
loop.set_debug(True)
logging.basicConfig(level=logging.DEBUG)
addrinfo = socket.getaddrinfo(BROADCAST_ADDR, None)[0]
sock = socket.socket(addrinfo[0], socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
group_bin = socket.inet_pton(addrinfo[0], addrinfo[4][0])
if addrinfo[0] == socket.AF_INET: # IPv4
sock.bind(('', BROADCAST_PORT))
mreq = group_bin + struct.pack('=I', socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
else:
sock.bind(('', BROADCAST_PORT))
mreq = group_bin + struct.pack('@I', 0)
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq)
listen = loop.create_datagram_endpoint(
MulticastServerProtocol,
sock=sock,
)
transport, protocol = loop.run_until_complete(listen)
loop.run_forever()
loop.close()
Me think this issue is a non issue and therefore should be closed