kafka-connect-storage-common
kafka-connect-storage-common copied to clipboard
S3 Source custom partition Caused by: NoSuchMethodException: after creation
Hi, I implement a custom partitioner for the sink and source s3 connector. Partitioner: package io.confluent.connect.storage.partitioner;
import java.util.List; import java.util.Locale; import java.util.Map; import org.apache.kafka.connect.sink.SinkRecord; import org.joda.time.DateTimeZone; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
public class FieldAndTimeBasedPartitioner<T> extends TimeBasedPartitioner<T> {
private static final String PARTITION_FIELD_FORMAT_PATH_CONFIG = "partition.field.format.path";
private static final boolean PARTITION_FIELD_FORMAT_PATH_DEFAULT = true;
private static final Logger log = LoggerFactory.getLogger(FieldAndTimeBasedPartitioner.class);
private PartitionFieldExtractor partitionFieldExtractor;
public FieldAndTimeBasedPartitioner() {
}
protected void init(long partitionDurationMs, String pathFormat, Locale locale, DateTimeZone timeZone, Map<String, Object> config) {
super.init(partitionDurationMs, pathFormat, locale, timeZone, config);
final List<String> fieldNames = (List<String>) config.get(PartitionerConfig.PARTITION_FIELD_NAME_CONFIG);
// option value is parse as string all other type is cast as string by kafka connect need to parse by ourselves
boolean formatPath =
Boolean.parseBoolean((String) config.getOrDefault(PARTITION_FIELD_FORMAT_PATH_CONFIG, PARTITION_FIELD_FORMAT_PATH_DEFAULT));
this.partitionFieldExtractor = new PartitionFieldExtractor(fieldNames, formatPath);
}
public String encodePartition(final SinkRecord sinkRecord, final long nowInMillis) {
final String partitionsForTimestamp = super.encodePartition(sinkRecord, nowInMillis);
final String partitionsForFields = this.partitionFieldExtractor.extract(sinkRecord);
final String partition = String.join(this.delim, partitionsForFields, partitionsForTimestamp);
log.info("Encoded partition : {}", partition);
return partition;
}
public String encodePartition(final SinkRecord sinkRecord) {
final String partitionsForTimestamp = super.encodePartition(sinkRecord);
final String partitionsForFields = this.partitionFieldExtractor.extract(sinkRecord);
final String partition = String.join(this.delim, partitionsForFields, partitionsForTimestamp);
log.info("Encoded partition : {}", partition);
return partition;
}
public static class PartitionFieldExtractor { private static final String DELIMITER_EQ = "=";
private final boolean formatPath;
private final List<String> fieldNames;
private static final Logger log = LoggerFactory.getLogger(PartitionFieldExtractor.class);
PartitionFieldExtractor(final List<String> fieldNames, final boolean formatPath) {
this.fieldNames = fieldNames;
this.formatPath = formatPath;
}
public String extract(final ConnectRecord<?> record) {
Object value = record.value();
StringBuilder builder = new StringBuilder();
for (final String fieldName : this.fieldNames) {
if (builder.length() != 0) {
builder.append(StorageCommonConfig.DIRECTORY_DELIM_DEFAULT);
}
if (value instanceof Struct || value instanceof Map) {
final String partitionField = (String) DataUtils.getNestedFieldValue(value, fieldName);
if (formatPath) {
builder.append(String.join(DELIMITER_EQ, fieldName, partitionField));
} else {
builder.append(partitionField);
}
} else {
log.error("Value is not of Struct or Map type. type {}", value.getClass());
throw new PartitionException("Error encoding partition.");
}
}
return builder.toString();
}
}
}
When I create sink s3 connector everuthing is fine it working correct:
{
"name": "s3-sink",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "3",
"topics": "test-s3-sink",
"topic.creation.default.replication.factor": "1",
"topic.creation.default.partitions": "20",
"s3.region": "us-west-2",
"s3.proxy.url": "http://localstack:4566",
"s3.bucket.name": "confluent-kafka-connect-s3-testing",
"s3.part.size": "5242880",
"flush.size": "3",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"schema.compatibility": "NONE",
"value.converter.schemas.enable": "true",
"partitioner.class": "com.kafka.connect.extention.FieldAndTimeBasedPartitioner",
"partition.field.name": "customerId,channelType,channelId",
"partition.field.format.path": true,
"partition.duration.ms": 86400000,
"path.format": "'year'=YYYY/'month'=MM/'day'=dd",
"locale": "US",
"timezone": "UTC",
"timestamp.extractor": "RecordField",
"timestamp.field": "messageDate"
}
}
}
But when i try create source s3 connector with this partitioner I got exception after creation
{
"name": "s3-source",
"config": {
"confluent.license": "",
"connector.class": "io.confluent.connect.s3.source.S3SourceConnector",
"confluent.topic.bootstrap.servers": "kafka01.internal-service:9092",
"s3.region": "us-west-2",
"s3.proxy.url": "http://localstack:4566",
"s3.bucket.name": "confluent-kafka-connect-s3-testing",
"confluent.topic.replication.factor": 1,
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"schema.compatibility": "NONE",
"value.converter.schemas.enable": "true",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.FieldAndTimeBasedPartitioner",
"partition.field.name": "customerId,channelType,channelId",
"partition.field.format.path": true,
"partition.duration.ms": 86400000,
"path.format": "'year'=YYYY/'month'=MM/'day'=dd",
"locale": "US",
"timezone": "UTC",
"timestamp.extractor": "RecordField",
"timestamp.field": "messageDate"
}
}
Exception:
[2022-05-30 21:33:00,018] ERROR WorkerConnector{id=s3-source} Error while starting connector (org.apache.kafka.connect.runtime.WorkerConnector)
2022-05-30T21:33:00.023992047Z org.apache.kafka.common.config.ConfigException: Invalid value java.lang.NoSuchMethodException: io.confluent.connect.storage.partitioner.FieldAndTimeBasedPartitioner.
I don't know why I got this exception because constructor in present for partitioner
Are you using the same connect worker for both connectors and this class is available on the CLASSPATH?
@artemio77 is this issue resolved? I am also getting the same problem no matter what partitioner I use i.e default, or timebased or daily.
For me also s3 sink connector is working fine, but not s3 source connector
any pointers would be helpful
This connector doesn't have field and time partitioner. Therefore, it wouldn't be found, by default.
Regarding the original post, seems like it was working?
When I create sink s3 connector everuthing is fine it working correct
...
com.kafka.connect.extention.FieldAndTimeBasedPartitioner
Not clear what is this class since Kafka Connect packages start with org.apache.kafka, and all the included partitioners, including the shown code start with io.confluent
@OneCricketeer what are the valid partitioners for this connector?
@yarlagaddag They are all here, but switch the branch/tag to the version you've installed https://github.com/confluentinc/kafka-connect-storage-common/tree/master/partitioner/src/main/java/io/confluent/connect/storage/partitioner
Thank you @OneCricketeer . So there is TimeBasedPartitioner, and this is what I used earlier after looking into the above
Got the same issue for DefaultPartitioner as well
org.apache.kafka.common.config.ConfigException: Invalid value java.lang.NoSuchMethodException: io.confluent.connect.storage.partitioner.DefaultPartitioner.
Right. If you want to use both fields and timestamp, you'll need to checkout PR #251, then replace all the connect-storage-common JARs on your connect servers
alright.. I am using just the default one and still ran into the same issue, not sure what else is causing this issue.. stack trace is posted above
I'd guess it has something to do with reflection looking up the class name, then calling configure method since all the partitioners themselves have been tested since ac6daadfb4d49b02def0cac3b55b5baf3d866c8a
Hmm, ok. thanks @OneCricketeer