redigo icon indicating copy to clipboard operation
redigo copied to clipboard

Redis streams

Open samwhitecoull opened this issue 5 years ago • 6 comments

I am using Redigo in a project that relies on reading messages from streams. I know this is a newer feature of Redis, but is this something that Redigo plans to support natively in the future? I have cobbled together a stream reader by traversing through the nested interfaces returned from the Do() call and using type assertion (based on trial and improvement and knowledge of whats being sent) to pull useful results out. However this is extremely brittle and only works because I can guarantee whats being appended to the streams. If a code update is not required it would be great to see some documentation showing an idiomatic way of reading streams.

samwhitecoull avatar Nov 20 '18 09:11 samwhitecoull

r, err := redis.Values(conn.Do("XREAD", "STREAMS", "stream1", "stream2", "0-0", "0-0"))

for kIndex :=0; kIndex < len(r); kIndex++ {
	var keyInfo = r[kIndex].([]interface{})

	var key = string(keyInfo[0].([]byte))
	var idList = keyInfo[1].([]interface{})

	for idIndex :=0; idIndex <len(idList); idIndex++ {
		var idInfo = idList[idIndex].([]interface{})

		var id = string(idInfo[0].([]byte))

		var fieldList = idInfo[1].([]interface{})
		var field = string(fieldList[0].([]byte))
		var value = string(fieldList[1].([]byte))

		fmt.Println(key, id, field, value)
	}
}

smartwalle avatar Dec 17 '18 08:12 smartwalle

apologies for the radio silence, life got busy! I'm happy to tackle this feature if the main man wants it implemented as part of the API - @garyburd, otherwise I'll close this and carry on using the wrapper I mentioned.

samisagit avatar Feb 22 '19 08:02 samisagit

I should have an ability to deserialize XREAD response to a struct{} Currently I don't know how I can do it

nskforward avatar Nov 23 '19 12:11 nskforward

any one merge this? this is helpful :)

Explosivv avatar Jun 16 '21 02:06 Explosivv

This is a issue not a PR, are your referring to #557 ?

stevenh avatar Jun 21 '21 10:06 stevenh

I hope a solution for this problem will be found soon.

But the solution has to consider, that a stream entry is not a map. It can contain the same field multiple times. For example:

127.0.0.1:6379> xadd test * field value1 field value2
"1667730787485-0"
127.0.0.1:6379> xread streams test 0
1) 1) "test"
   2) 1) 1) "1667730787485-0"
         2) 1) "field"
            2) "value1"
            3) "field"
            4) "value2"

The PR #557 parses the fields in a map. So it would return

map[string]string{"field":"value"}

I think it would be nice, if the the user would be able to parse the fields as he wants.

For example:

func parseStream(reply any, f func(k, v []byte)) (string, error) {
	valueList, err := redis.Values(reply, nil)
	if err != nil {
		return "", err
	}

	var lastID string
	for i, value := range valueList {
		idFields, ok := value.([]any)
		if !ok || len(idFields) != 2 {
			return "", fmt.Errorf("invalid stream value %d, got %v", i, value)
		}

		id, err := redis.String(idFields[0], nil)
		if err != nil {
			return "", fmt.Errorf("parsing id from entry %d: %w", i, err)
		}

		lastID = id

		fieldList, ok := idFields[1].([]any)
		if !ok || len(fieldList)%2 != 0 {
			return "", fmt.Errorf("invalid field list value %d, got %v", i, idFields[i])
		}

		for fi := 0; fi < len(fieldList); fi += 2 {
			key, ok := toByte(fieldList[fi])
			if !ok {
				return "", fmt.Errorf("field %d in entry %d is not a bulk string value, got %T", fi, i, fieldList[fi])
			}

			value, ok := toByte(fieldList[fi+1])
			if !ok {
				return "", fmt.Errorf("value %d in entry %d is not a bulk string value, got %T", fi+1, i, fieldList[fi])
			}

			f(key, value)
		}
	}
	return lastID, nil
}

// user function1 that builds a map
data := make(map[string]string)
lastID, err :=parseStream(reply, func(k,v []byte) {data[string(k)]=string(v)})

// user function2 that only wants the values
var data []string
lastID, err :=parseStream(reply, func(_,v []byte) {data=append(data,string(v)})

ostcar avatar Nov 06 '22 10:11 ostcar