scala-redis icon indicating copy to clipboard operation
scala-redis copied to clipboard

Protocol error

Open lbsha opened this issue 11 years ago • 44 comments
trafficstars

19:03:34.068 [stats-system-akka.actor.default-dispatcher-9] ERROR akka.actor.OneForOneStrategy - Protocol error: Got (1,[B@52c7e2d4) as initial reply byte java.lang.Exception: Protocol error: Got (1,[B@52c7e2d4) as initial reply byte at com.redis.Reply$$anonfun$6.applyOrElse(RedisProtocol.scala:94) ~[redisclient_2.10-2.13.jar:2.13] at com.redis.Reply$$anonfun$6.applyOrElse(RedisProtocol.scala:92) ~[redisclient_2.10-2.13.jar:2.13] at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) ~[scala-library-2.10.4.jar:na] at com.redis.Reply$$anonfun$5.applyOrElse(RedisProtocol.scala:74) ~[redisclient_2.10-2.13.jar:2.13] at com.redis.Reply$$anonfun$5.applyOrElse(RedisProtocol.scala:74) ~[redisclient_2.10-2.13.jar:2.13] at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) ~[scala-library-2.10.4.jar:na] at com.redis.Reply$class.receive(RedisProtocol.scala:113) ~[redisclient_2.10-2.13.jar:2.13] at com.redis.RedisClient.receive(RedisClient.scala:91) ~[redisclient_2.10-2.13.jar:2.13] at com.redis.R$class.asListPairs(RedisProtocol.scala:143) ~[redisclient_2.10-2.13.jar:2.13] at com.redis.RedisClient.asListPairs(RedisClient.scala:91) ~[redisclient_2.10-2.13.jar:2.13] at com.redis.SortedSetOperations$$anonfun$zrangeWithScore$1.apply(SortedSetOperations.scala:41) ~[redisclient_2.10-2.13.jar:2.13] at com.redis.SortedSetOperations$$anonfun$zrangeWithScore$1.apply(SortedSetOperations.scala:41) ~[redisclient_2.10-2.13.jar:2.13] at com.redis.Redis$class.send(RedisClient.scala:21) ~[redisclient_2.10-2.13.jar:2.13] at com.redis.RedisClient.send(RedisClient.scala:91) ~[redisclient_2.10-2.13.jar:2.13] at com.redis.SortedSetOperations$class.zrangeWithScore(SortedSetOperations.scala:41) ~[redisclient_2.10-2.13.jar:2.13] at com.redis.RedisClient.zrangeWithScore(RedisClient.scala:91) ~[redisclient_2.10-2.13.jar:2.13]

lbsha avatar Jul 01 '14 11:07 lbsha

Please provide a failing test which I can use to replicate the error.

Thanks.

debasishg avatar Jul 01 '14 14:07 debasishg

I have the same error on my server. Repeated periodically, randomly. I am use scala-redis with Scala 2.10 and akka (redis-client invoke in actor)

dimparf avatar Jul 08 '14 06:07 dimparf

I had similar problem, caused by caching RedisClient in my singleton object, and reusing in methods, after I switched to creating new RedisClient each time, it disappeared.

mavarazy avatar Jul 22 '14 09:07 mavarazy

As a matter of facts, I can confirm that using @mavarazy 's approach fixed my problem on a project that uses "net.debasishg" % "redisclient_2.11" % "2.13".

AL333Z avatar Oct 29 '14 18:10 AL333Z

@AL333Z I faced some concurrency issues with previous approach recently, and switched to RedisConnectionPool with the latest release 2.14. So my advice is to use a RedisConnectionPool instead of creating a RedisClient each time.

mavarazy avatar Oct 30 '14 05:10 mavarazy

The same problem here but when I create a new instance of client each time before call the problem is gone (as @mavarazy noticed)

tlcs avatar Mar 20 '15 16:03 tlcs

I recently ran into this issue and the cause was misusing a lower level api.

Original (Not sure why we didn't use the built in ping before maybe we used an older version that did not have it?)

rcp.withClient { client =>
  client.send[String]("PING")("PONG") match {
    case "PONG" => Result.healthy()
    case _ => Result.unhealthy("No connection to Redis")
  }
}

Fixed

rcp.withClient { client =>
  val result = client.ping
  result match {
    case Some("PONG") => Result.healthy()
    case _ => Result.unhealthy("No connection to Redis")
  }
}

I feel like the send API should be private and hidden from the API consumers. It took quite a while to figure out that the signature def send[A](command: String)(result: => A): A = result must consume the input stream from the response or the connection will be returned to the pool in a bad state. The correct send signature would have been send("PING")(asString)

Alternatively you can change the code to make sure the input stream is flushed when returning a connection to the pool. This is the approach a lot of database connection pools use. They ensure connections returned to the pool are in some base state.

billoneil avatar Mar 08 '16 16:03 billoneil

+1. I ran into this reusing a connection as well. v3.1

seanmcl avatar Mar 14 '16 23:03 seanmcl

+1 on 3.0

DBassel avatar May 27 '16 16:05 DBassel

+1 on 3.0

cainj avatar Aug 08 '16 18:08 cainj

+1 on 3.0

yabooc avatar Oct 24 '16 02:10 yabooc

+1 on 3.3

charlesfaustin avatar Mar 15 '17 01:03 charlesfaustin

+1 on 3.4

mohittt8 avatar May 15 '17 07:05 mohittt8

+1 on 3.4 The error occurs randomly. Having this piece of code

private[this] val redisClient = new RedisClient()
override def findProfile[M[_] : Monad](uuid: String): M[Option[Profile]] = monadic {
  println(redisClient.ping)
  redisClient.get(uuidKey(uuid))
}

in case of error println prints:

Some(G
+ONG)

and in normal case it prints:

Some(PONG)

Looks like there is race condition somewhere and request/result strings are mixed with each other. Unfortunately, lib is useless with this error.

ezhulkov avatar Jul 12 '17 11:07 ezhulkov

+1 on 3.4

michalwarecki avatar Jul 12 '17 12:07 michalwarecki

+1 on 3.4. This occurs randomly and isn't reproducible

rmccullagh avatar Jul 20 '17 17:07 rmccullagh

Thanks for reporting. I will take a look though I am struggling for free cycles with all the other stuff going around. Just a few questions ..

  1. Is there a piece of code that can possibly give this error ? I know it may be a race condition and is not reproducible. But a piece that can possibly give this error will help in testing.
  2. Has anyone found a workaround for this ? Any suggestions, any best practice that help avoid this error ?

Any other suggestions ?

debasishg avatar Jul 20 '17 17:07 debasishg

@debasishg You can take a look at my piece of code - with this code the error occurs often and reproducible. I found workaround but it's not production ready. If you create new client for every request the error is gone. Artifact is "net.debasishg" %% "redisclient" % "3.4" Scala version is 2.12.2 Redis server is 3.2.9

UPD - in the above code you can skip typeclasses and monadic. It's not relevant to error

ezhulkov avatar Jul 20 '17 17:07 ezhulkov

Thanks @ezhulkov - I will take a look.

debasishg avatar Jul 20 '17 18:07 debasishg

I haven't looked at this in a very long time but from what I can remember the issue is related to returning a dirty buffer back to the pool. In the provided example println(redisClient.ping) it calls ping but I believe the bytes are never read when the connection gets returned to the pool. The next connection then starts writing to it again and you end up in a weird state.

billoneil avatar Jul 20 '17 21:07 billoneil

Sounds reasonable. In my case this error happens even without ping. Might be result of the set command (ok) mixed with get command

ezhulkov avatar Jul 21 '17 04:07 ezhulkov

Right that was just an example I think anything that uses the send command can leave the connection in a dirty state if it's not consumed. Just my best guess.

billoneil avatar Jul 21 '17 15:07 billoneil

I tried with the following test snippet ..

package com.redis

import org.scalatest.FunSpec
import org.scalatest.BeforeAndAfterEach
import org.scalatest.BeforeAndAfterAll
import org.scalatest.Matchers
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith


@RunWith(classOf[JUnitRunner])
class FooSpec extends FunSpec 
                     with Matchers
                     with BeforeAndAfterEach
                     with BeforeAndAfterAll {

  val r = new RedisClient("localhost", 6379)

  override def beforeEach = {
  }

  override def afterEach = {
    r.flushdb
  }

  override def afterAll = {
    r.disconnect
  }

  describe("keys") {
    it("should fetch keys") {
      r.set("key-1", "value-1")
      r.set("key-2", "value-2")
      (1 to 10000).foreach {_ =>
        println(r.ping)
        r.get("key-2")
        r.get("key-1")
      }
    }
  }
}

Also tried putting the set within the loop. Could not replicate the error once. Am I missing something ? Can you please confirm if the error occurs if only redis is used and no other library like Akka is present in the mix ? I tried with Redis 3.2.9 on my Mac Air running Scala 2.12.1.

Need some more help trying to replicate the error.

debasishg avatar Jul 22 '17 19:07 debasishg

I believe it is related to the lazy evaluation of the send api. When you call toString of ping it will flush the buffer. If you just call r.ping and do nothing with it it can leave the connection in a weird state. I also think it might be related to multi threading so maybe try a client with a very small pool size and hit it with multiple threads trying to force a connection to a bad state.

billoneil avatar Jul 22 '17 20:07 billoneil

I tried with just the r.ping without the println because I also was thinking of the flush situation. Could not replicate the problem.

For multithreading the recommended way is to use the pool - you cannot use a single RedisClient in a multithreaded environment. I will test the pool (as u suggested), but I thought some people are getting the exception even on single RedisClients. Hence I was thinking if it could be a side-effect of some other library being used together.

debasishg avatar Jul 22 '17 20:07 debasishg

When using the pool, if a connection gets into a bad state (somehow, maybe the thread in which the connection was being used dies) and gets returned to the pool in the bad state, we can do a reconnect when we return the connection to the pool (https://github.com/debasishg/scala-redis/blob/master/src/main/scala/com/redis/Pool.scala#L47). The reconnect will initialize the connection. WDYAT ?

I have not yet been able to replicate the problem myself. Just some thoughts based on the discussion above. But can I assume that the issue occurs only with pool and NOT with using a single client ? And just to reiterate a single RedisClient cannot be used in a multithreaded mode. You need to use a pool for that.

debasishg avatar Jul 22 '17 21:07 debasishg

@debasishg Yes, I am running into this issue while using a pool. Is there any established way to fix this?

masonkirchner avatar Mar 09 '18 18:03 masonkirchner

@debasishg it's really easy to reproduce with such simple test snippet

import com.redis.RedisClient

object RedisClientTest extends App {

  val r = new RedisClient("localhost", 6379)

  println(r.get("vvl:qm"))

  r.subscribe("vvl.qm") { m =>
    println(m)
  }

  Thread.sleep(3000)

  r.unsubscribe("vvl.qm")

  Thread.sleep(3000)

  println(r.get("vvl:qm"))

  r.subscribe("vvl.qm") { m =>
    println(m)
  }

  Thread.sleep(3000)

  r.unsubscribe("vvl.qm")

  Thread.sleep(3000)

  r.get("vvl:qm")
}

Here is the console output:

Get 1 result Some(test)
Subscribe 1 result S(vvl.qm,1)
Unsubscribe 1 was finished
Subscribe 1 result U(vvl.qm,0)
Get 2 result Some(test)
Unsubscribe 2 was finished
Exception in thread "main" java.lang.Exception: Protocol error: Got (*,[B@255b53dc) as initial reply byte
	at com.redis.Reply$$anonfun$errReply$1.applyOrElse(RedisProtocol.scala:120)
	at com.redis.Reply$$anonfun$errReply$1.applyOrElse(RedisProtocol.scala:118)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:34)
	at com.redis.Reply$$anonfun$bulkReply$1.applyOrElse(RedisProtocol.scala:78)
	at com.redis.Reply$$anonfun$bulkReply$1.applyOrElse(RedisProtocol.scala:78)
	at scala.PartialFunction$OrElse.apply(PartialFunction.scala:168)
	at com.redis.Reply.receive(RedisProtocol.scala:139)
	at com.redis.Reply.receive$(RedisProtocol.scala:135)
	at com.redis.RedisClient.receive(RedisClient.scala:103)
	at com.redis.R.asBulk(RedisProtocol.scala:242)
	at com.redis.R.asBulk$(RedisProtocol.scala:242)
	at com.redis.RedisClient.asBulk(RedisClient.scala:103)
	at com.redis.StringOperations.$anonfun$get$1(StringOperations.scala:36)
	at com.redis.Redis.send(RedisClient.scala:30)
	at com.redis.Redis.send$(RedisClient.scala:28)
	at com.redis.RedisClient.send(RedisClient.scala:103)
	at com.redis.StringOperations.get(StringOperations.scala:36)
	at com.redis.StringOperations.get$(StringOperations.scala:35)
	at com.redis.RedisClient.get(RedisClient.scala:103)
	at redis.test.backend.service.RedisClientTest$.delayedEndpoint$redis$test$backend$service$RedisClientTest$1(RedisTest.scala:36)
	at redis.test.backend.service.RedisClientTest$delayedInit$body.apply(RedisTest.scala:6)
	at scala.Function0.apply$mcV$sp(Function0.scala:34)
	at scala.Function0.apply$mcV$sp$(Function0.scala:34)
	at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
	at scala.App.$anonfun$main$1$adapted(App.scala:76)
	at scala.collection.immutable.List.foreach(List.scala:378)
	at scala.App.main(App.scala:76)
	at scala.App.main$(App.scala:74)
	at redis.test.backend.service.RedisClientTest$.main(RedisTest.scala:6)
	at redis.test.backend.service.RedisClientTest.main(RedisTest.scala)

Process finished with exit code 1

After second unsubscribe call RedisClient instance was spoiled and I couldn't do anything with it.

validol-ekb avatar May 03 '18 18:05 validol-ekb

I will take a look over this weekend.

debasishg avatar May 03 '18 19:05 debasishg

Maybe, it'll help you. If we do such trick

object RedisClientTest extends App {

  val r = new RedisClient("localhost", 6379)

  println(s"Get 1 result ${r.get("vvl:qm")}")

  r.subscribe("vvl.qm") { m =>
    println(s"Subscribe 1 result $m")
  }

  Thread.sleep(3000)

  r.unsubscribe("vvl.qm")
  println("Unsubscribe 1 was finished")

  Thread.sleep(3000)
  r.pubSub = false

  println(s"Get 2 result ${r.get("vvl:qm")}")

  r.subscribe("vvl.qm") { m =>
    println(s"Subscribe 2 result $m")
  }

  Thread.sleep(3000)

  r.unsubscribe("vvl.qm")
  println("Unsubscribe 2 was finished")

  Thread.sleep(3000)
  r.pubSub = false

  println(s"Get 3 result ${r.get("vvl:qm")}")
}

we won't receive any exception. But I think it'll bring us new problem with memory leaking.

validol-ekb avatar May 03 '18 19:05 validol-ekb