kafka-connect-elastic-sink
kafka-connect-elastic-sink copied to clipboard
Kafka connect Elastic sink connector, with just in time index/delete behaviour.
Kafka Connect Elastic Sink Connector
Default Elastic sink connector and open source alternatives read data from Kafka topic, and index/delete them with respect to startup configuration.
This custom connector created for reading the configuration from the data itself.
That is,
- If data has "status" field set to "insert", then connector sends index request.
- If data has "status" field set to "delete", then connector delete request.
"status" flag name is configurable.
About record
Your record must be a JSON string and these fields should be included:
- flag : Used to get behaviour, should be
insertordelete, field nameflagis configurable byflag.fieldproperty. (e.g.statusin example configuration below) - You can also change
insertordeletevaluesConstants.javafile. - payload: To send Elastic, should contain Json data, field name
dataListis configurable bydata.arrayproperty. (e.g.dataListin example configuration below). Payload can be array or object. - Your data in
dataListmust includeidfield. You can change id field name fromConstants.java
Example Configuration
elastic.url=elasticsearch
name=ElasticSinkConnector
topics=first_topic
tasks.max=1
type.name=targettype
connector.class=com.skynyrd.kafka.ElasticSinkConnector
elastic.port=9200
index.name=targetindex
flag.field=status
data.array=dataList
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
Run with local Kafka cluster and Elastic instance
You should have Docker and Docker Compose to run and test it in your local.
- Clone this repo
mvn cleanandmvn install- Change absolute path in
volumesunderkafka-clusterindocker-compose.yml - Run with
docker-compose up - Wait a bit and open connect console
http://localhost:3030/kafka-connect-ui/#/cluster/fast-data-dev/select-connector - Select
ElasticSinkConnectorin right bottom of the page. (Appears if volume is correctly set in compose file.) - For logs:
http://localhost:3030/logs/connect-distributed.log
Curious about how I implemented this repository? Here is my medium post
TODOs
- Missing tests.