confluent-kafka-python
confluent-kafka-python copied to clipboard
How to call this producer.flush() in Spark map function ?
I have spark job which reads data from S3 and publish that to kafka using 'confluent-kafka-python'. Below is the code which I am using, I am just trying to test with few 100 messages publish. If I add producer.flush() with every produce it works fine, but that will synchronous call.
But when Spark runs this piece of code in worked node for the given partition of data, how can I invoke producer.flush() at the end once or in between for batch ? Mainly curious to know the right way to use this PySpark.
Publish part :
producer.produce(topic=kafka_topic, key=key_data_dict, value=value_data_dict,
on_delivery=delivery_report)
producer.poll(0)
Producer init part (I am using SIngleton pattern to create producer per partition/task in worked node):
key_avro_schema = .....
value_avro_schema = .....
avro_key_serializer = AvroSerializer(key_avro_schema,
schema_registry_client)
avro_val_serializer = AvroSerializer(value_avro_schema,
schema_registry_client)
producer_conf = {'bootstrap.servers': 'localhost:9092',
'key.serializer': avro_key_serializer,
'value.serializer': avro_val_serializer,
'queue.buffering.max.messages': 100,
'queue.buffering.max.ms': 10,
'batch.num.messages': 50}
producer = SerializingProducer(producer_conf)
How to reproduce
Checklist
Please provide the following information:
- [ ] confluent-kafka-python and librdkafka version (
confluent_kafka.version(1.6.0)andconfluent_kafka.libversion()): - [ ] Apache Kafka broker version:
- [ ] Client configuration:
{...} - [ Linux/Mac] Operating system:
- [ ] Provide client logs (with
'debug': '..'as necessary) - [ ] Provide broker log excerpts
- [ ] Critical issue
Hi @hegdemahendra, thanks for asking.
For the flush() method, Wait until all outstanding produce requests, et.al, are completed. This should typically be done prior to destroying a producer instance to make sure all queued and in-flight produce requests are completed before terminating. Refer to: https://github.com/edenhill/librdkafka/blob/master/src/rdkafka.h#L4560.
If you create producer per partition of data, you just need to call the flush before destroying each producer: https://github.com/confluentinc/confluent-kafka-python/blob/master/src/confluent_kafka/kafkatest/verifiable_producer.py#L149
Closing, please reopen if issue persists