scio
scio copied to clipboard
Use hash join when writing sparkey
When writing to sparkey, allShards
represents every expected shard even if there is no corresponding data in shards
for that shard number.
shards.rightOuterJoin(allShards)
(added in https://github.com/spotify/scio/pull/5208) fails when a shard contains large amounts of data, leading to the error described in https://github.com/spotify/scio/issues/5300: java.lang.OutOfMemoryError: Required array length 2147483639 + 15534 is too large
.
This PR replaces rightOuterJoin
with hashFullOuterJoin
(note that there is no hashRightOuterJoin
implementation). A hash join is a good fit because the right-hand side contains very little data (only the keys of the shards) and it doesn't need to use an array to represent the large left-hand side's values. As a result, some failing workflows that succeeded in Scio 0.13.* will run successfully again.