ipmininet
ipmininet copied to clipboard
Is it possible to unbuffer sys.stdin for a node's shell process
Hi,
I'm experiencing a situation with exaBGP daemon in IPMininet. In my IPMininet topology, I've configured one of the nodes with
exaBGP. The exaBGP daemon has two processes announce-routes
and receive-routes
which both have two python scripts sender.py
and receiver.py
respectively. sender.py
announce the routes to the exaBGP peer and receiver.py receives and parses all BGP messages while receiver.py
receives and processes the BGP messages generated.
sender.py
looks somewhat like this
import sys
import time
if __name__ == '__main__':
while True:
# get route
sys.stdout.write('neighbor 172.16.2.20 announce attribute next-hop self med 150 origin incomplete nlri 100.30.0.0/16 100.40.0.0/16'
sys.stdout.flush()
time.sleep(0.1)
receiver.py
looks somewhat like this
import sys
import time
import json
if __name__ == '__main__':
sys.stderr = open('/tmp/exabgp-receiver.log', 'a+')
unbuffered_stdin = os.fdopen(sys.stdin.fileno(), 'r', buffering=1)
while True:
line = unbuffered_stdin.readline().strip()
try:
jsonline = json.loads(line)
except json.decoder.JSONDecodeError as e:
sys.stderr.write("Error\n")
else:
# process and write actual BGP message
sys.stderr.write(str(jsonline) + '\n')
sys.stderr.flush()
If I run the network/topology and I trigger a link failure about four times which causes a bunch of BGP updates and withdrawal, the receiver.py
script hangs and does not process any message again. However, if I run the topology with IPCLI()
and run exabgp from the shell of the host with xterm
, the script does not hang. This makes me think there is something buffering sys.stdin and sys.stdout when the topology is not run in interactive CLI mode. I was wondering if there is a way to turn off buffering or get around this ? Or if this has been encountered before ?
IPMininet version is 0.9, python version 3.6
Hello,
This makes me think there is something buffering sys.stdin and sys.stdout when the topology is not run in interactive CLI mode
Not that I know of and, in any case, a buffering is not eternal, the buffer would eventually be flushed
Would you mind giving the script that you use to start the topology ? With or without IPCLI
Does the command that is generated by IPMininet (you can get it with ps -aux | grep <exec name>
) and the command that you run in the shell are identical ? Do the configurations (IPMininet generates it in /tmp
) are identical as well ?
Also, did you make sure that BGP updates/withdrawals do arrive to exaBGP when not launched in IPCLI ? This might be a connectivity issue
Here is an example of my topology script. I configure the topology and the nodes and start an application that talks to ExaBGP. Once the nodes are configured, I trigger a link failure and restore the link a number of times. This causes a large number of BGP update messages to be generated and sent to ExaBGP. What's interesting is that, for the first three link failure and recovery events, ExaBGP is fine but after the fourth one, ExaBGP just hangs and does not process any messages again. When I run the topology script with IPCLI
and then run Exabgp from the shell of the host in xterm
, I can trigger a link failure and restore the link as many times as I want but ExaBGP does not stop running and does not hang.
I use the configurations generated by IPMininet to run the ExaBGP from the shell to the best of my knowledge.
from ipmininet.iptopo import IPTopo
from ipmininet.cli import IPCLI
from ipmininet.ipnet import IPNet
from ipmininet.router.config.ospf import OSPFRedistributedRoute
from ipmininet.srv6 import enable_srv6
from ipmininet.router.config import BGP, ebgp_session, set_rr, AccessList, \
AF_INET6, AF_INET, BorderRouterConfig, RouterConfig, OSPF, OSPF6, \
bgp_peering, ExaBGPDaemon, STATIC, StaticRoute, CLIENT_PROVIDER, SHARE
from ipmininet.link import IPLink
class TestTopo(IPTopo):
def build(self, *args, **kwargs):
# Add all routers
GsASr1 = self.bgp('GsASr1')
Tp1ASr1 = self.bgp('Tp1ASr1')
Tp1ASr2 = self.bgp('Tp1ASr2')
Tp1ASr3 = self.bgp('Tp1ASr3')
Tp2ASr1 = self.bgp('Tp2ASr1')
Tp2ASr2 = self.bgp('Tp2ASr2')
Tp3ASr1 = self.bgp('Tp3ASr1')
Tp3ASr2 = self.bgp('Tp3ASr2')
Tp4ASr1 = self.bgp('Tp4ASr1')
Tp5ASr1 = self.bgp('Tp5ASr1')
AS1R1 = self.bgp('AS1R1')
AS2R1 = self.bgp('AS2R1')
AS3R1 = self.bgp('AS3R1')
AS4R1 = self.bgp('AS4R1')
AS5R1 = self.bgp('AS5R1')
AS6R1 = self.bgp('AS6R1')
AS7R1 = self.bgp('AS7R1')
AS8R1 = self.bgp('AS8R1')
AS9R1 = self.bgp('AS9R1')
AS10R1 = self.bgp('AS10R1')
Sw1Tp1 = self.addSwitch('Sw1Tp1')
Sw2Tp2 = self.addSwitch('Sw2Tp2')
Sw3Tp3 = self.addSwitch('Sw3Tp3')
Tp1ASr1Sw1 = self.addLink(Tp1ASr1, Sw1Tp1)
Tp1ASr1Sw1[Tp1ASr1].addParams(ip=("100::1/48",))
Tp1ASr2Sw1 = self.addLink(Tp1ASr2, Sw1Tp1)
Tp1ASr2Sw1[Tp1ASr2].addParams(ip=("100::2/48",))
Tp1ASr3Sw1 = self.addLink(Tp1ASr3, Sw1Tp1)
Tp1ASr3Sw1[Tp1ASr3].addParams(ip=("100::3/48",))
# Add controller and ExaBGP speaker node
Tp1ASctlr = self.addRouter("Tp1ASctlr", config=RouterConfig)
Tp1ASctlrSw1 = self.addLink(Tp1ASctlr, Sw1Tp1)
Tp1ASctlrSw1[Tp1ASctlr].addParams(ip=("100::4/48",))
Tp1ASctlr.addDaemon(ExaBGPDaemon, env = { 'api' : {'cli':'true', 'encoder':'json',
'ack':'true', 'pipename':'\'exabgp\'',
'respawn':'true','chunk':1,
'terminate':'false'},
'bgp' : {'openwait' : 60},
'cache': {'attributes':'true', 'nexthops':'true'},
'daemon': {'daemonize':'false', 'drop':'true',
'pid': '\'\'', 'umask':'\'0o137\'',
'user':'nobody'},
'log': {'all':'true','configuration':'true','daemon':'true',
'message':'true','destination':'stdout',
'enable':'true','level':'INFO','network':'true',
'packets':'false','parser':'true',
'processes':'true','reactor':'true',
'rib':'false','routes':'true','short':'false',
'timers':'false'},
'pdb': {'enable':'false'},
'profile': { 'enable':'false', 'file':'\'\''},
'reactor': {'speed':'1.0'},
'tcp': {'acl':'false', 'bind':'', 'delay':0,
'once':'false', 'port': 179}
}, passive=False )
lTp1Tp2 = self.addLink(Tp1ASr2, Tp2ASr1)
lTp1Tp2[Tp1ASr2].addParams(ip=("1002::100/48",))
lTp1Tp2[Tp2ASr1].addParams(ip=("1002::200/48",))
Tp2ASr1Sw2 = self.addLink(Tp2ASr1, Sw2Tp2)
Tp2ASr1Sw2[Tp2ASr1].addParams(ip=("200::1/48",))
Tp2ASr2Sw2 = self.addLink(Tp2ASr2, Sw2Tp2)
Tp2ASr2Sw2[Tp2ASr2].addParams(ip=("200::2/48",))
lTp1Tp3 = self.addLink(Tp1ASr3, Tp3ASr1)
lTp1Tp3[Tp1ASr3].addParams(ip=("1003::100/48",))
lTp1Tp3[Tp3ASr1].addParams(ip=("1003::300/48",))
Tp3ASr1Sw3 = self.addLink(Tp3ASr1, Sw3Tp3)
Tp3ASr1Sw3[Tp3ASr1].addParams(ip=("300::1/48",))
Tp3ASr2Sw3 = self.addLink(Tp3ASr2, Sw3Tp3)
Tp3ASr2Sw3[Tp3ASr2].addParams(ip=("300::2/48",))
Server = self.addHost('Server')
lGsR = self.addLink(Server, GsASr1)
lGsR[Server].addParams(ip=("55::1/48",))
lGsR[GsASr1].addParams(ip=("55::2/48",))
for i in range(1, 11):
exec(f"gCl{i} = self.addHost('gCl{i}')")
exec(f"gClink{i} = self.addLink(AS{i}R1, gCl{i})")
ip = f"2001:df{str(i).zfill(2)}::2/48"
exec(f"gClink{i}[AS{i}R1].addParams(ip=('{ip}',))")
ip = f"2001:df{str(i).zfill(2)}::1/48"
exec(f"gClink{i}[gCl{i}].addParams(ip=('{ip}',))")
self.addLinks((GsASr1, Tp1ASr1), (Tp2ASr2, Tp4ASr1),
(Tp3ASr2, Tp4ASr1),(GsASr1, Tp5ASr1),
(Tp5ASr1, Tp4ASr1))
link_delay = 10
for i in range(1, 11):
link = self.addLink(Tp4ASr1, eval("AS{}R1".format(i)),
delay="{}ms".format(link_delay/2))
link_delay += 0.05
self.addAS(55, (GsASr1,))
self.addAS(100, (Tp1ASr1, Tp1ASr2, Tp1ASr3, Tp1ASctlr))
self.addAS(200, (Tp2ASr1, Tp2ASr2))
self.addAS(300, (Tp3ASr1, Tp3ASr2))
self.addAS(400, (Tp4ASr1,))
self.addAS(500, (Tp5ASr1,))
for i in range(1, 51):
exec(f"self.addAS(i, (AS{i}R1,))")
bgp_peering(self, Tp1ASr1, Tp1ASctlr)
bgp_peering(self, Tp1ASr2, Tp1ASctlr)
bgp_peering(self, Tp1ASr3, Tp1ASctlr)
bgp_peering(self, Tp2ASr1, Tp2ASr2)
bgp_peering(self, Tp3ASr1, Tp3ASr2)
# Set ACL and prefer one path over the other
acl4 = AccessList(name='all', entries=('any',), family='ipv4')
acl = AccessList(name='all6', entries=('any',), family='ipv6')
Tp1ASr3.get_config(BGP).set_local_pref(150, from_peer=Tp3ASr1,
matching=(acl4,acl))
Tp1ASr2.get_config(BGP).set_local_pref(450, from_peer=Tp2ASr1,
matching=(acl4,acl))
ebgp_session(self, GsASr1, Tp1ASr1, link_type=CLIENT_PROVIDER)
ebgp_session(self, GsASr1, Tp5ASr1, link_type=CLIENT_PROVIDER)
ebgp_session(self, Tp1ASr2, Tp2ASr1)
ebgp_session(self, Tp1ASr3, Tp3ASr1)
# Prefer return path from clients via Tp3 or Tp2
Tp4ASr1.get_config(BGP).set_local_pref(100, from_peer=Tp2ASr2,
matching=(acl4,acl))
Tp4ASr1.get_config(BGP).set_local_pref(100, from_peer=Tp3ASr2,
matching=(acl4,acl))
ebgp_session(self, Tp4ASr1, Tp2ASr2)
ebgp_session(self, Tp4ASr1, Tp3ASr2)
Tp5ASr1.get_config(BGP).deny(to_peer=GsASr1,
matching=(acl4,acl))
ebgp_session(self, Tp4ASr1, Tp5ASr1)
for i in range(1, 51):
exec(f"ebgp_session(self, AS{i}R1, Tp4ASr1, link_type=CLIENT_PROVIDER)")
super().build(*args, **kwargs)
def post_build(self, net):
for n in net.hosts + net.routers:
enable_srv6(n)
super().post_build(net)
def bgp(self, name):
r = self.addRouter(name, config=RouterConfig)
r.addDaemon(BGP, address_families=(
AF_INET(redistribute=('connected',)),
AF_INET6(redistribute=('connected',))))
return r
class PARNet(IPNet):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def modifyLink(self, node1, node2, delay="2ms", bw=None, max_queue_size=None, **opts):
src_params = opts.get("params1", {})
dst_params = opts.get("params2", {})
src_delay = src_params.get("delay")
src_loss = src_params.get("loss")
src_max_queue = src_params.get("max_queue_size")
dst_delay = dst_params.get("delay")
dst_loss = dst_params.get("loss")
dst_max_queue = dst_params.get("max_queue_size")
for sw in self.switches:
src_link = node2.connectionsTo(sw)
dst_link = node1.connectionsTo(sw)
if src_link and dst_link:
break
src_int, _ = src_link[0]
dst_int, _ = dst_link[0]
src_delay = src_delay or delay
src_loss = src_loss or 0
src_int.config(delay=src_delay, max_queue_size=src_max_queue, loss=src_loss)
dst_int.config(delay=dst_delay, max_queue_size= src_max_queue, loss=dst_loss)
if __name__ == '__main__':
net = PARNet(topo=TestTopo(), use_v4=False)
try:
net.start()
tp2_delay = 35.0
tp3_delay = 45.0
net.modifyLink(net["Tp2ASr1"], net["Tp2ASr2"],
params1={"delay": "{}ms".format(tp2_delay)},
params2={"delay": "{}ms".format(tp2_delay)})
net.modifyLink(net["Tp3ASr1"], net["Tp3ASr2"],
params1={"delay": "{}ms".format(tp3_delay)},
params2={"delay": "{}ms".format(tp3_delay)})
# Start a controller application that talks to ExaBGP here.
net["Tp1ASctlr"].cmd("python Controller.py /home/ubuntu/config.yaml &> controller.log &")
time.sleep(60)
fail_count = 0
while (time.time() - start_time) < 950:
time.sleep(120)
if fail_count < 1:
fail_links = [("Tp3ASr2", "Tp4ASr1")]
link_down = net.runFailurePlan(fail_links)
time.sleep(100)
net.restoreIntfs(link_down)
fail_count += 1
print("Fail count is %s" %fail_count)
time.sleep(60)
print("%s: End Experiment" % (str(datetime.now())))
finally:
net.stop()