kafka-connect-storage-common icon indicating copy to clipboard operation
kafka-connect-storage-common copied to clipboard

io.confluent.connect.storage.errors.PartitionException: Error encoding partition

Open Adeljoo opened this issue 5 years ago • 14 comments

I added my custom partitioner and I get the error when I write to a topic and I see the following error:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:559) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:315) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:218) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:186) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: io.confluent.connect.storage.errors.PartitionException: Error encoding partition. at com.accelerator.kafka.connect.FieldAndTimeBasedPartitioner$PartitionFieldExtractor.extract(FieldAndTimeBasedPartitioner.java:153) at com.accelerator.kafka.connect.FieldAndTimeBasedPartitioner.encodePartition(FieldAndTimeBasedPartitioner.java:97) at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:205) at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:176) at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:195) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:537) ... 10 more

Config of json file:

for i in "${!KCONNECT_NODES[@]}"; do
    curl ${KCONNECT_NODES[$i]}/connectors -XPOST -H 'Content-type: application/json' -H 'Accept: application/json' -d '{
        "name": "connect-s3-sink-'$i'",
        "config": {
            "topics": "events",
            "connector.class": "io.confluent.connect.s3.S3SinkConnector",
            "tasks.max" : 4,
            "flush.size": 100,
            "rotate.schedule.interval.ms": "-1",
            "rotate.interval.ms": "-1",
            "s3.region" : "eu-west-1",
            "s3.bucket.name" : "byob-raw",
            "s3.compression.type": "gzip",
            "topics.dir": "topics",
            "storage.class" : "io.confluent.connect.s3.storage.S3Storage",
            "partitioner.class": "com.accelerator.kafka.connect.FieldAndTimeBasedPartitioner",
            "partition.duration.ms" : "3600000",
            "path.format": "YYYY-MM-dd",
            "locale" : "US",
            "timezone" : "UTC",
            "schema.compatibility": "NONE",
            "format.class" : "io.confluent.connect.s3.format.json.JsonFormat",
            "timestamp.extractor": "Record",
            "partition.field" : "appId"
        }
    }'
done

Adeljoo avatar Apr 10 '20 12:04 Adeljoo

I don't think this is an issue with the repo.

Can you please show your partitioner?

Did you use the code in #73?

OneCricketeer avatar Apr 10 '20 14:04 OneCricketeer

You should also try debugging your own code and or write more unit tests

https://stackoverflow.com/questions/45717658/what-is-a-simple-effective-way-to-debug-custom-kafka-connectors

OneCricketeer avatar Apr 10 '20 14:04 OneCricketeer

@cricket007 thank you for the link, I tried to debug and still throws the same error. here is the line of code that it complains about it: `public String encodePartition(SinkRecord sinkRecord) {

    final Long timestamp = this.timestampExtractor.extract(sinkRecord);
    final String partitionFieldValue = this.partitionFieldExtractor.extract(sinkRecord);

    return encodedPartitionForFieldAndTime(sinkRecord, timestamp, partitionFieldValue);

}

`

I am not sure should I change something in json or other things caused the problem.

Adeljoo avatar Apr 14 '20 08:04 Adeljoo

I don't think this is an issue with the repo.

Can you please show your partitioner?

Did you use the code in #73?

yes

Adeljoo avatar Apr 14 '20 08:04 Adeljoo

Are you able to run the unit tests?

Can you fork that branch and run a build with your own test that produces the exception?

OneCricketeer avatar Apr 14 '20 19:04 OneCricketeer

Sorry I used the following code: https://github.com/canelmas/kafka-connect-field-and-time-partitioner not the one in 73

Adeljoo avatar Apr 15 '20 08:04 Adeljoo

Are you sure?

com.canelmas.kafka.connect (that repo) does not match com.accelerator.kafka.connect (your stacktrace)

OneCricketeer avatar Apr 15 '20 12:04 OneCricketeer

@cricket007 I changed the package name and also changed the config based on the package. I have the following line package com.accelerator.kafka.connect; in my java class.

Adeljoo avatar Apr 15 '20 13:04 Adeljoo

@cricket007 that's interesting. I did the mvn test and this is the outcome:

-------------------------------------------------------
 T E S T S
-------------------------------------------------------
Running com.accelerator.test.kafka.connect.FieldAndTimeBasedPartitionerTest
Tests run: 0, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.006 sec - in com.accelerator.test.kafka.connect.FieldAndTimeBasedPartitionerTest

Results :

Tests run: 0, Failures: 0, Errors: 0, Skipped: 0

[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ---------------------------------------------

Adeljoo avatar Apr 15 '20 18:04 Adeljoo

No tests ran... I was asking you to write one of your own with a sample of a record that you expect to get partitioned.

OneCricketeer avatar Apr 15 '20 20:04 OneCricketeer

How can I run such a test, is there any documentation avaiable?

Adeljoo avatar Apr 16 '20 07:04 Adeljoo

Tests are part of the repo...You have used Maven before, right?

https://github.com/confluentinc/kafka-connect-storage-common/tree/master/core/src/test

It's not clear what you've cloned or built if tests aren't running for you

OneCricketeer avatar Apr 17 '20 16:04 OneCricketeer

I followed the instruction to building the common storage

https://github.com/confluentinc/kafka-connect-storage-common/wiki/FAQ

And, this is the new pom to perform a test.


<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.accelerator.kafka</groupId>
    <artifactId>kafka-connect-storage-partitioner</artifactId>
    <name>kafka-connect-storage-partitioner</name>
    <version>1.0.0-SNAPSHOT</version>
    <packaging>jar</packaging>
    <repositories>
    <repository>
        <id>confluent</id>
        <url>http://packages.confluent.io/maven/</url>
    </repository>
    </repositories>
  <properties>
    <maven.compiler.source>1.6</maven.compiler.source>
    <maven.compiler.target>1.6</maven.compiler.target>
  </properties>
 <dependencies>
  <dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-connect-storage-partitioner</artifactId>
    <version>5.4.1</version>
    </dependency>
        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
            <version>2.9.9</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>connect-api</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
     <groupId>junit</groupId>
     <artifactId>junit</artifactId>
     <version>4.13</version>
     <scope>test</scope>
      </dependency>
        <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>2.0.0-alpha1</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-connect-storage-common</artifactId>
    <version>5.4.1</version>
</dependency>
 <dependency>
        <groupId>org.junit.jupiter</groupId>
        <artifactId>junit-jupiter-api</artifactId>
        <version>5.0.0-M4</version>
        <scope>test</scope>
    </dependency>
    </dependencies>
<build>
    <plugins>        
        <plugin>
            <artifactId>maven-surefire-plugin</artifactId>
            <version>2.19</version>
            <dependencies>
                <dependency>
                    <groupId>org.junit.platform</groupId>
                    <artifactId>junit-platform-surefire-provider</artifactId>
                    <version>1.0.0-M4</version>
                </dependency>
                <dependency>
                    <groupId>org.junit.jupiter</groupId>
                    <artifactId>junit-jupiter-engine</artifactId>
                    <version>5.0.0-M4</version>
                </dependency>
            </dependencies>
        </plugin>
    </plugins>
</build>
</project>

Adeljoo avatar Apr 20 '20 07:04 Adeljoo

But you don't need to do this. You can fork this repo, as is, then add your own partitioner and tests

OneCricketeer avatar Apr 20 '20 21:04 OneCricketeer