camel-kafka-connector
camel-kafka-connector copied to clipboard
File name format and File Format in Azure Data Lake Sink Connector
In Azure Datalake Sink connector , when we move events from Kafka to Gen 2 data lake, Is it possible to configure file name format and file type format like Avro, Json , Parquet in connector configurations.
@oscerd Kindly requesting guidance on this
For Avro you can transform the pojo into a Generic Avro Record through this SMT https://github.com/apache/camel-kafka-connector/blob/main/core/src/main/java/org/apache/camel/kafkaconnector/transforms/SinkPojoToSchemaAndStructTransform.java
For Json you need to convert the record json in a SMT, you need to write your own SMT. Same for Parquet.
@Oscerd how about fileName - I tried to pass exchangeID is Kafka Header , but its not able to set the file name in Datalake on what I give.
Kafka Header
{ "exchangeId": "adlstest_1000" }
Connector Configs :
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: adls-sink labels: strimzi.io/cluster: cdt-connect-shared-emp spec: class: org.apache.camel.kafkaconnector.azurestoragedatalakesink.CamelAzurestoragedatalakesinkSinkConnector tasksMax: 2 config: topics: msk.emp.cdtadlsconnectorsink.topic.internal.any.v1,msk.emp.cdtadlsconnectorsink.topic.internal.any.v2 camel.kamelet.azure-storage-datalake-sink.accountName: retinaconnectorpoc camel.kamelet.azure-storage-datalake-sink.clientId: 328ea5df-d095-4c9f-ba69-79c468792db9 camel.kamelet.azure-storage-datalake-sink.clientSecret: wvvvvvvvvrlanA camel.kamelet.azure-storage-datalake-sink.tenantId: 05d75c05-fa1a-42e7-9cf1-eb416c396f2d camel.kamelet.azure-storage-datalake-sink.fileSystemName: test value.converter.schema.registry.ssl.key.password: ${env:SSL_TRUSTSTORE_PASSWORD} value.converter.schema.registry.ssl.keystore.location: /opt/kafka/external-configuration/sr-certs/keystore.jks value.converter.schema.registry.ssl.keystore.password: ${env:SSL_TRUSTSTORE_PASSWORD} value.converter.schema.registry.ssl.truststore.location: /opt/kafka/external-configuration/sr-certs/truststore.jks value.converter.schema.registry.ssl.truststore.password: ${env:SSL_TRUSTSTORE_PASSWORD} value.converter.schema.registry.url: https://cdt-west-vvvvvvvvk-digital.net value.converter.schemas.enable: false key.converter.schema.registry.ssl.key.password: ${env:SSL_TRUSTSTORE_PASSWORD} key.converter.schema.registry.ssl.keystore.location: /opt/kafka/external-configuration/sr-certs/keystore.jks key.converter.schema.registry.ssl.keystore.password: ${env:SSL_TRUSTSTORE_PASSWORD} key.converter.schema.registry.ssl.truststore.location: /opt/kafka/external-configuration/sr-certs/truststore.jks key.converter.schema.registry.ssl.truststore.password: ${env:SSL_TRUSTSTORE_PASSWORD} key.converter.schema.registry.url: https://cdt-west-vvvvvvvvvdigital.net key.converter.schemas.enable: false key.converter: org.apache.kafka.connect.storage.StringConverter value.converter: org.apache.kafka.connect.storage.StringConverter camel.aggregation.size: 5 camel.aggregation.timeout: 50000 camelHeader.camelFileName: "adlstestfileanem_1000" camel.sink.path.directoryName : "/user/app/folder" camel.sink.endpoint.charset : "UTF-8" camel.sink.endpoint.fileName : "messages-${date:now:yyyyMMdd}.txt"
An answered on the other issue:
You need to set an header on record: CamelAzureStorageDataLakeFileName, the header name should be "CamelHeader.CamelAzureStorageDataLakeFileName", it will be transformed while running the sync.
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: adls-sink labels: strimzi.io/cluster: cdt-connect-shared-emp spec: class: org.apache.camel.kafkaconnector.azurestoragedatalakesink.CamelAzurestoragedatalakesinkSinkConnector tasksMax: 2 config: topics: msk.emp.cdtadlsconnectorsink.topic.internal.any.v1,msk.emp.cdtadlsconnectorsink.topic.internal.any.v2 camel.kamelet.azure-storage-datalake-sink.accountName: retinaconnectorpoc camel.kamelet.azure-storage-datalake-sink.clientId: 328dddddddddb9 camel.kamelet.azure-storage-datalake-sink.clientSecret: wro8ddddddddddddodwP.~rlanA camel.kamelet.azure-storage-datalake-sink.tenantId: 05d75c05-fa1a-42e7-9cf1-eb416c396f2d camel.kamelet.azure-storage-datalake-sink.fileSystemName: test value.converter.schema.registry.ssl.key.password: ${env:SSL_TRUSTSTORE_PASSWORD} value.converter.schema.registry.ssl.keystore.location: /opt/kafka/external-configuration/sr-certs/keystore.jks value.converter.schema.registry.ssl.keystore.password: ${env:SSL_TRUSTSTORE_PASSWORD} value.converter.schema.registry.ssl.truststore.location: /opt/kafka/external-configuration/sr-certs/truststore.jks value.converter.schema.registry.ssl.truststore.password: ${env:SSL_TRUSTSTORE_PASSWORD} value.converter.schema.registry.url: https://cdt-west-2-shared-emp-kafka-sr.maersk-digital.net value.converter.schemas.enable: false key.converter.schema.registry.ssl.key.password: ${env:SSL_TRUSTSTORE_PASSWORD} key.converter.schema.registry.ssl.keystore.location: /opt/kafka/external-configuration/sr-certs/keystore.jks key.converter.schema.registry.ssl.keystore.password: ${env:SSL_TRUSTSTORE_PASSWORD} key.converter.schema.registry.ssl.truststore.location: /opt/kafka/external-configuration/sr-certs/truststore.jks key.converter.schema.registry.ssl.truststore.password: ${env:SSL_TRUSTSTORE_PASSWORD} key.converter.schema.registry.url: https://cdt-west-2-shared-emp-kafka-sr.maersk-digital.net key.converter.schemas.enable: false key.converter: org.apache.kafka.connect.storage.StringConverter value.converter: org.apache.kafka.connect.storage.StringConverter camel.aggregation.size: 5 camel.aggregation.timeout: 50000 camelHeader.camelFileName: "adlstestfileanem_1000" camel.sink.path.directoryName : "/user/app/folder" camel.sink.endpoint.charset : "UTF-8" camel.sink.endpoint.fileName : "${exchangeId}" camel.sink.endpoint.keyName: "${exchangeId}" camelHeader.camelAzureStorageDataLakeFileName: "${exchangeId}_adlstest"
Is it like this, I added in last line in connector configs.
Or is it like this in Kafka Header :
Is it possible to give an example of Kafka record and header and connector configs , as I am not able to understand the configuration for custom file name. We would like to add - Date , Timestamp on the fileName, whenever file is added to the data lake via this connector. @oscerd
Yes, it should be an header in the kafka record.
I don't know If I'll have time to write an example.
@oscerd After adding this config in Kafka header also. The file name is coming with default name like this FD0288760138873-0000000000000001. Can you please guide on this , as we are in need of this on high priority. Kindly requesting your time on this pls. Can you also review the Kafka connector configurations if they are right,
Try by adding an header of this form:
"CamelHeader.file"
I don't have to create a reproducer and work on reproducing your use case. It's too much time and I don't have it. Also it requires an Azure Storage account and creating the infra. It's too much stuff to do.
you don't need to have camelHeader in the config. You just need the header named "CamelHeader.file", also you don't need charset, fileName and keyName, they're doing nothing.
"CamelHeader.file" works :) , Thank you
This is because of the fact we are now basing the connectors on the Kamelets catalog.
How can we move the files to a particular directory inside the containers. Right now files are directly moved inside container and not going inside directory
@oscerd can you give sample connector configs with SinkPojoToSchemaAndStructTransform.java to convert file to Avro
On this @valdar is the expert
@valdar Can you pls guide on connector configs of using this transforms, I am always getting "Unable to Initialise the pojo class"
@oscerd I just have a small doubt, can you pls guide on this.
key.converter: io.confluent.connect.avro.AvroConverter value.converter: io.confluent.connect.avro.AvroConverter transforms: toavro transforms.toavro.converter.type: value transforms.toavro.type: org.apache.camel.kafkaconnector.transforms.SinkPojoToSchemaAndStructTransform transforms.toavro.camel.transformer.sink.pojo.class : value.converter.enhanced.avro.schema.support: true connect.meta.data: false
Above is configs with respect to converting to Avro, Here not sure what we need to pass as value for Pojo class.
@valdar @oscerd kindly requesting your response here. What is sink.pojo.class, what s the value we need to pass to connector for this.
We work on this project in our spare time, we don't have an ETA to answer question and you're pushing a bit too much. This is not a commercial support service. By the way, this is the definition of sink pojo class
Full qualified class name of the pojo you want your record value converted to
@oscerd I understand, We have been trying to explore on this a lot . But it's not working since last one week, that's why. Sorry about the bugging.
If you have sample configuration , it will be really great. This configs and connector runs inside connect cluster and pojo class will be not present inside the connect cluster. Is there any other way to convert these files to Avro in storage account.
As far as I remember this is the only way of transforming directly, other than that you'll need to write your own SMT: https://camel.apache.org/camel-kafka-connector/4.0.x/reference/transformers/index.html
@valdar @oscerd Still we don't have proper understanding of "Full qualified class name of the pojo you want your record value converted to". Try to give example when free.
An example is a full java class name. For example "org.apache.camel.Message" or "java.lang.String"
@oscerd Is it not at all working , and we are almost 2 developers trying to figure this out for one week. Kindly support with more details pls. We want to convert Json event to Avro File in Data Lake.
Is below is right ?
transforms: toavro
transforms.toavro.type: org.apache.camel.kafkaconnector.transforms.SinkPojoToSchemaAndStructTransform$value
Write an SMT for converting the json to avro and use that in your configuration. I don't have time this week to support you more than this and I think I've done a lot. As I said that part was written by @valdar, so my suggestion is either wait for him or write your own SMT to transform. Please stop pinging maintaners.
Being able to save in parquet format would be very nice as it is one of the most used file formats in data lakes.
Being able to save in parquet format would be very nice as it is one of the most used file formats in data lakes.
You can do that by writing a yaml file using Kamelets or plain Camel and use the dataformat parquet-avro https://camel.apache.org/components/4.4.x/dataformats/parquetAvro-dataformat.html
Doing this in camel-kafka-connector requires a specialized version of the Azure Data Lake Sink Kamelet.