stream.take does not decrement events_active
Steps to reproduce
use stream.take and watch the app.monitor events_active counter.
#!/usr/bin/env python3
import random
import ssl
import faust
def main_loop():
app = get_faust()
topic = app.topic(
'test-topic-stream-take',
internal=True,
)
@app.task
async def on_started():
print('TEST started')
await topic.maybe_declare()
@app.timer(interval=1.0)
async def faust_timer_1s(app):
monitor = app.monitor.asdict()
print("events_active = %s" % monitor['events_active'])
@app.timer(interval=5.0)
async def faust_timer_5s(app):
count = random.randint(1, 10)
print("Sending %s values" % count)
for value in range(count):
# print("Sending %s"%value)
await topic.send(value=value)
@app.agent(topic)
async def app_agent_task_Rx(stream):
#async for value in stream:
# print(value)
async for values in stream.take(5, within=10):
print(f'RECEIVED {len(values)}: values')
app.main()
def get_faust():
context = ssl.SSLContext()
context.verify_mode = ssl.CERT_REQUIRED
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
broker_credentials = faust.SASLCredentials(
ssl_context=context,
mechanism='PLAIN',
username=<USER>
password=<PASSWORD>
)
broker = <BROKER>
app = faust.App(
'stream_take_events',
broker=broker,
broker_credentials=broker_credentials,
broker_check_crcs=False,
stream_wait_empty=False,
)
return app
if __name__ == '__main__':
main_loop()
Expected behavior
events_active is decremented when stream.take returns
Actual behavior
events_active keeps incrementing
Versions
Version(s) CPython 3.6.3 (Linux x86_64) faust = v1.10.4 aiokafka=1.1.6 aiohttp=3.6.2
@jabonte can you check whether all the messages that are available in kafka are getting consumed with the take method once. because of this issue https://github.com/robinhood/faust/issues/656
@sivasai-quartic It looks like all the messages are being consumed from kafka
It also checked if the problem exists if I use a app.channel() instead of an app.topic() With a channel the event counter is also not decremented But for some reason the message counter becomes negative.
Is there any update on that issue ?