camel-kafka-connector icon indicating copy to clipboard operation
camel-kafka-connector copied to clipboard

How to configure `camel.sink.marshal` ?

Open tgib23 opened this issue 2 years ago • 0 comments

I'm testing camel-azure-storage-blob-sink-kafka-connector from k8s. So far, I've confirmed CamelHeader option works and the log is uploaded to arbitrary path. Now I'm testing camel.sink.marshal option to save some usage on Azure Blob Storage, but haven't been successful yet. I've followed some configuration like zipfile for camel.sink.marshal option, but faced the following errors.

2023-09-27 03:14:35,322 ERROR [blob-connector|task-0] WorkerSinkTask{id=blob-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.Work
erTask) [task-thread-blob-connector-0]
org.apache.kafka.connect.errors.ConnectException: Failed to create and start Camel context
        at org.apache.camel.kafkaconnector.CamelSinkTask.start(CamelSinkTask.java:159)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:315)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
        at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.camel.RuntimeCamelException: org.apache.camel.VetoCamelContextStartException: Failure creating route from template: ckcMarshal
        at org.apache.camel.RuntimeCamelException.wrapRuntimeException(RuntimeCamelException.java:66)
        at org.apache.camel.support.service.BaseService.doFail(BaseService.java:413)
        at org.apache.camel.impl.engine.AbstractCamelContext.doFail(AbstractCamelContext.java:3550)
        at org.apache.camel.support.service.BaseService.fail(BaseService.java:342)
        at org.apache.camel.impl.engine.AbstractCamelContext.failOnStartup(AbstractCamelContext.java:5204)
        at org.apache.camel.impl.engine.AbstractCamelContext.init(AbstractCamelContext.java:2642)
        at org.apache.camel.support.service.BaseService.start(BaseService.java:111)
        at org.apache.camel.impl.engine.AbstractCamelContext.start(AbstractCamelContext.java:2649)
        at org.apache.camel.impl.DefaultCamelContext.start(DefaultCamelContext.java:262)
        at org.apache.camel.main.SimpleMain.doStart(SimpleMain.java:43)
        at org.apache.camel.support.service.BaseService.start(BaseService.java:119)
        at org.apache.camel.kafkaconnector.CamelSinkTask.start(CamelSinkTask.java:152)
        ... 9 more
Caused by: org.apache.camel.VetoCamelContextStartException: Failure creating route from template: ckcMarshal
        at org.apache.camel.component.kamelet.KameletComponent$LifecycleHandler.onContextInitialized(KameletComponent.java:433)
        at org.apache.camel.impl.engine.AbstractCamelContext.doInit(AbstractCamelContext.java:2956)
        at org.apache.camel.support.service.BaseService.init(BaseService.java:83)
        at org.apache.camel.impl.engine.AbstractCamelContext.init(AbstractCamelContext.java:2630)
        ... 15 more
Caused by: org.apache.camel.component.kamelet.KameletNotFoundException: Kamelet with id ckcMarshal not found in locations: classpath:/kamelets
        at org.apache.camel.component.kamelet.KameletComponent$LifecycleHandler.createRouteForEndpoint(KameletComponent.java:421)
        at org.apache.camel.component.kamelet.KameletComponent$LifecycleHandler.onContextInitialized(KameletComponent.java:430)
        ... 18 more
Caused by: org.apache.camel.FailedToCreateRouteException: Failed to create route ckcMarshal-4 at: >>> Marshal[CustomDataFormat[{{marshal}}]] <<< in route: Route(ckcMarshal-4)[From[kamelet://source?routeId=ckcMarshal... because of Cannot find data format in registry with ref: zipfile
        at org.apache.camel.reifier.RouteReifier.doCreateRoute(RouteReifier.java:241)
        at org.apache.camel.reifier.RouteReifier.createRoute(RouteReifier.java:75)
        at org.apache.camel.impl.DefaultModelReifierFactory.createRoute(DefaultModelReifierFactory.java:49)
        at org.apache.camel.impl.DefaultCamelContext.startRouteDefinitions(DefaultCamelContext.java:862)
        at org.apache.camel.component.kamelet.KameletComponent$LifecycleHandler.createRouteForEndpoint(KameletComponent.java:416)
        ... 19 more
Caused by: java.lang.IllegalArgumentException: Cannot find data format in registry with ref: zipfile
        at org.apache.camel.reifier.dataformat.DataFormatReifier.getDataFormat(DataFormatReifier.java:142)
        at org.apache.camel.reifier.dataformat.DataFormatReifier.getDataFormat(DataFormatReifier.java:115)
        at org.apache.camel.reifier.dataformat.CustomDataFormatReifier.doCreateDataFormat(CustomDataFormatReifier.java:35)
        at org.apache.camel.reifier.dataformat.DataFormatReifier.createDataFormat(DataFormatReifier.java:266)
        at org.apache.camel.reifier.dataformat.DataFormatReifier.getDataFormat(DataFormatReifier.java:151)
        at org.apache.camel.reifier.dataformat.DataFormatReifier.getDataFormat(DataFormatReifier.java:111)
        at org.apache.camel.reifier.MarshalReifier.createProcessor(MarshalReifier.java:35)
        at org.apache.camel.reifier.ProcessorReifier.makeProcessor(ProcessorReifier.java:847)
        at org.apache.camel.reifier.ProcessorReifier.addRoutes(ProcessorReifier.java:588)
        at org.apache.camel.reifier.RouteReifier.doCreateRoute(RouteReifier.java:237)
        ... 23 more
2023-09-27 03:14:35,323 INFO [blob-connector|task-0] Stopping CamelSinkTask connector task (org.apache.camel.kafkaconnector.CamelSinkTask) [task-thread-blob-connector-0]

I've also tested with org.apache.camel.model.dataformat.ZipFileDataFormat, then faced the following.

2023-09-27 03:13:40,704 ERROR [blob-connector|task-0] WorkerSinkTask{id=blob-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.Work
erTask) [task-thread-blob-connector-0]
org.apache.kafka.connect.errors.ConnectException: Failed to create and start Camel context
        at org.apache.camel.kafkaconnector.CamelSinkTask.start(CamelSinkTask.java:159)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:315)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
        at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.camel.RuntimeCamelException: org.apache.camel.VetoCamelContextStartException: Failure creating route from template: ckcMarshal
        at org.apache.camel.RuntimeCamelException.wrapRuntimeException(RuntimeCamelException.java:66)
        at org.apache.camel.support.service.BaseService.doFail(BaseService.java:413)
        at org.apache.camel.impl.engine.AbstractCamelContext.doFail(AbstractCamelContext.java:3550)
        at org.apache.camel.support.service.BaseService.fail(BaseService.java:342)
        at org.apache.camel.impl.engine.AbstractCamelContext.failOnStartup(AbstractCamelContext.java:5204)
        at org.apache.camel.impl.engine.AbstractCamelContext.init(AbstractCamelContext.java:2642)
        at org.apache.camel.support.service.BaseService.start(BaseService.java:111)
        at org.apache.camel.impl.engine.AbstractCamelContext.start(AbstractCamelContext.java:2649)
        at org.apache.camel.impl.DefaultCamelContext.start(DefaultCamelContext.java:262)
        at org.apache.camel.main.SimpleMain.doStart(SimpleMain.java:43)
        at org.apache.camel.support.service.BaseService.start(BaseService.java:119)
        at org.apache.camel.kafkaconnector.CamelSinkTask.start(CamelSinkTask.java:152)
        ... 9 more
Caused by: org.apache.camel.VetoCamelContextStartException: Failure creating route from template: ckcMarshal
        at org.apache.camel.component.kamelet.KameletComponent$LifecycleHandler.onContextInitialized(KameletComponent.java:433)
        at org.apache.camel.impl.engine.AbstractCamelContext.doInit(AbstractCamelContext.java:2956)
        at org.apache.camel.support.service.BaseService.init(BaseService.java:83)
        at org.apache.camel.impl.engine.AbstractCamelContext.init(AbstractCamelContext.java:2630)
        ... 15 more
Caused by: org.apache.camel.component.kamelet.KameletNotFoundException: Kamelet with id ckcMarshal not found in locations: classpath:/kamelets
        at org.apache.camel.component.kamelet.KameletComponent$LifecycleHandler.createRouteForEndpoint(KameletComponent.java:421)
        at org.apache.camel.component.kamelet.KameletComponent$LifecycleHandler.onContextInitialized(KameletComponent.java:430)
        ... 18 more
Caused by: org.apache.camel.FailedToCreateRouteException: Failed to create route ckcMarshal-1 at: >>> Marshal[CustomDataFormat[{{marshal}}]] <<< in route: Route(ckcMarshal-1)[From[kamelet://source?routeId=ckcMarshal... because of Resolving datafor
mat: org.apache.camel.model.dataformat.ZipFileDataFormat detected type conflict: Not a DataFormat implementation. Found: org.apache.camel.model.dataformat.ZipFileDataFormat
        at org.apache.camel.reifier.RouteReifier.doCreateRoute(RouteReifier.java:241)
        at org.apache.camel.reifier.RouteReifier.createRoute(RouteReifier.java:75)
        at org.apache.camel.impl.DefaultModelReifierFactory.createRoute(DefaultModelReifierFactory.java:49)
        at org.apache.camel.impl.DefaultCamelContext.startRouteDefinitions(DefaultCamelContext.java:862)
        at org.apache.camel.component.kamelet.KameletComponent$LifecycleHandler.createRouteForEndpoint(KameletComponent.java:416)
        ... 19 more
Caused by: java.lang.IllegalArgumentException: Resolving dataformat: org.apache.camel.model.dataformat.ZipFileDataFormat detected type conflict: Not a DataFormat implementation. Found: org.apache.camel.model.dataformat.ZipFileDataFormat
        at org.apache.camel.impl.engine.DefaultDataFormatResolver.createDataFormatFromResource(DefaultDataFormatResolver.java:76)
        at org.apache.camel.impl.engine.DefaultDataFormatResolver.createDataFormat(DefaultDataFormatResolver.java:47)
        at org.apache.camel.impl.engine.AbstractCamelContext.lambda$resolveDataFormat$3(AbstractCamelContext.java:4473)
        at java.base/java.util.Optional.orElseGet(Optional.java:364)
        at org.apache.camel.impl.engine.AbstractCamelContext.lambda$resolveDataFormat$4(AbstractCamelContext.java:4473)
        at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1708)
        at org.apache.camel.impl.engine.AbstractCamelContext.resolveDataFormat(AbstractCamelContext.java:4464)
        at org.apache.camel.reifier.dataformat.DataFormatReifier.getDataFormat(DataFormatReifier.java:140)
        at org.apache.camel.reifier.dataformat.DataFormatReifier.getDataFormat(DataFormatReifier.java:115)
        at org.apache.camel.reifier.dataformat.CustomDataFormatReifier.doCreateDataFormat(CustomDataFormatReifier.java:35)
        at org.apache.camel.reifier.dataformat.DataFormatReifier.createDataFormat(DataFormatReifier.java:266)
        at org.apache.camel.reifier.dataformat.DataFormatReifier.getDataFormat(DataFormatReifier.java:151)
        at org.apache.camel.reifier.dataformat.DataFormatReifier.getDataFormat(DataFormatReifier.java:111)
        at org.apache.camel.reifier.MarshalReifier.createProcessor(MarshalReifier.java:35)
        at org.apache.camel.reifier.ProcessorReifier.makeProcessor(ProcessorReifier.java:847)
        at org.apache.camel.reifier.ProcessorReifier.addRoutes(ProcessorReifier.java:588)
        at org.apache.camel.reifier.RouteReifier.doCreateRoute(RouteReifier.java:237)
        ... 23 more

Can I have a bit more detailed explanation on how to specify this option? My configuration is something like below so far.

spec:
  class: org.apache.camel.kafkaconnector.azurestorageblobsink.CamelAzurestorageblobsinkSinkConnector
  config:
    camel.kamelet.azure-storage-blob-sink.accountName: "my_account"
    camel.kamelet.azure-storage-blob-sink.accessKey: "xxx"
    camel.kamelet.azure-storage-blob-sink.containerName: "my_container"
    camel.beans.aggregate: "#class:org.apache.camel.kafkaconnector.aggregator.StringAggregator"
    camel.aggregation.size: 10
    camel.aggregation.timeout: 500000
    key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
    value.converter: org.apache.kafka.connect.converters.ByteArrayConverter
    topics: my-logs
    camel.sink.marshal: zipfile

tgib23 avatar Sep 27 '23 03:09 tgib23