faust
faust copied to clipboard
Unable to use `schema` on `Table`
Checklist
- [ x ] I have included information about relevant versions
- [ x ] I have verified that the issue persists when using the
master
branch of Faust.
Steps to reproduce
When using the Table
apply a schema parameter and try to pass in a custom serde
Expected behavior
The custom Schema
passed to the Table
would be used rather than the default json
value serializer.
Actual behavior
Default value serializer was used
Full traceback
Traceback (most recent call last):
File "/Users/aaron.gonzales/.pyenv/versions/elset-filter-service/lib/python3.9/site-packages/faust/agents/agent.py", line 674, in _execute_actor
await coro
File "/Users/aaron.gonzales/development/work/kahless/backend/elset-filter-service/elset_filter_service/kafka/elset_stream_service.py", line 47, in process_elset
self._filtered_elset_materialized_view[satellite_catalog_number] = filtered_elset.kahless_filtered_elset
File "/Users/aaron.gonzales/.pyenv/versions/elset-filter-service/lib/python3.9/site-packages/mode/utils/collections.py", line 656, in __setitem__
self.on_key_set(key, value)
File "/Users/aaron.gonzales/.pyenv/versions/elset-filter-service/lib/python3.9/site-packages/faust/tables/table.py", line 77, in on_key_set
fut = self.send_changelog(self.partition_for_key(key), key, value)
File "/Users/aaron.gonzales/.pyenv/versions/elset-filter-service/lib/python3.9/site-packages/faust/tables/base.py", line 268, in send_changelog
return self.changelog_topic.send_soon(
File "/Users/aaron.gonzales/.pyenv/versions/elset-filter-service/lib/python3.9/site-packages/faust/topics.py", line 222, in send_soon
fut = self.as_future_message(
File "/Users/aaron.gonzales/.pyenv/versions/elset-filter-service/lib/python3.9/site-packages/faust/channels.py", line 270, in as_future_message
final_value, open_headers = self.prepare_value(
File "/Users/aaron.gonzales/.pyenv/versions/elset-filter-service/lib/python3.9/site-packages/faust/channels.py", line 705, in prepare_value
return schema.dumps_value(
File "/Users/aaron.gonzales/.pyenv/versions/elset-filter-service/lib/python3.9/site-packages/faust/serializers/schemas.py", line 129, in dumps_value
payload = app.serializers.dumps_value(
File "/Users/aaron.gonzales/.pyenv/versions/elset-filter-service/lib/python3.9/site-packages/faust/serializers/registry.py", line 180, in dumps_value
return dumps(serializer, value)
File "/Users/aaron.gonzales/.pyenv/versions/elset-filter-service/lib/python3.9/site-packages/faust/serializers/codecs.py", line 359, in dumps
return get_codec(codec).dumps(obj) if codec else obj
File "/Users/aaron.gonzales/.pyenv/versions/elset-filter-service/lib/python3.9/site-packages/faust/serializers/codecs.py", line 224, in dumps
obj = cast(Codec, node)._dumps(obj)
File "/Users/aaron.gonzales/.pyenv/versions/elset-filter-service/lib/python3.9/site-packages/faust/serializers/codecs.py", line 261, in _dumps
return want_bytes(_json.dumps(s))
File "/Users/aaron.gonzales/.pyenv/versions/elset-filter-service/lib/python3.9/site-packages/faust/utils/json.py", line 191, in dumps
return json_dumps(obj, cls=cls, **dict(_JSON_DEFAULT_KWARGS, **kwargs))
File "/Users/aaron.gonzales/.pyenv/versions/3.9.10/lib/python3.9/json/__init__.py", line 234, in dumps
return cls(
File "/Users/aaron.gonzales/.pyenv/versions/3.9.10/lib/python3.9/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/Users/aaron.gonzales/.pyenv/versions/3.9.10/lib/python3.9/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/Users/aaron.gonzales/.pyenv/versions/elset-filter-service/lib/python3.9/site-packages/faust/utils/json.py", line 160, in default
return callback(o)
File "/Users/aaron.gonzales/.pyenv/versions/elset-filter-service/lib/python3.9/site-packages/faust/utils/json.py", line 148, in on_default
raise TypeError(f"JSON cannot serialize {type(o).__name__!r}: {o!r}")
Versions
- Python version: 3.9.10
- Faust version: 0.8.4
- Operating system: macOS Monterey
- Kafka version: 2.8.0
- RocksDB version (if applicable)