streamx
streamx copied to clipboard
Error while running copy job in standalone mode
#I running streamx master branch on Apache Kafka .10.0 server and trying to copy Kafka topic messages to s3. I am getting below error in starting connector. I suspect S3 filesystem bucket and key names are not properly passed to underlying APIs. Please let me know if any hadoop configuration property needed to be added.(hdfs-site.xml published below).
Command to start-- ./connect-standalone.sh /app/kafka_2.11-0.10.2.1/config/connect-standalone.properties /app/streamx/streamx-0.1.0-SNAPSHOT-package/etc/streamx/connector1.properties
Connector Propeties- connector1.properties name=s3-sink connector.class=com.qubole.streamx.s3.S3SinkConnector format.class=com.qubole.streamx.SourceFormat tasks.max=1 topics=test topics.dir=test logs.dir=logs flush.size=3 s3.url=s3://platform.com/data/rawdata hadoop.conf.dir=/app/streamx/streamx-0.1.0-SNAPSHOT-package/etc/streamx/hadoop-conf partitioner.class=io.confluent.connect.hdfs.partitioner.DailyPartitioner
Error-
Caused by: java.io.IOException: / doesn't exist
at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get(Jets3tFileSystemStore.java:170)
at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.retrieveINode(Jets3tFileSystemStore.java:221)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy41.retrieveINode(Unknown Source)
at org.apache.hadoop.fs.s3.S3FileSystem.mkdir(S3FileSystem.java:165)
at org.apache.hadoop.fs.s3.S3FileSystem.mkdirs(S3FileSystem.java:154)
at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1877)
at com.qubole.streamx.s3.S3Storage.mkdirs(S3Storage.java:68)
at io.confluent.connect.hdfs.DataWriter.createDir(DataWriter.java:374)
at io.confluent.connect.hdfs.DataWriter.
Worker Configuration- connect-standalone.properties bootstrap.servers=localhost:9092 Kafka key.converter=com.qubole.streamx.ByteArrayConverter value.converter=com.qubole.streamx.ByteArrayConverter
key.converter.schemas.enable=true value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false
offset.storage.file.filename=/kafkadata/kafka/connect.offsets
offset.flush.interval.ms=10000
hdfs-site.xml
<configuration> <property> <name>fs.s3.impl</name> <value>org.apache.hadoop.fs.s3.S3FileSystem</value> </property> <property> <name>fs.s3.awsAccessKeyId</name> <value>secret</value> </property> <property> <name>fs.s3.awsSecretAccessKey</name> <value>secret</value> </property> </configuration>