alpakka icon indicating copy to clipboard operation
alpakka copied to clipboard

Provide Cassandra execute output when available (insertion especially)

Open JustinPihony opened this issue 2 years ago • 1 comments

Short description

If you perform an INSERT...IF NOT EXISTS then C* will return the status of the insert with clarifying details. This should be passed back to the user.

Details

When you perform something like "INSERT INTO {FOO} ({COLUMNS}) VALUES (?,?) IF NOT EXISTS"; then it will return a datarow with the column [applied] (boolean indicating insertion) and if not inserted the columns and values for the uninserted.

Here is the documentation on CQL inserts.

JustinPihony avatar Apr 14 '22 01:04 JustinPihony

The AsyncResultSet which the Java client returns has a wasApplied method which is the value of the [applied] column: it's always true for unconditional queries. So there are three options as I see them:

  • add counterpart Flow[T, AsyncResultSet, NotUsed]s. The downside AIUI is that downstream won't know what was upserted in the success case (just that something was). For some use-cases, this is what one would want, but it's also (especially in Scala) really easy to just have the flow be:
val session: CassandraSession = ???
val writeSettings: CassandraWriteSettings = ???
val cqlStatement: String = ???
val statementBinder: (Any, PreparedStatement) => BoundStatement = (element, stmt) => ???  // TODO: specialize element type
val preparedStatement: Future[PreparedStatement] = session.prepare(cqlStatement)

Flow[T].mapAsync(writeSettings.parallelism) { // NB: parallelism greater than one implies reordering of writes is possible...
  element =>
    implicit val ec = session.executionContext
    preparedStatement.map(ps => statementBinder(element, ps))
      .flatMap { boundStatement =>
        val cqlSess = session.underlying().value.get.get  // safe since preparedStatement only completes successfully after session.underlying() has completed successfully
        cqlSess.executeAsync(boundStatement).toScala
      }

The Java would be similar, but I'm not familiar enough with the Java future APIs to write it here.

  • add counterpart Flow[T, T, NotUsed]s which would be created with a resultSetExtractor (the dual of the statementBinder in the existing flows) which would convert the results of the unapplied writes to Ts: a drop-in replacement for the existing flows. This would allow downstream to know that at some point the values corresponding to T were in Cassandra.

  • add counterpart Flow[T, Either[T, AsyncResultSet], NotUsed]s. Downstream can fold on the Either; if folding in a mapAsync can even cleanly do recovery side effects etc. The Flow[T, T, NotUsed] case can be implemented in terms of this.

leviramsey avatar Apr 14 '22 14:04 leviramsey