faust icon indicating copy to clipboard operation
faust copied to clipboard

Unable to use `schema` on `Table`

Open Aaronzinhoo opened this issue 2 years ago • 0 comments

Checklist

  • [ x ] I have included information about relevant versions
  • [ x ] I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

When using the Table apply a schema parameter and try to pass in a custom serde

Expected behavior

The custom Schema passed to the Table would be used rather than the default json value serializer.

Actual behavior

Default value serializer was used

Full traceback

Traceback (most recent call last):
  File "/Users/aaron.gonzales/.pyenv/versions/elset-filter-service/lib/python3.9/site-packages/faust/agents/agent.py", line 674, in _execute_actor
    await coro
  File "/Users/aaron.gonzales/development/work/kahless/backend/elset-filter-service/elset_filter_service/kafka/elset_stream_service.py", line 47, in process_elset
    self._filtered_elset_materialized_view[satellite_catalog_number] = filtered_elset.kahless_filtered_elset
  File "/Users/aaron.gonzales/.pyenv/versions/elset-filter-service/lib/python3.9/site-packages/mode/utils/collections.py", line 656, in __setitem__
    self.on_key_set(key, value)
  File "/Users/aaron.gonzales/.pyenv/versions/elset-filter-service/lib/python3.9/site-packages/faust/tables/table.py", line 77, in on_key_set
    fut = self.send_changelog(self.partition_for_key(key), key, value)
  File "/Users/aaron.gonzales/.pyenv/versions/elset-filter-service/lib/python3.9/site-packages/faust/tables/base.py", line 268, in send_changelog
    return self.changelog_topic.send_soon(
  File "/Users/aaron.gonzales/.pyenv/versions/elset-filter-service/lib/python3.9/site-packages/faust/topics.py", line 222, in send_soon
    fut = self.as_future_message(
  File "/Users/aaron.gonzales/.pyenv/versions/elset-filter-service/lib/python3.9/site-packages/faust/channels.py", line 270, in as_future_message
    final_value, open_headers = self.prepare_value(
  File "/Users/aaron.gonzales/.pyenv/versions/elset-filter-service/lib/python3.9/site-packages/faust/channels.py", line 705, in prepare_value
    return schema.dumps_value(
  File "/Users/aaron.gonzales/.pyenv/versions/elset-filter-service/lib/python3.9/site-packages/faust/serializers/schemas.py", line 129, in dumps_value
    payload = app.serializers.dumps_value(
  File "/Users/aaron.gonzales/.pyenv/versions/elset-filter-service/lib/python3.9/site-packages/faust/serializers/registry.py", line 180, in dumps_value
    return dumps(serializer, value)
  File "/Users/aaron.gonzales/.pyenv/versions/elset-filter-service/lib/python3.9/site-packages/faust/serializers/codecs.py", line 359, in dumps
    return get_codec(codec).dumps(obj) if codec else obj
  File "/Users/aaron.gonzales/.pyenv/versions/elset-filter-service/lib/python3.9/site-packages/faust/serializers/codecs.py", line 224, in dumps
    obj = cast(Codec, node)._dumps(obj)
  File "/Users/aaron.gonzales/.pyenv/versions/elset-filter-service/lib/python3.9/site-packages/faust/serializers/codecs.py", line 261, in _dumps
    return want_bytes(_json.dumps(s))
  File "/Users/aaron.gonzales/.pyenv/versions/elset-filter-service/lib/python3.9/site-packages/faust/utils/json.py", line 191, in dumps
    return json_dumps(obj, cls=cls, **dict(_JSON_DEFAULT_KWARGS, **kwargs))
  File "/Users/aaron.gonzales/.pyenv/versions/3.9.10/lib/python3.9/json/__init__.py", line 234, in dumps
    return cls(
  File "/Users/aaron.gonzales/.pyenv/versions/3.9.10/lib/python3.9/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/Users/aaron.gonzales/.pyenv/versions/3.9.10/lib/python3.9/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/Users/aaron.gonzales/.pyenv/versions/elset-filter-service/lib/python3.9/site-packages/faust/utils/json.py", line 160, in default
    return callback(o)
  File "/Users/aaron.gonzales/.pyenv/versions/elset-filter-service/lib/python3.9/site-packages/faust/utils/json.py", line 148, in on_default
    raise TypeError(f"JSON cannot serialize {type(o).__name__!r}: {o!r}")

Versions

  • Python version: 3.9.10
  • Faust version: 0.8.4
  • Operating system: macOS Monterey
  • Kafka version: 2.8.0
  • RocksDB version (if applicable)

Aaronzinhoo avatar May 27 '22 08:05 Aaronzinhoo