spark-MDLP-discretization icon indicating copy to clipboard operation
spark-MDLP-discretization copied to clipboard

few values threshold method does not seem to be deterministic

Open barrybecker4 opened this issue 8 years ago • 46 comments

This is a strange one, and not that easy to reproduce. If I run the tests a bunch of times, I have noticed that on occasion, I get an extra split for the fare attribute in the "Run MDLPD on all columns in titanic data (label = embarked)" test.

Sometimes the splits are

-Infinity, 7.1833496, 7.2396, 7.6875, 7.7625, 13.20835, 15.3729, 15.74585, 56.7125, Infinity

but usually they are

-Infinity, 6.6229, 7.1833496, 7.2396, 7.6875, 7.7625, 13.20835, 15.3729, 15.74585, 56.7125, Infinity

I can't think why this might be unless something is non-deterministic or maintaining state between runs. I just ran a bunch of times and always get the second result, but know I have sometimes seen the first.

barrybecker4 avatar Aug 25 '16 14:08 barrybecker4

I've seen this a few times now. It's frustrating because its hard to reproduce and makes that one unit tests occasionally fail. I wonder if this is a concurrency issue - where the results could be different based on the order of processing some of the data.

barrybecker4 avatar Aug 26 '16 18:08 barrybecker4

Is it now fixed in the referenced PR or not?

sramirez avatar Aug 26 '16 18:08 sramirez

Unfortunately, the issue is still there (I think). I see it only occasionally on one or two of the unit tests. If you have any thoughts about how it might occur, please let mew know.

This may not be the right forum, but I have a couple of other general questions:

  • I see the warning about "The input data is not directly cached, which may hurt performance..." all the time. I'm not sure how I should be caching it in my client code. Can it be cached here if not cached elsewhere so we can avoid the warning? What causes the performance degradation?
  • I haven't had a chance to do much testing with big datasets, but I tried one with a million rows yesterday and it took longer than expected (maybe because of above issue). I also noticed that it generated a lot of splits. It may be that relaxing the stopping criterion (to like -1e-2) starts yielding a lot more splits when there are many instances - or maybe you always get more splits when lots of instances. Do you have any intuition about this?
  • The broadcast feature of spark is used a lot in the code. I don't know much about broadcast other than it sends the data to other nodes so it can be shared. If you could add any comments in the code about why its needed in the places that is used, that would help contributors. Thanks for adding this library on github. I hope it will get lots of use and eventually be incorporated into spark 2.x.

barrybecker4 avatar Aug 27 '16 13:08 barrybecker4

Hi again,

It's certainly weird, it's supposed to be fully deterministic. I'm going to check the code to look for a possible problem with concurrency. About the other topics:

  1. The client is supposed to persist the data as he/she wants to. Most of methods have reviewed in MLlib use this message to warn about data persistence. I think it's good to follow this solution. Therefore, it'd be nice to include a .cache() call in the unit tests because the algorithm is designed to be iterative in some parts.
  2. Yes, it's clearly influenced by the fact that the data is not cached, so it's re-loaded in Spark every time that a RDD variable is called twice or more times.
  3. I normally use broadcast when the variable to be shared is more complex than a simple type like integer, string, etc. Mainly, these variables are data structures like arrays or vectors. This is done so because if not the variable would be replicated by Spark in each function instance, which implies a lot of data replication. In counterpart, broadcast replication only sends a copy to each machine. I think this "philosophy" has been assumed by all Spark developers, so maybe is redundant to show this information in the code.

Thanks for your help again!

sramirez avatar Aug 29 '16 12:08 sramirez

About the determinism:

I got the following results from one Eclipse execution of the unit tests:

Tests ok: 1-5, 9.

Failing tests:

Test 6: org.scalatest.exceptions.TestFailedException: Expected "...Infinity;-Infinity, [6.6229, 6.9624996, ]7.1833496, 7.2396, 7...", but got "...Infinity;-Infinity, []7.1833496, 7.2396, 7..."

the rest:

org.scalatest.exceptions.TestFailedException: Expected "...ty;-Infinity, 1.4435[9817]E12, Infinity", but got "...ty;-Infinity, 1.4435[658]E12, Infinity"

The last one seems a problem about precision. Test 6 always outputs the same result in my environment. So maybe there is a problem with yours. Have you checked the correct solutions for 6 using a sequential implementation for MDLP?

sramirez avatar Aug 29 '16 12:08 sramirez

I will look into 6 some more. I think that is the one I have seen fail intermittently as well. Sometimes I see it give the same result many times in a row - more than if it were just random, then for no reason it will start giving a result with an extra split.

barrybecker4 avatar Aug 29 '16 13:08 barrybecker4

I just ran the tests 7 times in a row. They all passed except for the second time when I got

[info] - Run MDLPD on all columns in titanic2 data (label = embarked) *** FAILED *** [info] Expected "...Infinity;-Infinity, []7.175, 7.2396, 7.687...", but got "...Infinity;-Infinity, [6.6229, ]7.175, 7.2396, 7.687..." (MDLPDiscretizerSuite.scala:205)

Note that it gave an extra split on that particular run. I am running on Windows, but it should not matter.

barrybecker4 avatar Aug 29 '16 15:08 barrybecker4

After running all the tests 10 times each one, I get the following result in test 6:

"...Infinity;-Infinity, []7.1833496, 7.2396, 7..."

So, maybe there is a problem related to your environment. Could you check it in another SO and/or machine, please?

I'm using Debian 8.4 and Scala IDE 4.1.0 (I directly run the JUnit tests from here).

sramirez avatar Aug 31 '16 14:08 sramirez

I just ran the tests 10 times in a row after pulling and building in my Ubuntu linux VM and there were no failures at all. I have also had streaks like this on Windows, but then occasionally it will start streaking the other way (with failures). It may also have to do with what else is running on the machine at the time that I run the tests. Its definitely a strange issue.

I run the tests from the command line (using sbt assembly) and from within Intellij (using scala plugin) on windows and linux (4 different configurations total).

barrybecker4 avatar Aug 31 '16 14:08 barrybecker4

I think this issue still persists - though it is hard to reproduce. I don't know where the source of the non-determinism is, but I think it is premature to close this issue without more analysis. I've also noticed that there can be different splits produced when an attribute is binned by itself as opposed to when it is processed with a collection of numeric features.

barrybecker4 avatar Sep 08 '16 13:09 barrybecker4

I really think that this issue should be reopened. Sometimes inserting print statements in the code can cause it to run at different rates and give different splits.

barrybecker4 avatar Sep 14 '16 16:09 barrybecker4

Ok, we need a third-person to give more insights about this topic, because I haven't been able to reproduce this problem yet in my experiments.

sramirez avatar Sep 19 '16 16:09 sramirez

I will need to take a closer look at this soon. I just got a new machine and tried running the MDLP unit tests and found that one of them was failing because of this. Just yesterday, one of our automated tests started failing (I think because of this). It seems that there are some sources for on-determinism in spark for some typical operations. See http://stackoverflow.com/questions/34188834/sources-of-non-determinism-of-apache-spark

barrybecker4 avatar Oct 08 '16 10:10 barrybecker4

I haven't solved this yet, but I think I have narrowed it down. I made some local changes so that findSmallThresholds was called for all features. I noticed that I often got different splits when running the tests. In particular, I seemed to get different splits when running the tests through Intellij versus running with sbt on the command line. Then I made local changes so that findBigThresholds was called for all features. Then the unit tests always produced consistent results. My conclusion is that the source of the non-determinism is most likely in the FewValuesThresholdFinder. My next step will be to run unit tests on just FewValueThresholdFinder in isolation to see if I can reproduce the non-dterminism. This should be easy not that that logic is in a separate class.

barrybecker4 avatar Oct 08 '16 14:10 barrybecker4

I added some unit tests for the Few and ManyValuesThresholdFinders in my fork. Though I am seeing intermittent failures in the other tests, the tests I added for the Threshold finders are producing consistent results.

barrybecker4 avatar Oct 08 '16 17:10 barrybecker4

I was reading about an interesting case in scala where parallel collections can lead to non-deterministic results if a variable declared outside a closure is accumulated in the closure.

scala> list.foreach(sum += _); sum
res01: Int = 467766
scala> var sum = 0
sum: Int = 0
scala> list.foreach(sum += _); sum
res02: Int = 457073 

Could it be that something like this is happening in the evalThresholds method? I'm not sure. I didn't think a parallel collection was being used, but I don't think I fully understand what calculations are happening in different threads yet. I'll keep looking.

barrybecker4 avatar Oct 09 '16 13:10 barrybecker4

I haven't used parallel collections in this project. So, I think we can discard this hypothesis at least scala and/or spark uses parallel collection internally.

About Spark's non-determinism, if the problem comes from FewValuesThresholdsFinders, the problem has to be caused by scala, because there is no Spark code in this class.

sramirez avatar Oct 10 '16 15:10 sramirez

Another possibility might be in the initialThresholds method, but currently don't have any theories as to the source. I see the intermittent failures fairly often, but it is not reliably reproducible unfortunately. I tried things like adding some sleep calls to change the timing, but it has not helped to make it more reproducible.

barrybecker4 avatar Oct 10 '16 16:10 barrybecker4

I found one problem. Unfortunately, it does not seem to be the only one. I think there may be two issues with the initialThresholds method. The one I identified today again has to do with the presence of nulls. I started printing out the thresholds found and saw that there were some cases were there were some sequences like this

... (0,72.5), (0,NaN), (0,NaN), (1,2.00625), (1,4.50625), ....

There should not be thresholds with duplicates. If there are duplicates, the sort order will not necessarily be unique. The reason that there are duplicates is that whenever we try to find the midpoint between NaN and some other value, the result is NaN. The solution, I found, was to always take the non-null value as the midpoint if one of the two was NaN. Things seemed to work better after this change, but then I still noticed some occasional inconsistency. This change required some unit tests updates.

Next, I printed out the initial candidate thresholds found for each attribute. They were definitely different between runs. Sometimes there are extra thresholds in the list. I have a couple of thoughts.

  1. The code block in mapPartitionsWithIndex works concurrently on the partitions, so we cannot rely on the partitions being executed in order. I thought maybe we were relying on the order, but it does not seem to be the case. I get 4 partitions when running, but it depends on spark configuration.
  2. Each time you run the partitioning can be different for the same dataset. Could it be that the firstElements found outside of initialThresholds are determined using a different partitioning scheme than within. This doesn't seem to be the case, but I'm not sure yet. Any other thoughts?

Separate question: Why do we need to braodcast variables that are small, like "arr" which is just an array of boolean whose size is the number of features? I thought broadcasting would only be needed for variables that took a lot of memory.

barrybecker4 avatar Oct 10 '16 21:10 barrybecker4

I think I know what is happening and its not 1) or 2) above. For a given run, the partitions stay constant, but since the partitions may be different between runs, there can sometimes be extra candidate splits that should not be there. The problem is that the splits do not seem to be determined correctly across a partition. If the label frequencies do not vary, unique points are skipped until they do, and a split is created. However, across a partition, a candidate is always added, whether it should be or not. In the for loop of initialThresholds, there is a call to isBoundary used to tell if a candidate should be added. This call is not made for the lastPoint, but I think it should be. Unfortunately, that may be difficult because we do not know the freqs for bcFirsts.value(index + 1). Probably the label frequency information needs to be part of bcFirsts. Sound right?

barrybecker4 avatar Oct 11 '16 00:10 barrybecker4

Sergio, can you reproduce the problem by using setMaster("local[1]") in TestHelper instead of setMaster("local[4]")? When I switch to local[1] in splits the RDD into 2 partitions instead of 4, and I get different results when running the tests. In theory, the results should not change when the number of threads is changed.

barrybecker4 avatar Oct 11 '16 02:10 barrybecker4

I included the frequencies with the bcFirsts, and added the isBoundary check for the lastPoint as I describe above, but its not sufficient. I still get inconsistencies because the frequencies need to be accumulated across the partition boundary in the case when its not a new feaure, and isBoundary is false. I don't know how to pass accumFreqs to the next partition since all the partitions are being processed concurrently - so I am stuck again. Any thoughts?

barrybecker4 avatar Oct 11 '16 02:10 barrybecker4

There is one potential issue with my pull request that I just thought of. The partitioner I use when finding the initialThresholds creates partions for each feature. However, if the initialThresholds for a singe feature are very large, they may not fit into memory on a node. It would likely be better to create a more sophisticated partitioner that will at least create partitions for each feature, but will further split large feature partitions as needed. There would be two difficulties to implementing this correctly: 1) it needs to split those feature partitions consistently 2) it should preserve the accumFreqs across splits. Probably 1) is good enough, and 2) (which may not be possible with RDDs) would be nice to have. One possible way to implement this custom partitioner would be to determine the max and min values for a feature, then partition the sorted values into ranges to create additional partitions.

barrybecker4 avatar Oct 12 '16 00:10 barrybecker4

I know that last condition is not completely fair with the fact that initialThresholds should only generate boundary points, but it is known (I'm going to look for this paper) that most of discretizers are based on measures that mostly select boundary points despite some other points are among the points to evaluate. It means that the last point (or the midpoint between the last and the next one) is always added to avoid more communication between partitions. I think the final result shouldn't be too affected by this problem to be considered as a big problem. Maybe we should check if classification results change a lot or not. In my experiments, I didn't notice that. Actually, I got the same cut points than those generated by the original algorithm (developed by Fayyad).

sramirez avatar Oct 13 '16 17:10 sramirez

I don't mind if the initialThresholds include a few extra thresholds that don't need to be there, but we do need to figure out a way to make the results deterministic. My current pull request is deterministic, but it may not work well for attributes that have so many distinct values that they do not fit on a node. Were you able to reproduce the non-determinism by changing N in setMaster("local[N]") in TestHelper?

barrybecker4 avatar Oct 13 '16 21:10 barrybecker4

I still think that this kind of determinism is not too bad, and avoids all considerations we are taking into account here. But, maybe we can find a proper solution to improve the original design of this algorithm.

I don't know how to pass accumFreqs to the next partition since all the partitions are being processed concurrently

It can be done as a previous step. But it implies to launch another Spark operation over all data. Communication between partitions implies more than one stage.

It would likely be better to create a more sophisticated partitioner that will at least create partitions for each feature, but will further split large feature partitions as needed.

SortPartitioner was the best choice when I designed this algorithm, as it generates (more or less) equal-sized partitions. The problem is that you have to deal with points from different attributes. The solution I took was to allow these non-boundary points between partitions.

I've also performed some experiments, which you can find below:

  • 1 core, 3 runnings -> 4/23 errors.
  • 2 cores, 3 runnings -> 5/23 errors.
  • 4 cores, 3 runnings -> 5/23 errors.

sramirez avatar Oct 14 '16 14:10 sramirez

Maybe this link can be useful:

http://stackoverflow.com/questions/23127329/how-to-define-custom-partitioner-for-spark-rdds-of-equally-sized-partition-where#23228151

sramirez avatar Oct 14 '16 14:10 sramirez

Sergio, I fixed the scalability issue that my previous changes (to fix the non-determinism problem) had introduced. I now use a more sophisiticated partitioner that will subdivide the initial thresholds into smaller partitions if their number exceeds maxByPart. Now, no partition will ever exceed maxByPart. The unit tests still run approximately as fast as they did before this change. I added unit tests for the logic which finds the initial thresholds. Please review and merge the pull request when you can. If all looks good, I think you could now craete the release.

barrybecker4 avatar Oct 22 '16 17:10 barrybecker4

I was running some performance tests with larger datasets. Things seemed to scale ok when using the FewValuesFinder, but not the ManyValuesFinder. I upgraded to spark 2.1.0 and tried restoring the coalesce statement in ManyValuesFinder, but now unit tests are failing again with non-deterministic results. It seems there is still a problem with that coalesce statement when using spark 2.1.0.

barrybecker4 avatar Feb 01 '17 18:02 barrybecker4

When I added back the coalesce it took 440 seconds on to induce the NB model for my test dataset. It was only taking 164 seconds before I added back the coalesce. In addition the unit tests fail intermittently with the coalesce in. For now, I am leaving it out.

barrybecker4 avatar Feb 01 '17 20:02 barrybecker4