kafka-connect-fs
kafka-connect-fs copied to clipboard
Files not being processed
You connector is probably one of the better explained connectors available and I really appreciate the work you have put into this. Unfortunately, I am having a problem getting it to process files.
The configurations are all provided below. My understanding is that when I add files to "/data" that I would see their contents in my topic ("sample-topic"), but they are not being processed.
Any ideas are appreciated!
The property file is setup as suggested (I am only using text files, located in "/data"):
name=FsSourceConnector
connector.class=com.github.mmolimar.kafka.connect.fs.FsSourceConnector
tasks.max=1
fs.uris=file:///data
topic=sample-topic
policy.class=com.github.mmolimar.kafka.connect.fs.policy.SimplePolicy
policy.recursive=true
policy.regexp=*
policy.batch_size=0
policy.cleanup=none
file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.TextFileReader
file_reader.batch_size=0
I have build the connector docker image and updated it for confluence platform v6.2.0:
FROM confluentinc/cp-kafka-connect-base:6.2.0
ARG PROJECT_VERSION
ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"
COPY ./staging/mmolimar-kafka-connect-fs-${PROJECT_VERSION}.zip /tmp/kafka-connect-fs.zip
RUN confluent-hub install --no-prompt /tmp/kafka-connect-fs.zip
I can see the connector (the first in the list):
curl -sX GET http://localhost:8083/connector-plugins | grep FsSourceConnector
[{"class":"com.github.mmolimar.kafka.connect.fs.FsSourceConnector","type":"source","version":"1.3.0"},{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"6.2.0-ccs"},{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"6.2.0-ccs"},{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}]
And I have made minor changes to the docker-compose file (update to 6.2.0, and added a volume to the connector to map to "/data" (which I confirmed is visible to connect):
version: '3'
services:
cp-zookeeper:
image: confluentinc/cp-zookeeper:6.2.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
cp-kafka:
image: confluentinc/cp-kafka:6.2.0
hostname: kafka
container_name: kafka
depends_on:
- cp-zookeeper
ports:
- "29092:29092"
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka:29092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'false'
cp-schema-registry:
image: confluentinc/cp-schema-registry:6.2.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- cp-zookeeper
- cp-kafka
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
connect-fs:
image: kafka-connect-fs:latest
container_name: connect
depends_on:
- cp-kafka
- cp-schema-registry
ports:
- "8083:8083"
- "8000:8000"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'kafka:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components/"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
KAFKA_OPTS: "-agentlib:jdwp=transport=dt_socket,server=y,address=8000,suspend=n"
volumes:
- ${PROJECT_DIR}/data:/data
Now that things have successfully started, I have created the topic ("sample-topic") as named in the properties file, and then moved files into "/data", but nothing appears in "sample-topic".
Hi @ericbroda! Thanks for your words.
What are the filenames you have in that directory? Have you tried with another policy such as Sleepy or Cron policy? What can you see in the connector logs?
Thank you for the quick response!
Regarding your questions:
- I have only two files in the "/data" directory: "file1.txt" and "file2.txt"
- I have not yet tried the other policies? I was hoping the default one would suffice.
- There is nothing the connector logs other than Connect has started
Can you try with another policy and also change the policy regexp config to this policy.regexp=^.*\.txt$
?