alpakka
alpakka copied to clipboard
Use of GzipCodec get a null Compressor in CompressedDataWriter
Versions used
Akka version: 2.6.14 Alpakka version: 3.0.4
Expected Behavior
One should be able to use any compression codec from org.apache.hadoop.io.compress
in a HdfsFlow.
Actual Behavior
When providing a GzipCodec
to the flow, the CompressedDataWriter
get a null when retrieving the desired compressor.
Relevant logs
- should use file size rotation and the gzip codec and produce six files *** FAILED *** (23 milliseconds)
[info] java.lang.IllegalArgumentException: requirement failed: Compressor cannot be null
[info] at scala.Predef$.require(Predef.scala:281)
[info] at akka.stream.alpakka.hdfs.impl.writer.CompressedDataWriter.<init>(CompressedDataWriter.scala:32)
[info] at akka.stream.alpakka.hdfs.impl.writer.CompressedDataWriter$.apply(CompressedDataWriter.scala:73)
[info] at akka.stream.alpakka.hdfs.scaladsl.HdfsFlow$.compressedWithPassThrough(HdfsFlow.scala:112)
[info] at akka.stream.alpakka.hdfs.scaladsl.HdfsFlow$.compressed(HdfsFlow.scala:82)
[info] at docs.scaladsl.HdfsWriterSpec.$anonfun$new$46(HdfsWriterSpec.scala:351)
Reproducible Test Case
I am not sure how to proceed with a PR, so here is a failing test case that I wrote in a fresh checkout of the project:
"use file size rotation and the gzip codec and produce six files" in {
//#define-codec
val codec = new GzipCodec()
codec.setConf(fs.getConf)
//#define-codec
//#define-compress
val flow = HdfsFlow.compressed(
fs,
SyncStrategy.count(1),
RotationStrategy.size(0.1, FileUnit.MB),
codec,
settings
)
//#define-compress
val content = generateFakeContentWithPartitions(1, FileUnit.MB.byteCount, 30)
val resF = Source
.fromIterator(() => content.toIterator)
.map(HdfsWriteMessage(_))
.via(flow)
.runWith(Sink.seq)
val logs = Await.result(resF, Duration.Inf)
logs shouldBe Seq(
RotationMessage(output("0.tar.gz"), 0),
RotationMessage(output("1.tar.gz"), 1),
RotationMessage(output("2.tar.gz"), 2),
RotationMessage(output("3.tar.gz"), 3),
RotationMessage(output("4.tar.gz"), 4),
RotationMessage(output("5.tar.gz"), 5)
)
verifyOutputFileSize(fs, logs)
verifyLogsWithCodec(fs, content, logs, codec)
}
Thanks a lot for your time !
Thank you for reporting this problem.
The compressor lookup in CompressedDataWriter
uses CodecPool.getCompressor(compressionCodec, fs.getConf)
. I'm not on top of the HDFS APIs, but maybe you can investigate there why it doesn't get the correct compressor instance.
You need to load the native hadoop library for this to work.
You can do so by starting the program with the JVM option
-Djava.library.path=$HADOOP_HOME/lib/native
Mind that your HdfsWritingSettings
should not include .withNewLine(true)
and/or .withLineSeparator()
.
Otherwise this will produce invalid gzip files.