kafka-connect-hdfs icon indicating copy to clipboard operation
kafka-connect-hdfs copied to clipboard

org.apache.avro.AvroRuntimeException: already open

Open rubybj opened this issue 6 years ago • 6 comments

image

rubybj avatar Mar 19 '18 08:03 rubybj

Hi, same problem for me, it seems that the sink tries to open twice the file before writing, and so it fails:

[2018-05-18 14:13:18,031] INFO Finished recovery for topic partition mi.test-0 (io.confluent.connect.hdfs.TopicPartitionWriter:267)

[2018-05-18 14:13:18,233] INFO Opening record writer for: hdfs://<url>/mitest//+tmp/mi.test/year=2018/month=05/day=18/hour=14/a9e62db7-46fb-4c73-9479-61f3648496f8_tmp.avro (io.confluent.connect.hdfs.avro.AvroRecordWriterProvider:65)

[2018-05-18 14:13:18,318] INFO Opening record writer for: hdfs://<url>/mitest//+tmp/mi.test/year=2018/month=05/day=18/hour=14/a9e62db7-46fb-4c73-9479-61f3648496f8_tmp.avro (io.confluent.connect.hdfs.avro.AvroRecordWriterProvider:65)

[2018-05-18 14:13:18,321] ERROR Task fxrfq-hdfs-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:482) org.apache.avro.AvroRuntimeException: already open at org.apache.avro.file.DataFileWriter.assertNotOpen(DataFileWriter.java:85) at org.apache.avro.file.DataFileWriter.setCodec(DataFileWriter.java:93) at io.confluent.connect.hdfs.avro.AvroRecordWriterProvider$1.write(AvroRecordWriterProvider.java:69) at io.confluent.connect.hdfs.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:643) at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:379) at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:374) at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:109) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:464) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

[2018-05-18 14:13:18,322] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:483)

I'm trying a very basic configuration:

mitest.properties

name=fxrfq-hdfs-sink connector.class=io.confluent.connect.hdfs.HdfsSinkConnector tasks.max=1 topics=mi.test hdfs.url=hdfs:// flush.size=3 topics.dir=mitest format.class=io.confluent.connect.hdfs.avro.AvroFormat partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner

The test topic has only one partition.

pmid avatar May 18 '18 12:05 pmid

Hi again,

I think I found the bug, or at least a workaround :-)


I started from a very simple configuration (confluent 4.1.1), and it seems that the "Already open" issue arises when the flush.size is greater than 1, with a single partition.

From what I can see, a tmp file is opened before every flush to write the avro file and, if the flush size is > 1, the AvroRecordWriterProvider tries to open the tmp files with the same file, hence the exception.

Two brief extracts from the logs:

  1. flush.size = 1 ---> message is saved

[2018-05-21 15:37:48,089] INFO Finished recovery for topic partition mi.fx.oracle.FX-USER-0 (io.confluent.connect.hdfs.TopicPartitionWriter:267) [2018-05-21 15:37:48,177] INFO Opening record writer for: hdfs://<...url...>:8020/raw_data//+tmp/mi.fx.oracle.FX-USER/partition=0/d741978a-c339-4378-8f84-f28d3384bb68_tmp.avro (io.confluent.connect.hdfs.avro.AvroRecordWriterProvider:65) [2018-05-21 15:37:48,253] INFO Starting commit and rotation for topic partition mi.fx.oracle.FX-USER-0 with start offsets {partition=0=119} and end offsets {partition=0=119} (io.confluent.connect.hdfs.TopicPartitionWriter:368) [2018-05-21 15:37:48,377] INFO Successfully acquired lease for hdfs://<...url...>:8020/logs/mi.fx.oracle.FX-USER/0/log (io.confluent.connect.hdfs.wal.FSWAL:75) [2018-05-21 15:37:48,411] INFO Committed hdfs://<...url...>:8020/raw_data/mi.fx.oracle.FX-USER/partition=0/mi.fx.oracle.FX-USER+0+0000000119+0000000119.avro for mi.fx.oracle.FX-USER-0 (io.confluent.connect.hdfs.TopicPartitionWriter:746) [2018-05-21 15:37:48,411] INFO Opening record writer for: hdfs://<...url...>:8020/raw_data//+tmp/mi.fx.oracle.FX-USER/partition=0/d741978a-c339-4378-8f84-f28d3384bb68_tmp.avro (io.confluent.connect.hdfs.avro.AvroRecordWriterProvider:65) [2018-05-21 15:37:48,414] INFO Starting commit and rotation for topic partition mi.fx.oracle.FX-USER-0 with start offsets {partition=0=120} and end offsets {partition=0=120} (io.confluent.connect.hdfs.TopicPartitionWriter:368)

  1. flush.size = 2 ---> exception

[2018-05-21 15:35:40,445] INFO Finished recovery for topic partition mi.fx.oracle.FX-USER-0 (io.confluent.connect.hdfs.TopicPartitionWriter:267) [2018-05-21 15:35:40,536] INFO Opening record writer for: hdfs://<...url...>:8020/raw_data//+tmp/mi.fx.oracle.FX-USER/partition=0/a4584b96-d2be-47d0-ae34-a24835f1fe8b_tmp.avro (io.confluent.connect.hdfs.avro.AvroRecordWriterProvider:65) [2018-05-21 15:35:40,599] INFO Opening record writer for: hdfs://<...url...>:8020/raw_data//+tmp/mi.fx.oracle.FX-USER/partition=0/a4584b96-d2be-47d0-ae34-a24835f1fe8b_tmp.avro (io.confluent.connect.hdfs.avro.AvroRecordWriterProvider:65) [2018-05-21 15:35:40,607] ERROR WorkerSinkTask{id=fxuser-hdfs-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:544) org.apache.avro.AvroRuntimeException: already open at org.apache.avro.file.DataFileWriter.assertNotOpen(DataFileWriter.java:85) at org.apache.avro.file.DataFileWriter.setCodec(DataFileWriter.java:93) at io.confluent.connect.hdfs.avro.AvroRecordWriterProvider$1.write(AvroRecordWriterProvider.java:69) at io.confluent.connect.hdfs.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:643) at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:379) at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:374) at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:109) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:524) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:302) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

I hope this can help.

Regards,

Paolo

pmid avatar May 21 '18 14:05 pmid

Any update/pointer on this issue ?

I'm having the same issue with the configuration below:

{
  "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
  "tasks.max": 4,
  "topics": "topicA,topicB",
  "hdfs.url": "hdfs://some-master-url:8020",
  "flush.size": 50000,
  "topics.dir": "topics",
  "logs.dir": "logs",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter.schemas.enable": false,
  "format.class": "io.confluent.connect.hdfs.avro.AvroFormat",
  "schema.compatibility": "FULL",
  "enhanced.avro.schema.support": true,
  "partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner",
  "timezone": "UTC",
  "locale": "en",
  "timestamp.extractor": "Record"
}

Using the following versions:

  • Connector 4.1.1 (also tried older 4.0.X)
  • Kafka 1.1.0
  • HDFS 2.7.3

I tried both standalone and distributed modes. I tried using confluent packages and building from sources. Source topics have multiple partitions, therefore I tried different values for tasks.max, and also different partitioners like DefaultPartitioner. I always get the same error org.apache.avro.AvroRuntimeException: already open.

Any help would be greatly appreciated @cricket007 @ewencp

lerouxrgd avatar Jun 13 '18 18:06 lerouxrgd

we are facing the same issue on production, even with hdfs3. Opened i ticket to confluent support. It is not a misconfiguration ! moreover it is not acceptable to have flush.size = 1 since huge number of tiny files on hdfs it leads to a severe performance degradation

ghost avatar Apr 23 '20 09:04 ghost

Any updates on this? I'm having the same issue

bradurani avatar Nov 05 '20 09:11 bradurani

Any updates on this? I'm having the same issue

diegoot-dev avatar Aug 22 '22 21:08 diegoot-dev