storm-hdfs icon indicating copy to clipboard operation
storm-hdfs copied to clipboard

Enable the Hdfs Compression type in Hdfs Bolt

Open SrinivasanBigData opened this issue 9 years ago • 0 comments

My Requirement is need to compress Hdfs loaded data. when tried to implement the code by extending the HdfsBolt. it was compressed data but when i tried to decompress the data. i'm getting the this error java.lang.IllegalStateException: java.io.IOException: invalid block type. i think i'm doing mistake while compress the data. could you please help me to resolve this issue. my code for your review.

package com.eltyx.singlePart;

import java.io.IOException; import java.util.EnumSet; import java.util.Map;

import org.apache.commons.lang.reflect.FieldUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.storm.hdfs.bolt.HdfsBolt; import org.apache.storm.hdfs.bolt.format.RecordFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory;

import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.tuple.Tuple;

public class HdfsCompressionBolt extends HdfsBolt {

/**
 * 
 */
private static final long serialVersionUID = 7655170056529495962L;
private static final Logger LOG = LoggerFactory
        .getLogger(HdfsCompressionBolt.class);
private CompressionCodec codec;
private String compression_class;
private transient FSDataOutputStream __out;
private transient CompressionOutputStream output;
private long offset = 0;

public HdfsCompressionBolt withCompressionClass(
        Class<? extends CompressionCodec> classz) {
    this.compression_class = classz.getName();
    return this;
}

@SuppressWarnings("rawtypes")
@Override
public void doPrepare(Map conf, TopologyContext topologyContext,
        OutputCollector collector) throws IOException {
    super.doPrepare(conf, topologyContext, collector);
    Configuration confs = new Configuration();
    CompressionCodecFactory factory = new CompressionCodecFactory(confs);
    this.codec = factory.getCodecByClassName(this.compression_class);
}

@Override
public void execute(Tuple tuple) {
    try {
        RecordFormat __format = (RecordFormat) FieldUtils.readField(this,
                "format", true);
        __out = (FSDataOutputStream) FieldUtils
                .readField(this, "out", true);
        byte[] bytes = __format.format(tuple);
        synchronized (this.writeLock) {
            output = codec.createOutputStream(__out,
                    codec.createCompressor());
            output.write(bytes);
            this.offset += bytes.length;

            if (this.syncPolicy.mark(tuple, this.offset)) {
                if (this.__out instanceof HdfsDataOutputStream) {
                    ((HdfsDataOutputStream) this.__out).hsync(EnumSet
                            .of(SyncFlag.UPDATE_LENGTH));
                    this.output.flush();
                } else {
                    this.__out.hsync();
                }
                this.syncPolicy.reset();
            }
        }

        this.collector.ack(tuple);

        if (this.rotationPolicy.mark(tuple, this.offset)) {
            // this.output.close();
            rotateOutputFile(); // synchronized
            this.offset = 0;
            this.rotationPolicy.reset();
        }
    } catch (IOException e) {
        LOG.warn("write/sync failed.", e);
        this.collector.fail(tuple);
    } catch (IllegalAccessException e) {
        e.printStackTrace();
    }
}

}

SrinivasanBigData avatar Sep 24 '15 12:09 SrinivasanBigData