[WIP] Prepare for Spark 4
This PR provides support for Spark 4.0.0 (currently preview2) along with major updates to Jedis (from 3.9 -> 5x) as well as migration to Scala 2.13 and Java 17.
Open Questions
The logic in the getClusterNodes changed to reflect the new Jedis conn.clusterShards() command. While the Unit Tests all pass I am unsure of the correct indexing for idx. The comment specified that it was for non-master nodes, and the node.getRole() returns master for all nodes which makes sense given everything is running on 127.0.0.1.
With that said, I could add an AtomicInteger to ensure the idx field has monotonically increasing values but I wanted to understand more about how they are used before moving forwards (hence the WIP status).
private def getClusterNodes(initialHost: RedisEndpoint): Array[RedisNode] = {
val conn = initialHost.connect()
val res = conn.clusterShards().asScala.flatMap {
shardInfoObj: ClusterShardInfo => {
val slotInfo = shardInfoObj.getSlots
// todo: Can we have more than 1 node per ClusterShard?
val nodeInfo = shardInfoObj.getNodes.get(0)
/*
* We will get all the nodes with the slots range [sPos, ePos],
* and create RedisNode for each nodes, the total field of all
* RedisNode are the number of the nodes whose slots range is
* as above, and the idx field is just an index for each node
* which will be used for adding support for slaves and so on.
* And the idx of a master is always 0, we rely on this fact to
* filter master.
*/
(0 until (slotInfo.size)).map(i => {
val host = SafeEncoder.encode(nodeInfo.getIp.getBytes(Charset.forName("UTF8")))
val port = nodeInfo.getPort.toInt
val slotStart = slotInfo.get(i).get(0).toInt
val slotEnd = slotInfo.get(i).get(1).toInt
val endpoint = RedisEndpoint(
host = host,
port = port,
user = initialHost.user,
auth = initialHost.auth,
dbNum = initialHost.dbNum,
timeout = initialHost.timeout,
ssl = initialHost.ssl)
val role = nodeInfo.getRole
val idx = if (role == "master") 0 else i
RedisNode(endpoint, slotStart, slotEnd, idx, slotInfo.size)
})
}
}.toArray
conn.close()
res
}
Thanks in advance.
The other thing to mention is that in Spark 4, the older DStream Receivers are deprecated, so I removed all traces of non Structured Streaming.
This biggest change here is RedisNode index (idx) for the cluster getNodes function. Here is the object for clarity.
This is one item in the
List[ClusterShardInfo]that is read in theflatMapoperation provided above.
Let's go! 👏
Not sure if anyone is looking at these but I'm happy to make additional modifications :)