camel-kafka-connector
camel-kafka-connector copied to clipboard
How to configure `camel.sink.marshal` ?
I'm testing camel-azure-storage-blob-sink-kafka-connector from k8s.
So far, I've confirmed CamelHeader option works and the log is uploaded to arbitrary path.
Now I'm testing camel.sink.marshal option to save some usage on Azure Blob Storage, but haven't been successful yet.
I've followed some configuration like zipfile for camel.sink.marshal option, but faced the following errors.
2023-09-27 03:14:35,322 ERROR [blob-connector|task-0] WorkerSinkTask{id=blob-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.Work
erTask) [task-thread-blob-connector-0]
org.apache.kafka.connect.errors.ConnectException: Failed to create and start Camel context
at org.apache.camel.kafkaconnector.CamelSinkTask.start(CamelSinkTask.java:159)
at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:315)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.camel.RuntimeCamelException: org.apache.camel.VetoCamelContextStartException: Failure creating route from template: ckcMarshal
at org.apache.camel.RuntimeCamelException.wrapRuntimeException(RuntimeCamelException.java:66)
at org.apache.camel.support.service.BaseService.doFail(BaseService.java:413)
at org.apache.camel.impl.engine.AbstractCamelContext.doFail(AbstractCamelContext.java:3550)
at org.apache.camel.support.service.BaseService.fail(BaseService.java:342)
at org.apache.camel.impl.engine.AbstractCamelContext.failOnStartup(AbstractCamelContext.java:5204)
at org.apache.camel.impl.engine.AbstractCamelContext.init(AbstractCamelContext.java:2642)
at org.apache.camel.support.service.BaseService.start(BaseService.java:111)
at org.apache.camel.impl.engine.AbstractCamelContext.start(AbstractCamelContext.java:2649)
at org.apache.camel.impl.DefaultCamelContext.start(DefaultCamelContext.java:262)
at org.apache.camel.main.SimpleMain.doStart(SimpleMain.java:43)
at org.apache.camel.support.service.BaseService.start(BaseService.java:119)
at org.apache.camel.kafkaconnector.CamelSinkTask.start(CamelSinkTask.java:152)
... 9 more
Caused by: org.apache.camel.VetoCamelContextStartException: Failure creating route from template: ckcMarshal
at org.apache.camel.component.kamelet.KameletComponent$LifecycleHandler.onContextInitialized(KameletComponent.java:433)
at org.apache.camel.impl.engine.AbstractCamelContext.doInit(AbstractCamelContext.java:2956)
at org.apache.camel.support.service.BaseService.init(BaseService.java:83)
at org.apache.camel.impl.engine.AbstractCamelContext.init(AbstractCamelContext.java:2630)
... 15 more
Caused by: org.apache.camel.component.kamelet.KameletNotFoundException: Kamelet with id ckcMarshal not found in locations: classpath:/kamelets
at org.apache.camel.component.kamelet.KameletComponent$LifecycleHandler.createRouteForEndpoint(KameletComponent.java:421)
at org.apache.camel.component.kamelet.KameletComponent$LifecycleHandler.onContextInitialized(KameletComponent.java:430)
... 18 more
Caused by: org.apache.camel.FailedToCreateRouteException: Failed to create route ckcMarshal-4 at: >>> Marshal[CustomDataFormat[{{marshal}}]] <<< in route: Route(ckcMarshal-4)[From[kamelet://source?routeId=ckcMarshal... because of Cannot find data format in registry with ref: zipfile
at org.apache.camel.reifier.RouteReifier.doCreateRoute(RouteReifier.java:241)
at org.apache.camel.reifier.RouteReifier.createRoute(RouteReifier.java:75)
at org.apache.camel.impl.DefaultModelReifierFactory.createRoute(DefaultModelReifierFactory.java:49)
at org.apache.camel.impl.DefaultCamelContext.startRouteDefinitions(DefaultCamelContext.java:862)
at org.apache.camel.component.kamelet.KameletComponent$LifecycleHandler.createRouteForEndpoint(KameletComponent.java:416)
... 19 more
Caused by: java.lang.IllegalArgumentException: Cannot find data format in registry with ref: zipfile
at org.apache.camel.reifier.dataformat.DataFormatReifier.getDataFormat(DataFormatReifier.java:142)
at org.apache.camel.reifier.dataformat.DataFormatReifier.getDataFormat(DataFormatReifier.java:115)
at org.apache.camel.reifier.dataformat.CustomDataFormatReifier.doCreateDataFormat(CustomDataFormatReifier.java:35)
at org.apache.camel.reifier.dataformat.DataFormatReifier.createDataFormat(DataFormatReifier.java:266)
at org.apache.camel.reifier.dataformat.DataFormatReifier.getDataFormat(DataFormatReifier.java:151)
at org.apache.camel.reifier.dataformat.DataFormatReifier.getDataFormat(DataFormatReifier.java:111)
at org.apache.camel.reifier.MarshalReifier.createProcessor(MarshalReifier.java:35)
at org.apache.camel.reifier.ProcessorReifier.makeProcessor(ProcessorReifier.java:847)
at org.apache.camel.reifier.ProcessorReifier.addRoutes(ProcessorReifier.java:588)
at org.apache.camel.reifier.RouteReifier.doCreateRoute(RouteReifier.java:237)
... 23 more
2023-09-27 03:14:35,323 INFO [blob-connector|task-0] Stopping CamelSinkTask connector task (org.apache.camel.kafkaconnector.CamelSinkTask) [task-thread-blob-connector-0]
I've also tested with org.apache.camel.model.dataformat.ZipFileDataFormat, then faced the following.
2023-09-27 03:13:40,704 ERROR [blob-connector|task-0] WorkerSinkTask{id=blob-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.Work
erTask) [task-thread-blob-connector-0]
org.apache.kafka.connect.errors.ConnectException: Failed to create and start Camel context
at org.apache.camel.kafkaconnector.CamelSinkTask.start(CamelSinkTask.java:159)
at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:315)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.camel.RuntimeCamelException: org.apache.camel.VetoCamelContextStartException: Failure creating route from template: ckcMarshal
at org.apache.camel.RuntimeCamelException.wrapRuntimeException(RuntimeCamelException.java:66)
at org.apache.camel.support.service.BaseService.doFail(BaseService.java:413)
at org.apache.camel.impl.engine.AbstractCamelContext.doFail(AbstractCamelContext.java:3550)
at org.apache.camel.support.service.BaseService.fail(BaseService.java:342)
at org.apache.camel.impl.engine.AbstractCamelContext.failOnStartup(AbstractCamelContext.java:5204)
at org.apache.camel.impl.engine.AbstractCamelContext.init(AbstractCamelContext.java:2642)
at org.apache.camel.support.service.BaseService.start(BaseService.java:111)
at org.apache.camel.impl.engine.AbstractCamelContext.start(AbstractCamelContext.java:2649)
at org.apache.camel.impl.DefaultCamelContext.start(DefaultCamelContext.java:262)
at org.apache.camel.main.SimpleMain.doStart(SimpleMain.java:43)
at org.apache.camel.support.service.BaseService.start(BaseService.java:119)
at org.apache.camel.kafkaconnector.CamelSinkTask.start(CamelSinkTask.java:152)
... 9 more
Caused by: org.apache.camel.VetoCamelContextStartException: Failure creating route from template: ckcMarshal
at org.apache.camel.component.kamelet.KameletComponent$LifecycleHandler.onContextInitialized(KameletComponent.java:433)
at org.apache.camel.impl.engine.AbstractCamelContext.doInit(AbstractCamelContext.java:2956)
at org.apache.camel.support.service.BaseService.init(BaseService.java:83)
at org.apache.camel.impl.engine.AbstractCamelContext.init(AbstractCamelContext.java:2630)
... 15 more
Caused by: org.apache.camel.component.kamelet.KameletNotFoundException: Kamelet with id ckcMarshal not found in locations: classpath:/kamelets
at org.apache.camel.component.kamelet.KameletComponent$LifecycleHandler.createRouteForEndpoint(KameletComponent.java:421)
at org.apache.camel.component.kamelet.KameletComponent$LifecycleHandler.onContextInitialized(KameletComponent.java:430)
... 18 more
Caused by: org.apache.camel.FailedToCreateRouteException: Failed to create route ckcMarshal-1 at: >>> Marshal[CustomDataFormat[{{marshal}}]] <<< in route: Route(ckcMarshal-1)[From[kamelet://source?routeId=ckcMarshal... because of Resolving datafor
mat: org.apache.camel.model.dataformat.ZipFileDataFormat detected type conflict: Not a DataFormat implementation. Found: org.apache.camel.model.dataformat.ZipFileDataFormat
at org.apache.camel.reifier.RouteReifier.doCreateRoute(RouteReifier.java:241)
at org.apache.camel.reifier.RouteReifier.createRoute(RouteReifier.java:75)
at org.apache.camel.impl.DefaultModelReifierFactory.createRoute(DefaultModelReifierFactory.java:49)
at org.apache.camel.impl.DefaultCamelContext.startRouteDefinitions(DefaultCamelContext.java:862)
at org.apache.camel.component.kamelet.KameletComponent$LifecycleHandler.createRouteForEndpoint(KameletComponent.java:416)
... 19 more
Caused by: java.lang.IllegalArgumentException: Resolving dataformat: org.apache.camel.model.dataformat.ZipFileDataFormat detected type conflict: Not a DataFormat implementation. Found: org.apache.camel.model.dataformat.ZipFileDataFormat
at org.apache.camel.impl.engine.DefaultDataFormatResolver.createDataFormatFromResource(DefaultDataFormatResolver.java:76)
at org.apache.camel.impl.engine.DefaultDataFormatResolver.createDataFormat(DefaultDataFormatResolver.java:47)
at org.apache.camel.impl.engine.AbstractCamelContext.lambda$resolveDataFormat$3(AbstractCamelContext.java:4473)
at java.base/java.util.Optional.orElseGet(Optional.java:364)
at org.apache.camel.impl.engine.AbstractCamelContext.lambda$resolveDataFormat$4(AbstractCamelContext.java:4473)
at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1708)
at org.apache.camel.impl.engine.AbstractCamelContext.resolveDataFormat(AbstractCamelContext.java:4464)
at org.apache.camel.reifier.dataformat.DataFormatReifier.getDataFormat(DataFormatReifier.java:140)
at org.apache.camel.reifier.dataformat.DataFormatReifier.getDataFormat(DataFormatReifier.java:115)
at org.apache.camel.reifier.dataformat.CustomDataFormatReifier.doCreateDataFormat(CustomDataFormatReifier.java:35)
at org.apache.camel.reifier.dataformat.DataFormatReifier.createDataFormat(DataFormatReifier.java:266)
at org.apache.camel.reifier.dataformat.DataFormatReifier.getDataFormat(DataFormatReifier.java:151)
at org.apache.camel.reifier.dataformat.DataFormatReifier.getDataFormat(DataFormatReifier.java:111)
at org.apache.camel.reifier.MarshalReifier.createProcessor(MarshalReifier.java:35)
at org.apache.camel.reifier.ProcessorReifier.makeProcessor(ProcessorReifier.java:847)
at org.apache.camel.reifier.ProcessorReifier.addRoutes(ProcessorReifier.java:588)
at org.apache.camel.reifier.RouteReifier.doCreateRoute(RouteReifier.java:237)
... 23 more
Can I have a bit more detailed explanation on how to specify this option? My configuration is something like below so far.
spec:
class: org.apache.camel.kafkaconnector.azurestorageblobsink.CamelAzurestorageblobsinkSinkConnector
config:
camel.kamelet.azure-storage-blob-sink.accountName: "my_account"
camel.kamelet.azure-storage-blob-sink.accessKey: "xxx"
camel.kamelet.azure-storage-blob-sink.containerName: "my_container"
camel.beans.aggregate: "#class:org.apache.camel.kafkaconnector.aggregator.StringAggregator"
camel.aggregation.size: 10
camel.aggregation.timeout: 500000
key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
value.converter: org.apache.kafka.connect.converters.ByteArrayConverter
topics: my-logs
camel.sink.marshal: zipfile