scio icon indicating copy to clipboard operation
scio copied to clipboard

Use hash join when writing sparkey

Open aslotnick opened this issue 1 week ago • 0 comments

When writing to sparkey, allShards represents every expected shard even if there is no corresponding data in shards for that shard number.

shards.rightOuterJoin(allShards) (added in https://github.com/spotify/scio/pull/5208) fails when a shard contains large amounts of data, leading to the error described in https://github.com/spotify/scio/issues/5300: java.lang.OutOfMemoryError: Required array length 2147483639 + 15534 is too large.

This PR replaces rightOuterJoin with hashFullOuterJoin (note that there is no hashRightOuterJoin implementation). A hash join is a good fit because the right-hand side contains very little data (only the keys of the shards) and it doesn't need to use an array to represent the large left-hand side's values. As a result, some failing workflows that succeeded in Scio 0.13.* will run successfully again.

aslotnick avatar Jun 24 '24 21:06 aslotnick