confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

How to call this producer.flush() in Spark map function ?

Open hegdemahendra opened this issue 4 years ago • 1 comments

I have spark job which reads data from S3 and publish that to kafka using 'confluent-kafka-python'. Below is the code which I am using, I am just trying to test with few 100 messages publish. If I add producer.flush() with every produce it works fine, but that will synchronous call.

But when Spark runs this piece of code in worked node for the given partition of data, how can I invoke producer.flush() at the end once or in between for batch ? Mainly curious to know the right way to use this PySpark.

Publish part :

  producer.produce(topic=kafka_topic, key=key_data_dict, value=value_data_dict,
                     on_delivery=delivery_report)
producer.poll(0)

Producer init part (I am using SIngleton pattern to create producer per partition/task in worked node):

         key_avro_schema = .....
         value_avro_schema = .....

         avro_key_serializer = AvroSerializer(key_avro_schema,
                                              schema_registry_client)

         avro_val_serializer = AvroSerializer(value_avro_schema,
                                              schema_registry_client)

         producer_conf = {'bootstrap.servers': 'localhost:9092',
                          'key.serializer': avro_key_serializer,
                          'value.serializer': avro_val_serializer,
                          'queue.buffering.max.messages': 100,
                          'queue.buffering.max.ms': 10,
                          'batch.num.messages': 50}

         producer = SerializingProducer(producer_conf)

How to reproduce

Checklist

Please provide the following information:

  • [ ] confluent-kafka-python and librdkafka version (confluent_kafka.version(1.6.0) and confluent_kafka.libversion()):
  • [ ] Apache Kafka broker version:
  • [ ] Client configuration: {...}
  • [ Linux/Mac] Operating system:
  • [ ] Provide client logs (with 'debug': '..' as necessary)
  • [ ] Provide broker log excerpts
  • [ ] Critical issue

hegdemahendra avatar Apr 08 '21 17:04 hegdemahendra

Hi @hegdemahendra, thanks for asking.

For the flush() method, Wait until all outstanding produce requests, et.al, are completed. This should typically be done prior to destroying a producer instance to make sure all queued and in-flight produce requests are completed before terminating. Refer to: https://github.com/edenhill/librdkafka/blob/master/src/rdkafka.h#L4560.

If you create producer per partition of data, you just need to call the flush before destroying each producer: https://github.com/confluentinc/confluent-kafka-python/blob/master/src/confluent_kafka/kafkatest/verifiable_producer.py#L149

jliunyu avatar Mar 21 '22 21:03 jliunyu

Closing, please reopen if issue persists

nhaq-confluent avatar Mar 06 '24 23:03 nhaq-confluent