submitit icon indicating copy to clipboard operation
submitit copied to clipboard

Command for after srun executes

Open jgbos opened this issue 4 years ago • 14 comments

I utilize the setup parameter now to include running commands in the sbatch script before running srun, but is it possible to add commands to be executed after srun?

jgbos avatar Aug 12 '20 05:08 jgbos

It's not possible for now but should be easy enough to add if need be. What's your use case though? In case it can be dealt with in any other way

jrapin avatar Aug 12 '20 08:08 jrapin

We are adding logging scripts to monitor load, memory, gpu stats, etc via dstat and dcgmi. I need to start the logging before srun and then shut them down afterwords.

jgbos avatar Aug 12 '20 13:08 jgbos

FYI, I wrote a wrapper around my cmd to run my commands in a subprocess before and after the actual command on the rank zero node. This seems to be working.

jgbos avatar Aug 13 '20 14:08 jgbos

Ok, more questions: Would you need it for all executors or for slurm only? (setup is slurm only) Could that be performed in Python or does this need to ba in commandlines? I am thinking of some kind decorator/context manager

jrapin avatar Aug 17 '20 11:08 jrapin

typically ContextDecorator would seem to be a good fit. What do your logs look like? If you have something robust maybe we can include it as a helper in submitit eventually

jrapin avatar Aug 17 '20 11:08 jrapin

So these logging scripts are not slurm specific, but I only have access to slurm right now. I like the decorator idea. Here is the class I made to wrap my cmd function

class JobMonitor(object):
    """JobMonitor will monitor memory, cpu, and gpu states.

    The monitors must be
    """
    def __init__(self, fn):
        self.fn = fn

    def __call__(self, *args, **kwargs):
        import os
        rank_zero = False
        if 'SLURM_PROCID' in os.environ:
            rank = int(os.getenv('SLURM_PROCID'))
            if rank == 0:
                rank_zero = True

        if rank_zero:
            self.before_srun()

        output = self.fn(*args, **kwargs)

        if rank_zero:
            self.post_srun()

        return output

    def before_srun(self):
        import subprocess
        subprocess.Popen(['dstat', '-cdlmnr', '--output', 'monitor_dstat_logfile.csv'])
        subprocess.Popen(['dcgmi', 'group', '-c', 'allgpus', '--default'])
        subprocess.Popen(['dcgmi', 'stats', '-g', '2', '-e'])
        subprocess.Popen(['dcgmi', 'stats', '-g', '2', '-s', '$SLURM_JOBID'])

    def post_srun(self):
        import subprocess
        subprocess.Popen(['dcgmi', 'stats', '-x', '$SLURM_JOBID', '-v'])
        with open('monitor_gpu_logfile', 'w') as outfile:
            subprocess.run(['dcgmi', 'stats', '-j', '$SLURM_JOBID', '-v'], stdout=outfile)
        subprocess.run(['kill', '$(jobs -pr)'])

jgbos avatar Aug 19 '20 03:08 jgbos

This looks like it can most definitely be framed as a contextmanager decorator indeed. How standard are dstat and dcgmi? I am not familiar with this. I heard about python plugins that may be able to do the job as well (although it may not necessarily be simpler since we need to spawn processes can you provide a sample of an output?

jrapin avatar Aug 20 '20 08:08 jrapin

I'm not too familiar with the commands as I'm was asked to include these logs for someone else interested in them. I think dstat can be installed on any linux machine and dcgmi is an nvidia product. The dstat output is a text file of essentially a top like output with CPU and Memory usage over time. Similarly for dcgmi, but for gpus. I'm swamped with some other stuff, but I'll try to get you a sample output soon.

jgbos avatar Aug 22 '20 03:08 jgbos

I've given it a try. If this works for you so far I think it's a good option for now. I have a few minor concerns for border cases if we want to make it work at a bigger scale:

  • dstat seems to be very verbose and for days-long jobs may become huge
  • would be cool to have the generated files in the job folder, I don't think it is feasible automatically for now (this information is not available in the job environment), but that can be improved easily. In the meantime this looks like all jobs log to the same files :s
  • a context manager could be better as it will handle errors (but the one in contextlib does not seem simpler than your code in the end, it's pretty much the same with a few minor differences, so it's simpler to build from your code see below).
  • with or without context manager, I'm a bit concerned by what happens in case of cancellation/timeout/preemption, since the "post" method may not be called. that may need some improvements on submitit side.
  • the decorator needs to forward the checkpoint method of the callable if there is one, to handle preemption. Right now, that requires a ugly hack :s
  • I don't like Popen, then again, I don't know if there's anything better.
  • I am wondering if it could be worth having a submitit specific context manager system, that could handle this and more (I'm thinking on copying the work directory to run somewhere else, that's been a concern for many people to avoid bugs)

Below is the code I am using to test, it does not work perfectly and a bit too verbose (probably because I'm trying to avoid Popen). I'll make more tests later if you are not in any hurry now.

  import subprocess
  import submitit

  class with_stats:

      def __init__(self, func):
          self._func = func

      def __call__(self, *args, **kwargs):
          with self:
              output = self._func(*args, **kwargs)
          return output

      def checkpoint(self, *args, **kwargs):
          if not hasattr(self._func, "checkpoint"):
              return None
          output = self._func.checkpoint(*args, **kwargs)
          output.function = self.__class__(function)  # ugly hack
          return output

      def __enter__(self):
          print('Starting')
          env = submitit.JobEnvironment()
          if not env.global_rank:
              self.process = subprocess.Popen(['dstat', '-cdlmnr', '--output', 'monitor_dstat_logfile.csv'])
              subprocess.run(['dcgmi', 'group', '-c', 'allgpus', '--default'])
              subprocess.run(['dcgmi', 'stats', '-g', '2', '-e'])
              subprocess.run(['dcgmi', 'stats', '-g', '2', '-s', str(env.job_id)])
          return self

      def __exit__(self, *exc):
          env = submitit.JobEnvironment()
          if not env.global_rank:
              subprocess.run(['dcgmi', 'stats', '-x', str(env.job_id), '-v'])
              with open('monitor_gpu_logfile', 'w') as outfile:
                  subprocess.run(['dcgmi', 'stats', '-j', str(env.job_id), '-v'], stdout=outfile)
              self.process.terminate()
          print('Finishing')
          return False

jrapin avatar Aug 24 '20 12:08 jrapin

is it possible to add commands to be executed after srun?

Given the rest of the discussion this is not needed anymore ? I don't think I see a good usecase for this. Any command can be run from python, so you should be able to run you commands inside the submitted function.

def run():
  env = submitit.JobEnvironment()
  with dstat('monitor_dstat_logfile.csv'):
      dcgmi(env.job_id)
      return train_model()

As outlined by Jérémy the issue is that it is a bit verbose to wrap an existing Checkpointable. Maybe the Checkpointable should have more hooks on_startup, on_restart, on_timeout, on_job_done ... but this begin to be very frameworkish.

gwenzek avatar Aug 25 '20 13:08 gwenzek

with or without context manager, I'm a bit concerned by what happens in case of cancellation/timeout/preemption, since the "post" method may not be called.

I think it works fine, at least with a LocalExecutor, haven't confirmed on SLURM yet.

def test_cleanup_is_done_on_kill(tmp_path: Path) -> None:
    def touch_then_cleanup(tmp_file: Path):
        tmp_file.touch()
        try:
            time.sleep(10)
        finally:
            tmp_file.unlink()

    executor = local.LocalExecutor(tmp_path)

    job = executor.submit(touch_then_cleanup, tmp_path / "cancel.txt")
    time.sleep(1)
    assert (tmp_path / "cancel.txt").exists()
    job.cancel()
    time.sleep(1)
    assert not (tmp_path / "cancel.txt").exists()

    job = executor.submit(touch_then_cleanup, tmp_path / "timeout.txt")
    time.sleep(1)
    assert (tmp_path / "timeout.txt").exists()
    job._interrupt(timeout=True)  # type: ignore
    time.sleep(1)
    assert not (tmp_path / "timeout.txt").exists()

    job = executor.submit(touch_then_cleanup, tmp_path / "interrupt.txt")
    time.sleep(1)
    assert (tmp_path / "interrupt.txt").exists()
    job._interrupt()  # type: ignore
    time.sleep(1)
    assert not (tmp_path / "interrupt.txt").exists()

gwenzek avatar Aug 25 '20 14:08 gwenzek

the decorator needs to forward the checkpoint method of the callable if there is one, to handle preemption. Right now, that requires a ugly hack :s

I don't think it's ugly, it's looks like regular Monad stuff for me: checkpoint(with_stats(f)) := with_stats(checkpoint(f))

gwenzek avatar Aug 25 '20 14:08 gwenzek

I think it works fine, at least with a LocalExecutor, haven't confirmed on SLURM yet.

I've seen weird stuff on slurm, seemed to work OKish for preemption/timeout, although I'm a bit concerned that the spawned process does not get killed directly and my delay the completion, and it probably does not for cancellation (then again, handling cancellation may not be a good idea)

I don't think it's ugly

OK maybe it's alright (not familiar with monad stuff though), then again it may require some helpers to ease development.

Given the rest of the discussion this is not needed anymore ?

In my opinion it's much more generic with decorators so probably not needed anymore, though

Maybe the Checkpointable should have more hooks

I'm wondering about it, or to have another objects for setup/teardown. That may (or may not) be useful for copying the directory when submitting a function... I'm wondering if we could make some decorator-like framework work for this as well since it is a major concern of many users

jrapin avatar Aug 26 '20 11:08 jrapin

Dear all, First, let me thank you very much for your work. I am posting here my issue because it is related to the current one, hence it seems the most appropriate place.

I am using submitit through the hydra plugin to run deep learning experiment on a cluster. The nodes of the cluster do not have access to the internet for safety matters. I would like to upload a posteriori logs of my experiments to a platform like wandb, from other specific nodes that can actually connect to external servers. A possible solution relying only on slurm would be to execute a bash script containing multiple sbatch commands specifying the dependency between jobs via the --dependency:afterwork option. How can I achieve the same using submitit and the associated plugin in hydra?

LTMeyer avatar May 05 '21 09:05 LTMeyer