pyiron_base
pyiron_base copied to clipboard
Status and fetching of remote jobs
Jobs executed on a remote cluster can not run collect, because they are not found in the filetable. So jobs get the status "initialized", independent if they ran successfully or not. Also update_from_remote() and wait_for_jobs are not fetching the jobs because of this issue. Instead they have to be manually downloaded. The manual download also comes with several issues because the jobs status does not work. It is hard to filter the jobs that already finished, when everything has the status "intialized". If a job is fetched that is not finished yet it will delete the working directory of the job on the cluster, so the job will fail completely.
IMO a more robust way to fix the issue of fetching is to simply download everything that can't be found in the remote squeue job table anymore instead of looking at the jobs status, which f.e. also breaks when the job runs into a time out. Would it be possible to create a db table "is_remote" that simply stores all ids of jobs that have been transferred? When they are fetched back they could be deleted from the table so no complex status checking on the cluster would be required, but instead a simple check if the job is still listed with squeue would suffice. This could probably even be implemented as part of pysqa so that it does not mess with anything pyiron specific.
Error backtrace:
2022-07-12 21:19:06,128 - pyiron_log - INFO - run job: Cook1_1000 id: 749443, status: collect
Traceback (most recent call last):
File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/job/generic.py", line 735, in run
self._run_if_collect()
File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/job/generic.py", line 1303, in _run_if_collect
run_job_with_status_collect(job=self)
File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/job/runfunction.py", line 194, in run_job_with_status_collect
job.project.db.item_update(job._runtime(), job.job_id)
File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/job/generic.py", line 1375, in _runtime
start_time = self.project.db.get_item_by_id(self.job_id)["timestart"]
File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/database/filetable.py", line 118, in get_item_by_id
return {
File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/database/filetable.py", line 119, in <dictcomp>
k: list(v.values())[0]
IndexError: list index out of range
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/nl77pupy/miniconda3/lib/python3.9/runpy.py", line 197, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/home/nl77pupy/miniconda3/lib/python3.9/runpy.py", line 87, in _run_code
exec(code, run_globals)
File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/cli/__main__.py", line 3, in <module>
main()
File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/cli/__init__.py", line 63, in main
args.cli(args)
File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/cli/wrapper.py", line 31, in main
job_wrapper_function(
File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/job/wrapper.py", line 171, in job_wrapper_function
job.run()
File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/job/wrapper.py", line 124, in run
self.job.run_static()
File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/job/generic.py", line 769, in run_static
execute_job_with_external_executable(job=self)
File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/job/runfunction.py", line 508, in execute_job_with_external_executable
job.run()
File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/generic/util.py", line 213, in decorated
return function(*args, **kwargs)
File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/job/generic.py", line 749, in run
self.drop_status_to_aborted()
File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/job/generic.py", line 1238, in drop_status_to_aborted
self.refresh_job_status()
File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/job/generic.py", line 455, in refresh_job_status
initial_status=self.project.db.get_job_status(self.job_id),
File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/database/filetable.py", line 317, in get_job_status
return self._job_table[self._job_table.id == job_id].status.values[0]
IndexError: index 0 is out of bounds for axis 0 with size 0
Extending on the id of the pysqa based fetching: A database with 3 columns: directory, remote_id, local_id would allow to implement everything in a more or less pyiron independent way in pysqa directory could store the working directory that is extended with the local and remote paths stored in config, local id in this case the pyiron id and remote id the queueing system id. Than all the filtering that is currently done and causing so many errors is not necessary anymore. Instead project can just dispatch fetching completely to pysqa which could a return a list with all fetched local ids that can be used to update the status in the local database.
I can imagine that this stuff is a significant obstacle to the usability of pyiron in this configuration that users will notice very quickly. I'll work on this soon, since I'll also be using this in the somewhat near future.
I suppose the task list to completely unconvolute the job mess that the remote cluster configuration experiences would be something like this:
- [ ] Add an "if" branch to detect if pyiron is running a remote cluster - local database configueration
- [ ] If remote configuration is detected, create a local database file that is checked frequently (cron-like?).
Enable a complete separate set of job status code which:
-
[ ] a) Writes a file to the local machine containing a database that contains the jobs, their directory paths on the remote cluster for copying back.
-
[ ] b) Relies only on the
squeue -u username
command being called locally via some pyiron method (periodically? user-triggered?, let the user decide which config they want) to detect whether a job is running or not via detection in the queue job table on the remote cluster. -
[ ] c) write the squeue query output to local and compare differences to previous squeue query.
This squeue query will contain information on:
- The number of jobs running on the system
- The name/ID of jobs running/queued on the system
- The time this file was created
- [ ] Obviously, this needs a failsafe for queueing limits (# of jobs). The simplest and best way to do this is to let the user specify how many jobs are allowed on the queue themselves. Then, pyiron will refuse submission of any number of jobs past this number. This can be achieved by pyiron calling squeue if it detects that it has been a certain time since the last squeue file update before a submission.
- [ ] pr.remote_continue_submission() method that continues submission of jobs in a project if # of jobs in a project exceeds queueing limit.
Things that would be nice to have:
- [ ] For jobs, check if hdf5 output exists. This should be a easy way to check if the job completed or not. Then, remove large user-specifiable files (WAVECAR/CHG/CHGCAR for vasp) and tar with some name like
INCOMPLETE-*
before fetching back to local.
Let me know if I missed anything or if you see any huge barriers to implementation before I start in a few weeks.
@Leimeroth I made some progress in this direction with https://github.com/pyiron/pyiron_base/pull/878 and https://github.com/pyiron/pysqa/pull/143 Can you check if this solves this issue?
The errors are still the same on the cluster
IndexError: list index out of range
because get_item_by_id returns a probably empty dict
and
IndexError: index 0 is out of bounds for axis 0 with size 0
because the job is not found in the get_job_status function.
Per our meeting on Monday, @jan-janssen 's explanation of why the implementation that @Leimeroth has suggested is incompatible with pyiron is because there are certain types of jobs that can spawn additional jobs in pyiron.
However, I am not convinced that the assignment of pi_None as the jobname is necessary, which forces the workaround to go through the slurm SQUEUE command to do unique job matching through directory matching. Do note that this will not work for all queueing systems, anyway.
Why can't the jobnames have appended something in the case of jobs which spawn further jobs? For example, pi_JOBID_SUBJOBID in the case of remote jobs without a database? Is this not a reasonable solution to the problem of jobs which can spawn further jobs?
Then the only check that needs to occur is to see if there are multiple underscores in the job_name to reconcile it with the database.
https://github.com/pyiron/pyiron_base/pull/1067/files Could you try with the changes made in this PR?
Works at least for simple jobs :)
Advanced jobs still fail. When I try to run a Murnaghan job for example:
ref= pr.create.job.Vasp("Ref")
ref.structure = pr.create.structure.bulk("Cu", cubic=True)
ref.server.cores = 12
murn = pr.create.job.Murnaghan("MurnTest", delete_aborted_job=True)
murn.ref_job = ref
murn.server.cores = 96
murn.server.queue = "short_l2"
murn.input["num_points"] = 8
murn.run()
the following error occurs:
100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<00:00, 68.28it/s]
/nfshome/leimeroth/git_projects/pysqa/pysqa/utils/remote.py:240: UserWarning: mkdir: missing operand
Try 'mkdir --help' for more information.
warnings.warn(stderr.read().decode())
0it [00:00, ?it/s]
/nfshome/leimeroth/git_projects/pysqa/pysqa/utils/remote.py:240: UserWarning: Traceback (most recent call last):
File "/home/nl77pupy/miniconda3/lib/python3.9/runpy.py", line 197, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/home/nl77pupy/miniconda3/lib/python3.9/runpy.py", line 87, in _run_code
exec(code, run_globals)
File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/cli/__main__.py", line 3, in <module>
main()
File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/cli/control.py", line 59, in main
args.cli(args)
File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/cli/wrapper.py", line 31, in main
job_wrapper_function(
File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/jobs/job/wrapper.py", line 161, in job_wrapper_function
job = JobWrapper(
File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/jobs/job/wrapper.py", line 69, in __init__
self.job = pr.load_from_jobpath(
File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/project/generic.py", line 880, in load_from_jobpath
job = job.to_object()
File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/jobs/job/core.py", line 500, in to_object
return self.project_hdf5.to_object(object_type, **qwargs)
File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/storage/hdfio.py", line 1393, in to_object
obj = self.create_instance(class_object, **qwargs)
File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/storage/hdfio.py", line 1352, in create_instance
return cls(**init_args)
File "/home/nl77pupy/git_projects/pyiron_atomistics/pyiron_atomistics/atomistics/master/murnaghan.py", line 657, in __init__
super(Murnaghan, self).__init__(project, job_name)
File "/home/nl77pupy/git_projects/pyiron_atomistics/pyiron_atomistics/atomistics/master/parallel.py", line 26, in __init__
super(AtomisticParallelMaster, self).__init__(project, job_name=job_name)
File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/jobs/master/parallel.py", line 74, in __init__
self.refresh_submission_status()
File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/jobs/master/parallel.py", line 218, in refresh_submission_status
self.submission_status.refresh()
File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/jobs/master/submissionstatus.py", line 199, in refresh
computer = self.database.get_item_by_id(self.job_id)["computer"]
File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/database/filetable.py", line 190, in get_item_by_id
return {
File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/database/filetable.py", line 191, in <dictcomp>
k: list(v.values())[0]
IndexError: list index out of range
warnings.warn(stderr.read().decode())
---------------------------------------------------------------------------
IndexError Traceback (most recent call last)
Cell In[6], line 11
8 murn.server.queue = "short_l2"
9 murn.input["num_points"] = 8
---> 11 murn.run()
File ~/git_projects/pyiron_base/pyiron_base/utils/deprecate.py:171, in Deprecator.__deprecate_argument.<locals>.decorated(*args, **kwargs)
161 if kw in self.arguments:
162 warnings.warn(
163 message_format.format(
164 "{}.{}({}={})".format(
(...)
169 stacklevel=2,
170 )
--> 171 return function(*args, **kwargs)
File ~/git_projects/pyiron_base/pyiron_base/jobs/job/generic.py:693, in GenericJob.run(self, delete_existing_job, repair, debug, run_mode, run_again)
691 self._run_if_repair()
692 elif status == "initialized":
--> 693 self._run_if_new(debug=debug)
694 elif status == "created":
695 self._run_if_created()
File ~/git_projects/pyiron_base/pyiron_base/jobs/master/parallel.py:362, in ParallelMaster._run_if_new(self, debug)
354 """
355 Internal helper function the run if new function is called when the job status is 'initialized'. It prepares
356 the hdf5 file and the corresponding directory structure.
(...)
359 debug (bool): Debug Mode
360 """
361 self.submission_status.submitted_jobs = 0
--> 362 super()._run_if_new(debug=debug)
File ~/git_projects/pyiron_base/pyiron_base/jobs/job/generic.py:1217, in GenericJob._run_if_new(self, debug)
1209 def _run_if_new(self, debug=False):
1210 """
1211 Internal helper function the run if new function is called when the job status is 'initialized'. It prepares
1212 the hdf5 file and the corresponding directory structure.
(...)
1215 debug (bool): Debug Mode
1216 """
-> 1217 run_job_with_status_initialized(job=self, debug=debug)
File ~/git_projects/pyiron_base/pyiron_base/jobs/job/runfunction.py:76, in run_job_with_status_initialized(job, debug)
74 else:
75 job.save()
---> 76 job.run()
File ~/git_projects/pyiron_base/pyiron_base/utils/deprecate.py:171, in Deprecator.__deprecate_argument.<locals>.decorated(*args, **kwargs)
161 if kw in self.arguments:
162 warnings.warn(
163 message_format.format(
164 "{}.{}({}={})".format(
(...)
169 stacklevel=2,
170 )
--> 171 return function(*args, **kwargs)
File ~/git_projects/pyiron_base/pyiron_base/jobs/job/generic.py:695, in GenericJob.run(self, delete_existing_job, repair, debug, run_mode, run_again)
693 self._run_if_new(debug=debug)
694 elif status == "created":
--> 695 self._run_if_created()
696 elif status == "submitted":
697 run_job_with_status_submitted(job=self)
File ~/git_projects/pyiron_base/pyiron_base/jobs/job/generic.py:1228, in GenericJob._run_if_created(self)
1219 def _run_if_created(self):
1220 """
1221 Internal helper function the run if created function is called when the job status is 'created'. It executes
1222 the simulation, either in modal mode, meaning waiting for the simulation to finish, manually, or submits the
(...)
1226 int: Queue ID - if the job was send to the queue
1227 """
-> 1228 return run_job_with_status_created(job=self)
File ~/git_projects/pyiron_base/pyiron_base/jobs/job/runfunction.py:109, in run_job_with_status_created(job)
107 job.run_if_non_modal()
108 elif job.server.run_mode.queue:
--> 109 job.run_if_scheduler()
110 elif job.server.run_mode.interactive:
111 job.run_if_interactive()
File ~/git_projects/pyiron_base/pyiron_base/jobs/job/generic.py:864, in GenericJob.run_if_scheduler(self)
856 def run_if_scheduler(self):
857 """
858 The run if queue function is called by run if the user decides to submit the job to and queing system. The job
859 is submitted to the queuing system using subprocess.Popen()
(...)
862 int: Returns the queue ID for the job.
863 """
--> 864 return run_job_with_runmode_queue(job=self)
File ~/git_projects/pyiron_base/pyiron_base/jobs/job/runfunction.py:386, in run_job_with_runmode_queue(job)
379 else:
380 command = (
381 "python -m pyiron_base.cli wrapper -p "
382 + job.working_directory
383 + " -j "
384 + str(job.job_id)
385 )
--> 386 que_id = state.queue_adapter.submit_job(
387 queue=job.server.queue,
388 job_name="pi_" + str(job.job_id),
389 working_directory=job.project_hdf5.working_directory,
390 cores=job.server.cores,
391 run_time_max=job.server.run_time,
392 memory_max=job.server.memory_limit,
393 command=command,
394 )
395 if que_id is not None:
396 job.server.queue_id = que_id
File ~/git_projects/pysqa/pysqa/queueadapter.py:158, in QueueAdapter.submit_job(self, queue, job_name, working_directory, cores, memory_max, run_time_max, dependency_list, command)
131 def submit_job(
132 self,
133 queue=None,
(...)
140 command=None,
141 ):
142 """
143 Submits command to the given queue.
144
(...)
156 int:
157 """
--> 158 return self._adapter.submit_job(
159 queue=queue,
160 job_name=job_name,
161 working_directory=working_directory,
162 cores=cores,
163 memory_max=memory_max,
164 run_time_max=run_time_max,
165 command=command,
166 )
File ~/git_projects/pysqa/pysqa/utils/remote.py:60, in RemoteQueueAdapter.submit_job(self, queue, job_name, working_directory, cores, memory_max, run_time_max, dependency_list, command)
58 self._transfer_data_to_remote(working_directory=working_directory)
59 output = self._execute_remote_command(command=command)
---> 60 return int(output.split()[-1])
IndexError: list index out of range
``
Hi Niklas, I'll try to look into this this week.
To my knowledge the Murnaghan example already fails when running without database, unless the individual jobs are executed in interactive mode.