pulsar-io-kafka
pulsar-io-kafka copied to clipboard
Pulsar IO Kafka Connector
Archived repository
This repository has been archived.
The Pulsar IO Kafka Connector code can now be found at apache/pulsar/pulsar-io/kafka.
Source documentation: https://pulsar.apache.org/docs/io-kafka-source/
Sink documentation: https://pulsar.apache.org/docs/io-kafka-sink/
Pulsar IO Kafka Connector
pulsar-io-kafka is a Pulsar IO Connector for copying data between Kafka and Pulsar.
NOTE: This connector is an enhanced version of
pulsar-io-kafkaconnector to support schema.
Get started
This provides a step-by-step example how to use this Kafka source connector to copy json data from a Kafka topic to a Pulsar topic and save the data in AVRO format.
Build pulsar-io-kafka connector
-
Git clone
pulsar-io-kafka. Assume PULSAR_IO_KAFKA_HOME is the home directory for your clonedpulsar-io-kafkarepo for the remaining steps.$ git clone https://github.com/streamnative/pulsar-io-kafka -
Build the connector in
${PULSAR_IO_KAFKA_HOME}directory.mvn clean install -DskipTestsAfter successfully built the connector, a NAR package is generated under target directory. The NAR package is the one you used for
$ ls target/pulsar-io-kafka-2.5.0-SNAPSHOT.nar target/pulsar-io-kafka-2.5.0-SNAPSHOT.nar
Prepare a config for running pulsar-io-kafka connector
An example yaml config is available here
This example yaml config is used for copying json data from Kafka topic input-topic to Pulsar topic output-topic and save the messages in AVRO format.
Run pulsar-io-kafka connector
-
Download Pulsar 2.4.0 release from Pulsar website and follow the instructions to start a standalone Pulsar. Assume PULSAR_HOME is the home directory for your Pulsar installation for the remaining steps.
Example command to start a standalone Pulsar.
cd ${PULSAR_HOME} bin/pulsar standalone -
Download Kafka release from Kafka website and follow the instructions to start a Kafka server. Assume KAFKA_HOME is the home directory for your Kafka installation for the remaining steps.
Example commands to start a Kafka server.
cd ${KAFKA_HOME} bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties -
Create a Kafka topic.
Make sure the Kafka topic name is the one you configured in
pulsar-io-kafka.yaml.bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic input-topic --replication-factor 1 --partitions 4 -
Copy the pulsar-io-kafka conenctor to
${PULSAR_HOME}/connectorsdirectory.cd ${PULSAR_HOME} mkdir -p connectors cp ${PULSAR_IO_KAFKA_HOME}/target/pulsar-io-kafka-2.5.0-SNAPSHOT.nar connectors/ -
Localrun the pulsar-io-kafka connector.
NOTE:
--destination-topic-nameis used by pulsar io runtime but not by thispulsar-io-kafkaconnector. We can not omit this setting at this momement. So you can given any unused topic name for now.cd ${PULSAR_HOME} bin/pulsar-admin sources localrun -a connectors/pulsar-io-kafka-2.5.0-SNAPSHOT.nar --tenant public --namespace default --name test-kafka-source --source-config-file ${PULSAR_IO_KAFKA_HOME}/conf/pulsar-io-kafka.yaml --destination-topic-name test-kafka-source-topicOnce the connector is running, keep this terminal open during the remaining steps.
-
Now we can use a json kafka producer and an avro pulsar consumer to verify if the connector is working as expected.
Start a json Kafka producer to produce 100 messages.
cd ${PULSAR_IO_KAFKA_HOME} bin/kafka-json-producer.sh localhost:9092 input-topic 100Start an avro Pulsar consumer to consume the 100 messages (in avro format).
cd ${PULSAR_IO_KAFKA_HOME} bin/pulsar-avro-consumer.sh pulsar://localhost:6650 output-topic subYou will see similar output in the terminal you run
pulsar-avro-consumer.sh:Receive message : key = user-99, value = User(name=user-99, age=990, address=address-99)