dispy icon indicating copy to clipboard operation
dispy copied to clipboard

Sheduling issues with SharedJobCluster

Open mknambiar opened this issue 5 years ago • 0 comments

Hi,

(Apologies in advance for the poor formatting - I am unable to control why some text appears in bold and why it doesnt sometimes)

I am facing a problem with two streams of map reduce jobs. Making an attempt to explain as clearly as possible.

I have two map reduce streams. Stream 1 functions : map1 , reduce1 Stream 2 functions: map2, reduce2

The streams themselves need to be scheduled serially stream 1, Stream 2, Stream1 , Stream 2 ………. and so on till all streams terminated

In both cases map will have many parallel jobs, but there is only one instance of a reduce job. I expect many servers to be available for running the map jobs in parallel.

Now here's the catch. Stream 1 has to run after Stream 2. To maximize the server resources I would like to start running map2 instances on nodes or cpus within nodes which have no map1 instances to run - only one node is going to run reduce1.

Another requirement is that reduce1 has to run immediately after the last map1 job has finished. It should not be queue behind the map2 jobs(from stream 2) if they have been submitted.

I realized from the documentation that JobCluster will not suit my purpose as the nodes will execute jobs in order of submission. They would not give priority to reduce1 if map2 jobs have been submitted prior to it.

So decided to try my luck with SharedjobCluster, with dispyscheduler invoked with options - "--cooperative --early_cluster_scheduler"

New clusters are created for each instance of the stream.

I used the dispy api to start clusters for the streams as follows

For - Stream 1, Stream 2, Stream1 , Stream 2, Stream 1, Stream 2 ……

The cluster numbers are - cluster 1, cluster 2, cluster 3, cluster 4, cluster 5, cluster 6 ………..

Each cluster is closed after last job in the stream (the reduce job) completes.

All this is coded into the attached file sched.py sched.zip

I run my setup in 3 terminal windows as follows - everything on my Ubuntu 18.04 virtual machine.

Environment -

(base) manoj@Ubuntu-MSHV-Virtual-Machine:/Documents/PacManSched$ pip show dispy Name: dispy Version: 4.11.0 Summary: Distributed and Parallel Computing with/for Python. Home-page: http://dispy.sourceforge.net Author: Giridhar Pemmasani Author-email: [email protected] License: Apache 2.0 Location: /home/manoj/anaconda3/lib/python3.7/site-packages Requires: pycos Required-by: (base) manoj@Ubuntu-MSHV-Virtual-Machine:/Documents/PacManSched$ uname -a Linux Ubuntu-MSHV-Virtual-Machine 4.15.0-62-generic #69-Ubuntu SMP Wed Sep 4 20:55:53 UTC 2019 x86_64 x86_64 x86_64 GNU/Linux (base) manoj@Ubuntu-MSHV-Virtual-Machine:/Documents/PacManSched$ python -V Python 3.7.3

Commands I run -

Terminal 1 : dispynode.py -c 2 -d -i 127.0.0.1 --clean --debug --force_cleanup Terminal 2 : dispyscheduler.py -i 127.0.0.1 --cooperative --early_cluster_scheduler --clean --debug --cleanup_nodes Terminal 3 : python sched.py

Observations -

If my application has to succeed then all the streams will run in sequence and the program will terminate.

But this happens roughly only 50% of the time.

Many a time I see an entire cluster failing on dispyscheduler like this . . 2019-09-16 10:12:51 dispyscheduler - New computation 139651049625024: relegate, /tmp/dispy/scheduler/127.0.0.1/relegate_r55abhr2 2019-09-16 10:12:51 dispyscheduler - Submitted job 139651049487120 / 1568608971.6798518 2019-09-16 10:12:51 dispyscheduler - Running job 139651060019872 on 127.0.0.1 (busy: 2 / 2) 2019-09-16 10:12:51 dispyscheduler - Running job 139651049487720 on 127.0.0.1 (busy: 2 / 2) 2019-09-16 10:12:51 dispyscheduler - Submitted job 139651060019632 / 1568608971.6837986 2019-09-16 10:12:51 dispyscheduler - Submitted job 139651049488080 / 1568608971.6853924 2019-09-16 10:12:51 dispyscheduler - Transfer of computation "relegate" to 127.0.0.1 failed 2019-09-16 10:12:51 dispyscheduler - Submitted job 139651049487480 / 1568608971.687945 2019-09-16 10:12:51 dispyscheduler - Failed to setup 127.0.0.1 for computation "relegate": -1 . .

The cluster fails sometimes with not a single job run. The lines in between "New computation …" and "Failed to setup …" belong to the previous cluster

I look at the displynode output and I correlate it with the following lines

. . . 2019-09-16 10:12:51 dispynode - New computation "939c7c52a9b683a0e1be5f74b70397790961dcbf" from 127.0.0.1 2019-09-16 10:12:51 dispynode - Sending result for job 139651049487720 (11) 2019-09-16 10:12:51 dispynode - Sending result for job 139651060019872 (11) 2019-09-16 10:12:51 dispynode - Computation "939c7c52a9b683a0e1be5f74b70397790961dcbf" from 127.0.0.1 done . . .

The "…done" message for the cluster at dispynode with not a single job from it executed. In this case also the lines between "New computation …" and "…done" belong to the previous cluster

Apart from this I sometimes I see dispynode troubles with the following error message sometimes

Exception in thread Thread-2: Traceback (most recent call last): File "/home/manoj/anaconda3/lib/python3.7/threading.py", line 917, in _bootstrap_inner self.run() File "/home/manoj/anaconda3/lib/python3.7/threading.py", line 865, in run self._target(*self._args, **self._kwargs) File "/home/manoj/anaconda3/bin/dispynode.py", line 2327, in __reply_Q proc.join(2) File "/home/manoj/anaconda3/lib/python3.7/multiprocessing/process.py", line 139, in join assert self._popen is not None, 'can only join a started process' AssertionError: can only join a started process


Some inferences -

  • The cluster seems to be failing because dispynode is refusing the job. Again this does not happen all the time.
  • Here it looks like the entire cluster is failing. I would like to be able to catch this exception and resubmit the cluster, but dispy does not seem to have a facility for catching the exception. Even the cluster_status callback function does not give a cluster status per se, but only a job status and the node status. There is no notification even to the callback function observed used for job level callback.

Requests -

I need help with the following

  • Get an idea why dispynode is failing ocassionally - then I could find a way to prevent it.
  • Get an exception in cluster_status callback function for the cluster itself - so I can close the cluster and restart it.

Manoj

mknambiar avatar Sep 16 '19 07:09 mknambiar