dramatiq
dramatiq copied to clipboard
`group.completed_count` does not actually returns the total number of jobs that have been completed
Issues
The algorithm for the completed_count
property of the class dramatiq.composition.group
returns prematurely.
Checklist
- [x] Does your title concisely summarize the problem?
- [x] Did you include a minimal, reproducible example?
- [x] What OS are you using?
- [x] What version of Dramatiq are you using?
- [x] What did you do?
- [x] What did you expect would happen?
- [x] What happened?
What OS are you using?
Archlinux (uname -a
gives : Linux Pisces 5.15.2-arch1-1 #1 SMP PREEMPT Fri, 12 Nov 2021 19:22:10 +0000 x86_64 GNU/Linux)
What version of Dramatiq are you using?
dramatiq
version 1.12.0
with django-dramatiq
version 0.10.0
What did you do?
Create a group of multiple messages, run it, then call completed_count
every 0.5 second.
(I configured dramatiq to use Memcached as a result backend and RabbitMQ as a broker in my Django project settings, but this is not relevant for the issue.)
import time
import dramatiq
@dramatiq.actor(store_results=True)
def wait_actor(seconds):
time.sleep(seconds)
return 'finished'
if __name__ == '__main__':
group_messages = []
for i in [1, 4, 1, 1]:
group_messages.append(wait_actor.message(i))
g = dramatiq.group(group_messages).run()
while not g.completed:
print(g.completed_count) # will stick on `1` until the second task has finished
time.sleep(0.5)
print(g.completed_count) # back to 4
What did you expect would happen?
The number of completed tasks shouldn't stick on 1
(but on 3
according to my example).
What happened?
As the second task is slower, and ended after all the others, the number returned by completed_count
sticks on 1
.
The logs are :
0
0
1
1
1
1
1
4
My guess is that the completed_count
property actually returns on the first uncompleted child, and does not check the completeness of the followings.
@property
def completed_count(self):
"""Returns the total number of jobs that have been completed.
Actors that don't store results are not counted, meaning this
may be inaccurate if all or some of your actors don't store
results.
Raises:
RuntimeError: If your broker doesn't have a result backend
set up.
Returns:
int: The total number of results.
"""
for count, child in enumerate(self.children, start=1):
try:
if isinstance(child, group):
child.get_results()
else:
child.get_result()
except ResultMissing:
return count - 1 # <-- This returns without checking the completeness of subsequent children.
return count
To me, a possible solutions would be :
@property
def completed_count(self):
"""Returns the total number of jobs that have been completed.
Actors that don't store results are not counted, meaning this
may be inaccurate if all or some of your actors don't store
results.
Raises:
RuntimeError: If your broker doesn't have a result backend
set up.
Returns:
int: The total number of results.
"""
count = 0
for child in self.children:
try:
if isinstance(child, group):
child.get_results()
else:
child.get_result()
count += 1
except ResultMissing:
pass
return count
Would you like to make a PR with your proposed fix?