akka-persistence-cassandra
akka-persistence-cassandra copied to clipboard
Tried to execute unprepared query
@mckeeh3 noticed this when running with AWS Keyspaces
[2020-09-15 12:53:40,107] [WARN] [akka.persistence.cassandra.query.EventsByTagStage] [] [woe-twin-akka.actor.default-dispatcher-17] - Backtrack failed, this will retried. java.lang.IllegalStateException: Tried to execute unprepared query 0x3c27fe920291dfe7c3a1f0e3524e5dee but we don't have the data to reprepare it MDC: {akkaAddress=akka://[email protected]:25520, sourceThread=woe-twin-akka.actor.default-dispatcher-17, akkaSource=EventsByTagStage(akka://woe-twin), sourceActorSystem=woe-twin, akkaTimestamp=12:53:40.107UTC}
[2020-09-15 12:53:40,477] [ERROR] [com.datastax.oss.driver.internal.core.tracker.RequestLogger] [] [s0-io-15] - [s0|130042065][Node(endPoint=3.234.248.205/3.234.248.205:9142, hostId=6c945ed5-b04e-3fc4-8dc0-9aea1128bfa8, hashCode=1c981716)] Error (1 ms) [4 values]
SELECT persistence_id, tag_pid_sequence_nr, timestamp
FROM woe_twin.tag_views WHERE
tag_name = ? AND
timebucket = ? AND
timestamp > ? AND
timestamp <= ? [tag_name='0', timebucket=1600171200000, timestamp=9acfe930-f751-11ea-8080-808080808080, timestamp=9dcad9b4-f751-11ea-893d-4fd06c7f32e0] MDC: {}
java.lang.IllegalStateException: Tried to execute unprepared query 0x3c27fe920291dfe7c3a1f0e3524e5dee but we don't have the data to reprepare it
at com.datastax.oss.driver.internal.core.cql.CqlRequestHandler$NodeResponseCallback.processErrorResponse(CqlRequestHandler.java:655)
at com.datastax.oss.driver.internal.core.cql.CqlRequestHandler$NodeResponseCallback.onResponse(CqlRequestHandler.java:629)
at com.datastax.oss.driver.internal.core.channel.InFlightHandler.channelRead(InFlightHandler.java:250)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1475)
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1224)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1271)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:505)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:444)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:283)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1421)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Unknown Source)
Could we be using the prepared statements in wrong way? PreparedStatements are cached by the driver and we should be able to use them many times without looking up new. At least what I can see in the docs. https://docs.datastax.com/en/developer/java-driver/4.6/manual/core/statements/prepared/
Same with local (real) Cassandra:
[com.datastax.oss.driver.internal.core.tracker.RequestLogger] [] [s0-io-13] - [s0|2125764217][Node(endPoint=3.234.248.255/3.234.248.255:9142, hostId=52b446ab-b2ec-36dc-97b0-d1eb37ac10a5, hashCode=21a604fe)] Error (1 ms) [4 values]
SELECT persistence_id, tag_pid_sequence_nr, timestamp
FROM woe_twin.tag_views WHERE
tag_name = ? AND
timebucket = ? AND
timestamp > ? AND
timestamp <= ? [tag_name='0', timebucket=1601316000000, timestamp=6a0e5f80-01b4-11eb-8080-808080808080, timestamp=6d095000-01b4-11eb-8080-808080808080] MDC: {}
java.lang.IllegalStateException: Tried to execute unprepared query 0x3c27fe920291dfe7c3a1f0e3524e5dee but we don't have the data to reprepare it
at com.datastax.oss.driver.internal.core.cql.CqlRequestHandler$NodeResponseCallback.processErrorResponse(CqlRequestHandler.java:667)
at com.datastax.oss.driver.internal.core.cql.CqlRequestHandler$NodeResponseCallback.onResponse(CqlRequestHandler.java:641)
at com.datastax.oss.driver.internal.core.channel.InFlightHandler.channelRead(InFlightHandler.java:255)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295)
I am guessing this is a bug upstream but i'll investiage and raise there if need be
Is there any update regarding this issue?
We need to look into this more, it is either a driver issue or we're mis-using sessions: https://datastax-oss.atlassian.net/jira/software/c/projects/JAVA/issues/?filter=reportedbyme
@chbatey what's the link without the reportedbyme
filter?
https://datastax-oss.atlassian.net/browse/JAVA-2910
Doh, thanks!
We came across the same issue. This is quite serious since it means that if a cassandra cluster node is added to the cluster or restarted that all akka-persistence-cassandra clients will fail.
In our case we use persistent-queries to update the view side. These streams fail:
ERROR akka.actor.OneForOneStrategy akka://main/system/sharding/itemViews/27/xxx Tried to execute unprepared query 0x1a9a3b650a2b8fbf20474458137cee81 but we don't have the data to reprepare it
Do you have any updates on the issue or a workaround? (currently we need to restart the whole akka cluster if we restart a cassandra node :( )
Same with my PoC version of service using akka-persistence-cassandra and akka-projections with CosmosDB, appears after few hours
@chbatey
There was feedback to issue https://datastax-oss.atlassian.net/browse/JAVA-2910
Looks like the prepared statements get garbage collected. akka.persistence.cassandra.journal.CassandraJournal
does not keep a reference to the prepared statements...
So either upgrade the cassandra driver to 4.11.1 and set cache-prepare-payloads-forever: true
or keep a reference to prepared statements...
The statements are already initialized eagerly but this needs proper management of blocking/async and error handling...
case CassandraJournal.Init =>
// try initialize early, to be prepared for first real request
preparedWriteMessage
preparedWriteMessageWithMeta
preparedSelectMessages
Alex Dutra March 3, 2021, 10:06 PM Hi Dongsheng Song , the weak values were chosen on purpose. The rationale is that you should store your prepared statements somewhere where they live as long as the application lives, in which case they will never be garbage-collected. If they are not referenced anymore, they are not useful anymore, and they could (and should) be garbage-collected and removed from the cache.
the correct config to use with cassandra java-driver-core:4.12.0 is
datastax-java-driver.advanced.prepared-statements.prepared-cache.weak-values=false
@chbatey @patriknw This issue is still open and imho really critical - and could be omitted since above workaround works. Anybody doing a cassandra cluster restart (e.g. version upgrade) will run into this if the akka application was up a while because gc will very likely get rid of the prepared statements.
Workaround
Upgrading the cassandra driver to a version > 4.11.1 and document the necessary config overrides. Config needs to be overridden by user to have an effect...
https://github.com/akka/akka-persistence-cassandra/pull/943 would not yet pull the needed cassandra driver - a new release of alpakka would be needed
akka-stream-alpakka-cassandra:2.0.2
java-driver-core:4.6.1
akka-stream-alpakka-cassandra:3.0.4
java-driver-core:4.10.0
akka-stream-alpakka-cassandra:3.0.? (not yet released)
java-driver-core:4.13.0
hmm the current projects cassandra driver is quiet far behind https://docs.datastax.com/en/developer/java-driver/4.14/changelog/ We override the version and successfully use 4.14.1 in our production - both with cassandra 3 and 4 clusters.
Fix / keep a reference to prepared statements
I would consider creating a PR to address the issue at hand but currently its hard to grasp how a clean solution would look like. It would be nice if all prepared statements are treated alike. Currently there are multiple concepts... ... so it would be nice to get some guidance before starting to work on that so that it's not a waste of time...
Problematic prepared statements CassandraJournal - Init CassandraJournal - Statement
Tagged uses lazy vals... TaggedPreparedStatements
And the Read Journal uses yet another approach CassandraReadJournal - init
@octonato May I ping you for feedback?
@danischroeter @nvollmar Thanks for the investigation and the ping. I agree that updating the versions is the best long term solution. Since it's a big jump there are risks and uncertainty involved in that. I don't know how compatible the java driver between those versions. We might not have the bandwidth for testing and releasing that and unsure if we could do such a jump in a patch release.
Do I understand it correct that a workaround would be to keep a hard reference to the PreparedStatements? If that is the case we could create an Akka extension (bound to the ActorSystem). Whenever we prepare a statement we place the instance in a ConcurrentHashMap in that Akka extension. Keyed by the statement string. We don't use that map for lookups (since the driver has a cache already) but only to keep around a hard reference to the prepared statement instances.
Does that sound like a feasible workaround? Would you be able to prepare a pull request for that?
@patriknw Thx for the feedback!
I don't know how compatible the java driver between those versions.
We upgraded to and use several newer cassandra java drivers since over a year in production without any issues. So I think this wont be an issue.
Do I understand it correct that a workaround would be to keep a hard reference to the PreparedStatements?
Yes. I would opt however to change the problematic statements to be kept in lazy vals like in TaggedPreparedStatements. I will create a pull request if that's fine...
lazy val
indeed sounds easier, my concern with that was that if the surrounding instance is an actor that might (at least theoretically) be restarted and the surrounding instance not referenced anywhere and then the prepared statements are again free for gc (at least for a short while until they are used again). Please look into if that could be problem.
Thanks for creating the pull request.
@patriknw
#974 will fix the issue using lazy vals
- this is the least invasive solution addressing the problem and already used in parts. I kept the change as minimal as possible to avoid any side effects...
lazy val indeed sounds easier, my concern with that was that if the surrounding instance is an actor that might (at least theoretically) be restarted and the surrounding instance not referenced anywhere and then the prepared statements are again free for gc (at least for a short while until they are used again). Please look into if that could be problem.
The issue java.lang.IllegalStateException: Tried to execute unprepared query
cannot realistically happen anymore. Even if it's theoretically possible that the actor could be killled because of a failure and just exactly then the prepared statement gets collected (already almost impossible) - then the initialization of the new actor will create a fresh prepared statement before executing a new query.