akka-persistence-cassandra icon indicating copy to clipboard operation
akka-persistence-cassandra copied to clipboard

Tried to execute unprepared query

Open patriknw opened this issue 4 years ago • 16 comments

@mckeeh3 noticed this when running with AWS Keyspaces

[2020-09-15 12:53:40,107] [WARN] [akka.persistence.cassandra.query.EventsByTagStage] [] [woe-twin-akka.actor.default-dispatcher-17] - Backtrack failed, this will retried. java.lang.IllegalStateException: Tried to execute unprepared query 0x3c27fe920291dfe7c3a1f0e3524e5dee but we don't have the data to reprepare it MDC: {akkaAddress=akka://[email protected]:25520, sourceThread=woe-twin-akka.actor.default-dispatcher-17, akkaSource=EventsByTagStage(akka://woe-twin), sourceActorSystem=woe-twin, akkaTimestamp=12:53:40.107UTC}
[2020-09-15 12:53:40,477] [ERROR] [com.datastax.oss.driver.internal.core.tracker.RequestLogger] [] [s0-io-15] - [s0|130042065][Node(endPoint=3.234.248.205/3.234.248.205:9142, hostId=6c945ed5-b04e-3fc4-8dc0-9aea1128bfa8, hashCode=1c981716)] Error (1 ms) [4 values]
    SELECT persistence_id, tag_pid_sequence_nr, timestamp
    FROM woe_twin.tag_views WHERE
    tag_name = ? AND
    timebucket = ? AND
    timestamp > ? AND
    timestamp <= ? [tag_name='0', timebucket=1600171200000, timestamp=9acfe930-f751-11ea-8080-808080808080, timestamp=9dcad9b4-f751-11ea-893d-4fd06c7f32e0] MDC: {}
java.lang.IllegalStateException: Tried to execute unprepared query 0x3c27fe920291dfe7c3a1f0e3524e5dee but we don't have the data to reprepare it
	at com.datastax.oss.driver.internal.core.cql.CqlRequestHandler$NodeResponseCallback.processErrorResponse(CqlRequestHandler.java:655)
	at com.datastax.oss.driver.internal.core.cql.CqlRequestHandler$NodeResponseCallback.onResponse(CqlRequestHandler.java:629)
	at com.datastax.oss.driver.internal.core.channel.InFlightHandler.channelRead(InFlightHandler.java:250)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1475)
	at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1224)
	at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1271)
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:505)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:444)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:283)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1421)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Unknown Source)

Could we be using the prepared statements in wrong way? PreparedStatements are cached by the driver and we should be able to use them many times without looking up new. At least what I can see in the docs. https://docs.datastax.com/en/developer/java-driver/4.6/manual/core/statements/prepared/

patriknw avatar Sep 15 '20 13:09 patriknw

Same with local (real) Cassandra:

[com.datastax.oss.driver.internal.core.tracker.RequestLogger] [] [s0-io-13] - [s0|2125764217][Node(endPoint=3.234.248.255/3.234.248.255:9142, hostId=52b446ab-b2ec-36dc-97b0-d1eb37ac10a5, hashCode=21a604fe)] Error (1 ms) [4 values] 
    SELECT persistence_id, tag_pid_sequence_nr, timestamp
    FROM woe_twin.tag_views WHERE
    tag_name = ? AND
    timebucket = ? AND
    timestamp > ? AND
    timestamp <= ? [tag_name='0', timebucket=1601316000000, timestamp=6a0e5f80-01b4-11eb-8080-808080808080, timestamp=6d095000-01b4-11eb-8080-808080808080] MDC: {}
java.lang.IllegalStateException: Tried to execute unprepared query 0x3c27fe920291dfe7c3a1f0e3524e5dee but we don't have the data to reprepare it
	at com.datastax.oss.driver.internal.core.cql.CqlRequestHandler$NodeResponseCallback.processErrorResponse(CqlRequestHandler.java:667)
	at com.datastax.oss.driver.internal.core.cql.CqlRequestHandler$NodeResponseCallback.onResponse(CqlRequestHandler.java:641)
	at com.datastax.oss.driver.internal.core.channel.InFlightHandler.channelRead(InFlightHandler.java:255)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295)

patriknw avatar Sep 29 '20 12:09 patriknw

I am guessing this is a bug upstream but i'll investiage and raise there if need be

chbatey avatar Dec 07 '20 11:12 chbatey

Is there any update regarding this issue?

nvollmar avatar Feb 12 '21 08:02 nvollmar

We need to look into this more, it is either a driver issue or we're mis-using sessions: https://datastax-oss.atlassian.net/jira/software/c/projects/JAVA/issues/?filter=reportedbyme

chbatey avatar Feb 16 '21 07:02 chbatey

@chbatey what's the link without the reportedbyme filter?

patriknw avatar Feb 16 '21 09:02 patriknw

https://datastax-oss.atlassian.net/browse/JAVA-2910

nvollmar avatar Feb 16 '21 09:02 nvollmar

Doh, thanks!

chbatey avatar Feb 17 '21 15:02 chbatey

We came across the same issue. This is quite serious since it means that if a cassandra cluster node is added to the cluster or restarted that all akka-persistence-cassandra clients will fail.

In our case we use persistent-queries to update the view side. These streams fail:

ERROR	akka.actor.OneForOneStrategy	akka://main/system/sharding/itemViews/27/xxx	Tried to execute unprepared query 0x1a9a3b650a2b8fbf20474458137cee81 but we don't have the data to reprepare it

Do you have any updates on the issue or a workaround? (currently we need to restart the whole akka cluster if we restart a cassandra node :( )

danischroeter avatar Feb 22 '21 14:02 danischroeter

Same with my PoC version of service using akka-persistence-cassandra and akka-projections with CosmosDB, appears after few hours

pasikon avatar Feb 24 '21 20:02 pasikon

@chbatey There was feedback to issue https://datastax-oss.atlassian.net/browse/JAVA-2910 Looks like the prepared statements get garbage collected. akka.persistence.cassandra.journal.CassandraJournal does not keep a reference to the prepared statements... So either upgrade the cassandra driver to 4.11.1 and set cache-prepare-payloads-forever: true or keep a reference to prepared statements... The statements are already initialized eagerly but this needs proper management of blocking/async and error handling...

    case CassandraJournal.Init =>
      // try initialize early, to be prepared for first real request
      preparedWriteMessage
      preparedWriteMessageWithMeta
      preparedSelectMessages

Alex Dutra March 3, 2021, 10:06 PM Hi Dongsheng Song , the weak values were chosen on purpose. The rationale is that you should store your prepared statements somewhere where they live as long as the application lives, in which case they will never be garbage-collected. If they are not referenced anymore, they are not useful anymore, and they could (and should) be garbage-collected and removed from the cache.

danischroeter avatar Jul 06 '21 06:07 danischroeter

the correct config to use with cassandra java-driver-core:4.12.0 is

datastax-java-driver.advanced.prepared-statements.prepared-cache.weak-values=false

danischroeter avatar Jul 09 '21 09:07 danischroeter

@chbatey @patriknw This issue is still open and imho really critical - and could be omitted since above workaround works. Anybody doing a cassandra cluster restart (e.g. version upgrade) will run into this if the akka application was up a while because gc will very likely get rid of the prepared statements.

Workaround

Upgrading the cassandra driver to a version > 4.11.1 and document the necessary config overrides. Config needs to be overridden by user to have an effect...

https://github.com/akka/akka-persistence-cassandra/pull/943 would not yet pull the needed cassandra driver - a new release of alpakka would be needed

akka-stream-alpakka-cassandra:2.0.2
  java-driver-core:4.6.1
akka-stream-alpakka-cassandra:3.0.4
  java-driver-core:4.10.0
akka-stream-alpakka-cassandra:3.0.? (not yet released)
  java-driver-core:4.13.0

hmm the current projects cassandra driver is quiet far behind https://docs.datastax.com/en/developer/java-driver/4.14/changelog/ We override the version and successfully use 4.14.1 in our production - both with cassandra 3 and 4 clusters.

Fix / keep a reference to prepared statements

I would consider creating a PR to address the issue at hand but currently its hard to grasp how a clean solution would look like. It would be nice if all prepared statements are treated alike. Currently there are multiple concepts... ... so it would be nice to get some guidance before starting to work on that so that it's not a waste of time...

Problematic prepared statements CassandraJournal - Init CassandraJournal - Statement

Tagged uses lazy vals... TaggedPreparedStatements

And the Read Journal uses yet another approach CassandraReadJournal - init

danischroeter avatar Jun 17 '22 12:06 danischroeter

@octonato May I ping you for feedback?

nvollmar avatar Jul 07 '22 05:07 nvollmar

@danischroeter @nvollmar Thanks for the investigation and the ping. I agree that updating the versions is the best long term solution. Since it's a big jump there are risks and uncertainty involved in that. I don't know how compatible the java driver between those versions. We might not have the bandwidth for testing and releasing that and unsure if we could do such a jump in a patch release.

Do I understand it correct that a workaround would be to keep a hard reference to the PreparedStatements? If that is the case we could create an Akka extension (bound to the ActorSystem). Whenever we prepare a statement we place the instance in a ConcurrentHashMap in that Akka extension. Keyed by the statement string. We don't use that map for lookups (since the driver has a cache already) but only to keep around a hard reference to the prepared statement instances.

Does that sound like a feasible workaround? Would you be able to prepare a pull request for that?

patriknw avatar Jul 07 '22 06:07 patriknw

@patriknw Thx for the feedback!

I don't know how compatible the java driver between those versions.

We upgraded to and use several newer cassandra java drivers since over a year in production without any issues. So I think this wont be an issue.

Do I understand it correct that a workaround would be to keep a hard reference to the PreparedStatements?

Yes. I would opt however to change the problematic statements to be kept in lazy vals like in TaggedPreparedStatements. I will create a pull request if that's fine...

danischroeter avatar Jul 07 '22 06:07 danischroeter

lazy val indeed sounds easier, my concern with that was that if the surrounding instance is an actor that might (at least theoretically) be restarted and the surrounding instance not referenced anywhere and then the prepared statements are again free for gc (at least for a short while until they are used again). Please look into if that could be problem.

Thanks for creating the pull request.

patriknw avatar Jul 07 '22 06:07 patriknw

@patriknw #974 will fix the issue using lazy vals - this is the least invasive solution addressing the problem and already used in parts. I kept the change as minimal as possible to avoid any side effects...

lazy val indeed sounds easier, my concern with that was that if the surrounding instance is an actor that might (at least theoretically) be restarted and the surrounding instance not referenced anywhere and then the prepared statements are again free for gc (at least for a short while until they are used again). Please look into if that could be problem.

The issue java.lang.IllegalStateException: Tried to execute unprepared query cannot realistically happen anymore. Even if it's theoretically possible that the actor could be killled because of a failure and just exactly then the prepared statement gets collected (already almost impossible) - then the initialization of the new actor will create a fresh prepared statement before executing a new query.

danischroeter avatar Aug 22 '22 19:08 danischroeter