faust icon indicating copy to clipboard operation
faust copied to clipboard

stream.take does not decrement events_active

Open jabonte opened this issue 5 years ago • 3 comments

Steps to reproduce

use stream.take and watch the app.monitor events_active counter.

#!/usr/bin/env python3
import random
import ssl
import faust

def main_loop():
    app = get_faust()
    topic = app.topic(
        'test-topic-stream-take',
        internal=True,
    )

    @app.task
    async def on_started():
        print('TEST started')
        await topic.maybe_declare()

    @app.timer(interval=1.0)
    async def faust_timer_1s(app):
        monitor = app.monitor.asdict()
        print("events_active   = %s" % monitor['events_active'])

    @app.timer(interval=5.0)
    async def faust_timer_5s(app):
        count = random.randint(1, 10)
        print("Sending %s values" % count)
        for value in range(count):
            # print("Sending %s"%value)
            await topic.send(value=value)

    @app.agent(topic)
    async def app_agent_task_Rx(stream):
        #async for value in stream:
        #    print(value)
        async for values in stream.take(5, within=10):
            print(f'RECEIVED {len(values)}: values')

    app.main()


def get_faust():
    context = ssl.SSLContext()
    context.verify_mode = ssl.CERT_REQUIRED
    context.check_hostname = False
    context.verify_mode = ssl.CERT_NONE
    broker_credentials = faust.SASLCredentials(
        ssl_context=context,
        mechanism='PLAIN',
        username=<USER>
        password=<PASSWORD>
    )
    broker = <BROKER>
    app = faust.App(
        'stream_take_events',
        broker=broker,
        broker_credentials=broker_credentials,
        broker_check_crcs=False,
        stream_wait_empty=False,
    )
    return app


if __name__ == '__main__':
    main_loop()

Expected behavior

events_active is decremented when stream.take returns

Actual behavior

events_active keeps incrementing

Versions

Version(s) CPython 3.6.3 (Linux x86_64) faust = v1.10.4 aiokafka=1.1.6 aiohttp=3.6.2

jabonte avatar Sep 09 '20 07:09 jabonte

@jabonte can you check whether all the messages that are available in kafka are getting consumed with the take method once. because of this issue https://github.com/robinhood/faust/issues/656

sivasai-quartic avatar Sep 24 '20 10:09 sivasai-quartic

@sivasai-quartic It looks like all the messages are being consumed from kafka

It also checked if the problem exists if I use a app.channel() instead of an app.topic() With a channel the event counter is also not decremented But for some reason the message counter becomes negative.

jabonte avatar Sep 25 '20 07:09 jabonte

Is there any update on that issue ?

enavarro222 avatar Aug 02 '22 08:08 enavarro222