kafka-connect-hdfs
kafka-connect-hdfs copied to clipboard
org.apache.avro.AvroRuntimeException: already open
Hi, same problem for me, it seems that the sink tries to open twice the file before writing, and so it fails:
[2018-05-18 14:13:18,031] INFO Finished recovery for topic partition mi.test-0 (io.confluent.connect.hdfs.TopicPartitionWriter:267)
[2018-05-18 14:13:18,233] INFO Opening record writer for: hdfs://<url>/mitest//+tmp/mi.test/year=2018/month=05/day=18/hour=14/a9e62db7-46fb-4c73-9479-61f3648496f8_tmp.avro (io.confluent.connect.hdfs.avro.AvroRecordWriterProvider:65)
[2018-05-18 14:13:18,318] INFO Opening record writer for: hdfs://<url>/mitest//+tmp/mi.test/year=2018/month=05/day=18/hour=14/a9e62db7-46fb-4c73-9479-61f3648496f8_tmp.avro (io.confluent.connect.hdfs.avro.AvroRecordWriterProvider:65)
[2018-05-18 14:13:18,321] ERROR Task fxrfq-hdfs-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:482) org.apache.avro.AvroRuntimeException: already open at org.apache.avro.file.DataFileWriter.assertNotOpen(DataFileWriter.java:85) at org.apache.avro.file.DataFileWriter.setCodec(DataFileWriter.java:93) at io.confluent.connect.hdfs.avro.AvroRecordWriterProvider$1.write(AvroRecordWriterProvider.java:69) at io.confluent.connect.hdfs.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:643) at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:379) at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:374) at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:109) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:464) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
[2018-05-18 14:13:18,322] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:483)
I'm trying a very basic configuration:
mitest.properties
name=fxrfq-hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=mi.test
hdfs.url=hdfs://
The test topic has only one partition.
Hi again,
I think I found the bug, or at least a workaround :-)
I started from a very simple configuration (confluent 4.1.1), and it seems that the "Already open" issue arises when the flush.size is greater than 1, with a single partition.
From what I can see, a tmp file is opened before every flush to write the avro file and, if the flush size is > 1, the AvroRecordWriterProvider tries to open the tmp files with the same file, hence the exception.
Two brief extracts from the logs:
- flush.size = 1 ---> message is saved
[2018-05-21 15:37:48,089] INFO Finished recovery for topic partition mi.fx.oracle.FX-USER-0 (io.confluent.connect.hdfs.TopicPartitionWriter:267) [2018-05-21 15:37:48,177] INFO Opening record writer for: hdfs://<...url...>:8020/raw_data//+tmp/mi.fx.oracle.FX-USER/partition=0/d741978a-c339-4378-8f84-f28d3384bb68_tmp.avro (io.confluent.connect.hdfs.avro.AvroRecordWriterProvider:65) [2018-05-21 15:37:48,253] INFO Starting commit and rotation for topic partition mi.fx.oracle.FX-USER-0 with start offsets {partition=0=119} and end offsets {partition=0=119} (io.confluent.connect.hdfs.TopicPartitionWriter:368) [2018-05-21 15:37:48,377] INFO Successfully acquired lease for hdfs://<...url...>:8020/logs/mi.fx.oracle.FX-USER/0/log (io.confluent.connect.hdfs.wal.FSWAL:75) [2018-05-21 15:37:48,411] INFO Committed hdfs://<...url...>:8020/raw_data/mi.fx.oracle.FX-USER/partition=0/mi.fx.oracle.FX-USER+0+0000000119+0000000119.avro for mi.fx.oracle.FX-USER-0 (io.confluent.connect.hdfs.TopicPartitionWriter:746) [2018-05-21 15:37:48,411] INFO Opening record writer for: hdfs://<...url...>:8020/raw_data//+tmp/mi.fx.oracle.FX-USER/partition=0/d741978a-c339-4378-8f84-f28d3384bb68_tmp.avro (io.confluent.connect.hdfs.avro.AvroRecordWriterProvider:65) [2018-05-21 15:37:48,414] INFO Starting commit and rotation for topic partition mi.fx.oracle.FX-USER-0 with start offsets {partition=0=120} and end offsets {partition=0=120} (io.confluent.connect.hdfs.TopicPartitionWriter:368)
- flush.size = 2 ---> exception
[2018-05-21 15:35:40,445] INFO Finished recovery for topic partition mi.fx.oracle.FX-USER-0 (io.confluent.connect.hdfs.TopicPartitionWriter:267) [2018-05-21 15:35:40,536] INFO Opening record writer for: hdfs://<...url...>:8020/raw_data//+tmp/mi.fx.oracle.FX-USER/partition=0/a4584b96-d2be-47d0-ae34-a24835f1fe8b_tmp.avro (io.confluent.connect.hdfs.avro.AvroRecordWriterProvider:65) [2018-05-21 15:35:40,599] INFO Opening record writer for: hdfs://<...url...>:8020/raw_data//+tmp/mi.fx.oracle.FX-USER/partition=0/a4584b96-d2be-47d0-ae34-a24835f1fe8b_tmp.avro (io.confluent.connect.hdfs.avro.AvroRecordWriterProvider:65) [2018-05-21 15:35:40,607] ERROR WorkerSinkTask{id=fxuser-hdfs-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:544) org.apache.avro.AvroRuntimeException: already open at org.apache.avro.file.DataFileWriter.assertNotOpen(DataFileWriter.java:85) at org.apache.avro.file.DataFileWriter.setCodec(DataFileWriter.java:93) at io.confluent.connect.hdfs.avro.AvroRecordWriterProvider$1.write(AvroRecordWriterProvider.java:69) at io.confluent.connect.hdfs.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:643) at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:379) at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:374) at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:109) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:524) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:302) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
I hope this can help.
Regards,
Paolo
Any update/pointer on this issue ?
I'm having the same issue with the configuration below:
{
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"tasks.max": 4,
"topics": "topicA,topicB",
"hdfs.url": "hdfs://some-master-url:8020",
"flush.size": 50000,
"topics.dir": "topics",
"logs.dir": "logs",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"format.class": "io.confluent.connect.hdfs.avro.AvroFormat",
"schema.compatibility": "FULL",
"enhanced.avro.schema.support": true,
"partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner",
"timezone": "UTC",
"locale": "en",
"timestamp.extractor": "Record"
}
Using the following versions:
- Connector
4.1.1
(also tried older4.0.X
) - Kafka
1.1.0
- HDFS
2.7.3
I tried both standalone and distributed modes.
I tried using confluent packages and building from sources.
Source topics have multiple partitions, therefore I tried different values for tasks.max
, and also different partitioners like DefaultPartitioner
.
I always get the same error org.apache.avro.AvroRuntimeException: already open
.
Any help would be greatly appreciated @cricket007 @ewencp
we are facing the same issue on production, even with hdfs3. Opened i ticket to confluent support. It is not a misconfiguration ! moreover it is not acceptable to have flush.size = 1 since huge number of tiny files on hdfs it leads to a severe performance degradation
Any updates on this? I'm having the same issue
Any updates on this? I'm having the same issue