redux
redux copied to clipboard
Add support for Redis 5.0.x X* commands
Omitted from #41 as the basic interface generated did not seem usable. This needs input from someone who is using these.
Note that these can be manually used using $command()
con$command(list("XINFO", "STREAM", "mystream"))
Hey @richfitz, first of all thanks for the excellent package. It has been a huge leg up. Second, we find ourselves using the X* functions via con$command
as you suggest. I don't have the bandwidth to be very involved, but in case it is helpful and/or for anyone else coming across this issue here are some code snippets that implement rough versions of XADD, XACK, and XREADGROUP. They all take a redis connection as their first argument. Let me know if there is any other context or info that would be helpful.
XREADGROUP <- function(r, group, consumer, streams, count=NULL, block=NULL, noack=FALSE){
args <- list("XREADGROUP", "GROUP", group, consumer)
if (!is.null(count)){
args <- c(args, c("COUNT", count))
}
if (!is.null(block)){
args <- c(args, c("BLOCK", block))
}
if (noack){
args <- c(args, "NOACK")
}
args <- c(args, "STREAMS")
args <- c(args, names(streams))
args <- c(args, streams)
redis_response <- r$command(args)
if (is.null(redis_response)){
# No new messages
return(redis_response)
}
# redis_response is a list of streams:
# each stream is a list with two items:
# 1. A Stream Name
# 2. A list of messages where each messages is a list with two items:
# 1. Message id
# 2. A list of key-value pairs that alternates key, value, key, value...
out <- list()
for (stream_section_raw in redis_response){
stream_section_out <- list()
stream_name <- stream_section_raw[[1]]
stream_section_out[["stream"]] <- stream_name
messages <- list()
for (i in 1:length(stream_section_raw[[2]])){
message_raw <- stream_section_raw[[2]][[i]]
# The payload is a list that alternates key, value, key, value...
keys <- message_raw[[2]][c(TRUE, FALSE)]
values <- message_raw[[2]][c(FALSE, TRUE)]
names(values) <- keys
message_out <- list(
id=message_raw[[1]],
payload=values
)
messages[[i]] <- message_out
}
stream_section_out[['messages']] <- messages
out[[stream_name]] <- stream_section_out
}
return(out)
}
XADD <- function(r, stream, values, id="*", maxlength=NULL){
args = list("XADD", stream)
if (!is.null(maxlength)){
args <- c(args, "MAXLEN", "~", maxlength)
}
args <- c(args, id)
for (field in names(values)){
args <- c(args, field, values[[field]])
}
return(r$command(args))
}
XACK <- function(r, stream, group, ids){
args = list("XACK", stream, group)
args <- c(args, ids)
return(r$command(args))
}
### Usage
# Create the consumer group "r-worker" on the stream "r-test"
r$command(list("XGROUP", "CREATE", "r-test", "r-worker", "$", "MKSTREAM"))
# Add messages to the "r-test" stream
x1 <- XADD(r, "r-test", list("value1"="apples", "value2"="something"))
x2 <- XADD(r, "r-test", list("value1"="bananas", "value2"="something else"))
# Read messages from the "r-test" stream from consumer group "r-worker" as consumer "consumer1"
y <- XREADGROUP(r, "r-worker", "consumer1", list("r-test"=">", "r-test2"=">"))
print(x[['r-test']]$messages[[1]]$payload$value1) # Prints 'apples'
print(x[['r-test']]$messages[[2]]$payload$value1) # Prints 'bananas'
# Acknowledge the messages
z <- XACK(r, "r-test", "r-worker", c(x1, x2))
@MaxTaggart thank you for the head start and examples. @richfitz if you still need help here I would be interested in contributing as I need to create Redis Stream interface for communication between a web application and R models.