doit icon indicating copy to clipboard operation
doit copied to clipboard

Prohibit concurrent execution of tasks using shared resource

Open Hinidu opened this issue 8 years ago • 14 comments

Hi, Eduardo! I want to use doit concurrent capabilities in my project but I have a problem: Some of my tasks use Photoshop and I can run only one of them simultaneously :disappointed: Do you have any suggestions which I can use to enable concurrent builds but prohibit running of many such tasks? Thank you in advance!

Want to back this issue? Post a bounty on it! We accept bounties via Bountysource.

Fund with Polar

Hinidu avatar Mar 22 '16 12:03 Hinidu

After creating this issue I've got an idea: I can set task_dep for each task using Photoshop equal to the previous such task. It seems like it should work. I will try it and tell you how it will work.

Hinidu avatar Mar 22 '16 12:03 Hinidu

I've implemented described approach and it works quite well. Perhaps we can discuss it and include something like this in doit itself? Or describe it in documentation? Are you interested? That's my current implementation

from collections import defaultdict

_group_last_task_names = defaultdict(lambda: None)

def put_in_serialized_group(group_name, task_name_base, task):
    last_task_name = _group_last_task_names[group_name]
    if last_task_name is not None:
        if 'task_dep' in task:
            task['task_dep'].append(last_task_name)
        else:
            task['task_dep'] = [last_task_name]

    if task_name_base is None:
        task_name = task['name']
    else:
        task_name = '%s:%s' % (task_name_base, task['name'])
    _group_last_task_names[group_name] = task_name

    return task

I'm not very satisfied by this name but I couldn't figure out a better one (serialized is like serialized transactions in the database :fearful:).

Usage looks like:

yield put_in_serialized_group('photoshop', 'task_base_name', {
    # actual task properties
})

Hinidu avatar Mar 22 '16 17:03 Hinidu

Yesterday I forgot that except name tasks can have basename. This version I believe should work correctly for all kinds of tasks:

from collections import defaultdict

_group_last_task_names = defaultdict(lambda: None)

def put_in_serialized_group(group_name, task_basename, task):
    last_task_name = _group_last_task_names[group_name]
    if last_task_name is not None:
        if 'task_dep' in task:
            task['task_dep'].append(last_task_name)
        else:
            task['task_dep'] = [last_task_name]

    if task_basename is None:
        if 'basename' in task:
            task_name = task['basename']
        else:
            raise ValueError('No basename provided')
    elif 'name' in task:
        task_name = '%s:%s' % (task_basename, task['name'])
    else:
        task_name = task_basename
    _group_last_task_names[group_name] = task_name

    return task

Hinidu avatar Mar 23 '16 13:03 Hinidu

@Hinidu thanks for the patch. The problem with your solution is that it only works when there is exactly one shared resource.

Example: what if you want to have 4 tasks running in parallel but limit a group of tasks to have at most 2 instances executing at the same time. I know thats much more complex than your solution, so I would prefer to not merge your merge and wait for a more generic solution to the problem.

schettino72 avatar Mar 24 '16 16:03 schettino72

I didn't want to merge exactly that code in doit. I wanted to discuss this problem and possible solutions with you if you are interested.

what if you want to have 4 tasks running in parallel but limit a group of tasks to have at most 2 instances executing at the same time.

My method works like mutex, your example needs something like semaphore... Though I don't think that it's very practical - I can't imagine any resource that needs described strategy. In any case we can think how this API could be designed. The most obvious approach that I see is a new property of the task. Something like 'shared_resources': ['photoshop']. The number of concurrent tasks using the same shared resource is a property of resource itself, so it should be configured not in tasks themeselves but in some special config like {'photoshop': 1, 'foo': 4} (I have no idea how and where it should be set). I didn't dig into the part of doit that does scheduling of tasks so I can't propose implementation method for this feature. But I think it shouldn't be hard to do. I hope I will find some time in the near future.

Hinidu avatar Mar 27 '16 07:03 Hinidu

And I don't know very much about plugin support, so perhaps it can be implemented as a plugin?

Hinidu avatar Mar 27 '16 07:03 Hinidu

Another problem with my current implementation is that it can cause accidental cyclic references - if A depends on B and both of them use the same shared resource but B was initialized before A. That's another proof that my solution is a hack, though it's working for me very well :smiling_imp: I'm on the halfway of reading control.py and I believe implementation of this stuff should go there.

Hinidu avatar Mar 28 '16 08:03 Hinidu

I watch this issue and firstly thought about decision @Hinidu said. Indeed, what you discussing here is more like semaphore. So it must be configured separately from tasks. I suggest to use DOIT_CONFIG and doit configuration file.

DOIT_CONFIG = {
    'shared_resources': {
        'photoshop': {
            'max_concurrency':  1,
            # ... whatever else?
        }
    }
}

The most obvious approach that I see is a new property of the task. Something like 'shared_resources': ['photoshop'].

But i only thought about single argument of property. Just can't imagine how list of such shared resources may work and how resolve task with multiple shared resources.

def task_use_photoshop() :
    return {
        # ...
        'shared_resource':  'photoshop',
        # ...
    }

FrankStain avatar Mar 28 '16 09:03 FrankStain

My method works like mutex, your example needs something like semaphore...

exactly

Though I don't think that it's very practical - I can't imagine any resource that needs described strategy.

https://groups.google.com/d/msg/python-doit/Y81sqSMzT7s/qMoQaRrcDw8J

And much more common than that. imagine you have some IO operations (download some data from internet) and then do some computation afterwards. You probably want to limit the number of compute tasks by number of process but have more IO tasks in parallel. And more important make sure compute tasks start as soon as possible (not wait for all IO operations)

Good luck trying to understand control.py :stuck_out_tongue_closed_eyes:

schettino72 avatar Mar 28 '16 14:03 schettino72

I suggest to use DOIT_CONFIG and doit configuration file.

Yes, it can be the proper place.

But i only thought about single argument of property. Just can't imagine how list of such shared resources may work and how resolve task with multiple shared resources.

When we want to run task with shared resources we just check if all required resources are available. If some resources are unavailable the task will wait for the next time we will try to run this task.

https://groups.google.com/d/msg/python-doit/Y81sqSMzT7s/qMoQaRrcDw8J And much more common than that. imagine you have some IO operations (download some data from internet) and then do some computation afterwards. You probably want to limit the number of compute tasks by number of process but have more IO tasks in parallel. And more important make sure compute tasks start as soon as possible (not wait for all IO operations)

Good examples :+1:

Good luck trying to understand control.py :stuck_out_tongue_closed_eyes:

Thanks :smile: I hope I'll find some time tomorrow to finish my journey in control.py.

Hinidu avatar Mar 29 '16 12:03 Hinidu

After reading control.py I've got an opinion that this feature should be implemented inside of runner.py and in MRunner class specifically because this feature doesn't affect non-parallel execution. I assume it can be incapsulated inside of get_next_job and run_tasks. get_next_job result is used in all calls to job_q.put and run_tasks is the only place where we call result_q.get. Before adding to job_q we will "consume" the number of available resources and "release" them after extracting from result_q. Additionally I think we'll need to store ready tasks waiting for available resources in some data structure inside MRunner. Perhaps I will have some time today (or later) to implement it...

Hinidu avatar Apr 03 '16 17:04 Hinidu

control.py is the place where parallel execution is controlled. MRunner is only smart enough to know how many times it should call the task dispatcher in order to use all its available processes/threads. So I think you should not touch MRunner...

I just took a look into the code. I guess you need to modify mainly 2 places:

  • TaskDispatcher._add_task() you need to add a check for available resources, if not: update something like node.wait_resource (similar to node.wait_run), then yield "wait"
  • TaskDispatcher._update_waiting() free a resource and (maybe) transfer nodes from the wait queue to the ready queue.

Of course you will need a Resource class that is able to track which nodes are waiting for it... Also you will need to make sure all resources are acquired at once or not acquired at all to avoid "blocking" the usage of a resource without actually using it, and also to avoid deadlocks.

schettino72 avatar Apr 03 '16 18:04 schettino72

control.py is the place where parallel execution is controlled. MRunner is only smart enough to know how many times it should call the task dispatcher in order to use all its available processes/threads. So I think you should not touch MRunner...

Hmm, I think that available processes/threads is conceptually the same thing as "shared resources". And it looks to me that MRunner is the logical place to hold this logic. Furthermore "shared resources" don't affect non-parallel execution.

I just took a look into the code. I guess you need to modify mainly 2 places:

  • TaskDispatcher._add_task() you need to add a check for available resources, if not: update something like node.wait_resource (similar to node.wait_run), then yield "wait"
  • TaskDispatcher._update_waiting() free a resource and (maybe) transfer nodes from the wait queue to the ready queue.

Another concern is that TaskDispatcher._add_task is more complex than MRunner.get_next_job so it is not so scary to modify it :smile:

Of course you will need a Resource class that is able to track which nodes are waiting for it...

I'm not sure about Resource class... I prefer to not couple data with logic. I would just created two dicts: resource_name -> available_count and resource_name -> waiting_tasks. Or dict from resouce_name to namedtuple with available_count and waiting_tasks inside.

Also you will need to make sure all resources are acquired at once or not acquired at all to avoid "blocking" the usage of a resource without actually using it, and also to avoid deadlocks.

Yes, it is absolutely necessarily.

In summary: of course if you have strong opinion/arguments on above points I will try to do it your way :ok_hand:

Hinidu avatar Apr 03 '16 19:04 Hinidu

of course if you have strong opinion/arguments on above points I will try to do it your way

I am just trying to show you what I think is the right path... I doubt it is even possible to achieve this by modifying MRunner, but maybe you can prove me wrong :grimacing:

schettino72 avatar Apr 04 '16 03:04 schettino72