clustershell icon indicating copy to clipboard operation
clustershell copied to clipboard

Engine: improve overall fanout management

Open thiell opened this issue 3 years ago • 0 comments

Issue #488 and the broken test found in this ticket made me realize that if a worker with no streams is registered in the engine, it still "uses" one slot for fanout, delaying other potential engine clients, and if the fanout is low, leading to the engine loop exiting prematurely in some cases, like in the test below. This is because pending clients are not retaining the run loop (evlooprefcnt is only increased when clients register, not before). With the current code (as of 1.8-1.9), the test case below exits prematurely and the event handlers are not called at all.

I open this ticket to re-work the engine to improve management of active streams, clients, and fanout management. Not super high priority as this is an edge case with weird workers but still, it would be nice to fix in the future.

test case:

    def test_010_worker_with_no_streams_vs_fanout(self):
        """test StreamWorker with no streams vs engine fanout"""
        # Revealead by GH #488

        class TestH(EventHandler):
            def __init__(self):
                self.start_count = 0
                self.close_count = 0
                self.pickup_count = 0
                self.hup_count = 0

            def ev_start(self, worker):
                self.start_count += 1

            def ev_pickup(self, worker, node):
                self.pickup_count += 1

            def ev_hup(self, worker, node, rc):
                self.hup_count += 1

            def ev_close(self, worker, timedout):
                if not timedout:
                    self.close_count += 1

        task = task_self()
        task.set_info('debug', True)
        fanout = task.info("fanout")
        try:
            task.set_info("fanout", 1)
            self.assertEqual(len(task._engine._clients), 0)

            # under low fanout, register a StreamWorker with no streams
            worker = StreamWorker(handler=None)
            task.schedule(worker)

            eh = TestH()
            task.shell("/bin/sleep 0.11", handler=eh, key="n1")
            task.shell("/bin/sleep 0.12", handler=eh, key="n2")
            task.shell("/bin/sleep 0.13", handler=eh, key="n3")

            task.run()

            self.assertEqual(eh.start_count, 3)
            self.assertEqual(eh.pickup_count, 3)
            self.assertEqual(eh.hup_count, 3)
            self.assertEqual(eh.close_count, 3)
            self.assertEqual(len(task._engine._clients), 1)
            worker.abort()
            self.assertEqual(len(task._engine._clients), 0)
        finally:
            task.set_info("fanout", fanout)

thiell avatar Nov 25 '22 18:11 thiell