kafka-mysql-connector icon indicating copy to clipboard operation
kafka-mysql-connector copied to clipboard

kafka-connect from mysql to hbase

Open sunnn opened this issue 8 years ago • 7 comments

Hi is there any idea on how to ingest data from mysql to hbase through confluent ,If so could shed light on it @ewencp @wushujames

sunnn avatar Aug 25 '16 04:08 sunnn

@sunnn You probably just need to tie a couple of connectors together. For MySQL -> Kafka you could use Confluent's JDBC connector which uses the standard JDBC interface to support a variety of relational databases or Debezium's CDC connector which can read directly from the MySQL binlog. For Kafka -> HBase, you could use this HBase sink connector listed on the Connector Hub.

ewencp avatar Aug 25 '16 04:08 ewencp

Yeah i have tried doing it the problem arises when i start the kafka agent then i get the following error [2016-08-25 03:59:00,223] INFO Created connector kafka-cdc-hbase (org.apache.kaf ka.connect.cli.ConnectStandalone:82) [2016-08-25 03:59:00,225] ERROR Thread WorkerSinkTask-kafka-cdc-hbase-0 exiting with uncaught exception: (org.apache.kafka.connect.util.ShutdownableThread:84) java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.sub scribe(Ljava/util/List;Lorg/apache/kafka/clients/consumer/ConsumerRebalanceListe ner;)V at org.apache.kafka.connect.runtime.WorkerSinkTask.joinConsumerGroupAndS tart(WorkerSinkTask.java:143) at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerS inkTaskThread.java:54) at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThre ad.java:82) Exception in thread "WorkerSinkTask-kafka-cdc-hbase-0" java.lang.NoSuchMethodErr or: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/List;Lo rg/apache/kafka/clients/consumer/ConsumerRebalanceListener;)V at org.apache.kafka.connect.runtime.WorkerSinkTask.joinConsumerGroupAndS tart(WorkerSinkTask.java:143) at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerS inkTaskThread.java:54) at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThre ad.java:82) [2016-08-25 03:59:00,230] INFO Source task Thread[WorkerSourceTask-kafka-mysql-j dbc-0,5,main] finished initialization and start (org.apache.kafka.connect.runtim e.WorkerSourceTask:342) [2016-08-25 03:59:01,474] WARN Error while fetching metadata with correlation id 0 : {test_jdbc_users=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkCli ent:600) could you help me out in this issue @ewencp

sunnn avatar Aug 25 '16 05:08 sunnn

It seems like there could be multiple issues at play here.

The first is the one with java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/List;Lorg/apache/kafka/clients/consumer/ConsumerRebalanceListener;) message. This looks like you might have mismatched versions of Connect and Kafka libraries -- because of some concerns around using specific types of collections instead of generic Collection types, Kafka made a backwards-incompatible change in the consumer APIs to address this. But normally I wouldn't expect to see this issue in connect since usually users will just install a single Kafka bundle including everything (either from Apache or Confluent) and execute everything with matching versions. How did you install everything, and was there anything non-standard about your process?

The LEADER_NOT_AVAILABLE message looks like a topic might not exist. But in this case, I'm guessing you are seeing that message simply because of topic auto-creation -- if you are relying on the topic being auto-created by the connector when it produces data, then this message would be expected.

ewencp avatar Aug 25 '16 05:08 ewencp

I have used the vagrant machine provided for the example http://www.confluent.io/blog/how-to-build-a-scalable-etl-pipeline-with-kafka-connect and installed hbase over it the same way hive was being done i the machine and started kafka but the here confluent i used was cp3.0.0 and hbase connector i feel may be from cp2.0.0 from the above link u have provided

sunnn avatar Aug 25 '16 06:08 sunnn

@sunnn Yes, simply because of the timing, that repository uses a CP 2.0 version. I have not tested this, but you can probably just substitute the 2.0 version for 3.0 version in the vagrant/deb.sh script to get a more up-to-date version of CP to test with in that VM.

ewencp avatar Aug 25 '16 06:08 ewencp

the hbase link you have given was that for cp2.0 or 3.0 version could you specify @ewencp

sunnn avatar Aug 25 '16 06:08 sunnn

@sunnn Great question! It is a third party connector (connectors are federated, so anyone can develop them), but it looks like it might currently be targeted at CP 2.0/Kafka 0.9.0. The changes required for newer versions of Kafka/CP are very minimal, so it's likely a small PR/request to the developers might get it updated for the most recent version. (They may even be up to date and just forgotten to update their README -- I'd suggest posting over on their repository to find out.)

ewencp avatar Aug 25 '16 06:08 ewencp