riak-scala-client icon indicating copy to clipboard operation
riak-scala-client copied to clipboard

Initial Play Iteratees API for secondary indexes lookup

Open eamelink opened this issue 12 years ago • 15 comments

Here's an initial implementation at a Play Iteratees API for the secondary indexes lookup. It doesn't yet resolve the loading of the list of keys into memory, but it does make the fetching of the actual values work with an iteratee.

There's a basic that confirms that the happy flow works, but more extensive tests are needed to confirm that the potential ways of failure are handled correctly.

A flaw of this implementation is that if one of the key lookups returns a None, the enumerator is closed, instead of continuing with trying the next key (which would be more like it works for the regular fetch methods.

eamelink avatar May 23 '13 08:05 eamelink

Thanks, that looks a lot simpler than I thought, even though the enumerator code is still pretty counter-intuitive to me. I have some different ideas about how to expose this API (not as an alternative but integrated into the normal fetch) but this looks great as a starting point. Thanks again, I'll merge this soon!

agemooij avatar May 23 '13 10:05 agemooij

This looks great!

Another approach you may want to consider is returning an Enumerator instead of taking an iteratee, and then implement the apply() method of the enumerator to do the lookup, create a new Enumerator with the results as you've currently got, and return the result of that. Each time the enumerator is applied to an iteratee, the lookup is done again. This makes it a little more intuitive, since the stream is a producer, and an enumerator is a producer.

Fixing the flaw you pointed out shouldn't be too hard, just do it in a recursive flatMap call to on the future, something like:

def fetchNext(server: RiakServerInfo, bucket: String, keys: List[RiakIndex], resolver: RiakConflictsResolver): Future[Option(List[RiakIndex], RiakValue)] = xs match {
  case Nil     => successful(None)
  case x :: xs => {
    val riakValueFuture = fetch(server, bucket, x, resolver)
    riakValueFuture.flatMap {
      case None => fetchNext(server, bucket, xs, resolver)
      case Some(value) => successful(Some((xs, value)))
    }
  }
}

jroper avatar May 23 '13 13:05 jroper

Thanks James :)

I actually chose taking an Iteratee instead of returning an Enumerator because that's what Play's WS library does. I was already wondering why that is exactly. Maybe you know?

eamelink avatar May 23 '13 13:05 eamelink

The hard question is what do we do if a person applies the enumerator to an iteratee twice? There are two options, execute the request again (correct, but dangerous for non idempotent requests), or throw an IllegalStateException. And if we execute the request again, what happens to the response headers? There's no way to pass them to the user again. But we will eventually have an option for providing the response as an enumerator.

jroper avatar May 23 '13 23:05 jroper

Is this ever getting merged? We could really use the 2i streaming feature in some form, and this looks great.

drapp avatar Aug 22 '13 20:08 drapp

Hi, this was a bit of an example how something like this could be implemented instead of an easy to merge finished product. I have to admit that I forgot all about it :(

How soon do you need this? Are you referring to the streaming 2i features in 1.4? Those will very probably need some more work at the http level to deal with.

I'll try to add this to my schedule ASAP, either this weekend or the next. All help in updating this PR will be much appreciated of course.

agemooij avatar Aug 22 '13 20:08 agemooij

@agemooij Let me know if you'd like a PoC with the structure that @jroper suggested, returning an Enumerator :)

eamelink avatar Aug 22 '13 21:08 eamelink

1.4 2i streaming looks like what we actually looking for. At first glance that's what I thought this was, but it looks like all the keys get loaded into memory

drapp avatar Aug 22 '13 22:08 drapp

Thanks Erik, I like the idea of returning an Enumerator. This is also what ReactiveMongo does AFAIK. An updated PR would be much appreciated.

1.4 only came out recently and I just haven't had any time to look at the new API yet. It looks like the new streaming 2i uses http chunking, which would require a dedicated actor instance to handle incoming chunks and I wonder how that could be wrapped in an Enumerator.

Erik, are there any examples of bridging a (child) actor receiving messages (i.e. chunks) over time with an Enumerator? sounds pretty tricky...

This will obviously require some research, but please feel free to point me in the right direction :)

agemooij avatar Aug 24 '13 21:08 agemooij

I can't find any examples, but it seems to me the basic notion would be

  1. The streaming actor has a queue of RiakValues and a Promise[Option[RiakValue]] var which should conceptually always be fulfilled with, or waiting for, the first value in the queue.
  2. When the streaming actor receives a chunk, it adds it to a queue.
  3. If the queue was previously empty, fulfill the promise with the head of the queue
  4. The onComplete method of the Enumerator should look something like
Enumerator.fromCallback { () =>
  val promiseToReturn = this.promise
  this.promise = promise[Option[RiakValue]]
  queue.pop
  if(!queue.isEmpty) this.promise.success(queue.head)
  promiseToReturn
}

I'd have to think about that a bit harder to make sure there's no concurrency issues, and there might be a more actor-based way to abstract it, but Enumerator.fromCallback and promises looks like the way to turn wrap chunks showing up asynchronously into an enumerator.

drapp avatar Aug 26 '13 18:08 drapp

Hi @drapp,

The code sample you gave there wouldn't be safe to run in an actor, since the callback gets executed in a different thread from the actor, but modifies the actors state (ie, updates the promise).

The problem with actors is that they don't give you back pressure, so if riak is producing many values very fast, and the client is consuming them slowly, then your buffers will fill up, and you'll OOME. But sometimes this just isn't an issue, the amount of data expected is not enough to do this. If it is an issue, then you'll need to implement some way to ACK each message, or NACK messages when your buffers are full. The first is simplest, and maps well to futures because an ACK is equivalent to a future being redeemed, and hence maps well to iteratees. The NACK approach is much harder to implement.

But that aside, let's assume back pressure is not necessary, then I would do something like this (coding in github, writing actor code which I definitely don't know off the top of my head how to do exactly, probably won't compile):

val actor = // actor ref to actor below
val enumerator = Concurrent.unicast[RiakValue](channel => actor ! channel)

class StreamingActor extends Actor {
  var buffer: List[RiakValue] = Nil
  var done = false
  var channel: Channel = _
  // initial state - buffering
  def receive = {
    case value: RiakValue => buffer = value :: buffer
    case Done => done = true
    case ch: Channel =>
      channel = ch
      buffer.reverse.foreach(channel.push)
      if (done) {
        channel.end()
      } else {
        // become streaming state
        context.become(streaming)
      }
  }

  def streaming = {
    case value: RiakValue => channel.push(value)
    case Done => channel.end()
  }
}

Note the key thing here - none of the actors mutable state is ever exposed to any asynchronous callbacks. And the use of become makes it very easy to know what's happening when, the initial buffering state is after the unicast enumerator has been created, but before it has been applied to an iteratee, the streaming state is once we have a channel and it has been applied to an iteratee.

jroper avatar Aug 27 '13 00:08 jroper

Thanks, I figured something like that ought to exist but couldn't find it.

drapp avatar Aug 27 '13 00:08 drapp

if channel.push is blocking then this needs a redesign; if not then there is some ignorance which needs fixing on my part ;-)

rkuhn avatar Aug 27 '13 07:08 rkuhn

Hi Roland,

channel.push is non blocking - the "Concurrent.*" iteratees in Play are the shifty neighbourhood of the Play iteratees API, they offer all sorts of things that aren't supposed to be done in purely functional iteratees :)

Cheers,

James

jroper avatar Aug 27 '13 07:08 jroper

Thanks guys! I haven't had any time yet to look at Roland's typed channel stuff and my Iteratee skills are still very basic so all of this looks like pure magic to me and I'll need to study up on this stuff a bit.

Riak indexing now also has paging support and retrieving indexed values always requires one http call per key so there is lots of leeway to protect against the data overflowing.

The basic flow will be something like this:

  • perform call to retrieve the list of indexed keys using streaming (i.e. http chunking AFAIK) and potentially paging
    • in spray this will need to be done in an actor that will handle ChunkedRequestStart, MessageChunk, and ChunkedMessageEnd messages
    • each chunk will contains N keys
    • provide a function to turn each key into a Future[RiakValue](i.e. fetch)
    • buffer X (hundreds) keys and Y (10-20) values (by starting the futures)
  • provide an Enumerator that will push out the values when the futures complete while keeping the buffers full

My calendar is a bit overstuffed at the moment (I'm also helping out getting Spray to a final release and working on a mini-startup in my spare time) but this is definitely interesting stuff so I hope to be making time for this soon. As always, all help is very much appreciated :)

agemooij avatar Aug 27 '13 09:08 agemooij