faust
faust copied to clipboard
Message isn't published although logs say it was sent
Checklist
- [x] I have included information about relevant versions
- [x] I have verified that the issue persists when using the
masterbranch of Faust.
Steps to reproduce
I have some Faust streaming code that publishes messages to a Kafka topic, but although the logs report that the message was sent I can't see it in Kafka using Offset Explorer.
class Container(faust.Record, serializer='json', coerce=True):
vin: str
container_location: str
kam: str
container_timestamp: datetime.datetime # dates should always be in UTC "timezone"
request_id: Optional[str] = ""
class ContainerPublisher:
# [..]
def __init__(self, config):
self.config = config
# default_topic_partitions = 16
# store = "memory://"
self.faust_app = faust.App(name, broker=broker_url, logging_config=logging_config, topic_partitions=default_topic_partitions, store=store, web_port=web_port)
# [..]
# config.kafka.topic.partitions = 64
self._containers_topic = self.faust_app.topic(
config.kafka.topic.name,
value_type=Container,
partitions=config.kafka.topic.partitions,
retention=datetime.timedelta(weeks=4 * 6))
async def publish_containers(self, vehicles: dict, raw_containers: dict[]):
logger.info(f"Publishing {len(raw_containers)} containers to Kafka")
converted: dict[str, Container] = {k: self._convert_container(vehicles=vehicles, key=k, raw_container=v) for k,v in raw_containers.items()}
for candidate in converted:
logger.info(f"Publishing {candidate} container to Kafka")
value = converted[candidate]
result = await self._containers_topic.send(key=candidate, value=value, timestamp=value.container_timestamp.timestamp())
logger.info(f"Finished publishing {len(raw_containers)} containers to Kafka")
Expected behavior
The message should be available in Kafka if it's reported as being sent.
Actual behavior
I can't see the message in Kafka using Offset Explorer.
Full traceback
I have this log in the application which makes me understand that the message was recorded as being sent to Kafka
2022-10-24 15:08:16,234 - [vin] - {MainThread} - [peds.__main__] - INFO - Publishing 1 containers to Kafka
2022-10-24 15:08:16,249 - [vin] - {MainThread} - [peds.__main__] - INFO - Publishing vin_20221024_125153 container to Kafka
2022-10-24 15:08:16,249 - [vin] - {MainThread} - [faust.topics] - DEBUG - send: topic='containers-production' k=b'vin_20221024_125153' v=b'{"vin":"vin","container_location":"vin_20221024_125153.ZIP","kam":"kam","container_timestamp":"2022-10-24T12:51:53","request_id":"","__faust":{"ns":"models.Container.Container"}}' timestamp=1666608713.0 partition=None
2022-10-24 15:08:16,249 - [vin] - {MainThread} - [peds.__main__] - INFO - Finished publishing 1 containers to Kafka
But in Kafka I have no message with that key:

Versions
- Python version 3.9.5
- Faust version 0.9.0
- Operating system: Windows Server 2016 x64
- Kafka version 2.8.1
- No RocksDb
+1 using faust-streaming==0.10.23 + rocksDB; consumer and producer running on macOS M2Max connecting to a docker Kafka 3.3 (by bitnami) w/ KRaft