geeteventbus icon indicating copy to clipboard operation
geeteventbus copied to clipboard

An inprocess eventbus for highly concurrent Python applications

geeteventbus

An eventbus for concurrent programming

geeteventbus is a library that allows publish-subscribe-style communication. There is no need for the components to register to each-other. It is inspired by a Java library, Guava eventbus from Google. But it is not exactly same as the Guava eventbus library.

  • geeteventbus simplifies handling events from publishers and subscribers.
  • publisher and subscribers don't need to create threads to concurrently process the events.
  • the eventbus can be synchronus, where the events are delivered from the same thread posting the events
  • events can be delivered to subscibers in the same order they are posted
  • subscribers may be declared as thread-safe, in that case same subscriber may be invoked concurrently for processing multiple events
  • events for which there are no subscribers are registered yet are simply discared by the eventbus.
  • the eventbus is not to be used for inter process communication. Publishers and subsribers must run on the same process

Basic working

  1. We create an eventbus

    .. code:: python

     from geeteventbus.eventbus import eventbus
     eb = eventbus()
    

    This will create an eventbus with the defaults. The default eventbus will have below characteristics:

     1) the maximum queued event limit is set to 10000
     2) number of executor thread is 8
     3) the subscribers will be called asynchronously
     4) subscibers are treated as thread-safe and hence same subscribers may be invoked simultaneously  on different threads
    
  2. Create a subsclass of subscriber and override the process method. Create an object of this class and register it to the eventbus for receiving messages with certain topics:

    .. code:: python

     from geeteventbus.subscriber import subscriber
     from geeteventbus.eventbus import eventbus
     from geeteventbus.event import event
    
     class mysubscriber(subscriber):
         def process(self, eventobj):
             if not isinstance(eventobj, event):
                 print('Invalid object type is passed.')
                 return
             topic = eventobj.get_topic()
             data = eventobj.get_data()
             print('Processing event with TOPIC: %s, DATA: %s' % (topic, data))
    
     subscr = mysubscriber()
     eb.register_consumer(subscr, 'an_important_topic')
    
  3. Post some events to the eventbus with the topic "an_important_topic".

    .. code:: python

     from geeteventbus.event import event
    
     eobj1 = event('an_important_topic', 'This is some data for the event 1')
     eobj2 = event('an_important_topic', 'This is some data for the event 2')
     eobj3 = event('an_important_topic', 'This is some data for the event 3')
     eobj3 = event('an_important_topic', 'This is some data for the event 4')
    
     eb.post(eobj1)
     eb.post(eobj2)
     eb.post(eobj3)
     eb.post(eobj4)
    
  4. We may gracefully shutdown the eventbus before exiting the process

    .. code:: python

     eb.shutdown()
    

The complete example is below:

.. code:: python
    
    from time import sleep
    from geeteventbus.subscriber import subscriber
    from geeteventbus.eventbus import eventbus
    from geeteventbus.event import event

    class mysubscriber(subscriber):
        def process(self, eventobj):
            if not isinstance(eventobj, event):
                print('Invalid object type is passed.')
                return
            topic = eventobj.get_topic()
            data = eventobj.get_data()
            print('Processing event with TOPIC: %s, DATA: %s' % (topic, data))
    
    
    eb = eventbus()
    subscr = mysubscriber()
    eb.register_consumer(subscr, 'an_important_topic')
    

    eobj1 = event('an_important_topic', 'This is some data for the event 1')
    eobj2 = event('an_important_topic', 'This is some data for the event 2')
    eobj3 = event('an_important_topic', 'This is some data for the event 3')
    eobj4 = event('an_important_topic', 'This is some data for the event 4')

    eb.post(eobj1)
    eb.post(eobj2)
    eb.post(eobj3)
    eb.post(eobj4)

    eb.shutdown()
    sleep(2)

A more detailed example is given below. A subscriber (counter_aggregator) aggregates the values for a set of counters. It registers itself to an eventbus for receiving events for the counters(topics). A set of producers update the values for the counters and post events describing the counter to the eventbus:

.. code:: python

    from threading import Lock, Thread
    from time import sleep, time
    from geeteventbus.eventbus import eventbus
    from geeteventbus.event import event
    from geeteventbus.subscriber import subscriber
    from random import randint


    class counter_aggregator(subscriber, Thread):
        '''
        Aggregator for a set of counters. Multiple threads updates the counts which
        are aggregated by this class and output the aggregated value periodically.
        '''
        def __init__(self, counter_names):
            Thread.__init__(self)
            self.counter_names = counter_names
            self.locks = {}
            self.counts = {}
            self.keep_running = True
            self.collect_times = {}
            for counter in counter_names:
                self.locks[counter] = Lock()
                self.counts[counter] = 0
                self.collect_times[counter] = time()

        def process(self, eobj):
            '''
            Process method calls with the event object eobj. eobj has the counter name as the topic
            and an int count as the value for the counter.
            '''
            counter_name = eobj.get_topic()
            if counter_name not in self.counter_names:
                return
            count = eobj.get_data()
            with self.locks[counter_name]:
                self.counts[counter_name] += count

        def stop(self):
            self.keep_running = False

        def __call__(self):
            '''
            Keep outputing the aggregated counts every 2 seconds
            '''
            while self.keep_running:
                sleep(2)
                for counter_name in self.counter_names:
                    with self.locks[counter_name]:
                        print('Change for counter %s = %d, in last %f secs' % (counter_name,
                              self.counts[counter_name], time() - self.collect_times[counter_name]))
                        self.counts[counter_name] = 0
                        self.collect_times[counter_name] = time()
            print('Aggregator exited')


    class count_producer:
        '''
        Producer for counters. Every 0.02 seconds post the "updated" value for a
        counter randomly
        '''
        def __init__(self, counters, ebus):
            self.counters = counters
            self.ebus = ebus
            self.keep_running = True
            self.num_counter = len(counters)

        def stop(self):
            self.keep_running = False

        def __call__(self):
            while self.keep_running:
                ev = event(self.counters[randint(0, self.num_counter - 1)], randint(1, 100))
                ebus.post(ev)
                sleep(0.02)
            print('producer exited')

    if __name__ == '__main__':
        ebus = eventbus()
        counters = ['c1', 'c2', 'c3', 'c4']
        subcr = counter_aggregator(counters)
        producer = count_producer(counters, ebus)
        for counter in counters:
            ebus.register_consumer(subcr, counter)
        threads = []
        i = 30
        while i > 0:
            threads.append(Thread(target=producer))
            i -= 1

        aggregator_thread = Thread(target=subcr)
        aggregator_thread.start()
        for thrd in threads:
            thrd.start()
        sleep(20)
        producer.stop()
        subcr.stop()
        sleep(2)
        ebus.shutdown()