kafka-connect-storage-common
kafka-connect-storage-common copied to clipboard
io.confluent.connect.storage.errors.PartitionException: Error encoding partition
I added my custom partitioner and I get the error when I write to a topic and I see the following error:
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:559) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:315) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:218) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:186) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: io.confluent.connect.storage.errors.PartitionException: Error encoding partition. at com.accelerator.kafka.connect.FieldAndTimeBasedPartitioner$PartitionFieldExtractor.extract(FieldAndTimeBasedPartitioner.java:153) at com.accelerator.kafka.connect.FieldAndTimeBasedPartitioner.encodePartition(FieldAndTimeBasedPartitioner.java:97) at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:205) at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:176) at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:195) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:537) ... 10 more
Config of json file:
for i in "${!KCONNECT_NODES[@]}"; do
curl ${KCONNECT_NODES[$i]}/connectors -XPOST -H 'Content-type: application/json' -H 'Accept: application/json' -d '{
"name": "connect-s3-sink-'$i'",
"config": {
"topics": "events",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max" : 4,
"flush.size": 100,
"rotate.schedule.interval.ms": "-1",
"rotate.interval.ms": "-1",
"s3.region" : "eu-west-1",
"s3.bucket.name" : "byob-raw",
"s3.compression.type": "gzip",
"topics.dir": "topics",
"storage.class" : "io.confluent.connect.s3.storage.S3Storage",
"partitioner.class": "com.accelerator.kafka.connect.FieldAndTimeBasedPartitioner",
"partition.duration.ms" : "3600000",
"path.format": "YYYY-MM-dd",
"locale" : "US",
"timezone" : "UTC",
"schema.compatibility": "NONE",
"format.class" : "io.confluent.connect.s3.format.json.JsonFormat",
"timestamp.extractor": "Record",
"partition.field" : "appId"
}
}'
done
I don't think this is an issue with the repo.
Can you please show your partitioner?
Did you use the code in #73?
You should also try debugging your own code and or write more unit tests
https://stackoverflow.com/questions/45717658/what-is-a-simple-effective-way-to-debug-custom-kafka-connectors
@cricket007 thank you for the link, I tried to debug and still throws the same error. here is the line of code that it complains about it: `public String encodePartition(SinkRecord sinkRecord) {
final Long timestamp = this.timestampExtractor.extract(sinkRecord);
final String partitionFieldValue = this.partitionFieldExtractor.extract(sinkRecord);
return encodedPartitionForFieldAndTime(sinkRecord, timestamp, partitionFieldValue);
}
`
I am not sure should I change something in json or other things caused the problem.
I don't think this is an issue with the repo.
Can you please show your partitioner?
Did you use the code in #73?
yes
Are you able to run the unit tests?
Can you fork that branch and run a build with your own test that produces the exception?
Sorry I used the following code: https://github.com/canelmas/kafka-connect-field-and-time-partitioner not the one in 73
Are you sure?
com.canelmas.kafka.connect (that repo) does not match com.accelerator.kafka.connect (your stacktrace)
@cricket007 I changed the package name and also changed the config based on the package. I have the following line package com.accelerator.kafka.connect; in my java class.
@cricket007 that's interesting. I did the mvn test and this is the outcome:
-------------------------------------------------------
T E S T S
-------------------------------------------------------
Running com.accelerator.test.kafka.connect.FieldAndTimeBasedPartitionerTest
Tests run: 0, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.006 sec - in com.accelerator.test.kafka.connect.FieldAndTimeBasedPartitionerTest
Results :
Tests run: 0, Failures: 0, Errors: 0, Skipped: 0
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ---------------------------------------------
No tests ran... I was asking you to write one of your own with a sample of a record that you expect to get partitioned.
How can I run such a test, is there any documentation avaiable?
Tests are part of the repo...You have used Maven before, right?
https://github.com/confluentinc/kafka-connect-storage-common/tree/master/core/src/test
It's not clear what you've cloned or built if tests aren't running for you
I followed the instruction to building the common storage
https://github.com/confluentinc/kafka-connect-storage-common/wiki/FAQ
And, this is the new pom to perform a test.
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.accelerator.kafka</groupId>
<artifactId>kafka-connect-storage-partitioner</artifactId>
<name>kafka-connect-storage-partitioner</name>
<version>1.0.0-SNAPSHOT</version>
<packaging>jar</packaging>
<repositories>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
<properties>
<maven.compiler.source>1.6</maven.compiler.source>
<maven.compiler.target>1.6</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-storage-partitioner</artifactId>
<version>5.4.1</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.9.9</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>2.0.0-alpha1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-storage-common</artifactId>
<version>5.4.1</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.0.0-M4</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.19</version>
<dependencies>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-surefire-provider</artifactId>
<version>1.0.0-M4</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.0.0-M4</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>
</project>
But you don't need to do this. You can fork this repo, as is, then add your own partitioner and tests