spark-redis icon indicating copy to clipboard operation
spark-redis copied to clipboard

[WIP] Prepare for Spark 4

Open newfront opened this issue 1 year ago • 5 comments

This PR provides support for Spark 4.0.0 (currently preview2) along with major updates to Jedis (from 3.9 -> 5x) as well as migration to Scala 2.13 and Java 17.

Open Questions

The logic in the getClusterNodes changed to reflect the new Jedis conn.clusterShards() command. While the Unit Tests all pass I am unsure of the correct indexing for idx. The comment specified that it was for non-master nodes, and the node.getRole() returns master for all nodes which makes sense given everything is running on 127.0.0.1.

With that said, I could add an AtomicInteger to ensure the idx field has monotonically increasing values but I wanted to understand more about how they are used before moving forwards (hence the WIP status).

  private def getClusterNodes(initialHost: RedisEndpoint): Array[RedisNode] = {
    val conn = initialHost.connect()

    val res = conn.clusterShards().asScala.flatMap {
      shardInfoObj: ClusterShardInfo => {
        val slotInfo = shardInfoObj.getSlots

        // todo: Can we have more than 1 node per ClusterShard?
        val nodeInfo = shardInfoObj.getNodes.get(0)

        /*
         * We will get all the nodes with the slots range [sPos, ePos],
         * and create RedisNode for each nodes, the total field of all
         * RedisNode are the number of the nodes whose slots range is
         * as above, and the idx field is just an index for each node
         * which will be used for adding support for slaves and so on.
         * And the idx of a master is always 0, we rely on this fact to
         * filter master.
         */
        (0 until (slotInfo.size)).map(i => {
          val host = SafeEncoder.encode(nodeInfo.getIp.getBytes(Charset.forName("UTF8")))
          val port = nodeInfo.getPort.toInt
          val slotStart = slotInfo.get(i).get(0).toInt
          val slotEnd = slotInfo.get(i).get(1).toInt
          val endpoint = RedisEndpoint(
            host = host,
            port = port,
            user = initialHost.user,
            auth = initialHost.auth,
            dbNum = initialHost.dbNum,
            timeout = initialHost.timeout,
            ssl = initialHost.ssl)
          val role = nodeInfo.getRole
          val idx = if (role == "master") 0 else i
          RedisNode(endpoint, slotStart, slotEnd, idx, slotInfo.size)
        })
      }
    }.toArray
    conn.close()
    res
  }

newfront avatar Oct 11 '24 21:10 newfront

Thanks in advance.

newfront avatar Oct 11 '24 21:10 newfront

The other thing to mention is that in Spark 4, the older DStream Receivers are deprecated, so I removed all traces of non Structured Streaming.

newfront avatar Oct 11 '24 21:10 newfront

This biggest change here is RedisNode index (idx) for the cluster getNodes function. Here is the object for clarity.

redis_cluster-shard-info

This is one item in the List[ClusterShardInfo] that is read in the flatMap operation provided above.

zen-data avatar Oct 14 '24 18:10 zen-data

Let's go! 👏

ff137 avatar Oct 23 '24 20:10 ff137

Not sure if anyone is looking at these but I'm happy to make additional modifications :)

newfront avatar Jul 23 '25 22:07 newfront