doit
doit copied to clipboard
Prohibit concurrent execution of tasks using shared resource
Hi, Eduardo! I want to use doit concurrent capabilities in my project but I have a problem: Some of my tasks use Photoshop and I can run only one of them simultaneously :disappointed: Do you have any suggestions which I can use to enable concurrent builds but prohibit running of many such tasks? Thank you in advance!
Want to back this issue? Post a bounty on it! We accept bounties via Bountysource.
After creating this issue I've got an idea: I can set task_dep for each task using Photoshop equal to the previous such task. It seems like it should work. I will try it and tell you how it will work.
I've implemented described approach and it works quite well. Perhaps we can discuss it and include something like this in doit itself? Or describe it in documentation? Are you interested? That's my current implementation
from collections import defaultdict
_group_last_task_names = defaultdict(lambda: None)
def put_in_serialized_group(group_name, task_name_base, task):
last_task_name = _group_last_task_names[group_name]
if last_task_name is not None:
if 'task_dep' in task:
task['task_dep'].append(last_task_name)
else:
task['task_dep'] = [last_task_name]
if task_name_base is None:
task_name = task['name']
else:
task_name = '%s:%s' % (task_name_base, task['name'])
_group_last_task_names[group_name] = task_name
return task
I'm not very satisfied by this name but I couldn't figure out a better one (serialized
is like serialized transactions in the database :fearful:).
Usage looks like:
yield put_in_serialized_group('photoshop', 'task_base_name', {
# actual task properties
})
Yesterday I forgot that except name
tasks can have basename
. This version I believe should work correctly for all kinds of tasks:
from collections import defaultdict
_group_last_task_names = defaultdict(lambda: None)
def put_in_serialized_group(group_name, task_basename, task):
last_task_name = _group_last_task_names[group_name]
if last_task_name is not None:
if 'task_dep' in task:
task['task_dep'].append(last_task_name)
else:
task['task_dep'] = [last_task_name]
if task_basename is None:
if 'basename' in task:
task_name = task['basename']
else:
raise ValueError('No basename provided')
elif 'name' in task:
task_name = '%s:%s' % (task_basename, task['name'])
else:
task_name = task_basename
_group_last_task_names[group_name] = task_name
return task
@Hinidu thanks for the patch. The problem with your solution is that it only works when there is exactly one shared resource.
Example: what if you want to have 4 tasks running in parallel but limit a group of tasks to have at most 2 instances executing at the same time. I know thats much more complex than your solution, so I would prefer to not merge your merge and wait for a more generic solution to the problem.
I didn't want to merge exactly that code in doit. I wanted to discuss this problem and possible solutions with you if you are interested.
what if you want to have 4 tasks running in parallel but limit a group of tasks to have at most 2 instances executing at the same time.
My method works like mutex, your example needs something like semaphore... Though I don't think that it's very practical - I can't imagine any resource that needs described strategy.
In any case we can think how this API could be designed. The most obvious approach that I see is a new property of the task. Something like 'shared_resources': ['photoshop']
.
The number of concurrent tasks using the same shared resource is a property of resource itself, so it should be configured not in tasks themeselves but in some special config like {'photoshop': 1, 'foo': 4}
(I have no idea how and where it should be set).
I didn't dig into the part of doit
that does scheduling of tasks so I can't propose implementation method for this feature. But I think it shouldn't be hard to do. I hope I will find some time in the near future.
And I don't know very much about plugin support, so perhaps it can be implemented as a plugin?
Another problem with my current implementation is that it can cause accidental cyclic references - if A depends on B and both of them use the same shared resource but B was initialized before A. That's another proof that my solution is a hack, though it's working for me very well :smiling_imp: I'm on the halfway of reading control.py and I believe implementation of this stuff should go there.
I watch this issue and firstly thought about decision @Hinidu said. Indeed, what you discussing here is more like semaphore. So it must be configured separately from tasks.
I suggest to use DOIT_CONFIG
and doit configuration file.
DOIT_CONFIG = {
'shared_resources': {
'photoshop': {
'max_concurrency': 1,
# ... whatever else?
}
}
}
The most obvious approach that I see is a new property of the task. Something like 'shared_resources': ['photoshop'].
But i only thought about single argument of property. Just can't imagine how list of such shared resources may work and how resolve task with multiple shared resources.
def task_use_photoshop() :
return {
# ...
'shared_resource': 'photoshop',
# ...
}
My method works like mutex, your example needs something like semaphore...
exactly
Though I don't think that it's very practical - I can't imagine any resource that needs described strategy.
https://groups.google.com/d/msg/python-doit/Y81sqSMzT7s/qMoQaRrcDw8J
And much more common than that. imagine you have some IO operations (download some data from internet) and then do some computation afterwards. You probably want to limit the number of compute tasks by number of process but have more IO tasks in parallel. And more important make sure compute tasks start as soon as possible (not wait for all IO operations)
Good luck trying to understand control.py
:stuck_out_tongue_closed_eyes:
I suggest to use DOIT_CONFIG and doit configuration file.
Yes, it can be the proper place.
But i only thought about single argument of property. Just can't imagine how list of such shared resources may work and how resolve task with multiple shared resources.
When we want to run task with shared resources we just check if all required resources are available. If some resources are unavailable the task will wait for the next time we will try to run this task.
https://groups.google.com/d/msg/python-doit/Y81sqSMzT7s/qMoQaRrcDw8J And much more common than that. imagine you have some IO operations (download some data from internet) and then do some computation afterwards. You probably want to limit the number of compute tasks by number of process but have more IO tasks in parallel. And more important make sure compute tasks start as soon as possible (not wait for all IO operations)
Good examples :+1:
Good luck trying to understand control.py :stuck_out_tongue_closed_eyes:
Thanks :smile: I hope I'll find some time tomorrow to finish my journey in control.py
.
After reading control.py
I've got an opinion that this feature should be implemented inside of runner.py
and in MRunner
class specifically because this feature doesn't affect non-parallel execution.
I assume it can be incapsulated inside of get_next_job
and run_tasks
. get_next_job
result is used in all calls to job_q.put
and run_tasks
is the only place where we call result_q.get
. Before adding to job_q
we will "consume" the number of available resources and "release" them after extracting from result_q
.
Additionally I think we'll need to store ready tasks waiting for available resources in some data structure inside MRunner
.
Perhaps I will have some time today (or later) to implement it...
control.py
is the place where parallel execution is controlled. MRunner is only smart enough to know how many times it should call the task dispatcher in order to use all its available processes/threads.
So I think you should not touch MRunner...
I just took a look into the code. I guess you need to modify mainly 2 places:
-
TaskDispatcher._add_task()
you need to add a check for available resources, if not: update something likenode.wait_resource
(similar tonode.wait_run
), thenyield "wait"
-
TaskDispatcher._update_waiting()
free a resource and (maybe) transfer nodes from thewait
queue to theready
queue.
Of course you will need a Resource
class that is able to track which nodes are waiting for it...
Also you will need to make sure all resources are acquired at once or not acquired at all to avoid "blocking" the usage of a resource without actually using it, and also to avoid deadlocks.
control.py is the place where parallel execution is controlled. MRunner is only smart enough to know how many times it should call the task dispatcher in order to use all its available processes/threads. So I think you should not touch MRunner...
Hmm, I think that available processes/threads is conceptually the same thing as "shared resources". And it looks to me that MRunner
is the logical place to hold this logic. Furthermore "shared resources" don't affect non-parallel execution.
I just took a look into the code. I guess you need to modify mainly 2 places:
- TaskDispatcher._add_task() you need to add a check for available resources, if not: update something like node.wait_resource (similar to node.wait_run), then yield "wait"
- TaskDispatcher._update_waiting() free a resource and (maybe) transfer nodes from the wait queue to the ready queue.
Another concern is that TaskDispatcher._add_task
is more complex than MRunner.get_next_job
so it is not so scary to modify it :smile:
Of course you will need a Resource class that is able to track which nodes are waiting for it...
I'm not sure about Resource
class... I prefer to not couple data with logic. I would just created two dict
s: resource_name -> available_count and resource_name -> waiting_tasks. Or dict
from resouce_name to namedtuple with available_count and waiting_tasks inside.
Also you will need to make sure all resources are acquired at once or not acquired at all to avoid "blocking" the usage of a resource without actually using it, and also to avoid deadlocks.
Yes, it is absolutely necessarily.
In summary: of course if you have strong opinion/arguments on above points I will try to do it your way :ok_hand:
of course if you have strong opinion/arguments on above points I will try to do it your way
I am just trying to show you what I think is the right path... I doubt it is even possible to achieve this by modifying MRunner
, but maybe you can prove me wrong :grimacing: