ipmininet icon indicating copy to clipboard operation
ipmininet copied to clipboard

Is it possible to unbuffer sys.stdin for a node's shell process

Open olafayomi opened this issue 1 year ago • 2 comments

Hi, I'm experiencing a situation with exaBGP daemon in IPMininet. In my IPMininet topology, I've configured one of the nodes with exaBGP. The exaBGP daemon has two processes announce-routes and receive-routes which both have two python scripts sender.py and receiver.py respectively. sender.py announce the routes to the exaBGP peer and receiver.py receives and parses all BGP messages while receiver.py receives and processes the BGP messages generated.

sender.py looks somewhat like this

import sys
import time

if __name__ == '__main__':
     while True:
            # get route 
            sys.stdout.write('neighbor 172.16.2.20 announce attribute next-hop self med 150 origin incomplete nlri 100.30.0.0/16 100.40.0.0/16'
            sys.stdout.flush()
            time.sleep(0.1)

receiver.py looks somewhat like this

import sys
import time
import json

 if __name__ == '__main__':
     sys.stderr = open('/tmp/exabgp-receiver.log', 'a+') 
     unbuffered_stdin = os.fdopen(sys.stdin.fileno(), 'r', buffering=1)
     while True:
           line = unbuffered_stdin.readline().strip()
           try:
                 jsonline = json.loads(line)
           except json.decoder.JSONDecodeError as e:
                sys.stderr.write("Error\n")
           else:
                  # process and write actual BGP message
                  sys.stderr.write(str(jsonline) + '\n')
           sys.stderr.flush()

If I run the network/topology and I trigger a link failure about four times which causes a bunch of BGP updates and withdrawal, the receiver.py script hangs and does not process any message again. However, if I run the topology with IPCLI() and run exabgp from the shell of the host with xterm, the script does not hang. This makes me think there is something buffering sys.stdin and sys.stdout when the topology is not run in interactive CLI mode. I was wondering if there is a way to turn off buffering or get around this ? Or if this has been encountered before ?

IPMininet version is 0.9, python version 3.6

olafayomi avatar May 19 '23 07:05 olafayomi

Hello,

This makes me think there is something buffering sys.stdin and sys.stdout when the topology is not run in interactive CLI mode

Not that I know of and, in any case, a buffering is not eternal, the buffer would eventually be flushed

Would you mind giving the script that you use to start the topology ? With or without IPCLI Does the command that is generated by IPMininet (you can get it with ps -aux | grep <exec name>) and the command that you run in the shell are identical ? Do the configurations (IPMininet generates it in /tmp) are identical as well ?

Also, did you make sure that BGP updates/withdrawals do arrive to exaBGP when not launched in IPCLI ? This might be a connectivity issue

jadinm avatar May 22 '23 08:05 jadinm

Here is an example of my topology script. I configure the topology and the nodes and start an application that talks to ExaBGP. Once the nodes are configured, I trigger a link failure and restore the link a number of times. This causes a large number of BGP update messages to be generated and sent to ExaBGP. What's interesting is that, for the first three link failure and recovery events, ExaBGP is fine but after the fourth one, ExaBGP just hangs and does not process any messages again. When I run the topology script with IPCLI and then run Exabgp from the shell of the host in xterm, I can trigger a link failure and restore the link as many times as I want but ExaBGP does not stop running and does not hang. I use the configurations generated by IPMininet to run the ExaBGP from the shell to the best of my knowledge.

from ipmininet.iptopo import IPTopo
from ipmininet.cli import IPCLI
from ipmininet.ipnet import IPNet
from ipmininet.router.config.ospf import OSPFRedistributedRoute
from ipmininet.srv6 import enable_srv6
from ipmininet.router.config import BGP, ebgp_session, set_rr, AccessList, \
     AF_INET6, AF_INET, BorderRouterConfig, RouterConfig, OSPF, OSPF6, \
     bgp_peering, ExaBGPDaemon, STATIC, StaticRoute, CLIENT_PROVIDER, SHARE
from ipmininet.link import IPLink


class TestTopo(IPTopo):
  
    def build(self, *args, **kwargs):
        # Add all routers

        GsASr1 = self.bgp('GsASr1')
        Tp1ASr1 = self.bgp('Tp1ASr1')
        Tp1ASr2 = self.bgp('Tp1ASr2')
        Tp1ASr3 = self.bgp('Tp1ASr3')
        Tp2ASr1 = self.bgp('Tp2ASr1')
        Tp2ASr2 = self.bgp('Tp2ASr2')
        Tp3ASr1 = self.bgp('Tp3ASr1')
        Tp3ASr2 = self.bgp('Tp3ASr2')
        Tp4ASr1 = self.bgp('Tp4ASr1')
        Tp5ASr1 = self.bgp('Tp5ASr1')

        AS1R1 = self.bgp('AS1R1')
        AS2R1 = self.bgp('AS2R1')
        AS3R1 = self.bgp('AS3R1')
        AS4R1 = self.bgp('AS4R1')
        AS5R1 = self.bgp('AS5R1')
        AS6R1 = self.bgp('AS6R1')
        AS7R1 = self.bgp('AS7R1')
        AS8R1 = self.bgp('AS8R1')
        AS9R1 = self.bgp('AS9R1')
        AS10R1 = self.bgp('AS10R1')
       

        Sw1Tp1 = self.addSwitch('Sw1Tp1')
        Sw2Tp2 = self.addSwitch('Sw2Tp2')
        Sw3Tp3 = self.addSwitch('Sw3Tp3')
  
        Tp1ASr1Sw1 = self.addLink(Tp1ASr1, Sw1Tp1)
        Tp1ASr1Sw1[Tp1ASr1].addParams(ip=("100::1/48",))
        Tp1ASr2Sw1 = self.addLink(Tp1ASr2, Sw1Tp1)
        Tp1ASr2Sw1[Tp1ASr2].addParams(ip=("100::2/48",))
        Tp1ASr3Sw1 = self.addLink(Tp1ASr3, Sw1Tp1)
        Tp1ASr3Sw1[Tp1ASr3].addParams(ip=("100::3/48",))

        # Add controller and ExaBGP speaker node
        Tp1ASctlr = self.addRouter("Tp1ASctlr", config=RouterConfig)
        Tp1ASctlrSw1 = self.addLink(Tp1ASctlr, Sw1Tp1)
        Tp1ASctlrSw1[Tp1ASctlr].addParams(ip=("100::4/48",))
        Tp1ASctlr.addDaemon(ExaBGPDaemon, env = { 'api' : {'cli':'true', 'encoder':'json',
                                                       'ack':'true', 'pipename':'\'exabgp\'',
                                                       'respawn':'true','chunk':1,
                                                       'terminate':'false'},
                                              'bgp' : {'openwait' : 60},
                                              'cache': {'attributes':'true', 'nexthops':'true'},
                                              'daemon': {'daemonize':'false', 'drop':'true', 
                                                         'pid': '\'\'', 'umask':'\'0o137\'', 
                                                         'user':'nobody'},
                                              'log': {'all':'true','configuration':'true','daemon':'true',
                                                      'message':'true','destination':'stdout',
                                                      'enable':'true','level':'INFO','network':'true',
                                                      'packets':'false','parser':'true',
                                                      'processes':'true','reactor':'true',
                                                      'rib':'false','routes':'true','short':'false',
                                                      'timers':'false'},
                                              'pdb': {'enable':'false'},
                                              'profile': { 'enable':'false', 'file':'\'\''},
                                              'reactor': {'speed':'1.0'},
                                              'tcp': {'acl':'false', 'bind':'', 'delay':0,
                                                      'once':'false', 'port': 179}
                                            }, passive=False )

        lTp1Tp2 = self.addLink(Tp1ASr2, Tp2ASr1)
        lTp1Tp2[Tp1ASr2].addParams(ip=("1002::100/48",))
        lTp1Tp2[Tp2ASr1].addParams(ip=("1002::200/48",))

        Tp2ASr1Sw2 = self.addLink(Tp2ASr1, Sw2Tp2)
        Tp2ASr1Sw2[Tp2ASr1].addParams(ip=("200::1/48",))
        Tp2ASr2Sw2 = self.addLink(Tp2ASr2, Sw2Tp2)
        Tp2ASr2Sw2[Tp2ASr2].addParams(ip=("200::2/48",))

        lTp1Tp3 = self.addLink(Tp1ASr3, Tp3ASr1)
        lTp1Tp3[Tp1ASr3].addParams(ip=("1003::100/48",))
        lTp1Tp3[Tp3ASr1].addParams(ip=("1003::300/48",))

        Tp3ASr1Sw3 = self.addLink(Tp3ASr1, Sw3Tp3)
        Tp3ASr1Sw3[Tp3ASr1].addParams(ip=("300::1/48",))
        Tp3ASr2Sw3 = self.addLink(Tp3ASr2, Sw3Tp3)
        Tp3ASr2Sw3[Tp3ASr2].addParams(ip=("300::2/48",))

       
        Server = self.addHost('Server')
        lGsR = self.addLink(Server, GsASr1)
        lGsR[Server].addParams(ip=("55::1/48",))
       
        lGsR[GsASr1].addParams(ip=("55::2/48",))


        for i in range(1, 11):
            exec(f"gCl{i} = self.addHost('gCl{i}')")
            exec(f"gClink{i} = self.addLink(AS{i}R1, gCl{i})")
            ip = f"2001:df{str(i).zfill(2)}::2/48"
            exec(f"gClink{i}[AS{i}R1].addParams(ip=('{ip}',))")
            ip = f"2001:df{str(i).zfill(2)}::1/48"
            exec(f"gClink{i}[gCl{i}].addParams(ip=('{ip}',))")

        self.addLinks((GsASr1, Tp1ASr1), (Tp2ASr2, Tp4ASr1),
                      (Tp3ASr2, Tp4ASr1),(GsASr1, Tp5ASr1),
                      (Tp5ASr1, Tp4ASr1))
                      
        link_delay = 10

        for i in range(1, 11):
            link = self.addLink(Tp4ASr1, eval("AS{}R1".format(i)),
                                delay="{}ms".format(link_delay/2))
            link_delay += 0.05

        self.addAS(55, (GsASr1,))
        self.addAS(100, (Tp1ASr1, Tp1ASr2, Tp1ASr3, Tp1ASctlr))
        self.addAS(200, (Tp2ASr1, Tp2ASr2))
        self.addAS(300, (Tp3ASr1, Tp3ASr2))
        self.addAS(400, (Tp4ASr1,))
        self.addAS(500, (Tp5ASr1,))

        for i in range(1, 51):
            exec(f"self.addAS(i, (AS{i}R1,))")

        bgp_peering(self, Tp1ASr1, Tp1ASctlr) 
        bgp_peering(self, Tp1ASr2, Tp1ASctlr)
        bgp_peering(self, Tp1ASr3, Tp1ASctlr)

        bgp_peering(self, Tp2ASr1, Tp2ASr2)
        bgp_peering(self, Tp3ASr1, Tp3ASr2)
        
        # Set ACL and prefer one path over the other
        acl4 = AccessList(name='all', entries=('any',), family='ipv4')
        acl = AccessList(name='all6', entries=('any',), family='ipv6')
    
        Tp1ASr3.get_config(BGP).set_local_pref(150, from_peer=Tp3ASr1,
                                               matching=(acl4,acl))

        Tp1ASr2.get_config(BGP).set_local_pref(450, from_peer=Tp2ASr1,
                                               matching=(acl4,acl))

        ebgp_session(self, GsASr1, Tp1ASr1, link_type=CLIENT_PROVIDER)
        ebgp_session(self, GsASr1, Tp5ASr1, link_type=CLIENT_PROVIDER)

        ebgp_session(self, Tp1ASr2, Tp2ASr1)
        ebgp_session(self, Tp1ASr3, Tp3ASr1)

        # Prefer return path from clients via Tp3 or Tp2
        Tp4ASr1.get_config(BGP).set_local_pref(100, from_peer=Tp2ASr2,
                                               matching=(acl4,acl))
        Tp4ASr1.get_config(BGP).set_local_pref(100, from_peer=Tp3ASr2,
                                               matching=(acl4,acl))
        ebgp_session(self, Tp4ASr1, Tp2ASr2) 
        ebgp_session(self, Tp4ASr1, Tp3ASr2)
        Tp5ASr1.get_config(BGP).deny(to_peer=GsASr1,
                                     matching=(acl4,acl))
        ebgp_session(self, Tp4ASr1, Tp5ASr1)


        for i in range(1, 51):
            exec(f"ebgp_session(self, AS{i}R1, Tp4ASr1, link_type=CLIENT_PROVIDER)")

        super().build(*args, **kwargs)

    def post_build(self, net):
        for n in net.hosts + net.routers:
            enable_srv6(n)
        super().post_build(net)

    def bgp(self, name):
        r = self.addRouter(name, config=RouterConfig)
        r.addDaemon(BGP,  address_families=(
            AF_INET(redistribute=('connected',)),
            AF_INET6(redistribute=('connected',))))
        return r


class PARNet(IPNet):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
      

    def modifyLink(self, node1, node2, delay="2ms", bw=None, max_queue_size=None, **opts):

        src_params = opts.get("params1", {})
        dst_params = opts.get("params2", {})        
        src_delay = src_params.get("delay")
        src_loss = src_params.get("loss")
        src_max_queue = src_params.get("max_queue_size")
        
        dst_delay = dst_params.get("delay")
        dst_loss = dst_params.get("loss")
        dst_max_queue = dst_params.get("max_queue_size")
        
        for sw in self.switches:
            src_link = node2.connectionsTo(sw)
            dst_link = node1.connectionsTo(sw)
            if src_link and dst_link:
                break

        src_int, _ = src_link[0]
        dst_int, _ = dst_link[0]

        src_delay = src_delay or delay
        src_loss = src_loss or 0
        
        src_int.config(delay=src_delay, max_queue_size=src_max_queue, loss=src_loss)
        dst_int.config(delay=dst_delay, max_queue_size= src_max_queue, loss=dst_loss)


if __name__ == '__main__':
    net = PARNet(topo=TestTopo(), use_v4=False)
    try:
        net.start()
        tp2_delay =  35.0
       
        tp3_delay = 45.0
        
            
        net.modifyLink(net["Tp2ASr1"], net["Tp2ASr2"],
                       params1={"delay": "{}ms".format(tp2_delay)},
                       params2={"delay": "{}ms".format(tp2_delay)})
        net.modifyLink(net["Tp3ASr1"], net["Tp3ASr2"],
                       params1={"delay": "{}ms".format(tp3_delay)},
                       params2={"delay": "{}ms".format(tp3_delay)})
      
        # Start a controller application that talks to ExaBGP here.
        net["Tp1ASctlr"].cmd("python Controller.py /home/ubuntu/config.yaml &> controller.log &")

        time.sleep(60)
        fail_count = 0
        while (time.time() - start_time) < 950:
            time.sleep(120)
            if fail_count < 1:
                fail_links = [("Tp3ASr2", "Tp4ASr1")]
                link_down = net.runFailurePlan(fail_links)
                time.sleep(100)
                net.restoreIntfs(link_down)
            fail_count += 1
            print("Fail count is %s" %fail_count)
            time.sleep(60)
        print("%s: End Experiment" % (str(datetime.now())))
    finally:
        net.stop()

olafayomi avatar Jun 07 '23 01:06 olafayomi