cloudflow
cloudflow copied to clipboard
Using ParquetAvroWriters.forSpecificRecord from the flink-parquet package
Describe the bug
Using Flink, running ParquetAvroWriters.forSpecificRecord(classOf[MyGenericRecord]) results in silently never consuming from topic. I see no consumer group appearing for it.
To Reproduce
Create a project, separating out the libraries as described. Add dependency for Flink:
lazy val flink = (project in file("./flink"))
.enablePlugins(CloudflowFlinkPlugin)
.settings(
commonSettings,
libraryDependencies += "org.apache.flink" %% "flink-parquet" % "1.10.0"
)
.dependsOn(datamodel)
class ParquetSink extends FlinkStreamlet {
@transient val in = AvroInlet[MyGenericRecord]("in")
@transient val shape = StreamletShape.withInlets(in)
override def createLogic() = new FlinkStreamletLogic {
override def buildExecutionGraph = {
// val sink: StreamingFileSink[MyGenericRecord] = StreamingFileSink
// //.forRowFormat(new Path("/tmp/parquet"), new SimpleStringEncoder[MyGenericRecord]("UTF-8")) // this works fine
// .forBulkFormat( // this has issues
// new Path("/tmp/parquet"),
// ParquetAvroWriters.forSpecificRecord(classOf[MyGenericRecord])
// )
// .build()
ParquetAvroWriters.forSpecificRecord(classOf[MyGenericRecord]) // just running this is enough to reproduce
val record: DataStream[MyGenericRecord] = readStream(in)
// record.addSink(sink)
record.print()
}
}
}
Expected behavior Using addSink should sink to StreamingFileSink. Just commenting out the line ParquetAvroWriters.forSpecificRecord line prints records again.
Additional context
I think it still might have something to do with the loading of classes. I see no group.id and the class name is different in the log
The log without running that class method above, this works
[INFO] [23:29:08.143] **or.ap.ka.cl.co.**ConsumerConfig:279 ConsumerConfig values
...
And these are log entries from the one that silently fails:
[INFO] [00:02:04.809] **or.ap.ka.cl.pr.**ProducerConfig:279 ProducerConfig values:
Mind the difference in class names
I am a new Scala / Java user. Not sure how to tackle this.
Thank you!