Unable to subscribe to topics by pattern
Steps to reproduce
Created a topic specifying pattern= with a valid regex that should have matched multiple topics on the kafka cluster. Running the worker with -l info shows everything starting up, but the only topic subscription is to the base assignor topic.
This is a known, existing issue dating back to at least July 25, 2019 as detailed in robinhood/faust: https://github.com/robinhood/faust/issues/390
I suspect, but have not verified, that the issue occurs in 'faust/faust/transport/drivers/aiokafka:subscribe' given the comment XXX pattern does not work :/ from a prior dev.
Still occurs on master
Expected behavior
- Topic is subscribed to all Kafka topics matching the regex
- Topic subscription updates when new topic matching regex is created
- In general, should match behavior documented in aiokafka under 'Topic Subscription by Pattern' here https://aiokafka.readthedocs.io/en/stable/consumer.html
Actual behavior
- The topic subscribes to nothing, silently
Full traceback
No traceback - silently fails to do anything
Versions
- Python version = 3.8
- Faust version = v0.2.1
- Operating system = whatever python:3.8 docker image uses
- Kafka version = 2.6
- RocksDB version (if applicable) = N/A
import faust
import io
import json
from datetime import datetime
import random
import os
import ssl
import sys
from decimal import *
import typing
class Greeting(faust.Record):
from_name: str
to_name: str
app = faust.App('faust-dev', broker='kafka://broker:29092')
topic = app.topic('MyGreatTopic-1', value_type=Greeting)
topic2 = app.topic('MyGreatTopic-2', value_type=Greeting)
topic3 = app.topic('MyGreatTopic-3', value_type=Greeting)
@app.task
async def create_topics():
await topic.maybe_declare()
await topic2.maybe_declare()
await topic3.maybe_declare()
regex_topic = app.topic(pattern="^MyGreatTopic-.*$", value_type=Greeting)
@app.agent(regex_topic)
async def hello(greetings):
async for event in greetings.events():
greeting = event.value
print(f'{event.message.topic} says: Hello from {greeting.from_name} to {greeting.to_name}')
@app.timer(interval=1.0)
async def example_sender(app):
await topic.send(
value=Greeting(from_name='Faust', to_name='you'),
)
await topic2.send(value=Greeting(from_name='Faust 2', to_name='you'))
await topic3.send(value=Greeting(from_name='Faust 3', to_name='you'))
if __name__ == '__main__':
app.main()
I've been doing a bit of research and I think the problem is two-fold:
Faust-Side not linking pattern to anything:
- When a topic is added to a conductor, a chain of events is set in motion to pull a list of all subscribed topics using the function
Conductor:_update_indices(). This function only pulls from thetopicproperty of the Topic - so pattern is completely ignored. - The list of topics is sent to the Consumer to update the subscriptions via
aiokafka:subscribedescribed above, and again the list of patterns is ignored.
So the pattern parameter is sent and verified on Topic creation and then... never touched again creating a functionally-useless topic.
Faust <-> aiokafka connections:
- The currently-existing Faust architecture stipulates 1 AIOKafkaConsumer backs 1 Faust Consumer backs 1 Faust Conductor backs n Faust Channels/Topics backs m Faust Streams. Aiokafka stipulates that 1 AIOKafkaConsumer can consume either a list of exactly-specified topics, or 1 regular expression, but not both. Faust works by continually-updating the list of topics on that one consumer.
- Presumably development on regex patterns in Faust ended at this discovery, without altering the documentation or updating any signatures or doing anything to indicate that creating a topic with pattern= isn't going to work.
Thoughts then. Pretending to support regex matching for topics, exposing a way to do it, then silently failing and not actually having a way after all is not good behavior. Things that could be done:
- Remove comments in the documentation implying it's possible, raise an Exception if a topic is created with pattern= stating it's not possible
- Raise an issue with aiokafka seeing if AIoKafkaConsumers can use topics and patterns. I suspect this is a nonstarter though - I believe the "either topics or regex but not both" restriction is present on the Java Kafka streams code as well)
- Update the Faust architecture to have 1+ AIOKafkaConsumers back 1 Faust Consumer and so on. One AIOKafkaConsumer controls explicit topic names as currently working, then 1 more AIOKafkaConsumer per regex-backed topic - so a Faust app with 2 topics specifying pattern= and 10 topics with explicit topic names has 3 AIOKafkaConsumers.
- Same as above, but instead of splitting each pattern into it's own AIOKafkaConsumer we do some "magic" and merge in the regexes. This sounds like a terrible idea - trying to merge regexes just sounds like a lesson in frustration and misery - but I could be wrong. multiple_topic = Topic(topics=['MyGreatTopic-1', 'MyGreatTopic-2', 'MyGreatTopic-3'], app=app, value_type=Greeting)
Adding support for pattern= seems like the best solution, and doing so by allowing more than one AIOKafkaConsumer, seems like the ideal course of action, but that's a fairly large change to the overall architecture and I'm not sure what that might do for performance - I'm not sure why that initial architecture was chosen to begin with.
Any update on this issue please? The regex based subscription is a crucial feature in streaming applications, any plan to support it?
Just experimented with this. Don't worry I didn't forget. This is really complicated. If a user seeks to use regex patterns with their Faust applications, they'll need to make sure separate consumers are used. Otherwise you'll see errors such as:
[^---Conductor]: Crashed reason=IllegalStateError('Subscription to topics, partitions and pattern are mutually exclusive: SubscriptionType.AUTO_TOPICS')
Note that I added the SubscriptionType log statement manually for the sake of understanding what's going on here. So if you have any table topics or channels with defined topic names, it'll crash. I'll have some changes committed soon with a rough demo of what I did.
Even if I patch this, I have more bugs to fix:
[^--Consumer]: Drain messages raised: KeyError(TopicPartition(topic='inserttopicnameherelol', partition=0))
Edit: Some changes are in the regex-version branch, hopefully I can resume progress soon.
On further examination, since the majority of Faust functions require topic names to be explicitly declared and share a single Kafka Consumer, it's not possible to support topic and pattern specification simultaneously. You will have to create a separate AIOKafkaConsumer instance with a pattern subscription. See https://aiokafka.readthedocs.io/en/stable/consumer.html#topic-subscription-by-pattern for an example.
The good news is that it's possible to run this Consumer inside your Faust application:
@app.task
async def consume(self):
consumer = aiokafka.AIOKafkaConsumer(**dict_containing_your_config)
# Apparently you need to subscribe before starting and seeking, contrary to what the aiokafka docs depict in examples?
consumer.subscribe(topics=[], pattern=r"[0-9a-f]{32}\Z") # UUID regex pattern for an example
await consumer.start()
await consumer.seek_to_beginning()
for tp, messages in (await consumer.getmany(timeout_ms=1000)).items():
for msg in messages:
print("Consumed msg %s %s %s" % (msg.topic, msg.partition, msg.value))
I apologize that I can't provide a more helpful solution in Faust right off the bat. A single application uses a single Kafka Consumer at a time, and I'd need to figure out how to split an application across multiple consumers. You'd think that Faust would already have that functionality, given how it can handle the complexity of simultaneously balancing so many tables, etc., but apparently not.