dramatiq icon indicating copy to clipboard operation
dramatiq copied to clipboard

`group.completed_count` does not actually returns the total number of jobs that have been completed

Open mistiru opened this issue 2 years ago • 1 comments

Issues

The algorithm for the completed_count property of the class dramatiq.composition.group returns prematurely.

Checklist

  • [x] Does your title concisely summarize the problem?
  • [x] Did you include a minimal, reproducible example?
  • [x] What OS are you using?
  • [x] What version of Dramatiq are you using?
  • [x] What did you do?
  • [x] What did you expect would happen?
  • [x] What happened?

What OS are you using?

Archlinux (uname -a gives : Linux Pisces 5.15.2-arch1-1 #1 SMP PREEMPT Fri, 12 Nov 2021 19:22:10 +0000 x86_64 GNU/Linux)

What version of Dramatiq are you using?

dramatiq version 1.12.0 with django-dramatiq version 0.10.0

What did you do?

Create a group of multiple messages, run it, then call completed_count every 0.5 second.
(I configured dramatiq to use Memcached as a result backend and RabbitMQ as a broker in my Django project settings, but this is not relevant for the issue.)

import time

import dramatiq


@dramatiq.actor(store_results=True)
def wait_actor(seconds):
    time.sleep(seconds)
    return 'finished'


if __name__ == '__main__':
    group_messages = []
    for i in [1, 4, 1, 1]:
        group_messages.append(wait_actor.message(i))

    g = dramatiq.group(group_messages).run()

    while not g.completed:
        print(g.completed_count)  # will stick on `1` until the second task has finished
        time.sleep(0.5)
    print(g.completed_count)  # back to 4

What did you expect would happen?

The number of completed tasks shouldn't stick on 1 (but on 3 according to my example).

What happened?

As the second task is slower, and ended after all the others, the number returned by completed_count sticks on 1.

The logs are :

0
0
1
1
1
1
1
4

My guess is that the completed_count property actually returns on the first uncompleted child, and does not check the completeness of the followings.

    @property
    def completed_count(self):
        """Returns the total number of jobs that have been completed.
        Actors that don't store results are not counted, meaning this
        may be inaccurate if all or some of your actors don't store
        results.

        Raises:
          RuntimeError: If your broker doesn't have a result backend
            set up.

        Returns:
          int: The total number of results.
        """
        for count, child in enumerate(self.children, start=1):
            try:
                if isinstance(child, group):
                    child.get_results()
                else:
                    child.get_result()
            except ResultMissing:
                return count - 1  # <-- This returns without checking the completeness of subsequent children.

        return count

To me, a possible solutions would be :

    @property
    def completed_count(self):
        """Returns the total number of jobs that have been completed.
        Actors that don't store results are not counted, meaning this
        may be inaccurate if all or some of your actors don't store
        results.

        Raises:
          RuntimeError: If your broker doesn't have a result backend
            set up.

        Returns:
          int: The total number of results.
        """
        count = 0
        for child in self.children:
            try:
                if isinstance(child, group):
                    child.get_results()
                else:
                    child.get_result()
                count += 1
            except ResultMissing:
                pass

        return count

mistiru avatar Nov 18 '21 09:11 mistiru

Would you like to make a PR with your proposed fix?

Bogdanp avatar Nov 21 '21 07:11 Bogdanp