kafka-connect-transform-grok icon indicating copy to clipboard operation
kafka-connect-transform-grok copied to clipboard

Connector using Grok filter failed

Open bertrandcedric opened this issue 2 years ago • 0 comments

When I create differents connectors with a grok filter, I always have the following error on some connectors (not all) :

[2022-06-14 00:12:16,165] ERROR WorkerSourceTask{id=sample-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) org.apache.kafka.connect.errors.ConnectException: Failed to create filter with alias 'ParseGrok' at io.streamthoughts.kafka.connect.filepulse.config.SourceTaskConfig.filterByAlias(SourceTaskConfig.java:213) at io.streamthoughts.kafka.connect.filepulse.config.SourceTaskConfig.filters(SourceTaskConfig.java:199) at io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask.newFileRecordsPollingConsumer(FilePulseSourceTask.java:136) at io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask.start(FilePulseSourceTask.java:120) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:225) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.nio.file.ClosedFileSystemException at jdk.zipfs/jdk.nio.zipfs.ZipFileSystem.ensureOpen(ZipFileSystem.java:1105) at jdk.zipfs/jdk.nio.zipfs.ZipFileSystem.newInputStream(ZipFileSystem.java:551) at jdk.zipfs/jdk.nio.zipfs.ZipPath.newInputStream(ZipPath.java:708) at jdk.zipfs/jdk.nio.zipfs.ZipFileSystemProvider.newInputStream(ZipFileSystemProvider.java:276) at java.base/java.nio.file.Files.newInputStream(Files.java:155) at io.streamthoughts.kafka.connect.transform.pattern.GrokPatternResolver.readPatternDefinitionsFrom(GrokPatternResolver.java:188) at io.streamthoughts.kafka.connect.transform.pattern.GrokPatternResolver.loadPatternDefinitions(GrokPatternResolver.java:146) at io.streamthoughts.kafka.connect.transform.pattern.GrokPatternResolver.loadPredefinedPatterns(GrokPatternResolver.java:120) at io.streamthoughts.kafka.connect.transform.pattern.GrokPatternResolver.<init>(GrokPatternResolver.java:78) at io.streamthoughts.kafka.connect.filepulse.filter.GrokFilter.configure(GrokFilter.java:59) at io.streamthoughts.kafka.connect.filepulse.filter.AbstractRecordFilter.configure(AbstractRecordFilter.java:71) at io.streamthoughts.kafka.connect.filepulse.config.SourceTaskConfig.filterByAlias(SourceTaskConfig.java:210) ... 11 more

My connector config is :

"filters.ParseGrok.type": "io.streamthoughts.kafka.connect.filepulse.filter.GrokFilter", "filters.ParseGrok.pattern": ".{734}(?<code>.{4}).*", "filters.ParseGrok.overwrite": "message", "filters.ParseGrok.source": "message",

I use the version 7.1.1 of Kafka Connect.

I got the same issue in :

  • a local envionnement (Confluent Docker Image 7.1.1 with openjdk version "11.0.14.1" 2022-02-08 LTS)
  • a onpremise environnement with openjdk version "1.8.0_332"

If I do a pause/resume on the connector, the connector got the RUNNING status.

bertrandcedric avatar Jun 16 '22 13:06 bertrandcedric