kafka-connect-file-pulse icon indicating copy to clipboard operation
kafka-connect-file-pulse copied to clipboard

Allow Special Character Handling

Open e160842 opened this issue 1 year ago • 3 comments

Describe the issue Using this programming to transfer data fails for data that includes Special Chars ( $%^& etc.) , especially when those Special Char's are in the Header (field names) of the data :(

Describe the solution you'd like Enable this programming to 'handle' Special Chars

Describe alternatives you've considered we have to create extra step to "cleanse" Special Chars before we use this programming - not efficient.

Additional context Please let me know if any clarifications needed :)

e160842 avatar Oct 05 '23 14:10 e160842

Hi @e160842, could you please provide the connector's configuration you used to process data ? Thanks

fhussonnois avatar Oct 10 '23 09:10 fhussonnois

@fhussonnois - thank you for your support - below is sample connector config, with proprietary data obscured - Please let me know if any parts are unclear . .. .

apiVersion: platform.confluent.io/v1beta1 kind: Connector metadata: name: XXXXXXXXXXXXXXXXXXXXX namespace: ${namespace} spec: name: XXXXXXXXXXXXXXXXXXXXXXXXXX class: "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector" taskMax: 1 restartPolicy: type: "OnFailure" maxRetry: 10 connectRest: endpoint: https://connector.${nspace}.xxx.cluster.local:8XXX authentication: type: bearer bearer: secretRef: v2-connectors-apikeys configs: errors.log.enable: "true" errors.log.include.messages: "true" connector.class: "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector" tasks.max: 1 fs.listing.class: "io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3FileSystemListing" aws.credentials.provider.class: "com.amazonaws.auth.InstanceProfileCredentialsProvider" aws.s3.region: "us-east-1" aws.s3.bucket.name: "XXXXXXXXXXXXXXXXXXXXXX" aws.s3.bucket.prefix: "XXXXXXXXXXXXXXXXXX" aws.s3.default.object.storage.class: "STANDARD" fs.listing.filters: "io.streamthoughts.kafka.connect.filepulse.fs.filter.RegexFileListFilter" fs.listing.interval.ms: "300000" tasks.reader.class: "io.streamthoughts.kafka.connect.filepulse.fs.reader.AmazonS3RowFileInputReader" topic: "${env}.stage.XXXXXXXXXXXXXXXX" offset.policy.class: "io.streamthoughts.kafka.connect.filepulse.offset.DefaultSourceOffsetPolicy" filters: "ParseCSVLine,SetKey,ProvideId,AnnotateWithType" filters.ParseCSVLine.type: "io.streamthoughts.kafka.connect.filepulse.filter.CSVFilter" filters.ParseCSVLine.trim.column: "true" filters.ParseCSVLine.separator: "|" filters.ParseCSVLine.columns:

"AMS: # Alternative - Hybrid Tru|AMS: % Alternative - Hybrid Tru|AMS: $ Electric/Hybrid Cars|AMS: % Electric/Hybrid Cars|AMS: # Alternative - Natural_Gas|AMS: Alternative Power>Natural Ga|Property/Realty: Home Stories|Property/Realty: Home Bath|Property/Realty: Home Bedrooms|Property/Realty: Home Total Rooms|Property/Realty: Home Exterior Wall T|BehaviorBank: Hi-tech owner|BehaviorBank: Internet/online subscri|SRVY:HH Acty/Int:Socl Caus/Con:Enviro"

filters.SetKey.type: "io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter"
filters.SetKey.field: "$key"
filters.SetKey.value: "$value.Number"
filters.AnnotateWithType.type: "io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter"
filters.AnnotateWithType.field: "$._t"
filters.AnnotateWithType.value: "XXXXXXXXXX"
filters.ProvideId.type: "io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter"
filters.ProvideId.field: "$._id"
filters.ProvideId.value: "$value.Number"
fs.cleanup.policy.class: "io.streamthoughts.kafka.connect.filepulse.fs.clean.AmazonS3MoveCleanupPolicy"
fs.cleanup.policy.triggered.on: "COMMITTED"
fs.cleanup.policy.move.success.aws.bucket.name: "SUCCESS_XXXXXXXX"
fs.cleanup.policy.move.success.aws.prefix.path: "SUCCESS/processed"
fs.cleanup.policy.move.failure.aws.bucket.name: "FAILURE_XXXXXX"
fs.cleanup.policy.move.failure.aws.prefix.path: "FAILURE/error"	
file.filter.regex.pattern: ".*\\.csv|.*\\.txt$"
offset.attributes.string: "name"
skip.headers: "1"
tasks.file.status.storage.class: "io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore"
tasks.file.status.storage.bootstrap.servers: "kafka.${namespace}.svc.cluster.local:9071"
tasks.file.status.storage.topic: "${env}.prep.XXXXXXXXData-Status"
tasks.file.status.storage.topic.partitions: "1"
tasks.file.status.storage.topic.replication.factor: "3"
key.converter: "org.apache.kafka.connect.storage.StringConverter"
key.converter.schemas.enable: "false"
value.converter: "org.apache.kafka.connect.json.JsonConverter"
value.converter.schemas.enable: "false"
tasks.file.status.storage.producer.security.protocol : "SASL_SSL"
tasks.file.status.storage.producer.ssl.endpoint.identification.algorithm : "https"
tasks.file.status.storage.producer.sasl.mechanism : "PLAIN"
tasks.file.status.storage.producer.sasl.jaas.config : "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$${file:/mnt/secrets/v2-connectors-apikeys/plain.txt:username}\" password =\"$${file:/mnt/secrets/apikeys/plain.txt:password}\";"
tasks.file.status.storage.producer.request.timeout.ms : "20000"
tasks.file.status.storage.consumer.security.protocol : "SASL_SSL"
tasks.file.status.storage.consumer.ssl.endpoint.identification.algorithm : "https"
tasks.file.status.storage.consumer.sasl.mechanism : "PLAIN"
tasks.file.status.storage.consumer.sasl.jaas.config : "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$${file:/mnt/secretkeys/plain.txt:username}\" password=\"$${file:/mnt/secretkeys/plain.txt:password}\";"
tasks.file.status.storage.consumer.request.timeout.ms : "20000"
principal.service.name: "$${file:/mnt/secrets/connector-configs/s3-src:username}"
principal.service.password: "$${file:/mnt/secrets/src:password}"

e160842 avatar Oct 31 '23 15:10 e160842

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

github-actions[bot] avatar Jan 30 '24 01:01 github-actions[bot]