storm-hdfs icon indicating copy to clipboard operation
storm-hdfs copied to clipboard

Unable to set directory or filename dynamically based on the information in a tuple?

Open huxihx opened this issue 9 years ago • 3 comments

Hi there, Thanks much for this contrib. We kind of design a log aggregating system which collects logs from sources. The system puts all the log lines into a single Kafka topic. With the help of storm-kafka, we could consume each log line right now, but encounter a problem when we are going to transform each line into an HDFS file.

Sounds like storm-hdfs can only specify the directory and file name at the very first stage. We could not route different log lines from different log sources to different HDFS files.

by the way, we rewrote a whole framework like what you offered to by-pass this problem but ran into a performance issue when frequently appending and closing HDFS file, which made us give up.

Is there any plan that storm-hdfs is able to support this scenario in the future? Thanks!

huxihx avatar Mar 19 '15 03:03 huxihx

It will also take care of our use cases where we want to partition the data in HDFS based on the time when event was generated. The events generation and their entry into data ingestion pipeline can get delayed by several hours and in some cases few days. Flume already has this feature. Lack of this feature is blocking us not to use storm for the time being.

rs-01 avatar Jun 03 '15 16:06 rs-01

hello,Thanks for your contiribution. I meet a mistake when running SeuqenceFileTopology: the cosole print: java.io.InterruptedIOException: Interrupted while waiting for data to be acknowledged by pipeline at org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2151) ~[hadoop-hdfs-2.6.0.jar:na] at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:2038) ~[hadoop-hdfs-2.6.0.jar:na] at org.apache.hadoop.hdfs.DFSOutputStream.hsync(DFSOutputStream.java:1968) ~[hadoop-hdfs-2.6.0.jar:na] at org.apache.hadoop.hdfs.DFSOutputStream.hsync(DFSOutputStream.java:1951) ~[hadoop-hdfs-2.6.0.jar:na] at org.apache.hadoop.fs.FSDataOutputStream.hsync(FSDataOutputStream.java:139) ~[hadoop-common-2.6.0.jar:na] at org.apache.hadoop.io.SequenceFile$Writer.hsync(SequenceFile.java:1250) ~[hadoop-common-2.6.0.jar:na] at org.apache.storm.hdfs.bolt.SequenceFileBolt.execute(SequenceFileBolt.java:114) ~[classes/:na] at backtype.storm.daemon.executor$eval5170$fn__5171$tuple_action_fn__5173.invoke(executor.clj:630) [na:0.9.1-incubating] at backtype.storm.daemon.executor$mk_task_receiver$fn__5091.invoke(executor.clj:398) [na:0.9.1-incubating] at backtype.storm.disruptor$clojure_handler$reify__1894.onEvent(disruptor.clj:58) [na:0.9.1-incubating] at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:104) [storm-core-0.9.1-incubating.jar:0.9.1-incubating] at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:78) [storm-core-0.9.1-incubating.jar:0.9.1-incubating] at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:77) [na:0.9.1-incubating] at backtype.storm.daemon.executor$eval5170$fn__5171$fn__5183$fn__5230.invoke(executor.clj:745) [na:0.9.1-incubating] at backtype.storm.util$async_loop$fn__390.invoke(util.clj:433) [na:0.9.1-incubating] at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]

Could you help me find the fault?

250690392 avatar May 10 '18 08:05 250690392

hello,Thanks for your contiribution. I meet a mistake when running SeuqenceFileTopology: the cosole print: java.io.InterruptedIOException: Interrupted while waiting for data to be acknowledged by pipeline at org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2151) ~[hadoop-hdfs-2.6.0.jar:na] at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:2038) ~[hadoop-hdfs-2.6.0.jar:na] at org.apache.hadoop.hdfs.DFSOutputStream.hsync(DFSOutputStream.java:1968) ~[hadoop-hdfs-2.6.0.jar:na] at org.apache.hadoop.hdfs.DFSOutputStream.hsync(DFSOutputStream.java:1951) ~[hadoop-hdfs-2.6.0.jar:na] at org.apache.hadoop.fs.FSDataOutputStream.hsync(FSDataOutputStream.java:139) ~[hadoop-common-2.6.0.jar:na] at org.apache.hadoop.io.SequenceFile$Writer.hsync(SequenceFile.java:1250) ~[hadoop-common-2.6.0.jar:na] at org.apache.storm.hdfs.bolt.SequenceFileBolt.execute(SequenceFileBolt.java:114) ~[classes/:na] at backtype.storm.daemon.executor$eval5170$fn__5171$tuple_action_fn__5173.invoke(executor.clj:630) [na:0.9.1-incubating] at backtype.storm.daemon.executor$mk_task_receiver$fn__5091.invoke(executor.clj:398) [na:0.9.1-incubating] at backtype.storm.disruptor$clojure_handler$reify__1894.onEvent(disruptor.clj:58) [na:0.9.1-incubating] at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:104) [storm-core-0.9.1-incubating.jar:0.9.1-incubating] at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:78) [storm-core-0.9.1-incubating.jar:0.9.1-incubating] at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:77) [na:0.9.1-incubating] at backtype.storm.daemon.executor$eval5170$fn__5171$fn__5183$fn__5230.invoke(executor.clj:745) [na:0.9.1-incubating] at backtype.storm.util$async_loop$fn__390.invoke(util.clj:433) [na:0.9.1-incubating] at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]

Could you help me find the fault?

250690392 avatar May 10 '18 08:05 250690392