camel-kafka-connector icon indicating copy to clipboard operation
camel-kafka-connector copied to clipboard

Question: S3 file get moved on failure also

Open ruchirvaninasdaq opened this issue 4 years ago • 10 comments

Hello,

I am using the aws2-s3-kafka-source-connector connector (https://github.com/apache/camel-kafka-connector/tree/camel-kafka-connector-0.7.x/connectors/camel-aws2-s3-kafka-connector) with the following configs.

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
  name: name-connector
  labels:
    strimzi.io/cluster: kafka-connect-cluster
spec:
  tasksMax: 1
  class: org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SourceConnector
  config:
    client.id: client
    topics: topic
    connector.class: org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SourceConnector
    key.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter: org.apache.camel.kafkaconnector.aws2s3.converters.S3ObjectConverter
    camel.source.kafka.topic: topic
    camel.source.url: aws2-s3://source-bucket?useDefaultCredentialsProvider=true&moveAfterRead=true&destinationBucket=destination-bucket
    camel.source.maxPollDuration: 10
    camel.source.maxBatchPollSize: 1000
    camel.component.aws2-s3.includeBody: false
    camel.source.endpoint.useDefaultCredentialsProvider : true
    camel.component.aws2-s3.autocloseBody : true

I have updated the S3objectConverter for my customization for searlizer. Code is as follows:

public class S3ObjectConverter implements Converter {

    private static final Logger LOG = LoggerFactory.getLogger(S3ObjectConverter.class);
    //private final S3ObjectSerializer serializer = new S3ObjectSerializer();
    private final S3ObjectAvroSerializer serializer;

    public S3ObjectConverter() throws IOException {
        serializer = new S3ObjectAvroSerializer();
    }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public byte[] fromConnectData(String topic, Schema schema, Object value) {
        return serializer.serialize(topic, (ResponseInputStream<GetObjectResponse>)value);
    }

    @Override
    public SchemaAndValue toConnectData(String arg0, byte[] arg1) {
        return null;
    }

}

This works as expected and the object gets serialize as expected and added to Kafka topic and files get moved to destination-bucket also.

I have problem with on failure cases: When the object fail on seralization, even after that it moves to destinationbucket (I expect it to stay in source bucket), Is there any config am I using wrong?

Thank you.

ruchirvaninasdaq avatar Jan 07 '21 21:01 ruchirvaninasdaq

Adding Serializer code also if that helps:

public class S3ObjectAvroSerializer implements Serializer<ResponseInputStream<GetObjectResponse>> {

    private static final Logger LOG = LoggerFactory.getLogger(S3ObjectAvroSerializer.class);
    private Schema schema;
    private GenericRecordSerializer recordSerializer;
    private MessageFactory MessageFactory;

    public S3ObjectAvroSerializer(Schema schema){
        this.schema=schema;
        this.recordSerializer = new GenericRecordSerializer(this.schema);
        this.MessageFactory = new MessageFactory();
    }

    public S3ObjectAvroSerializer() throws IOException {
        Schema.Parser parser = new Schema.Parser();
        this.schema= parser.parse(getClass().getResourceAsStream("/avro/schema.avsc"));
        this.recordSerializer = new GenericRecordSerializer(this.schema);
        this.MessageFactory = new MessageFactory();
    }

    /**
     * Create a Kafka serializer for control schema messages.
     */
    public Serializer<GenericRecord> getSerializer() {
        return new GenericRecordSerializer(this.schema);
    }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public byte[] serialize(String topic, ResponseInputStream<GetObjectResponse> inputStream) {
        GenericRecord record = null;
        try{
           record = MessageFactory.parseMessage(inputStream);
        }
        catch (Exception e){
            LOG.error("Error in Serializer: "+ e );
            e.printStackTrace();
        }
        return recordSerializer.doSerialize(topic, record);
    }

    @Override
    public void close() {

    }
}

ruchirvaninasdaq avatar Jan 07 '21 21:01 ruchirvaninasdaq

I expected this behavior based on this: https://github.com/apache/camel-kafka-connector/blob/dd9ccfa5dc2a0740ef1ed636cf999da41e6bbbcb/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/CamelAws2s3SourceConnectorConfig.java#L270

ruchirvaninasdaq avatar Jan 07 '21 21:01 ruchirvaninasdaq

The failure in serialization happens at kafka level, so it is expected to find the file in the destination bucket. In terms of pure camel the exchange is complete. The error should be managed at kafka level. Also you should use single options and not camel.source.url. just one or the other. Camel.source.url is not the usual suggested apptoach

Il gio 7 gen 2021, 22:25 Ruchir Vani [email protected] ha scritto:

I expected this behavior based on this: https://github.com/apache/camel-kafka-connector/blob/dd9ccfa5dc2a0740ef1ed636cf999da41e6bbbcb/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/CamelAws2s3SourceConnectorConfig.java#L270

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/apache/camel-kafka-connector/issues/834#issuecomment-756394830, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABG6XV6UI5O7DIT3COKRZFTSYYRGNANCNFSM4VZRCRNQ .

oscerd avatar Jan 07 '21 21:01 oscerd

Thanks for the feedback. I will update my config.

I have seen that if Kafka-cluster went down it marked as exchange completed. What is a better way to handle the Kafka exception with Kafka connector? (My cluster was up, it was processing but when the cluster went down for first messages it marked as Exchange completed and later it started failing but for the first message, it didn't fail and it never went to kafka topic)

Thanks

ruchirvaninasdaq avatar Jan 07 '21 22:01 ruchirvaninasdaq

@oscerd : Any recommendations on how to handle Kafka exception with camel kafka connector? Please provide me guidence, I can try implementing on my own. Thanks

ruchirvaninasdaq avatar Jan 08 '21 14:01 ruchirvaninasdaq

If you want to still have the file in the source bucket you should avoid delete after read and move after read and use the idempotency support introduced in 0.7.. the error is related to broker, so it must be managed at kafka level, but I don't think it is easy to do.. if the cluster went down, but camel already completed the exchange and moved to the destination bucket, there is no way of rollback the situation from a camel perspective.

oscerd avatar Jan 08 '21 14:01 oscerd

if the cluster went down, but camel already completed the exchange and moved to the destination bucket, there is no way of rollback the situation from a camel perspective

Thank you for the information. Is this part of connector scop to handle this kind of rollback in the future? Or It's just me who is running into this problem?

ruchirvaninasdaq avatar Jan 08 '21 15:01 ruchirvaninasdaq

Well, if the error happens during camel routing we need to manage it a bit better with rollback, but for your case this won't change the final situation, if the cluster will go down after the message has been consumed but before reaching the kafka topic, you'll lose the message. This is true even if you handle the error through kafka connect options. We may try to reproduce this scenario, but it's not easy.

oscerd avatar Jan 08 '21 15:01 oscerd

Yeah, I have been running this connector for 7 months, never seen issues but few weeks before we had something wrong with our Kafka cluster and it failed, so I saw this issue.

To regenarate this issue, I added Thread.sleep (100000) in Serializer class. I deleted the cluster and was able to regenerate it. ( I guess you must know better way to do it ;) )

Please let me know if you have any suggestions/recommendations to handle such errors in the future.(Its very rare to happen but would like to handle it if its possible)

Thank you!!

ruchirvaninasdaq avatar Jan 08 '21 15:01 ruchirvaninasdaq

@ruchirvaninasdaq this should have been addressed by https://github.com/apache/camel-kafka-connector/issues/202 can you try with a recent version of the connector like 0.9.0 or the upcoming 0.10.0?

valdar avatar May 25 '21 09:05 valdar