camel-kafka-connector
camel-kafka-connector copied to clipboard
Question: S3 file get moved on failure also
Hello,
I am using the aws2-s3-kafka-source-connector
connector (https://github.com/apache/camel-kafka-connector/tree/camel-kafka-connector-0.7.x/connectors/camel-aws2-s3-kafka-connector) with the following configs.
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
name: name-connector
labels:
strimzi.io/cluster: kafka-connect-cluster
spec:
tasksMax: 1
class: org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SourceConnector
config:
client.id: client
topics: topic
connector.class: org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SourceConnector
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.camel.kafkaconnector.aws2s3.converters.S3ObjectConverter
camel.source.kafka.topic: topic
camel.source.url: aws2-s3://source-bucket?useDefaultCredentialsProvider=true&moveAfterRead=true&destinationBucket=destination-bucket
camel.source.maxPollDuration: 10
camel.source.maxBatchPollSize: 1000
camel.component.aws2-s3.includeBody: false
camel.source.endpoint.useDefaultCredentialsProvider : true
camel.component.aws2-s3.autocloseBody : true
I have updated the S3objectConverter for my customization for searlizer. Code is as follows:
public class S3ObjectConverter implements Converter {
private static final Logger LOG = LoggerFactory.getLogger(S3ObjectConverter.class);
//private final S3ObjectSerializer serializer = new S3ObjectSerializer();
private final S3ObjectAvroSerializer serializer;
public S3ObjectConverter() throws IOException {
serializer = new S3ObjectAvroSerializer();
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] fromConnectData(String topic, Schema schema, Object value) {
return serializer.serialize(topic, (ResponseInputStream<GetObjectResponse>)value);
}
@Override
public SchemaAndValue toConnectData(String arg0, byte[] arg1) {
return null;
}
}
This works as expected and the object gets serialize as expected and added to Kafka topic and files get moved to destination-bucket also.
I have problem with on failure cases: When the object fail on seralization, even after that it moves to destinationbucket (I expect it to stay in source bucket), Is there any config am I using wrong?
Thank you.
Adding Serializer code also if that helps:
public class S3ObjectAvroSerializer implements Serializer<ResponseInputStream<GetObjectResponse>> {
private static final Logger LOG = LoggerFactory.getLogger(S3ObjectAvroSerializer.class);
private Schema schema;
private GenericRecordSerializer recordSerializer;
private MessageFactory MessageFactory;
public S3ObjectAvroSerializer(Schema schema){
this.schema=schema;
this.recordSerializer = new GenericRecordSerializer(this.schema);
this.MessageFactory = new MessageFactory();
}
public S3ObjectAvroSerializer() throws IOException {
Schema.Parser parser = new Schema.Parser();
this.schema= parser.parse(getClass().getResourceAsStream("/avro/schema.avsc"));
this.recordSerializer = new GenericRecordSerializer(this.schema);
this.MessageFactory = new MessageFactory();
}
/**
* Create a Kafka serializer for control schema messages.
*/
public Serializer<GenericRecord> getSerializer() {
return new GenericRecordSerializer(this.schema);
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, ResponseInputStream<GetObjectResponse> inputStream) {
GenericRecord record = null;
try{
record = MessageFactory.parseMessage(inputStream);
}
catch (Exception e){
LOG.error("Error in Serializer: "+ e );
e.printStackTrace();
}
return recordSerializer.doSerialize(topic, record);
}
@Override
public void close() {
}
}
I expected this behavior based on this: https://github.com/apache/camel-kafka-connector/blob/dd9ccfa5dc2a0740ef1ed636cf999da41e6bbbcb/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/CamelAws2s3SourceConnectorConfig.java#L270
The failure in serialization happens at kafka level, so it is expected to find the file in the destination bucket. In terms of pure camel the exchange is complete. The error should be managed at kafka level. Also you should use single options and not camel.source.url. just one or the other. Camel.source.url is not the usual suggested apptoach
Il gio 7 gen 2021, 22:25 Ruchir Vani [email protected] ha scritto:
I expected this behavior based on this: https://github.com/apache/camel-kafka-connector/blob/dd9ccfa5dc2a0740ef1ed636cf999da41e6bbbcb/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/CamelAws2s3SourceConnectorConfig.java#L270
— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/apache/camel-kafka-connector/issues/834#issuecomment-756394830, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABG6XV6UI5O7DIT3COKRZFTSYYRGNANCNFSM4VZRCRNQ .
Thanks for the feedback. I will update my config.
I have seen that if Kafka-cluster went down it marked as exchange completed. What is a better way to handle the Kafka exception with Kafka connector? (My cluster was up, it was processing but when the cluster went down for first messages it marked as Exchange completed and later it started failing but for the first message, it didn't fail and it never went to kafka topic)
Thanks
@oscerd : Any recommendations on how to handle Kafka exception with camel kafka connector? Please provide me guidence, I can try implementing on my own. Thanks
If you want to still have the file in the source bucket you should avoid delete after read and move after read and use the idempotency support introduced in 0.7.. the error is related to broker, so it must be managed at kafka level, but I don't think it is easy to do.. if the cluster went down, but camel already completed the exchange and moved to the destination bucket, there is no way of rollback the situation from a camel perspective.
if the cluster went down, but camel already completed the exchange and moved to the destination bucket, there is no way of rollback the situation from a camel perspective
Thank you for the information. Is this part of connector scop to handle this kind of rollback in the future? Or It's just me who is running into this problem?
Well, if the error happens during camel routing we need to manage it a bit better with rollback, but for your case this won't change the final situation, if the cluster will go down after the message has been consumed but before reaching the kafka topic, you'll lose the message. This is true even if you handle the error through kafka connect options. We may try to reproduce this scenario, but it's not easy.
Yeah, I have been running this connector for 7 months, never seen issues but few weeks before we had something wrong with our Kafka cluster and it failed, so I saw this issue.
To regenarate this issue, I added Thread.sleep (100000) in Serializer class. I deleted the cluster and was able to regenerate it. ( I guess you must know better way to do it ;) )
Please let me know if you have any suggestions/recommendations to handle such errors in the future.(Its very rare to happen but would like to handle it if its possible)
Thank you!!
@ruchirvaninasdaq this should have been addressed by https://github.com/apache/camel-kafka-connector/issues/202 can you try with a recent version of the connector like 0.9.0
or the upcoming 0.10.0
?