alpakka
alpakka copied to clipboard
Provide Cassandra execute output when available (insertion especially)
Short description
If you perform an INSERT...IF NOT EXISTS
then C* will return the status of the insert with clarifying details. This should be passed back to the user.
Details
When you perform something like "INSERT INTO {FOO} ({COLUMNS}) VALUES (?,?) IF NOT EXISTS";
then it will return a datarow with the column [applied]
(boolean indicating insertion) and if not inserted the columns and values for the uninserted.
The AsyncResultSet
which the Java client returns has a wasApplied
method which is the value of the [applied]
column: it's always true
for unconditional queries. So there are three options as I see them:
- add counterpart
Flow[T, AsyncResultSet, NotUsed]
s. The downside AIUI is that downstream won't know what was upserted in the success case (just that something was). For some use-cases, this is what one would want, but it's also (especially in Scala) really easy to just have the flow be:
val session: CassandraSession = ???
val writeSettings: CassandraWriteSettings = ???
val cqlStatement: String = ???
val statementBinder: (Any, PreparedStatement) => BoundStatement = (element, stmt) => ??? // TODO: specialize element type
val preparedStatement: Future[PreparedStatement] = session.prepare(cqlStatement)
Flow[T].mapAsync(writeSettings.parallelism) { // NB: parallelism greater than one implies reordering of writes is possible...
element =>
implicit val ec = session.executionContext
preparedStatement.map(ps => statementBinder(element, ps))
.flatMap { boundStatement =>
val cqlSess = session.underlying().value.get.get // safe since preparedStatement only completes successfully after session.underlying() has completed successfully
cqlSess.executeAsync(boundStatement).toScala
}
The Java would be similar, but I'm not familiar enough with the Java future APIs to write it here.
-
add counterpart
Flow[T, T, NotUsed]
s which would be created with aresultSetExtractor
(the dual of thestatementBinder
in the existing flows) which would convert the results of the unapplied writes toT
s: a drop-in replacement for the existing flows. This would allow downstream to know that at some point the values corresponding toT
were in Cassandra. -
add counterpart
Flow[T, Either[T, AsyncResultSet], NotUsed]
s. Downstream canfold
on theEither
; iffold
ing in amapAsync
can even cleanly do recovery side effects etc. TheFlow[T, T, NotUsed]
case can be implemented in terms of this.