confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

Kafka returning same set of records even after full records fetched from the table

Open amplifinn-admin opened this issue 1 year ago • 0 comments

Description

Kafka returning same set of records even after full records fetched from the table

  • Iam using connector api for producer and consumer code in separate file to consume the message
  • Once full data is fetched from table again data is flowing in loop never stopping

Producer config: payload = { "name": "jdbc-postgres-connector-test3", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "mode": "bulk", "incrementing.column.name": "_id", "topic.prefix": "test", "connection.password": "xxxxx", "tasks.max": "1", "batch.max.rows": "10000", "connection.user": "xxxxx", "connection.url": "jdbc:postgresql://xx.xx.xx.xx/test", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "value.converter.schemas.enable": "false", "enable.idempotence": "true", "acks": "all", "poll.interval.ms": "-1", 'query': 'SELECT * FROM employee' } }

Consumer code:

from confluent_kafka import Consumer, KafkaError

conf = { 'bootstrap.servers': 'localhost:9092', 'group.id': 'test-consumer-group', 'auto.offset.reset': 'earliest', 'enable.auto.commit': False, 'enable.partition.eof': True, 'compression.type': 'gzip',

}

consumer = Consumer(conf) topics = ['test']

consumer.subscribe(topics) try: while True: msg = consumer.poll(1) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: consumer.close() break else: print(msg.error()) break try: data = msg.value().decode('utf-8') with open(f'data.txt', 'a') as file: file.write(data + '\n') consumer.commit(asynchronous=False) except Exception as e: print(f"Error processing message: {e}") consumer.close() except KeyboardInterrupt: pass finally: consumer.close()

Required is once all data consumed or produced stop send the data or receiving the data as my scenario is not for real-time data.

How to reproduce

Checklist

Please provide the following information:

  • [ ('2.3.0', 33751040), ('2.3.0', 33751295)] confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):
  • [ ] Apache Kafka broker version:
  • [ ] Client configuration: {...}
  • [ linux ubuntu] Operating system:
  • [ ] Provide client logs (with 'debug': '..' as necessary)
  • [ ] Provide broker log excerpts
  • [ yes] Critical issue

amplifinn-admin avatar Mar 01 '24 12:03 amplifinn-admin