CoolplaySpark icon indicating copy to clipboard operation
CoolplaySpark copied to clipboard

[SS]《3.1 Structured Streaming 之状态存储解析》讨论区

Open lw-lin opened this issue 8 years ago • 8 comments
trafficstars

如需要贴代码,请复制以下内容并修改:

public static final thisIsJavaCode;
val thisIsScalaCode

谢谢!

lw-lin avatar Jan 01 '17 07:01 lw-lin

@lw-lin 如果计算count distinct这种算uv的场景statestore方式不能做吧?

junhero avatar Feb 16 '17 07:02 junhero

@junhero

这个跟数据集大小有关。如果数据集非常小,如 user id 的空间很小,那么 statestore 是没有问题的。如果 user id 的空间很大,但每天的 distinct user id 很小,那么 statestore 也是没有问题的。但如果 user id 空间很大,每天的 distinct user id 又很多,那 statestore 就有问题了。可以考虑其它方法如 hyperloglog 等。

lw-lin avatar Feb 19 '17 06:02 lw-lin

谢谢

junhero avatar Feb 20 '17 10:02 junhero

您好,我想请教一下stateStore里具体存储的是什么内容?我看到在statefulOperators里的一些对state的put操作如下:

val thisIsScalaCode
val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
...
while (iter.hasNext) {
                val row = iter.next().asInstanceOf[UnsafeRow]
                val key = getKey(row)
                store.put(key, row)
                numUpdatedStateRows += 1
              }

KevinZwx avatar Aug 30 '17 02:08 KevinZwx

@KevinZwx 是 UnsafeRow;key 和 value 都是 UnsafeRow。UnsafeRow 在 SparkSQL 模块里相当于 Object 在 Java 里的作用。UnsafeRow 里包含各种类型(数值、字符串等)的具体数据。

lw-lin avatar Aug 31 '17 03:08 lw-lin

好的谢谢

KevinZwx avatar Aug 31 '17 07:08 KevinZwx

您好,我想请教下,是不是每次批次的数据在做状态更新的时候都要去hdfs拉一遍对应的stateStore,然后更新完之后再放回hdfs。

LinMingQiang avatar Aug 12 '19 06:08 LinMingQiang

请问一个可能不算是state的问题。在structured streaming中,两个流之间Join, 但是两个流join的时间范围比较大,比如几个小时。那这部分缓存数据,如果内存存不下,会溢写到磁盘吗?

lecssmi avatar Mar 10 '20 07:03 lecssmi