go-libp2p-pubsub
go-libp2p-pubsub copied to clipboard
Running Next On A Subscription Hangs Indefinitely
I'm attempting to create a distributed task processing system using libp2p that will send a message when work has been done. Other workers are supposed to pick up on this message, and finish processing.
Currently I'm running into an issue,
I have a loop like
for {
msg, err := subscription.Next(context.Background())
if err != nil {
return err
}
.........
}
Currently this will hang, and even when one worker sends a message to the topic, none of the other workers currently listening to the topic will receive the message. However, if I do something like this:
for {
go func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
msg, err := finishedSub.Next(ctx)
if err != nil {
return
}
msg, err := subscription.Next(context.Background())
if err != nil {
return err
}
.........
}()
}
Then this works, and other workers are able to pick up on the messages. However the one flaw I can detect with this, is that at some point goroutine panics start occurring. Something seems to be off here, and that Next isn't being handled properly or my understanding of the expected functionality of Next is incorrect.
Any details from the panics?
@vyzo I've uploaded output from the logs.
oooh, it's an Out Of Memory panic.
Interesting, I suppose that makes sense with me spawning many goroutines that may not complete due to the Next hanging. Any ideas why Next seems to be hanging the way it does? I suspect it might be something with the way I'm spinning up my pubsub nodes, since I've never personally experienced any kind of hanging when interacting with pubsub through go-ipfs-api