faust icon indicating copy to clipboard operation
faust copied to clipboard

how to create one polymorphic model

Open KallieLev opened this issue 2 years ago • 2 comments

I want to be able to process 1 event that can have multiple schemas based on type. for example

class Event(faust.Record, serializer="json"):
    track_id: str

class InsertDataEvent(Event):
    file_path: str
    type: str = "INSERT"

class CreateTableEvent(Event):
    table_name: str
    type: str = "CREATE_TABLE"

class PolyEvent(faust.Record, polymorphic_fields=True):
     ...

And when I process the event, I want it to automatically be parsed by the type, and validated against the leaf class (AKA CreateTableEvent or InsertDataEvent).

ingestion_topic = app.topic("my-topic", key_type=str, value_type=PolyEvent)

@app.agent(channel=ingestion_topic)
async def process(stream) -> None:
    async for value in stream.events():
        event = value.value
        if event.type == "INSERT":
            insert_data(event.file_path)
        if event.type == "CREATE_TABLE":
            create_table(event.table_name)

Is this feasible? how would I write PolyEvent in that case?

Checklist

  • [x] I have included information about relevant versions
  • [x] I have verified that the issue persists when using the master branch of Faust.

Versions

  • Python version - 3.11.0
  • Faust version - 0.10.3

KallieLev avatar Apr 02 '23 08:04 KallieLev

Faust doesn't support this feature yet. I need to work on https://github.com/faust-streaming/faust/issues/429 before addressing this.

wbarnha avatar Apr 02 '23 15:04 wbarnha

@KallieLev

Not sure if my workaround fits your need.

I have a topic with multiple Events inspired by faust.Schema 'AutoDetect' example. The customised faust.Schema is assigned to my agent subscribed to the topic.

In practice, I have a MutableMapping that stores event class name as key and class codec as value.

I use the message header to store the class name when the message are being produced.

When the message arrive to the agent, it extract the class name from the message header and consequently match the relevant codec then dynamically generate a serializer on the fly.

mozTheFuzz avatar Apr 03 '23 07:04 mozTheFuzz