python-pqueue icon indicating copy to clipboard operation
python-pqueue copied to clipboard

Final Done Task Repeating

Open graingerkid opened this issue 7 years ago • 7 comments

With the following example, if I run it and then comment out the put loop and then run it again, I get 99 printed.

def worker():
    while True:
        try:
            item = q.get()
            print item
        except:
            pass
        finally:
            q.task_done()

for i in range(3):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in range(100):
    q.put(item)

graingerkid avatar Apr 25 '17 13:04 graingerkid

I added a time.sleep(1) before the q.join() and that seems to have fixed the issue.

I guess it's terminated before it's completed the run.

graingerkid avatar Apr 25 '17 13:04 graingerkid

Hello @graingerkid, I will try to reproduce your case.

But a possible explanation is that the last entry is repeated because it was not completely processed (it didn't get a corresponding task_done()), so indeed it restarts from where it stopped.

Your initial code does not have a join(), so there's a chance that the main thread exits before their children, and in this case the processing may be interrupted in the middle. You can force this case by adding a time.sleep(1) in the worker() function emulating some intensive processing.

balena avatar Apr 25 '17 13:04 balena

Yea this was my full code (sorry)...

def worker():
    while True:
        try:
            item = q.get()
            print item
        except:
            pass
        finally:
            q.task_done()

for i in range(3):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in range(100):
    q.put(item)
q.join()

I'd put time .sleep(1) in as follows:

def worker():
    while True:
        try:
            item = q.get()
            print item
        except:
            pass
        finally:
            q.task_done()

for i in range(3):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in range(100):
    q.put(item)

time.sleep(1)
q.join()

but you suggest within the worker() function? I'm just concerned about speed, though time.sleep(0.1) might be fine.

graingerkid avatar Apr 25 '17 15:04 graingerkid

Try the following:

def worker():
    while True:
        try:
            item = q.get()
            print item
            time.sleep(1)   # emulate an intensive work here
        except:
            pass
        finally:
            q.task_done()

Then, remove your last time.sleep(1) before q.join(). You would have to wait ~33 seconds if you keep range(100), so you can reduce to just 10 iterations by using range(10) and you will wait 3 seconds each time.

By repeating the execution, you would not see the item 99 repeating in the second run. If so, we may have a BUG in the code.

balena avatar Apr 25 '17 15:04 balena

It seems to work sometimes and then not others. Any ideas how we could narrow down the issue?

graingerkid avatar Apr 25 '17 18:04 graingerkid

FYI i've got the following set...

Queue("/micro_service/queue_data", tempdir="/micro_service/queue_data")

graingerkid avatar Apr 25 '17 18:04 graingerkid

@graingerkid, I think you already narrowed down the problem. I'm just without time to work on this issue, but I will certainly do a test case for it.

balena avatar Apr 28 '17 13:04 balena