kafka-connect-storage-common icon indicating copy to clipboard operation
kafka-connect-storage-common copied to clipboard

S3 Source custom partition Caused by: NoSuchMethodException: after creation

Open artemio77 opened this issue 3 years ago • 11 comments

Hi, I implement a custom partitioner for the sink and source s3 connector. Partitioner: package io.confluent.connect.storage.partitioner;

import java.util.List; import java.util.Locale; import java.util.Map; import org.apache.kafka.connect.sink.SinkRecord; import org.joda.time.DateTimeZone; import org.slf4j.Logger; import org.slf4j.LoggerFactory;

public class FieldAndTimeBasedPartitioner<T> extends TimeBasedPartitioner<T> {

private static final String PARTITION_FIELD_FORMAT_PATH_CONFIG = "partition.field.format.path";
private static final boolean PARTITION_FIELD_FORMAT_PATH_DEFAULT = true;
private static final Logger log = LoggerFactory.getLogger(FieldAndTimeBasedPartitioner.class);
private PartitionFieldExtractor partitionFieldExtractor;

public FieldAndTimeBasedPartitioner() {
}

protected void init(long partitionDurationMs, String pathFormat, Locale locale, DateTimeZone timeZone, Map<String, Object> config) {
    super.init(partitionDurationMs, pathFormat, locale, timeZone, config);

    final List<String> fieldNames = (List<String>) config.get(PartitionerConfig.PARTITION_FIELD_NAME_CONFIG);
    // option value is parse as string all other type is cast as string by kafka connect need to parse by ourselves
    boolean formatPath =
        Boolean.parseBoolean((String) config.getOrDefault(PARTITION_FIELD_FORMAT_PATH_CONFIG, PARTITION_FIELD_FORMAT_PATH_DEFAULT));

    this.partitionFieldExtractor = new PartitionFieldExtractor(fieldNames, formatPath);
}

public String encodePartition(final SinkRecord sinkRecord, final long nowInMillis) {
    final String partitionsForTimestamp = super.encodePartition(sinkRecord, nowInMillis);
    final String partitionsForFields = this.partitionFieldExtractor.extract(sinkRecord);
    final String partition = String.join(this.delim, partitionsForFields, partitionsForTimestamp);

    log.info("Encoded partition : {}", partition);

    return partition;
}

public String encodePartition(final SinkRecord sinkRecord) {
    final String partitionsForTimestamp = super.encodePartition(sinkRecord);
    final String partitionsForFields = this.partitionFieldExtractor.extract(sinkRecord);
    final String partition = String.join(this.delim, partitionsForFields, partitionsForTimestamp);

    log.info("Encoded partition : {}", partition);

    return partition;
}

public static class PartitionFieldExtractor { private static final String DELIMITER_EQ = "=";

private final boolean formatPath;
private final List<String> fieldNames;
private static final Logger log = LoggerFactory.getLogger(PartitionFieldExtractor.class);


PartitionFieldExtractor(final List<String> fieldNames, final boolean formatPath) {
    this.fieldNames = fieldNames;
    this.formatPath = formatPath;
}

public String extract(final ConnectRecord<?> record) {
    Object value = record.value();
    StringBuilder builder = new StringBuilder();
    for (final String fieldName : this.fieldNames) {
        if (builder.length() != 0) {
            builder.append(StorageCommonConfig.DIRECTORY_DELIM_DEFAULT);
        }
        if (value instanceof Struct || value instanceof Map) {
            final String partitionField = (String) DataUtils.getNestedFieldValue(value, fieldName);
            if (formatPath) {
                builder.append(String.join(DELIMITER_EQ, fieldName, partitionField));
            } else {
                builder.append(partitionField);
            }
        } else {
            log.error("Value is not of Struct or Map type. type {}", value.getClass());
            throw new PartitionException("Error encoding partition.");
        }
    }
    return builder.toString();
}

}

} When I create sink s3 connector everuthing is fine it working correct: { "name": "s3-sink", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "tasks.max": "3", "topics": "test-s3-sink", "topic.creation.default.replication.factor": "1", "topic.creation.default.partitions": "20", "s3.region": "us-west-2", "s3.proxy.url": "http://localstack:4566", "s3.bucket.name": "confluent-kafka-connect-s3-testing", "s3.part.size": "5242880", "flush.size": "3", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "format.class": "io.confluent.connect.s3.format.avro.AvroFormat", "value.converter.schema.registry.url": "http://schema-registry:8081", "schema.compatibility": "NONE", "value.converter.schemas.enable": "true", "partitioner.class": "com.kafka.connect.extention.FieldAndTimeBasedPartitioner", "partition.field.name": "customerId,channelType,channelId", "partition.field.format.path": true, "partition.duration.ms": 86400000, "path.format": "'year'=YYYY/'month'=MM/'day'=dd", "locale": "US", "timezone": "UTC", "timestamp.extractor": "RecordField", "timestamp.field": "messageDate" } } } But when i try create source s3 connector with this partitioner I got exception after creation { "name": "s3-source", "config": { "confluent.license": "", "connector.class": "io.confluent.connect.s3.source.S3SourceConnector", "confluent.topic.bootstrap.servers": "kafka01.internal-service:9092", "s3.region": "us-west-2", "s3.proxy.url": "http://localstack:4566", "s3.bucket.name": "confluent-kafka-connect-s3-testing", "confluent.topic.replication.factor": 1, "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://schema-registry:8081", "schema.compatibility": "NONE", "value.converter.schemas.enable": "true", "format.class": "io.confluent.connect.s3.format.avro.AvroFormat", "partitioner.class": "io.confluent.connect.storage.partitioner.FieldAndTimeBasedPartitioner", "partition.field.name": "customerId,channelType,channelId", "partition.field.format.path": true, "partition.duration.ms": 86400000, "path.format": "'year'=YYYY/'month'=MM/'day'=dd", "locale": "US", "timezone": "UTC", "timestamp.extractor": "RecordField", "timestamp.field": "messageDate" } } Exception: [2022-05-30 21:33:00,018] ERROR WorkerConnector{id=s3-source} Error while starting connector (org.apache.kafka.connect.runtime.WorkerConnector) 2022-05-30T21:33:00.023992047Z org.apache.kafka.common.config.ConfigException: Invalid value java.lang.NoSuchMethodException: io.confluent.connect.storage.partitioner.FieldAndTimeBasedPartitioner.(io.confluent.connect.cloud.storage.source.StorageSourceConnectorConfig, io.confluent.connect.cloud.storage.source.SourceStorage) for configuration Failed to instantiate partitioner class io.confluent.connect.storage.partitioner.FieldAndTimeBasedPartitioner 2022-05-30T21:33:00.024020005Z at io.confluent.connect.cloud.storage.source.StorageSourceConnectorConfig.getPartitioner(StorageSourceConnectorConfig.java:667) 2022-05-30T21:33:00.024032949Z at io.confluent.connect.cloud.storage.source.RestoreStorageSourceConnector.doStart(RestoreStorageSourceConnector.java:90) 2022-05-30T21:33:00.024044101Z at io.confluent.connect.cloud.storage.source.RestoreStorageSourceConnector.start(RestoreStorageSourceConnector.java:83) 2022-05-30T21:33:00.024054699Z at io.confluent.connect.cloud.storage.source.CompositeSourceConnector.start(CompositeSourceConnector.java:72) 2022-05-30T21:33:00.024118151Z at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:184) 2022-05-30T21:33:00.024226395Z at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:209) 2022-05-30T21:33:00.024239719Z at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:348) 2022-05-30T21:33:00.024250041Z at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:331) 2022-05-30T21:33:00.024260690Z at org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:140) 2022-05-30T21:33:00.024271458Z at org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:117) 2022-05-30T21:33:00.024283104Z at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) 2022-05-30T21:33:00.024293380Z at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) 2022-05-30T21:33:00.024303275Z at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 2022-05-30T21:33:00.024313394Z at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 2022-05-30T21:33:00.024323897Z at java.base/java.lang.Thread.run(Thread.java:829) 2022-05-30T21:33:00.029909345Z [2022-05-30 21:33:00,028] ERROR [Worker clientId=connect-1, groupId=kafka-connect] Failed to start connector 's3-source' (org.apache.kafka.connect.runtime.distributed.DistributedHerder) 2022-05-30T21:33:00.029993514Z org.apache.kafka.connect.errors.ConnectException: Failed to start connector: s3-source 2022-05-30T21:33:00.030013367Z at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$startConnector$25(DistributedHerder.java:1461) 2022-05-30T21:33:00.030028022Z at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:334) 2022-05-30T21:33:00.030039736Z at org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:140) 2022-05-30T21:33:00.030051017Z at org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:117) 2022-05-30T21:33:00.030061715Z at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) 2022-05-30T21:33:00.030072480Z at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) 2022-05-30T21:33:00.030084369Z at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 2022-05-30T21:33:00.030095914Z at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 2022-05-30T21:33:00.030107039Z at java.base/java.lang.Thread.run(Thread.java:829) 2022-05-30T21:33:00.030118727Z Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to transition connector s3-source to state STARTED 2022-05-30T21:33:00.030130358Z ... 8 more 2022-05-30T21:33:00.030140662Z Caused by: org.apache.kafka.common.config.ConfigException: Invalid value java.lang.NoSuchMethodException: io.confluent.connect.storage.partitioner.FieldAndTimeBasedPartitioner.(io.confluent.connect.cloud.storage.source.StorageSourceConnectorConfig, io.confluent.connect.cloud.storage.source.SourceStorage) for configuration Failed to instantiate partitioner class io.confluent.connect.storage.partitioner.FieldAndTimeBasedPartitioner 2022-05-30T21:33:00.030181168Z at io.confluent.connect.cloud.storage.source.StorageSourceConnectorConfig.getPartitioner(StorageSourceConnectorConfig.java:667) 2022-05-30T21:33:00.030193293Z at io.confluent.connect.cloud.storage.source.RestoreStorageSourceConnector.doStart(RestoreStorageSourceConnector.java:90) 2022-05-30T21:33:00.030204530Z at io.confluent.connect.cloud.storage.source.RestoreStorageSourceConnector.start(RestoreStorageSourceConnector.java:83) 2022-05-30T21:33:00.030218397Z at io.confluent.connect.cloud.storage.source.CompositeSourceConnector.start(CompositeSourceConnector.java:72) 2022-05-30T21:33:00.030229561Z at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:184) 2022-05-30T21:33:00.030240106Z at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:209) 2022-05-30T21:33:00.030250930Z at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:348) 2022-05-30T21:33:00.030261898Z at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:331) 2022-05-30T21:33:00.030272834Z ... 7 more

I don't know why I got this exception because constructor in present for partitioner

artemio77 avatar May 30 '22 21:05 artemio77

Are you using the same connect worker for both connectors and this class is available on the CLASSPATH?

OneCricketeer avatar Jun 22 '22 21:06 OneCricketeer

@artemio77 is this issue resolved? I am also getting the same problem no matter what partitioner I use i.e default, or timebased or daily.

For me also s3 sink connector is working fine, but not s3 source connector

any pointers would be helpful

yarlagaddag avatar Oct 20 '22 00:10 yarlagaddag

This connector doesn't have field and time partitioner. Therefore, it wouldn't be found, by default.

Regarding the original post, seems like it was working?

When I create sink s3 connector everuthing is fine it working correct

... com.kafka.connect.extention.FieldAndTimeBasedPartitioner

Not clear what is this class since Kafka Connect packages start with org.apache.kafka, and all the included partitioners, including the shown code start with io.confluent

OneCricketeer avatar Oct 20 '22 00:10 OneCricketeer

@OneCricketeer what are the valid partitioners for this connector?

yarlagaddag avatar Oct 20 '22 00:10 yarlagaddag

@yarlagaddag They are all here, but switch the branch/tag to the version you've installed https://github.com/confluentinc/kafka-connect-storage-common/tree/master/partitioner/src/main/java/io/confluent/connect/storage/partitioner

OneCricketeer avatar Oct 20 '22 00:10 OneCricketeer

Thank you @OneCricketeer . So there is TimeBasedPartitioner, and this is what I used earlier after looking into the above

yarlagaddag avatar Oct 20 '22 00:10 yarlagaddag

Got the same issue for DefaultPartitioner as well org.apache.kafka.common.config.ConfigException: Invalid value java.lang.NoSuchMethodException: io.confluent.connect.storage.partitioner.DefaultPartitioner.(io.confluent.connect.cloud.storage.source.StorageSourceConnectorConfig, io.confluent.connect.cloud.storage.source.SourceStorage) for configuration Failed to instantiate partitioner class io.confluent.connect.storage.partitioner.DefaultPartitioner\n\tat io.confluent.connect.cloud.storage.source.StorageSourceConnectorConfig.getPartitioner(StorageSourceConnectorConfig.java:686)\n\tat io.confluent.connect.cloud.storage.source.StorageSourceTask.start(StorageSourceTask.java:75)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.initializeAndStart(WorkerSourceTask.java:230)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:198)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\n" },

yarlagaddag avatar Oct 20 '22 00:10 yarlagaddag

Right. If you want to use both fields and timestamp, you'll need to checkout PR #251, then replace all the connect-storage-common JARs on your connect servers

OneCricketeer avatar Oct 20 '22 00:10 OneCricketeer

alright.. I am using just the default one and still ran into the same issue, not sure what else is causing this issue.. stack trace is posted above

yarlagaddag avatar Oct 20 '22 00:10 yarlagaddag

I'd guess it has something to do with reflection looking up the class name, then calling configure method since all the partitioners themselves have been tested since ac6daadfb4d49b02def0cac3b55b5baf3d866c8a

OneCricketeer avatar Oct 20 '22 01:10 OneCricketeer

Hmm, ok. thanks @OneCricketeer

yarlagaddag avatar Oct 20 '22 01:10 yarlagaddag