streamx icon indicating copy to clipboard operation
streamx copied to clipboard

Error while running copy job in standalone mode

Open akshatthakar opened this issue 7 years ago • 0 comments

#I running streamx master branch on Apache Kafka .10.0 server and trying to copy Kafka topic messages to s3. I am getting below error in starting connector. I suspect S3 filesystem bucket and key names are not properly passed to underlying APIs. Please let me know if any hadoop configuration property needed to be added.(hdfs-site.xml published below).

Command to start-- ./connect-standalone.sh /app/kafka_2.11-0.10.2.1/config/connect-standalone.properties /app/streamx/streamx-0.1.0-SNAPSHOT-package/etc/streamx/connector1.properties

Connector Propeties- connector1.properties name=s3-sink connector.class=com.qubole.streamx.s3.S3SinkConnector format.class=com.qubole.streamx.SourceFormat tasks.max=1 topics=test topics.dir=test logs.dir=logs flush.size=3 s3.url=s3://platform.com/data/rawdata hadoop.conf.dir=/app/streamx/streamx-0.1.0-SNAPSHOT-package/etc/streamx/hadoop-conf partitioner.class=io.confluent.connect.hdfs.partitioner.DailyPartitioner

Error- Caused by: java.io.IOException: / doesn't exist at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get(Jets3tFileSystemStore.java:170) at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.retrieveINode(Jets3tFileSystemStore.java:221) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy41.retrieveINode(Unknown Source) at org.apache.hadoop.fs.s3.S3FileSystem.mkdir(S3FileSystem.java:165) at org.apache.hadoop.fs.s3.S3FileSystem.mkdirs(S3FileSystem.java:154) at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1877) at com.qubole.streamx.s3.S3Storage.mkdirs(S3Storage.java:68) at io.confluent.connect.hdfs.DataWriter.createDir(DataWriter.java:374) at io.confluent.connect.hdfs.DataWriter.(DataWriter.java:174)

Worker Configuration- connect-standalone.properties bootstrap.servers=localhost:9092 Kafka key.converter=com.qubole.streamx.ByteArrayConverter value.converter=com.qubole.streamx.ByteArrayConverter

key.converter.schemas.enable=true value.converter.schemas.enable=true

internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false

offset.storage.file.filename=/kafkadata/kafka/connect.offsets

offset.flush.interval.ms=10000

hdfs-site.xml <configuration> <property> <name>fs.s3.impl</name> <value>org.apache.hadoop.fs.s3.S3FileSystem</value> </property> <property> <name>fs.s3.awsAccessKeyId</name> <value>secret</value> </property> <property> <name>fs.s3.awsSecretAccessKey</name> <value>secret</value> </property> </configuration>

akshatthakar avatar Jun 08 '17 14:06 akshatthakar