scala-redis
scala-redis copied to clipboard
Protocol error
19:03:34.068 [stats-system-akka.actor.default-dispatcher-9] ERROR akka.actor.OneForOneStrategy - Protocol error: Got (1,[B@52c7e2d4) as initial reply byte java.lang.Exception: Protocol error: Got (1,[B@52c7e2d4) as initial reply byte at com.redis.Reply$$anonfun$6.applyOrElse(RedisProtocol.scala:94) ~[redisclient_2.10-2.13.jar:2.13] at com.redis.Reply$$anonfun$6.applyOrElse(RedisProtocol.scala:92) ~[redisclient_2.10-2.13.jar:2.13] at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) ~[scala-library-2.10.4.jar:na] at com.redis.Reply$$anonfun$5.applyOrElse(RedisProtocol.scala:74) ~[redisclient_2.10-2.13.jar:2.13] at com.redis.Reply$$anonfun$5.applyOrElse(RedisProtocol.scala:74) ~[redisclient_2.10-2.13.jar:2.13] at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) ~[scala-library-2.10.4.jar:na] at com.redis.Reply$class.receive(RedisProtocol.scala:113) ~[redisclient_2.10-2.13.jar:2.13] at com.redis.RedisClient.receive(RedisClient.scala:91) ~[redisclient_2.10-2.13.jar:2.13] at com.redis.R$class.asListPairs(RedisProtocol.scala:143) ~[redisclient_2.10-2.13.jar:2.13] at com.redis.RedisClient.asListPairs(RedisClient.scala:91) ~[redisclient_2.10-2.13.jar:2.13] at com.redis.SortedSetOperations$$anonfun$zrangeWithScore$1.apply(SortedSetOperations.scala:41) ~[redisclient_2.10-2.13.jar:2.13] at com.redis.SortedSetOperations$$anonfun$zrangeWithScore$1.apply(SortedSetOperations.scala:41) ~[redisclient_2.10-2.13.jar:2.13] at com.redis.Redis$class.send(RedisClient.scala:21) ~[redisclient_2.10-2.13.jar:2.13] at com.redis.RedisClient.send(RedisClient.scala:91) ~[redisclient_2.10-2.13.jar:2.13] at com.redis.SortedSetOperations$class.zrangeWithScore(SortedSetOperations.scala:41) ~[redisclient_2.10-2.13.jar:2.13] at com.redis.RedisClient.zrangeWithScore(RedisClient.scala:91) ~[redisclient_2.10-2.13.jar:2.13]
Please provide a failing test which I can use to replicate the error.
Thanks.
I have the same error on my server. Repeated periodically, randomly. I am use scala-redis with Scala 2.10 and akka (redis-client invoke in actor)
I had similar problem, caused by caching RedisClient in my singleton object, and reusing in methods, after I switched to creating new RedisClient each time, it disappeared.
As a matter of facts, I can confirm that using @mavarazy 's approach fixed my problem on a project that uses "net.debasishg" % "redisclient_2.11" % "2.13".
@AL333Z I faced some concurrency issues with previous approach recently, and switched to RedisConnectionPool with the latest release 2.14. So my advice is to use a RedisConnectionPool instead of creating a RedisClient each time.
The same problem here but when I create a new instance of client each time before call the problem is gone (as @mavarazy noticed)
I recently ran into this issue and the cause was misusing a lower level api.
Original (Not sure why we didn't use the built in ping before maybe we used an older version that did not have it?)
rcp.withClient { client =>
client.send[String]("PING")("PONG") match {
case "PONG" => Result.healthy()
case _ => Result.unhealthy("No connection to Redis")
}
}
Fixed
rcp.withClient { client =>
val result = client.ping
result match {
case Some("PONG") => Result.healthy()
case _ => Result.unhealthy("No connection to Redis")
}
}
I feel like the send API should be private and hidden from the API consumers. It took quite a while to figure out that the signature def send[A](command: String)(result: => A): A = result must consume the input stream from the response or the connection will be returned to the pool in a bad state. The correct send signature would have been send("PING")(asString)
Alternatively you can change the code to make sure the input stream is flushed when returning a connection to the pool. This is the approach a lot of database connection pools use. They ensure connections returned to the pool are in some base state.
+1. I ran into this reusing a connection as well. v3.1
+1 on 3.0
+1 on 3.0
+1 on 3.0
+1 on 3.3
+1 on 3.4
+1 on 3.4 The error occurs randomly. Having this piece of code
private[this] val redisClient = new RedisClient()
override def findProfile[M[_] : Monad](uuid: String): M[Option[Profile]] = monadic {
println(redisClient.ping)
redisClient.get(uuidKey(uuid))
}
in case of error println prints:
Some(G
+ONG)
and in normal case it prints:
Some(PONG)
Looks like there is race condition somewhere and request/result strings are mixed with each other. Unfortunately, lib is useless with this error.
+1 on 3.4
+1 on 3.4. This occurs randomly and isn't reproducible
Thanks for reporting. I will take a look though I am struggling for free cycles with all the other stuff going around. Just a few questions ..
- Is there a piece of code that can possibly give this error ? I know it may be a race condition and is not reproducible. But a piece that can possibly give this error will help in testing.
- Has anyone found a workaround for this ? Any suggestions, any best practice that help avoid this error ?
Any other suggestions ?
@debasishg You can take a look at my piece of code - with this code the error occurs often and reproducible. I found workaround but it's not production ready. If you create new client for every request the error is gone. Artifact is "net.debasishg" %% "redisclient" % "3.4" Scala version is 2.12.2 Redis server is 3.2.9
UPD - in the above code you can skip typeclasses and monadic. It's not relevant to error
Thanks @ezhulkov - I will take a look.
I haven't looked at this in a very long time but from what I can remember the issue is related to returning a dirty buffer back to the pool. In the provided example println(redisClient.ping) it calls ping but I believe the bytes are never read when the connection gets returned to the pool. The next connection then starts writing to it again and you end up in a weird state.
Sounds reasonable. In my case this error happens even without ping. Might be result of the set command (ok) mixed with get command
Right that was just an example I think anything that uses the send command can leave the connection in a dirty state if it's not consumed. Just my best guess.
I tried with the following test snippet ..
package com.redis
import org.scalatest.FunSpec
import org.scalatest.BeforeAndAfterEach
import org.scalatest.BeforeAndAfterAll
import org.scalatest.Matchers
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
@RunWith(classOf[JUnitRunner])
class FooSpec extends FunSpec
with Matchers
with BeforeAndAfterEach
with BeforeAndAfterAll {
val r = new RedisClient("localhost", 6379)
override def beforeEach = {
}
override def afterEach = {
r.flushdb
}
override def afterAll = {
r.disconnect
}
describe("keys") {
it("should fetch keys") {
r.set("key-1", "value-1")
r.set("key-2", "value-2")
(1 to 10000).foreach {_ =>
println(r.ping)
r.get("key-2")
r.get("key-1")
}
}
}
}
Also tried putting the set within the loop. Could not replicate the error once. Am I missing something ? Can you please confirm if the error occurs if only redis is used and no other library like Akka is present in the mix ? I tried with Redis 3.2.9 on my Mac Air running Scala 2.12.1.
Need some more help trying to replicate the error.
I believe it is related to the lazy evaluation of the send api. When you call toString of ping it will flush the buffer. If you just call r.ping and do nothing with it it can leave the connection in a weird state. I also think it might be related to multi threading so maybe try a client with a very small pool size and hit it with multiple threads trying to force a connection to a bad state.
I tried with just the r.ping without the println because I also was thinking of the flush situation. Could not replicate the problem.
For multithreading the recommended way is to use the pool - you cannot use a single RedisClient in a multithreaded environment. I will test the pool (as u suggested), but I thought some people are getting the exception even on single RedisClients. Hence I was thinking if it could be a side-effect of some other library being used together.
When using the pool, if a connection gets into a bad state (somehow, maybe the thread in which the connection was being used dies) and gets returned to the pool in the bad state, we can do a reconnect when we return the connection to the pool (https://github.com/debasishg/scala-redis/blob/master/src/main/scala/com/redis/Pool.scala#L47). The reconnect will initialize the connection. WDYAT ?
I have not yet been able to replicate the problem myself. Just some thoughts based on the discussion above. But can I assume that the issue occurs only with pool and NOT with using a single client ? And just to reiterate a single RedisClient cannot be used in a multithreaded mode. You need to use a pool for that.
@debasishg Yes, I am running into this issue while using a pool. Is there any established way to fix this?
@debasishg it's really easy to reproduce with such simple test snippet
import com.redis.RedisClient
object RedisClientTest extends App {
val r = new RedisClient("localhost", 6379)
println(r.get("vvl:qm"))
r.subscribe("vvl.qm") { m =>
println(m)
}
Thread.sleep(3000)
r.unsubscribe("vvl.qm")
Thread.sleep(3000)
println(r.get("vvl:qm"))
r.subscribe("vvl.qm") { m =>
println(m)
}
Thread.sleep(3000)
r.unsubscribe("vvl.qm")
Thread.sleep(3000)
r.get("vvl:qm")
}
Here is the console output:
Get 1 result Some(test)
Subscribe 1 result S(vvl.qm,1)
Unsubscribe 1 was finished
Subscribe 1 result U(vvl.qm,0)
Get 2 result Some(test)
Unsubscribe 2 was finished
Exception in thread "main" java.lang.Exception: Protocol error: Got (*,[B@255b53dc) as initial reply byte
at com.redis.Reply$$anonfun$errReply$1.applyOrElse(RedisProtocol.scala:120)
at com.redis.Reply$$anonfun$errReply$1.applyOrElse(RedisProtocol.scala:118)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:34)
at com.redis.Reply$$anonfun$bulkReply$1.applyOrElse(RedisProtocol.scala:78)
at com.redis.Reply$$anonfun$bulkReply$1.applyOrElse(RedisProtocol.scala:78)
at scala.PartialFunction$OrElse.apply(PartialFunction.scala:168)
at com.redis.Reply.receive(RedisProtocol.scala:139)
at com.redis.Reply.receive$(RedisProtocol.scala:135)
at com.redis.RedisClient.receive(RedisClient.scala:103)
at com.redis.R.asBulk(RedisProtocol.scala:242)
at com.redis.R.asBulk$(RedisProtocol.scala:242)
at com.redis.RedisClient.asBulk(RedisClient.scala:103)
at com.redis.StringOperations.$anonfun$get$1(StringOperations.scala:36)
at com.redis.Redis.send(RedisClient.scala:30)
at com.redis.Redis.send$(RedisClient.scala:28)
at com.redis.RedisClient.send(RedisClient.scala:103)
at com.redis.StringOperations.get(StringOperations.scala:36)
at com.redis.StringOperations.get$(StringOperations.scala:35)
at com.redis.RedisClient.get(RedisClient.scala:103)
at redis.test.backend.service.RedisClientTest$.delayedEndpoint$redis$test$backend$service$RedisClientTest$1(RedisTest.scala:36)
at redis.test.backend.service.RedisClientTest$delayedInit$body.apply(RedisTest.scala:6)
at scala.Function0.apply$mcV$sp(Function0.scala:34)
at scala.Function0.apply$mcV$sp$(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App.$anonfun$main$1$adapted(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:378)
at scala.App.main(App.scala:76)
at scala.App.main$(App.scala:74)
at redis.test.backend.service.RedisClientTest$.main(RedisTest.scala:6)
at redis.test.backend.service.RedisClientTest.main(RedisTest.scala)
Process finished with exit code 1
After second unsubscribe call RedisClient instance was spoiled and I couldn't do anything with it.
I will take a look over this weekend.
Maybe, it'll help you. If we do such trick
object RedisClientTest extends App {
val r = new RedisClient("localhost", 6379)
println(s"Get 1 result ${r.get("vvl:qm")}")
r.subscribe("vvl.qm") { m =>
println(s"Subscribe 1 result $m")
}
Thread.sleep(3000)
r.unsubscribe("vvl.qm")
println("Unsubscribe 1 was finished")
Thread.sleep(3000)
r.pubSub = false
println(s"Get 2 result ${r.get("vvl:qm")}")
r.subscribe("vvl.qm") { m =>
println(s"Subscribe 2 result $m")
}
Thread.sleep(3000)
r.unsubscribe("vvl.qm")
println("Unsubscribe 2 was finished")
Thread.sleep(3000)
r.pubSub = false
println(s"Get 3 result ${r.get("vvl:qm")}")
}
we won't receive any exception. But I think it'll bring us new problem with memory leaking.