kafka-connect-fs icon indicating copy to clipboard operation
kafka-connect-fs copied to clipboard

Examples for ebcdic files

Open audirline opened this issue 2 years ago • 1 comments

Hello @mmolimar,

Do you have example of configuration to read ebcdic files and put the content in a Kafka topic? I understand we can use this connector to do that.

I tried with the following:

name=FsSourceConnector
connector.class=com.github.mmolimar.kafka.connect.fs.FsSourceConnector
tasks.max=1
fs.uris=/tutorial/source/demo-1
topic=ebcdic
policy.class=com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy
policy.recursive=true
policy.regexp=^.*\..bin$
policy.batch_size=0
policy.cleanup=none
file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.CobolFileReader
file_reader.batch_size=0
policy.sleepy.sleep=50000
file_reader.cobol.copybook.content=test-eb/EBCDIC_BOOK.cob
file_reader.cobol.reader.is_ebcdic=true

I found some examples of ebcdic file and the corresponding copybook here: https://github.com/jrperin/cobol-copybook.jsonifier/tree/master/examples

But I have this error on my kafka cluster:

[2022-10-30 02:12:47,283] WARN FsSourceTask Error reading file [file:/tutorial/source/demo-1/ebcdic/example.bin]: An error has occurred when creating reader for file: file:/tutorial/source/demo-1/ebcdic/example.bin. Keep going... (com.github.mmolimar.kafka.connect.fs.FsSourceTask:103)
org.apache.kafka.connect.errors.ConnectException: An error has occurred when creating reader for file: file:/tutorial/source/demo-1/ebcdic/example.bin
	at com.github.mmolimar.kafka.connect.fs.policy.AbstractPolicy.offer(AbstractPolicy.java:250)
	at com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy.offer(SleepyPolicy.java:11)
	at com.github.mmolimar.kafka.connect.fs.FsSourceTask.lambda$poll$1(FsSourceTask.java:92)
	at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
	at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
	at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
	at com.github.mmolimar.kafka.connect.fs.FsSourceTask.poll(FsSourceTask.java:109)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:291)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:248)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:239)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: za.co.absa.cobrix.cobol.parser.exceptions.SyntaxErrorException: Syntax error in the copybook at line 1: Invalid input 'b' at position 1:6
	at com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils.make(ReflectionUtils.java:36)
	at com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils.makeReader(ReflectionUtils.java:20)
	at com.github.mmolimar.kafka.connect.fs.policy.AbstractPolicy.lambda$offer$8(AbstractPolicy.java:228)
	at java.base/java.util.Optional.orElseGet(Optional.java:369)
	at com.github.mmolimar.kafka.connect.fs.policy.AbstractPolicy.offer(AbstractPolicy.java:248)
	... 19 more
Caused by: za.co.absa.cobrix.cobol.parser.exceptions.SyntaxErrorException: Syntax error in the copybook at line 1: Invalid input 'b' at position 1:6
	at za.co.absa.cobrix.cobol.parser.antlr.ThrowErrorStrategy.recover(ANTLRParser.scala:37)
	at za.co.absa.cobrix.cobol.parser.antlr.copybookParser.item(copybookParser.java:3202)
	at za.co.absa.cobrix.cobol.parser.antlr.copybookParser.main(copybookParser.java:217)
	at za.co.absa.cobrix.cobol.parser.antlr.ANTLRParser$.parse(ANTLRParser.scala:73)
	at za.co.absa.cobrix.cobol.parser.CopybookParser$.parseTree(CopybookParser.scala:216)
	at za.co.absa.cobrix.cobol.reader.VarLenNestedReader.loadCopyBook(VarLenNestedReader.scala:207)
	at za.co.absa.cobrix.cobol.reader.VarLenNestedReader.<init>(VarLenNestedReader.scala:54)
	at com.github.mmolimar.kafka.connect.fs.file.reader.CobrixReader$.varLenReader(CobrixReader.scala:12)
	at com.github.mmolimar.kafka.connect.fs.file.reader.CobrixReader.varLenReader(CobrixReader.scala)

Do I have the right configuration? Do you have examples that work?

Thanks for your feedback.

audirline avatar Oct 30 '22 02:10 audirline

Have you taken a look to the tests?

mmolimar avatar Nov 06 '22 02:11 mmolimar