kafka-connect-storage-cloud icon indicating copy to clipboard operation
kafka-connect-storage-cloud copied to clipboard

S3 connector `format.class` cannot be configured with custom formatter

Open javierholguera opened this issue 4 years ago • 1 comments

Confluent documentation states that it is possible to implement your own Format class and configure it in the format.class property for the S3 connector:

You can also choose to use a custom formatter by implementing the io.confluent.connect.storage.format.Format interface.

However, I have tried to do this and it hasn't worked for me. I've got so far as to get 500 errors from the /connectors endpoint when posting the connector configuration. Notice that the same configuration, just changing the formatter to one of the bundled ones (JSON in this case), works perfectly fine.

Looking into the code, I think I can understand why it doesn't work (but I cannot understand how the docs state it should).

This is the Connect REST API controller for /connectors calls:

https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java#L151-L169

     public Response createConnector(final @QueryParam("forward") Boolean forward,
                                    final @Context HttpHeaders headers,
                                    final CreateConnectorRequest createRequest) throws Throwable {
        // Trim leading and trailing whitespaces from the connector name, replace null with empty string
        // if no name element present to keep validation within validator (NonEmptyStringWithoutControlChars
        // allows null values)
        String name = createRequest.name() == null ? "" : createRequest.name().trim();

        Map<String, String> configs = createRequest.config();
        checkAndPutConnectorConfigName(name, configs);

        FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
        herder.putConnectorConfig(name, configs, false, cb);
        Herder.Created<ConnectorInfo> info = completeOrForwardRequest(cb, "/connectors", "POST", headers, createRequest,
                new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator(), forward);

        URI location = UriBuilder.fromUri("/connectors").path(name).build();
        return Response.created(location).entity(info.result()).build();
    }

Notice the call to herder.putConnectorConfig(name, configs, false, cb) which in turns calls this code (for a distributed connect deployment):

https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L803-L858

    @Override
    public void putConnectorConfig(final String connName, final Map<String, String> config, final boolean allowReplace,
                                   final Callback<Created<ConnectorInfo>> callback) {
        log.trace("Submitting connector config write request {}", connName);
        addRequest(
                new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        validateConnectorConfig(config, (error, configInfos) -> {
                            if (error != null) {
                                callback.onCompletion(error, null);
                                return;
                            }

[REDACTED]

Which will end up calling this code: https://github.com/apache/kafka/blob/16eb9c0bfe9d27b4de127576084154a9ab663e1b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L334-L431

And eventually this code: https://github.com/apache/kafka/blob/16eb9c0bfe9d27b4de127576084154a9ab663e1b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L499-L515

This is where things get interesting, because ConfigDef.validateAll will validate all the received connector configuration, including the format.class value.

The code is here: https://github.com/apache/kafka/blob/16eb9c0bfe9d27b4de127576084154a9ab663e1b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L600-L625

    private void validate(String name, Map<String, Object> parsed, Map<String, ConfigValue> configs) {
        if (!configKeys.containsKey(name)) {
            return;
        }
        ConfigKey key = configKeys.get(name);
        ConfigValue value = configs.get(name);
        if (key.recommender != null) {
            try {
                List<Object> recommendedValues = key.recommender.validValues(name, parsed);
                List<Object> originalRecommendedValues = value.recommendedValues();
                if (!originalRecommendedValues.isEmpty()) {
                    Set<Object> originalRecommendedValueSet = new HashSet<>(originalRecommendedValues);
                    recommendedValues.removeIf(o -> !originalRecommendedValueSet.contains(o));
                }
                value.recommendedValues(recommendedValues);
                value.visible(key.recommender.visible(name, parsed));
            } catch (ConfigException e) {
                value.addErrorMessage(e.getMessage());
            }
        }

        configs.put(name, value);
        for (String dependent: key.dependents) {
            validate(dependent, parsed, configs);
        }
    }

So the algorithm seems to be:

  1. Get the recommender valid values
  2. Get the actual values
  3. If the actual values are not empty, remove all values from the recommended values that aren't part of the actual values.

Okay, so where are those recommender valid values coming from? This very same library:

https://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java#L184-L191

    FORMAT_CLASS_RECOMMENDER.addValidValues(
        Arrays.<Object>asList(
            AvroFormat.class,
            JsonFormat.class,
            ByteArrayFormat.class,
            ParquetFormat.class
        )
    );

So it seems to me that it is not possible to choose any value that isn't one of the recommended valid values. I'm not sure if the effect of having an empty list of recommended values coming out of the validate method causes the connector to fail the instantiation because I haven't been able to collect any logs with extra information.

For reference, the code for the formatter looks like this:

public class CsvFormat implements Format<S3SinkConnectorConfig, String> {
    private final S3Storage storage;

    public CsvFormat(S3Storage storage) {
        this.storage = storage;
    }

    @Override
    public RecordWriterProvider<S3SinkConnectorConfig> getRecordWriterProvider() {
        return new CsvRecordWriterProvider(storage);
    }

    @Override
    public SchemaFileReader<S3SinkConnectorConfig, String> getSchemaFileReader() {
        throw new UnsupportedOperationException("Reading schemas from S3 is not currently supported");
    }

    @Override
    @Deprecated
    public Object getHiveFactory() {
        throw new UnsupportedOperationException(
                "Hive integration is not currently supported in S3 Connector"
        );
    }
}

Happy to make the whole project available if that would help.

Thanks!

javierholguera avatar Jul 01 '20 16:07 javierholguera

Hello! Using version 5.5.7 I am able to supply my own custom format.class. I verified that it is loaded and getRecordWriterProvider() is invoked as you would expect. :tada:

I think you or the Confluent folks could verify and close this issue.

dylanmei avatar Feb 19 '22 15:02 dylanmei