confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

Unable to import cimpl

Open mauliksoneji opened this issue 6 years ago • 7 comments

Description

We are running trying to write to kafka from a spark job. We are using confluent-kafka to write the data. We get data as RDD and when we try to do: partition.flush() it throws exception saying: ImportError: No module named 'cimpl'

The contents of confluent-kafka that we have installed are:

/usr/local/lib/python3.5/dist-packages/confluent_kafka# ls
__init__.py  __pycache__  admin  avro  cimpl.cpython-35m-x86_64-linux-gnu.so  kafkatest

How to reproduce

pip3 install confluent-kafka

I tried doing these as well: pip3 install confluent_kafka pip3 install confluent-kafka==0.11.6

    def write_rdd(self, rdd, topic):
        def send_to_kafka(partition):
            p = Producer({
                'bootstrap.servers': self.brokers
            })

            i = 0
            for data in partition:
                p.produce(topic, data)
                if i % 100 == 0:
                    p.poll(0)
                if i % 1000 == 0:
                    p.flush() // Here is where it throws error
                i = i + 1
            p.flush()

        rdd.foreachPartition(send_to_kafka)

Checklist

Please provide the following information:

  • [ ] confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):
  • [ ] Apache Kafka broker version: 0.11.0.0
  • [ ] Client configuration: {...}
  • [ ] Operating system: Debian 9.9 stretch
  • [ ] Provide client logs (with 'debug': '..' as necessary)
<module 'src.jobs.kafka_writer' from '/tmp/write_customer_summary_to_kafka_gofood_92470344/jobs.zip/src/jobs/kafka_writer/__init__.py'>
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 148, in dump
    return Pickler.dump(self, obj)
  File "/usr/lib/python3.5/pickle.py", line 408, in dump
    self.save(obj)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.5/pickle.py", line 744, in save_tuple
    save(element)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 255, in save_function
    self.save_function_tuple(obj)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 292, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.5/pickle.py", line 729, in save_tuple
    save(element)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.5/pickle.py", line 774, in save_list
    self._batch_appends(obj)
  File "/usr/lib/python3.5/pickle.py", line 798, in _batch_appends
    save(x)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 255, in save_function
    self.save_function_tuple(obj)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 292, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.5/pickle.py", line 729, in save_tuple
    save(element)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.5/pickle.py", line 774, in save_list
    self._batch_appends(obj)
  File "/usr/lib/python3.5/pickle.py", line 798, in _batch_appends
    save(x)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 255, in save_function
    self.save_function_tuple(obj)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 292, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.5/pickle.py", line 729, in save_tuple
    save(element)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.5/pickle.py", line 774, in save_list
    self._batch_appends(obj)
  File "/usr/lib/python3.5/pickle.py", line 798, in _batch_appends
    save(x)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 255, in save_function
    self.save_function_tuple(obj)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 292, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.5/pickle.py", line 729, in save_tuple
    save(element)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.5/pickle.py", line 774, in save_list
    self._batch_appends(obj)
  File "/usr/lib/python3.5/pickle.py", line 798, in _batch_appends
    save(x)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 255, in save_function
    self.save_function_tuple(obj)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 292, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.5/pickle.py", line 729, in save_tuple
    save(element)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.5/pickle.py", line 774, in save_list
    self._batch_appends(obj)
  File "/usr/lib/python3.5/pickle.py", line 801, in _batch_appends
    save(tmp[0])
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 255, in save_function
    self.save_function_tuple(obj)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 292, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.5/pickle.py", line 729, in save_tuple
    save(element)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.5/pickle.py", line 774, in save_list
    self._batch_appends(obj)
  File "/usr/lib/python3.5/pickle.py", line 801, in _batch_appends
    save(tmp[0])
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 255, in save_function
    self.save_function_tuple(obj)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 297, in save_function_tuple
    save(f_globals)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.5/pickle.py", line 814, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/lib/python3.5/pickle.py", line 845, in _batch_setitems
    save(v)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 391, in save_global
    __import__(modname)
ImportError: No module named 'cimpl'
  • [ ] Provide broker log excerpts
  • [ ] Critical issue

mauliksoneji avatar Jul 22 '19 12:07 mauliksoneji