memphis.py icon indicating copy to clipboard operation
memphis.py copied to clipboard

stop consume functionality

Open Adarsh-jaiss opened this issue 1 year ago • 3 comments

I have created a stopConsume method with self parameter, refefered in the issue #139

def stopConsume(self):
    if self.t_consume is not None:
        self.t_consume.cancel()
        self.t_consume = None

In this code :

  • The stopConsume method is added to the Consumer class. This method will cancel the task responsible for consuming messages, effectively stopping the automatic message consumption.

  • In the __consume method, we added a condition to check whether the consumer task (t_consume) is still running. If not, it will break out of the loop, ensuring that the consumption stops when stopConsume is called.

Adarsh-jaiss avatar Aug 03 '23 06:08 Adarsh-jaiss

There are also a few lint issues

idanasulin2706 avatar Aug 03 '23 09:08 idanasulin2706

@Adarsh-jaiss How did you test this? Can you please describe your test plan?

As for the test plan of this stop_consume method, we can follow this approach:

  1. We can create an Instance of consumer class and define a callback function that will be used in the consume method.
  2. Now we can call the consume method with the callback function to start consuming events.
  3. Now call the stop_consume method to stop the consumption of events and verify that the consumption has stopped by checking if the callback function is no longer being invoked.

Here's the unittest file:

import unittest
from memphis.consumer import Consumer

class ConsumerTestCase(unittest.TestCase):
    def test_stopConsume(self):
        consumer = Consumer(connection=None, station_name='my_station', consumer_name='my_consumer', consumer_group='my_group', pull_interval_ms=1000, batch_size=100, batch_max_time_to_wait_ms=5000, max_ack_time_ms=1000)
        
        # Define a callback function
        def callback(messages, error, context):
            pass
        
        # Start consuming events
        consumer.consume(callback)
        
        # Stop consuming events
        consumer.stopConsume()
        
        # Verify that the consumption has stopped
        # You can add your own assertions here
        
        # Optionally, verify that the t_consume task is None
        self.assertIsNone(consumer.t_consume)

if __name__ == '__main__':
    unittest.main()

Please do correct me, and tell me the suggested changes since i am also a beginner and trying to learning new stuff by doing :)

Adarsh-jaiss avatar Aug 04 '23 08:08 Adarsh-jaiss

@Adarsh-jaiss Did you run the unit test you wrote? Did it run successfully? The best way to learn is to try things -- you won't break anything. :)

rnowling-memphis avatar Aug 04 '23 19:08 rnowling-memphis