spark icon indicating copy to clipboard operation
spark copied to clipboard

Added stageId <--> jobId mapping in DAGScheduler

Open markhamstra opened this issue 12 years ago • 33 comments

I'm not completely certain that this is ready to be merged yet, but I think it is ready to gather some comments.

Outside of DAGScheduler.scala, the changes are almost entirely just renaming -- I resolved the confusion of the notions of priority and job identity by consistently relabeling old, FIFO-only-based usages of "priority" to now be "jobId". The old usage of "priority" was misleading since we now have the fair scheduler infrastructure that doesn't use the old "priority" to order priority except to break what would otherwise be priority ties. Now we can speak consistently about jobs having jobIds that are unique and always increasing. Stages contain the jobId rof the job which was responsible for creating them. Again, all of that is really just renaming or thinking about things slightly differently without changing anything functionally.

There are also a few easy changes within DAGScheduler.scala that just suppress a bunch of code analyzer warnings in IntelliJ -- I got tired of looking at them.

The bigger changes in the DAGScheduler have to do with the new jobIdToStageIds and stageIdToJobIds maps and the associated methods to build them up and tear them down. (I also renamed idToStage to stageIdToStage to be more consistent and less ambiguous.) The stageIdToJobIds map gets built up as new stages are created, the jobIdToStageIds map gets sync'ed-with/built-up from stageIdToJobIds when jobs are submitted, and both maps are modified in concert when jobs are completed or aborted.

There's a fair amount of walking through entire data structures to create and maintain these maps, so I was concerned about the performance hit. I haven't rigorously tested just the relative performance of the DAGScheduler in isolation with and without these changes, but I have closely watched the total time reported by a complete run of Spark's unit tests, and there appears to be no significant difference -- a complete run takes almost 13.5 minutes when my machine is fully warmed up, and the difference with the new stageId <--> jobId mapping is only a couple of seconds. Some of that is probably because, while a lot more is done when a new stage is created, that actually results in the short-circuiting of later calls to, e.g., getShuffleMapStage.

The last bits outside the DAGScheduler have to do with SparkListenerJobStart, which now sends along an array containing all of the stageIds that the job depends on.

With all of this in place, it should now be simple for the WebUI to aggregate the appropriate stages to report progress and other information for an entire job, and it should also be easy for Ram's killJob stuff to kill all of the stages associated with a job while not killing any that are also associated with another job.

From looking at some instrumented output, I'm fairly certain that I've got things at least close to right. I don't think this is quite ready to merge yet, since the JobLogger still probably needs to be tweaked, and I need to come to an understanding of why stages seem to be left alive by the ThreadingSuite. Outside of that, this new code seems to do a nice job preventing the sizes of various DAGScheduler data structures from growing without bound, and does so while not breaking any of the unit tests.

markhamstra avatar Aug 16 '13 03:08 markhamstra

Thank you for submitting this pull request.

All automated tests for this request have passed.

Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/617/

AmplabJenkins avatar Aug 16 '13 06:08 AmplabJenkins

Thank you for submitting this pull request.

All automated tests for this request have passed.

Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/619/

AmplabJenkins avatar Aug 16 '13 07:08 AmplabJenkins

Thanks for doing this, Mark.

I'd actually recommend you to separate this as two pull requests since they are fairly independent. The renaming pull request should be much easier to merge, while the other one would require a lot of testing. Note that unit test runtime by no means test the scheduler's throughput, since the runtime is dominated by failure recovery, shuffle, and other things.

I believe in the perf benchmark suite Patrick has been working on (going to open source soon or have already open sourced) includes a scheduler throughput test that just launches a large number of empty tasks. We should use that one to test the scheduler throughput here.

rxin avatar Aug 16 '13 07:08 rxin

No disagreement that just timing the unit tests isn't an adequate measure of the change in scheduler performance. I was just using what I had as a rough check that things hadn't gone horribly wrong. If Patrick's suite can be used to do some better testing, that's great.

markhamstra avatar Aug 16 '13 10:08 markhamstra

UPDATE: I broke out renaming and other minor changes into a separate pull request; so after https://github.com/mesos/spark/pull/844 is merged, this one reduces to just the stageId <--> jobId mapping stuff.

markhamstra avatar Aug 16 '13 20:08 markhamstra

So, a few things worth talking about:

  1. This PR essentially boils down to moving at least part of the logic from the JobLogger directly into the DAGScheduler.
  2. At the moment, that work is duplicated in both DAGScheduler and JobLogger, but JobLogger should be trimmed down once we decide exactly what gets done in DAGScheduler.
  3. JobLogger originally ran in its own thread; now runs via the new async SparkListener handling, so again in a separate thread. This PR brings some of that work directly into the scheduler, with obvious performance concerns. However, if testing bears out that the performance hit is not too severe, then I think the change is worth it to not just track the jobs <--> stages relationship, but also to control the growth of DAGScheduler data structures, and doing so while keeping the concurrency issues manageable.
  4. Enough of the jobs <--> stages information needs to be sent out from the DAGScheduler to SparkListeners so that they don't need to duplicate much of the work in order to get all the information they need. Right now, I'm just sending out an array of associated StageIds within a SparkListenerJobStart, but that may not be enough information exported or the best way of making the information available to SparkListeners.
  5. Right now, this PR is maximally aggressive about cleaning up when a job completes or aborts and a stage is not part of another job. That's probably not the right thing to do if another job will soon launch and want to make use of one of the earlier job's stages. Putting the no-longer-in-a-job stages onto a queue for a while (how long?) before cleaning them out of the DAGScheduler data structures may make sense as a way to give other jobs a chance to re-use old stages.

markhamstra avatar Aug 16 '13 20:08 markhamstra

Thank you for submitting this pull request.

All automated tests for this request have passed.

Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/630/

AmplabJenkins avatar Aug 16 '13 21:08 AmplabJenkins

Thank you for submitting this pull request.

All automated tests for this request have passed.

Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/638/

AmplabJenkins avatar Aug 17 '13 01:08 AmplabJenkins

Thank you for submitting this pull request.

All automated tests for this request have passed.

Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/674/

AmplabJenkins avatar Aug 20 '13 08:08 AmplabJenkins

Thank you for submitting this pull request.

All automated tests for this request have passed.

Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/676/

AmplabJenkins avatar Aug 20 '13 18:08 AmplabJenkins

Hey Mark, I made a few comments on the PR. Is this still a work in progress on your side as you said in the original message, or is it mostly done?

mateiz avatar Aug 22 '13 17:08 mateiz

Still work in progress. I'm adding "data structures are now empty" assertions at the end of the tests in DAGSchedulerSuite and need to do some work still on shuffleToMapStage in general and the handling of clean up in the zero partition and local job cases.

After that, it should be down to addressing the five points in my "things worth talking about" list.

markhamstra avatar Aug 22 '13 18:08 markhamstra

Updated. Still WIP.

markhamstra avatar Aug 22 '13 20:08 markhamstra

Thank you for submitting this pull request.

All automated tests for this request have passed.

Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/697/

AmplabJenkins avatar Aug 22 '13 21:08 AmplabJenkins

Thank you for submitting this pull request.

All automated tests for this request have passed.

Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/699/

AmplabJenkins avatar Aug 23 '13 00:08 AmplabJenkins

Thank you for submitting this pull request.

All automated tests for this request have passed.

Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/700/

AmplabJenkins avatar Aug 23 '13 01:08 AmplabJenkins

UPDATED: The DAGSchedulerSuite now includes checks in all tests except the "zero split job", asserting that all DAGScheduler data structures are empty at the end of each test job. These were mostly fairly easy, but cleaning up shuffleToMapStage is considerably more involved, since it is not just internal to the DAGScheduler, but also involves external state in the MapOutputTracker.

In order to reuse map outputs that are still available for a stage that has previously been cleaned out of the DAGScheduler (and avoid re-submitting such stages), I've split DAGScheduler.newStage into two pieces: the first to initialize new stages, the second to re-use or re-constitute stages using output locations available in the mapOutputTracker.

Along the way, I also had to fix SPARK-864. That bug resulted from trying to submit stages and missing tasks for a job that is no longer active (thus failing in the idToActiveJob(stage.jobId) lookup when submitMissingTasks tries to get appropriate properties for the job.) With this PR, the properties for the earliest job associated with the stage are used, or the stage is aborted if there are no longer any active jobs associated with the stage.

With the exception of handling zero-partition RDDs and passing sufficient information to SparkListeners, I now consider this PR to be feature complete, and I'll move on to performance regression testing.

markhamstra avatar Aug 27 '13 00:08 markhamstra

Thank you for submitting this pull request.

All automated tests for this request have passed.

Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/732/

AmplabJenkins avatar Aug 27 '13 01:08 AmplabJenkins

Thank you for submitting this pull request.

All automated tests for this request have passed.

Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/733/

AmplabJenkins avatar Aug 27 '13 01:08 AmplabJenkins

Thank you for submitting this pull request.

Unfortunately, the automated tests for this request have failed.

Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/747/

AmplabJenkins avatar Aug 28 '13 00:08 AmplabJenkins

I've got spark-perf numbers:

scheduling-tput --num-tasks=10000 --num-trials=10 --inter-trial-wait=30,

baseline: 4.955, 0.156, 4.539, 4.955, 4.766 PR842: 5.016, 0.106, 4.786, 5.071, 4.892

scala-agg-by-key --num-trials=10 --inter-trial-wait=30 --num-partitions=1 --reduce-tasks=1 --random-seed=5 --persistent-type=memory --num-records=200 --unique-keys=1 --key-length=10 --unique-values=1 --value-length=10,

baseline: 0.047, 0.002, 0.045, 0.049, 0.047 PR842: 0.049, 0.003, 0.045, 0.050, 0.049

scala-sort-by-key --num-trials=10 --inter-trial-wait=30 --num-partitions=1 --reduce-tasks=1 --random-seed=5 --persistent-type=memory --num-records=20 --unique-keys=1 --key-length=10 --unique-values=1 --value-length=10,

baseline: 0.045, 0.004, 0.041, 0.046, 0.047 PR842: 0.046, 0.003, 0.041, 0.048, 0.052

scala-count --num-trials=10 --inter-trial-wait=30 --num-partitions=1 --reduce-tasks=1 --random-seed=5 --persistent-type=memory --num-records=200 --unique-keys=1 --key-length=10 --unique-values=1 --value-length=10,

baseline: 0.018, 0.002, 0.017, 0.022, 0.017 PR842: 0.018, 0.003, 0.017, 0.025, 0.018

scala-count-w-fltr --num-trials=10 --inter-trial-wait=30 --num-partitions=1 --reduce-tasks=1 --random-seed=5 --persistent-type=memory --num-records=200 --unique-keys=1 --key-length=10 --unique-values=1 --value-length=10,

baseline: 0.022, 0.002, 0.02, 0.025, 0.020 PR842: 0.024, 0.003, 0.018, 0.025, 0.020

markhamstra avatar Aug 29 '13 21:08 markhamstra

Thank you for submitting this pull request.

Unfortunately, the automated tests for this request have failed.

Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/788/

AmplabJenkins avatar Sep 01 '13 03:09 AmplabJenkins

Cool, that looks good! I'd like to wait until 0.8.0 is out to merge this but it's definitely appreciated.

mateiz avatar Sep 01 '13 04:09 mateiz

It's definitely getting there. I'm curious why DriverSuite is failing at least some of the time and why the Python tests are frequently complaining about tasks still pending for a stage even though there are no more jobs needing that stage. Not sure yet whether either of these are real bugs or just test artifacts. I'll look through some worker logs looking for more such logError messages, but I'll go out on a limb and claim that any such messages are as likely the result of weaknesses in existing code that the new checks are revealing as they are the result of mistakes in this PR.

Anyway, I still haven't resolved the handling of zero-partition RDDs by the DAGScheduler, so those weirdos will continue to cause data structure bloat -- but no worse than they were before, so still a large net reduction in bloat for all other circumstances. The other question is just what the JobLogger and WebUI need in order to avoid duplicating reference tracking work. Probably should be sending both jobIdToStageIds and stageIdToJobIds out to SparkListeners on job start. Each of these remaining issues can easily and without harm be resolved in follow-on pull requests, I think.

markhamstra avatar Sep 01 '13 04:09 markhamstra

Rebased and fixed some typos and grammatical niggles.

markhamstra avatar Sep 01 '13 05:09 markhamstra

Thank you for submitting this pull request.

All automated tests for this request have passed.

Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/792/

AmplabJenkins avatar Sep 01 '13 06:09 AmplabJenkins

Addressed the pendingTasks bloat -- it wasn't just for Python tests, but for every job. I think I got the life-cycle issues correct (i.e. will always be a set of task associated with stage when handling a successful task completion, and it's possible that the stage -> task set may already have been removed when a task is resubmitted), but take a close look @mateiz .

UPDATE: Changed my mind. Better and much simpler to leave handleTaskCompletion alone, to expect pendingTasks(stage).isEmpty when a stage completes, and to handle that expectation and final cleanup in removeStage.

markhamstra avatar Sep 02 '13 05:09 markhamstra

Thank you for submitting this pull request.

All automated tests for this request have passed.

Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/805/

AmplabJenkins avatar Sep 02 '13 05:09 AmplabJenkins

Thank you for submitting this pull request.

Unfortunately, the automated tests for this request have failed.

Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/812/

AmplabJenkins avatar Sep 02 '13 22:09 AmplabJenkins

Thank you for submitting this pull request.

Unfortunately, the automated tests for this request have failed.

Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/813/

AmplabJenkins avatar Sep 02 '13 22:09 AmplabJenkins