cloudflow icon indicating copy to clipboard operation
cloudflow copied to clipboard

Using ParquetAvroWriters.forSpecificRecord from the flink-parquet package

Open kozy opened this issue 4 years ago • 0 comments

Describe the bug

Using Flink, running ParquetAvroWriters.forSpecificRecord(classOf[MyGenericRecord]) results in silently never consuming from topic. I see no consumer group appearing for it.

To Reproduce

Create a project, separating out the libraries as described. Add dependency for Flink:

lazy val flink = (project in file("./flink"))
  .enablePlugins(CloudflowFlinkPlugin)
  .settings(
    commonSettings,
    libraryDependencies += "org.apache.flink" %% "flink-parquet" % "1.10.0"
  )
  .dependsOn(datamodel)
class ParquetSink extends FlinkStreamlet {
  @transient val in    = AvroInlet[MyGenericRecord]("in")
  @transient val shape = StreamletShape.withInlets(in)

  override def createLogic() = new FlinkStreamletLogic {
    override def buildExecutionGraph = {

//      val sink: StreamingFileSink[MyGenericRecord] = StreamingFileSink

//        //.forRowFormat(new Path("/tmp/parquet"), new SimpleStringEncoder[MyGenericRecord]("UTF-8")) // this works fine

//        .forBulkFormat( // this has issues
//          new Path("/tmp/parquet"),
//          ParquetAvroWriters.forSpecificRecord(classOf[MyGenericRecord]) 
//        )
//        .build()
      ParquetAvroWriters.forSpecificRecord(classOf[MyGenericRecord]) // just running this is enough to reproduce

      val record: DataStream[MyGenericRecord] = readStream(in)
//        record.addSink(sink)

      record.print()
    }

  }
}

Expected behavior Using addSink should sink to StreamingFileSink. Just commenting out the line ParquetAvroWriters.forSpecificRecord line prints records again.

Additional context

I think it still might have something to do with the loading of classes. I see no group.id and the class name is different in the log

The log without running that class method above, this works

[INFO] [23:29:08.143] **or.ap.ka.cl.co.**ConsumerConfig:279 ConsumerConfig values

...

And these are log entries from the one that silently fails:

[INFO] [00:02:04.809] **or.ap.ka.cl.pr.**ProducerConfig:279 ProducerConfig values:

Mind the difference in class names


I am a new Scala / Java user. Not sure how to tackle this.

Thank you!

kozy avatar Aug 01 '20 22:08 kozy