storm-hdfs
storm-hdfs copied to clipboard
Enable the Hdfs Compression type in Hdfs Bolt
My Requirement is need to compress Hdfs loaded data. when tried to implement the code by extending the HdfsBolt. it was compressed data but when i tried to decompress the data. i'm getting the this error java.lang.IllegalStateException: java.io.IOException: invalid block type. i think i'm doing mistake while compress the data. could you please help me to resolve this issue. my code for your review.
package com.eltyx.singlePart;
import java.io.IOException; import java.util.EnumSet; import java.util.Map;
import org.apache.commons.lang.reflect.FieldUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.storm.hdfs.bolt.HdfsBolt; import org.apache.storm.hdfs.bolt.format.RecordFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.tuple.Tuple;
public class HdfsCompressionBolt extends HdfsBolt {
/**
*
*/
private static final long serialVersionUID = 7655170056529495962L;
private static final Logger LOG = LoggerFactory
.getLogger(HdfsCompressionBolt.class);
private CompressionCodec codec;
private String compression_class;
private transient FSDataOutputStream __out;
private transient CompressionOutputStream output;
private long offset = 0;
public HdfsCompressionBolt withCompressionClass(
Class<? extends CompressionCodec> classz) {
this.compression_class = classz.getName();
return this;
}
@SuppressWarnings("rawtypes")
@Override
public void doPrepare(Map conf, TopologyContext topologyContext,
OutputCollector collector) throws IOException {
super.doPrepare(conf, topologyContext, collector);
Configuration confs = new Configuration();
CompressionCodecFactory factory = new CompressionCodecFactory(confs);
this.codec = factory.getCodecByClassName(this.compression_class);
}
@Override
public void execute(Tuple tuple) {
try {
RecordFormat __format = (RecordFormat) FieldUtils.readField(this,
"format", true);
__out = (FSDataOutputStream) FieldUtils
.readField(this, "out", true);
byte[] bytes = __format.format(tuple);
synchronized (this.writeLock) {
output = codec.createOutputStream(__out,
codec.createCompressor());
output.write(bytes);
this.offset += bytes.length;
if (this.syncPolicy.mark(tuple, this.offset)) {
if (this.__out instanceof HdfsDataOutputStream) {
((HdfsDataOutputStream) this.__out).hsync(EnumSet
.of(SyncFlag.UPDATE_LENGTH));
this.output.flush();
} else {
this.__out.hsync();
}
this.syncPolicy.reset();
}
}
this.collector.ack(tuple);
if (this.rotationPolicy.mark(tuple, this.offset)) {
// this.output.close();
rotateOutputFile(); // synchronized
this.offset = 0;
this.rotationPolicy.reset();
}
} catch (IOException e) {
LOG.warn("write/sync failed.", e);
this.collector.fail(tuple);
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
}