kryptonite-for-kafka icon indicating copy to clipboard operation
kryptonite-for-kafka copied to clipboard

Support for array of objects

Open ArturKT opened this issue 2 years ago • 4 comments

Any plan to support ELEMENT encryption for an array of objects? Example data:

{
  "_id":"gabd9a39-9856-38b6-b983-94513f746f34",
  "currency":"ABC",
  "fields":[
    {
      "fieldOne":"someValueOne",
      "fieldTwo":"someValueTwo"
    },
    {
      "fieldOne":"anotherValueOne",
      "fieldTwo":"anotherValueTwo"
    },
  ],
  "nested":{
    "value": 123,
    "valueString":"abc"
  }
}

I would like to encrypt all `fields[*].fieldOne'. I have tried that by setting:

        "transforms.cipher.field_config": "[{\"name\":\"fields\"},{\"name\":\"fields.fieldOne\"}]",
        "transforms.cipher.field_mode": "ELEMENT"

But that fails with exception:

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
    at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
    at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:395)
    at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:361)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
    at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.lang.Thread.run(Thread.java:829)
Caused by: java.util.NoSuchElementException: no default type mapping found for type STRUCT (optional true) and cipher mode ENCRYPT
    at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.TypeSchemaMapper.lambda$getSchemaForPrimitiveType$0(TypeSchemaMapper.java:63)
    at java.util.Optional.orElseThrow(Optional.java:408)
    at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.TypeSchemaMapper.getSchemaForPrimitiveType(TypeSchemaMapper.java:62)
    at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaRewriter.adaptArraySchema(SchemaRewriter.java:110)
    at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaRewriter.adaptField(SchemaRewriter.java:87)
    at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaRewriter.adaptSchema(SchemaRewriter.java:74)
    at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField.processWithSchema(CipherField.java:184)
    at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField.apply(CipherField.java:165)
    at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
    ... 13 more

ArturKT avatar Jun 13 '23 11:06 ArturKT

A similar use case for decryption. There is no way to encrypt a single field from an object inside an array. So I have used field_mode OBJECT and wanted to decrypt it in the sink connector.

Payload to encrypt:

{
  "_id":"gabd9a39-9856-38b6-b983-94513f746f34",
  "currency":"ABC",
  "fields":[
    {
      "fieldOne":"someValueOne",
      "fieldTwo":"someValueTwo"
    },
    {
      "fieldOne":"anotherValueOne",
      "fieldTwo":"anotherValueTwo"
    },
  ],
  "nested":{
    "value": 123,
    "valueString":"abc"
  }
}

Config:

(...)
        "transforms.cipher.field_config": "[{\"name\":\"fields\"}",
        "transforms.cipher.field_mode": "OBJECT"

That works just fine, the output is something like:

"fields":"5QMBG3qEIVeGriTdTLKO6QWECoOjAi1qcJX(...)"

But then I want to decrypt the value in the sink connector, so I use:

        "transforms.cipher.field_config": "[{\"name\":\"fields\", \"schema\": {\"type\":\"ARRAY\",\"valueSchema\":{\"type\":\"STRUCT\",\"fields\":[{\"type\":\"STRING\",\"optional\":true,\"field\":\"fieldOne\"},{\"type\":\"STRING\",\"optional\":true,\"field\":\"fieldTwo\"}],\"optional\":true,\"name\":\"fields\"},\"optional\":true,\"name\":\"fields\",\"field\":\"fields\"}}]",
        "transforms.cipher.field_mode": "OBJECT"

That fails with an error:

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in the error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
        at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:546)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:499)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:335)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
        at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: expected primitive value type for array elements but found STRUCT
        at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaRewriter.extractAndAdaptArraySchemaFromConfig(SchemaRewriter.java:189)
        at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaRewriter.adaptArraySchema(SchemaRewriter.java:118)
        at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaRewriter.adaptField(SchemaRewriter.java:87)
        at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaRewriter.adaptSchema(SchemaRewriter.java:74)
        at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField.processWithSchema(CipherField.java:184)
        at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField.apply(CipherField.java:165)
        at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
        ... 15 more

I have debugged the source code and I found that there is a TODO: https://github.com/hpgrahsl/kryptonite-for-kafka/blob/master/connect-transform-kryptonite/src/main/java/com/github/hpgrahsl/kafka/connect/transforms/kryptonite/SchemaRewriter.java#L187

ArturKT avatar Jun 13 '23 14:06 ArturKT

Hi there. Thanks for reaching out and opening this issue. The answer to this one needs some context and explanation:

  1. if your records are with schema the processing of objects within arrays is currently not supported.

  2. what could work for you maybe is to switch to schemaless records. then it should work to encrypt a map within an array. the only limitation for now is that by default all map fields are encrypted or none, meaning you cannot select only a single field of a map within an array for encryption.

hope this clarifies why you see the behaviour you posted above.

hpgrahsl avatar Jun 13 '23 20:06 hpgrahsl

Thanks @hpgrahsl, I will try with the schemaless setup.

ArturKT avatar Jun 14 '23 08:06 ArturKT

@ArturKT did it work for you to go with a schemaless approach for your enc/dec scenario?

hpgrahsl avatar Jun 20 '23 20:06 hpgrahsl