CoolplaySpark icon indicating copy to clipboard operation
CoolplaySpark copied to clipboard

[SS]《2.2 Structured Streaming 之 Sink 解析》讨论区

Open lw-lin opened this issue 7 years ago • 3 comments

如需要贴代码,请复制以下内容并修改:

public static final thisIsJavaCode;
val thisIsScalaCode

谢谢!

lw-lin avatar Jan 01 '17 07:01 lw-lin

当使用structured streaming的foreach sink时,如果希望对partition内的value进行批量输出时,有什么好的办法么。

val query = words.writeStream.foreach(new ForeachWriter[String] {

      val arr = scala.collection.mutable.ArrayBuffer[String]()
      var conn: Connection = _
      override def open(partitionId: Long, version: Long): Boolean = {
        conn = DBUtil.getConn
        true
      }

      override def close(errorOrNull: Throwable) = {
        DBUtil.insert(conn,arr.toArray)
        conn.close()
      }

      override def process(value: String) = {
        arr += value
      }

    }).start()

目前采用的是在 process 中把数据添加到集合,再在close方法内进行批量写入。有其他更为优雅的写法么,谢谢

ShiyuChai avatar Sep 11 '17 06:09 ShiyuChai

@Dreamtecher

非常好的问题 —— 但目前的版本(as of 2017.09, Spark 2.2.0)我没有想到更好的写法。

lw-lin avatar Sep 12 '17 13:09 lw-lin

@lw-lin 好的,谢谢您的回复~

ShiyuChai avatar Sep 12 '17 14:09 ShiyuChai