channels
channels copied to clipboard
Pipe with SharedBuffer channels will be dead lock
Fllow is the example code, the example code is from eapache/channels readme.
// never more than 3 elements in the pipeline at once
buf := NewSharedBuffer(3)
ch1 := buf.NewChannel()
ch2 := buf.NewChannel()
// or, instead of a straight pipe, implement your pipeline step
Pipe(ch1, ch2)
// inputs
go func() {
for i := 0; i < 20; i++ {
ch1.In() <- i
}
ch1.Close()
}()
time.Sleep(10 * time.Second)
for _ = range ch2.Out() {
// outputs
}
buf.Close()
the flow graph is blow data => ch1-in => ch1-out => ch2-in => ch2-out => data
Fllow scenario will deadlock
- ch1's length is 3, ch2's length is 0, all recv blocked
- ch1-out read one data, before send to ch2-in
- ch1's length is 2, ch2's length is 0, all recv unblocked
- ch1-in have another data in
- ch1's length is 3, ch2's length is 0, all recv blocked
- than the second data can not insert into ch2-in, event ch2-out is always ready
Yeah, this is a good discovery. I don’t know how to fix it off the top of my head, in hindsight the structure of a lot of the code in this package isn’t very reliable.
Perhaps we could reserve one buffer slot for each created channel (leaving the rest of the buffer slots truly shared)… this would be less efficient, and not really what you would expect from a “shared buffer”, but would avoid the deadlock. Although if somebody created a “shared buffer” of 3 and then created 10 channels off of it, I don’t know what we’d do.