kafka-connect-storage-cloud
kafka-connect-storage-cloud copied to clipboard
S3 connector `format.class` cannot be configured with custom formatter
Confluent documentation states that it is possible to implement your own Format
class and configure it in the format.class
property for the S3 connector:
You can also choose to use a custom formatter by implementing the io.confluent.connect.storage.format.Format interface.
However, I have tried to do this and it hasn't worked for me. I've got so far as to get 500 errors from the /connectors
endpoint when posting the connector configuration. Notice that the same configuration, just changing the formatter to one of the bundled ones (JSON in this case), works perfectly fine.
Looking into the code, I think I can understand why it doesn't work (but I cannot understand how the docs state it should).
This is the Connect REST API controller for /connectors
calls:
https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java#L151-L169
public Response createConnector(final @QueryParam("forward") Boolean forward,
final @Context HttpHeaders headers,
final CreateConnectorRequest createRequest) throws Throwable {
// Trim leading and trailing whitespaces from the connector name, replace null with empty string
// if no name element present to keep validation within validator (NonEmptyStringWithoutControlChars
// allows null values)
String name = createRequest.name() == null ? "" : createRequest.name().trim();
Map<String, String> configs = createRequest.config();
checkAndPutConnectorConfigName(name, configs);
FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
herder.putConnectorConfig(name, configs, false, cb);
Herder.Created<ConnectorInfo> info = completeOrForwardRequest(cb, "/connectors", "POST", headers, createRequest,
new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator(), forward);
URI location = UriBuilder.fromUri("/connectors").path(name).build();
return Response.created(location).entity(info.result()).build();
}
Notice the call to herder.putConnectorConfig(name, configs, false, cb)
which in turns calls this code (for a distributed connect deployment):
https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L803-L858
@Override
public void putConnectorConfig(final String connName, final Map<String, String> config, final boolean allowReplace,
final Callback<Created<ConnectorInfo>> callback) {
log.trace("Submitting connector config write request {}", connName);
addRequest(
new Callable<Void>() {
@Override
public Void call() throws Exception {
validateConnectorConfig(config, (error, configInfos) -> {
if (error != null) {
callback.onCompletion(error, null);
return;
}
[REDACTED]
Which will end up calling this code: https://github.com/apache/kafka/blob/16eb9c0bfe9d27b4de127576084154a9ab663e1b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L334-L431
And eventually this code: https://github.com/apache/kafka/blob/16eb9c0bfe9d27b4de127576084154a9ab663e1b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L499-L515
This is where things get interesting, because ConfigDef.validateAll
will validate all the received connector configuration, including the format.class
value.
The code is here: https://github.com/apache/kafka/blob/16eb9c0bfe9d27b4de127576084154a9ab663e1b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L600-L625
private void validate(String name, Map<String, Object> parsed, Map<String, ConfigValue> configs) {
if (!configKeys.containsKey(name)) {
return;
}
ConfigKey key = configKeys.get(name);
ConfigValue value = configs.get(name);
if (key.recommender != null) {
try {
List<Object> recommendedValues = key.recommender.validValues(name, parsed);
List<Object> originalRecommendedValues = value.recommendedValues();
if (!originalRecommendedValues.isEmpty()) {
Set<Object> originalRecommendedValueSet = new HashSet<>(originalRecommendedValues);
recommendedValues.removeIf(o -> !originalRecommendedValueSet.contains(o));
}
value.recommendedValues(recommendedValues);
value.visible(key.recommender.visible(name, parsed));
} catch (ConfigException e) {
value.addErrorMessage(e.getMessage());
}
}
configs.put(name, value);
for (String dependent: key.dependents) {
validate(dependent, parsed, configs);
}
}
So the algorithm seems to be:
- Get the
recommender
valid values - Get the actual values
- If the actual values are not empty, remove all values from the recommended values that aren't part of the actual values.
Okay, so where are those recommender
valid values coming from? This very same library:
https://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java#L184-L191
FORMAT_CLASS_RECOMMENDER.addValidValues(
Arrays.<Object>asList(
AvroFormat.class,
JsonFormat.class,
ByteArrayFormat.class,
ParquetFormat.class
)
);
So it seems to me that it is not possible to choose any value that isn't one of the recommended
valid values. I'm not sure if the effect of having an empty list of recommended values coming out of the validate
method causes the connector to fail the instantiation because I haven't been able to collect any logs with extra information.
For reference, the code for the formatter looks like this:
public class CsvFormat implements Format<S3SinkConnectorConfig, String> {
private final S3Storage storage;
public CsvFormat(S3Storage storage) {
this.storage = storage;
}
@Override
public RecordWriterProvider<S3SinkConnectorConfig> getRecordWriterProvider() {
return new CsvRecordWriterProvider(storage);
}
@Override
public SchemaFileReader<S3SinkConnectorConfig, String> getSchemaFileReader() {
throw new UnsupportedOperationException("Reading schemas from S3 is not currently supported");
}
@Override
@Deprecated
public Object getHiveFactory() {
throw new UnsupportedOperationException(
"Hive integration is not currently supported in S3 Connector"
);
}
}
Happy to make the whole project available if that would help.
Thanks!
Hello! Using version 5.5.7
I am able to supply my own custom format.class
. I verified that it is loaded and getRecordWriterProvider()
is invoked as you would expect. :tada:
I think you or the Confluent folks could verify and close this issue.