alpakka icon indicating copy to clipboard operation
alpakka copied to clipboard

Use of GzipCodec get a null Compressor in CompressedDataWriter

Open SnipyJulmy opened this issue 2 years ago • 2 comments

Versions used

Akka version: 2.6.14 Alpakka version: 3.0.4

Expected Behavior

One should be able to use any compression codec from org.apache.hadoop.io.compress in a HdfsFlow.

Actual Behavior

When providing a GzipCodec to the flow, the CompressedDataWriter get a null when retrieving the desired compressor.

Relevant logs

 - should use file size rotation and the gzip codec and produce six files *** FAILED *** (23 milliseconds)
[info]   java.lang.IllegalArgumentException: requirement failed: Compressor cannot be null
[info]   at scala.Predef$.require(Predef.scala:281)
[info]   at akka.stream.alpakka.hdfs.impl.writer.CompressedDataWriter.<init>(CompressedDataWriter.scala:32)
[info]   at akka.stream.alpakka.hdfs.impl.writer.CompressedDataWriter$.apply(CompressedDataWriter.scala:73)
[info]   at akka.stream.alpakka.hdfs.scaladsl.HdfsFlow$.compressedWithPassThrough(HdfsFlow.scala:112)
[info]   at akka.stream.alpakka.hdfs.scaladsl.HdfsFlow$.compressed(HdfsFlow.scala:82)
[info]   at docs.scaladsl.HdfsWriterSpec.$anonfun$new$46(HdfsWriterSpec.scala:351)

Reproducible Test Case

I am not sure how to proceed with a PR, so here is a failing test case that I wrote in a fresh checkout of the project:

    "use file size rotation and the gzip codec and produce six files" in {

      //#define-codec
      val codec = new GzipCodec()
      codec.setConf(fs.getConf)
      //#define-codec

      //#define-compress
      val flow = HdfsFlow.compressed(
        fs,
        SyncStrategy.count(1),
        RotationStrategy.size(0.1, FileUnit.MB),
        codec,
        settings
      )
      //#define-compress
      val content = generateFakeContentWithPartitions(1, FileUnit.MB.byteCount, 30)

      val resF = Source
        .fromIterator(() => content.toIterator)
        .map(HdfsWriteMessage(_))
        .via(flow)
        .runWith(Sink.seq)

      val logs = Await.result(resF, Duration.Inf)
      logs shouldBe Seq(
        RotationMessage(output("0.tar.gz"), 0),
        RotationMessage(output("1.tar.gz"), 1),
        RotationMessage(output("2.tar.gz"), 2),
        RotationMessage(output("3.tar.gz"), 3),
        RotationMessage(output("4.tar.gz"), 4),
        RotationMessage(output("5.tar.gz"), 5)
      )

      verifyOutputFileSize(fs, logs)
      verifyLogsWithCodec(fs, content, logs, codec)
    }

Thanks a lot for your time !

SnipyJulmy avatar Dec 08 '21 12:12 SnipyJulmy

Thank you for reporting this problem.

The compressor lookup in CompressedDataWriter uses CodecPool.getCompressor(compressionCodec, fs.getConf). I'm not on top of the HDFS APIs, but maybe you can investigate there why it doesn't get the correct compressor instance.

ennru avatar Jan 14 '22 14:01 ennru

You need to load the native hadoop library for this to work. You can do so by starting the program with the JVM option -Djava.library.path=$HADOOP_HOME/lib/native

Mind that your HdfsWritingSettings should not include .withNewLine(true) and/or .withLineSeparator(). Otherwise this will produce invalid gzip files.

koen-dejonghe avatar Jul 30 '22 10:07 koen-dejonghe