glint
glint copied to clipboard
PullFailedException in large dataset
glint.exceptions.PullFailedException: Failed 10 out of 10 attempts to push data at glint.models.client.async.PullFSM.glint$models$client$async$PullFSM$$execute(PullFSM.scala:67) at glint.models.client.async.PullFSM$$anonfun$request$1.applyOrElse(PullFSM.scala:79) at glint.models.client.async.PullFSM$$anonfun$request$1.applyOrElse(PullFSM.scala:76) at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136) at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Glint works well in my toy dataset, but fails with exception in big dataset.
There's multiple reasons a pull or push can fail. I think it's a good idea for me to add documentation on this, as this behavior is not intuitive. The two most common reasons are:
-
The pull request may be too large. The underlying networking library Akka has a configurable limit on message sizes to retain responsiveness. Messages that exceed this limit will be dropped. You could try increasing the size using the configuration, but this is not recommended. It is better to split your pull/push requests into smaller chunks. If you are using a matrix, you can try using a GranularBigMatrix wrapper, which puts an upper limit on message size and splits big pulls/pushes into smaller pulls/pushes:
val matrix = client.matrix[Double](10000000, 10000) // large matrix val granularMatrix = new GranularBigMatrix[Double](matrix, 10000) // maximum message size = 10000 keys granularMatrix.pull( someLargeArrayOfIndices )
-
There are too many simultaneous pull requests. In this case the parameter servers are getting flooded by requests and cannot respond to all of them. Akka by default queues up requests and if the queue becomes too large, requests will be dropped. Due to the asynchronous nature of Glint, it is very easy to send a lot of requests in parallel. The drawback is that there is no inherit blocking or back-pressure. You can try to simulate back-pressure by waiting for some requests to finish. For example, a Semaphore can be used to upper bound the number of open requests:
val lock = new java.util.concurrent.Semaphore(16) // maximum of 16 open requests dataset.foreach { lock.acquire() // Block execution here (wait for 1 of the 16 slots to open and reserve it) val request = vector.pull(...) // Perform pull request request.onComplete { case _ => lock.release() } // Release 1 of the 16 slots when the request has finished }
When doing something like this, it is very important to not put a blocking operation such as
lock.acquire()
inside anything that may be handled by the execution context (i.e., don't put it inside any future or callback). It is possible to deadlock your system that way!
Let me know if this helps your problem. If not we'll try to resolve it some other way. I'd be very interested in your code and seeing if I can help out there.
I have increased the limit size of msg in akka. If it is due to too many simultaneous pull requests, I am a bit confused. I use the same number of partitions, and the pull requests per second seems not to change because every pull request in a partition is synchronous.
I try to add more servers, but it does not work.
After I go through the source code, I found reason is the second one. In big dataset, the number of features is much more than toy dataset. After I decreased the number of features, it works.
I think the hot features are the bottleneck. For example, the bias term. But adding more servers or decreasing features cannot share the pressure. I cannot understand why the number of features affect.
I finally reduce the number of features, and it works as expected.
Just a note on https://github.com/rjagerman/glint/issues/48#issuecomment-237158188 (2) - the lock (semaphore) will be shipped to each partition and deserialized. So it seems to me that solution won't limit the overall number of concurrent requests - only per partition. Is that the intention?
Otherwise, the only way to limit things is via thread pool size?
So, the code example is pretty unclear, my apologies: The dataset is not an RDD in Spark, but a single partition of the data. Better example would probably be:
rdd.foreachPartition { case partition =>
val lock = new java.util.concurrent.Semaphore(16) // maximum of 16 open requests
partition.foreach { case sample =>
lock.acquire()
val request = vector.push(...)
request.onComplete { case _ => lock.release() }
}
lock.acquire(16) // wait for all of them to finish
lock.release(16)
}
A major challenge in asynchronous computing is the lack of backpressure. Unless you implement some sort of signal it is difficult to know when you are sending too many requests. It's a careful balancing act between optimizing CPU utilization and network communication. I haven't found a great solution for this and consider it an open research question. Existing parameter server literature does not really address this.
Most solutions in Akka that deal with backpressure are complicated and have the consumer signaling the producer to slow down. This is difficult to do in Glint, because the library can't necessarily stop someone from queueing up more pull or push requests without making these functions blocking and thus synchronous.
The Semaphore solution works because it limits the number of open requests, forcing the parameter server to finish processing some of them before we queue up more of them. However, it is very difficult to guess the optimal size of the semaphore as this heavily depends on your specific setup (I use 16 here because it works for me)... I hope this helps somewhat.
Note that even if you limit the size of a fixed thread pool, your code can still queue up more requests and they will be placed in an unbounded queue (depending on your specific ExecutionContext). This will cause eventual out-of-memory problems. More info about this in the Java 8 docs. You could force your ExecutionContext to use a blocking bounded queue (such as done here). I would however highly recommend against this, because you tread into the realm of deadlocks which are very difficult to find and debug...
Thanks, that is helpful. Yes, good point on the thread pools. I also saw that playing with thread pool size didn't help and in fact hurt performance.
What I've found most important is trying to achieve a good spread of features (really the "hot features") across model partitions.
The above approach will work well with "mini-batch" training within each partition. I won't have time to implement it but I think it could work well. It could be completely aysnc per mini-batch with a limit on overall open requests. Or perhaps a "sliding window" over pull futures, such that while the current batch is being computed, the next pull request is being completed in the background.