Potential blocking on unsubscribe with message in flight
Per our discussion in stompngo_examples issue #2, I'm opening an issue in the main project. The concern is on what to do with in-flight messages received after or during an unsubscribe. The current use case I've been working with involves ActiveMQ with prefetch set, but could apply to any similar interaction with a STOMP server that sends more than one MESSAGE without requiring an ACK using either ackmode client or client-individual
- connect
- sub to queue q.1 with ackmode
clientand prefetch X where X is how many messages I need - process X messages
-
ACKthe last message (which should be cumulative with ackmodeclient) - unsubscribe
- repeat 2-5 with queues q.2, q.3, etc.
The problem I'm running into is that the UNSUBSCRIBE right after the ACK seems to be causing messages to not get ACKed sometimes, and some of the later subscriptions are getting nothing at all. In examining this problem (which may or may not be related to the issue I'm submitting), I ran across the following scenario which I think needs to be at least mentioned in the documentation, if not fixed in the code somehow:
- Client subscribes to a destination, which creates a
chan MessageDatawith a buffer size ofc.scc(default: 1)
//subscription.go
if hid { // Client specified id
c.subs[id] = make(chan MessageData, c.scc) // Assign subscription
}
- Message(s) arrive with that destination specified in the
subscriptionheader and are picked up in the read loop here
//reader.go
if sid, ok := f.Headers.Contains("subscription"); ok {
c.subsLock.Lock()
c.subs[sid] <- d
c.subsLock.Unlock()
} else {
c.input <- d
}
- Client processes some messages or not, but there is still a message in the channel for this subscription.
- client unsubscribes, which causes the following
//unsubscribe.go
c.subsLock.Lock() defer c.subsLock.Unlock()
// ...
close(c.subs[sid])
delete(c.subs, sid)
- Before
c.subsLock.Lock()is called in the unsubscribe, another message arrives and is picked up by the reader goroutine.
It seems that this would block. The unsubscribe wouldn't be able to get the subsLock, and the reader goroutine would be blocked trying to send to the subscription channel.
First, I have been unable to recreate the main scenario described, i.e. apparently corrupted queue state. Why? I consistently get the hang described above (or different hangs/panics, read on). Queue state examined from admin consoles seems OK here, after kill of a blocked client.
Second, I have recreated the hang described, as well as other hangs or pure panics. All of this work is published at the https://github.com/gmallard/stompngo_examples project, branch name is sngissue25.
Included there is:
- An example of how to create the problem(s)
- Sample stack traces using ActiveMQ, Apollo, and RabbitMQ. Note that different brokers can behave differently. Or not.
You are invited to review those stack traces in detail.
All of those examples were run with stompngo at: 048c068.
Seems like a fairly major difficulty with this package at present.
I may have a couple ides on how to address the issue, but it will take some thinking for how to do it without breaking the API. Will try to find some time later in the week.
Thanks. I will experiment here as well.
Given the different scenarios in the stack traces from above, at this time I feel there is no 'simple' fix.
A nasty little problem .......
For the record, I can also create hangs when unsubscribe is bypassed, and disconnect is invoked directly.
What I cannot recreate is the ...... queue corruption that you seem to describe (broker does not correctly handle the ACK, missed ACKS, apparently empty queues .....). Do you have an example that reliably recreates that situation?
I am working on a very experimental idea to eliminate the hangs (which includes a draining concept), but have not published any of it as yet. It is somewhat ugly, and I want to think about the approach some more, as well as perform additional validation / tests.
Can you post your activemq.xml configuration file? Are you sure "missing" messages are not on a DLQ?
Try branch blockwork at 0c372bb.
Let me know if that helps.
This is a lot to respond to. I'll test out the new branch, and see if I can get ahold of the activemq.xml, but no, they were not on dead-letter queues, they were just not getting acked. My test case is to put 10 messages in each of the five queues and then take 1 from 1, 2 from 2 and so on. But I would end up only getting one or two from a queue I'd tried to get multiples from, IIRC. I will try the new branch and post what happens.
Do I read that last post correctly? As:
- from queue 1 receive 1 message only
- from queue 2 receive 2 messages only
- from queue 3 receive 3 messages only
- ..... and so on
???
Some day ..... you should explain to me what real world scenario would require something like that ..... seems somewhat strange to me.
Edited: to make it more clear what I am asking (I hope).
I really think that you should think about posting some code that shows that behavior.
I am changing my thoughts on some of this. I will detect the situation in Unsubscribe and handle it. And with 1.2, NACK any latent messages.
But there is something you need to be aware of:
- With ActiveMQ 5.11.1
- STOMP Protocol 1.2
- NACK'd messages are sent automatically to ActiveMQ.DLQ
That would seem to invalidate your approach.
A quick comment.
I will eventually add code which attempts to address this kind of client design. And will also add notes to the wiki describing this in some detail.
Given the totally asynchronous nature of STOMP and typical broker behavior, this application design is certainly less than ideal, and frankly I discourage it.
I have not just stopped thinking about this issue. Despite the lack of current progress.
I believe at least one, perhaps several partial solutions are possible. Any of these solutions would require this package to 'extend' the base STOMP protocol. The protocol extensions would allow package clients to optionally refine current package behavior.
I believe these solutions can be implemented by merely extending the current API, not modifying currently available functionality.
I also believe that any possible solutions would still leave a client with the ability to see odd / erroneous behavior depending on the client's design / implementation.
The STOMP level 1.2 specification was released on 10/22/2012. Prior to that time the specification committee had discussed a number of protocol additions that relate to the flow of messages, and possible control of that flow. These protocol additions were postponed to await an effort to develop a version 2.0 of the protocol. For a high level overview of these discussions see:
Concerned readers should peruse the link above.
Please upgrade your local environments and any clones to:
stompngo_examples: Version v1.0.5
This adds an 'extension' to the package that should allow you to implement the design you have described.
You will need changes to your current code.
The examples project has ..... well, examples of how to use this extension.
@flowchartsman Do you have any updates on this issue?
@gmallard
We also have encountered with this case.
As described by @flowchartsman , while reader try to dispatch message to subscribers, the subscriber routine may already give up read message from subscribe chan. This can make reader hang on c.subs[sid] <- d .
We solve this problem in our app as below:
func (b *WSBroker) unsubscribe(uuid string, r <-chan stompngo.MessageData, dest string) {
done := make(chan struct{})
go func() {
sbh := stompngo.Headers{}
switch b.connection.Protocol() {
case stompngo.SPL_12:
sbh = sbh.Add("id", uuid)
case stompngo.SPL_11:
sbh = sbh.Add("id", uuid)
case stompngo.SPL_10:
sbh = sbh.Add("destination", dest)
default:
b.logger.Printf("consume protocol error, should not happen")
}
err := b.connection.Unsubscribe(sbh)
if err != nil {
b.logger.Println("unsubscribe error %v", err)
}
//XXX stompngo Close on message inflight maybe hang due to reader sending
// message to subscriber which is already unsubed , while Close() function
// inflight cannot get the subscribers RWLock
close(done)
}()
for {
select {
// drain all incoming message due to we have unsubscribed
case _, ok := <-r:
if !ok {
return
}
case _, ok := <-b.connection.MessageData:
if !ok {
return
}
//insure connection Unsubscribe done
case <-done:
return
}
}
}
WSBroker is our encapsulation of stompngo.Connection with WSBroker is type of *stompngo.Connection.
After WSBroker unsubscribed from b.Connection. we continue drain message from r which is created by Subscribe.
This ensure reader is not blocked and continue to read until it read the response of unsubscribe frame. Meanwhile, the message inflight is not acked, and can be processed later.
A couple months after last working on this issue I actually ended up moving to another company, and most of the work we do here is with Kafka and NSQ, so my interaction with STOMP has been pretty much zero for over two years now, but I'm happy to help test if needed.
@flowchartsman - Thanks for the offer.
All - At this point, what is really needed is: a small self-contained example that reliably demonstrates the issue using current HEAD of the project.
I have added a wiki page that describes the workaround that was implemented for this issue. That page is here:
https://github.com/gmallard/stompngo/wiki/Issue-25
@itomsawyer - your solution to this issue is quite interesting.
I am considering how that solution might (optionally) be implemented as a second workaround in the current stompngo environment.
I have committed 512e8d8.
This implements a second extension that provides an alternative way to address blocks/hangs with this application design. This second extension is based very roughly on the idea @itomsawyer described above. Many thanks for the ideas.
Both stompngo extensions are described at:
https://github.com/gmallard/stompngo/wiki/Issue-25
The stompngo_examples project also has examples for both extensions that support message draining. Those examples are at these locations:
https://github.com/gmallard/stompngo_examples/tree/dev/adhoc/varmGetter
https://github.com/gmallard/stompngo_examples/tree/dev/adhoc/varmGetter2
I have hit this issue to today. In my usecase i have no idea how many messages we are going to consume ( its a long running application ) so i tried the "StompDrainNow" extension.
forsel:
for {
ticker := time.NewTicker(ival)
select {
case mi, ok := <-usesp.md:
if !ok {
break forsel
}
dmc++
c.log("sngdrnow DROP", dmc, mi.Message.Command, mi.Message.Headers)
case _ = <-ticker.C:
c.log("sngdrnow extension BREAK")
break forsel
}
}
Since the ticker is refreshed every time a message arrives, it just keeps on consuming messages and refreshing the ticker. Shouldnt the Ticker be constructed above the loop and not inside it?
What is the ACK mode in use please?
What is the ACK mode in use please?
client-individual in my use case. Right now, im draining the subscription by hand when unsubscribing. I have not hit the issue when draining manually.
Hmmm. Please describe in detail:
a) What you are seeing (sequence of events, ....) b) What you expect to see
Sorry, I am not really understanding the problem .... yet.
What ever you are seeing (hang, loop, something else??) I can not recreate it here.
Regardless, I recently became aware that I often use Ticker when the time package provides more appropriate tools.
That loop code has been changed to use time.After, and the code is on the dev branch: 8eccd03
Please give that a try and let me know the results.
What ever you are seeing (hang, loop, something else??) I can not recreate it here.
Regardless, I recently became aware that I often use Ticker when the time package provides more appropriate tools.
That loop code has been changed to use time.After, and the code is on the dev branch: 8eccd03
Please give that a try and let me know the results.
Sorry i've been busy; i'll try to create a testcase for what i'm seeing this week.
Ok, i have tried to write a test using ginkgo/gomega
The idea is that the server just keeps spamming messages after the unsubscribe call ( i think i have seen it with the rabbitmq stomp plugin, but maybe i was just too impatient and it took too long )
package test
import (
"fmt"
"io"
"strings"
"net"
"time"
"github.com/gmallard/stompngo"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
. "github.com/onsi/gomega/gbytes"
)
func stompFrame(frame string) string {
return fmt.Sprintf("%s\x00", frame)
}
func write(writer io.Writer, cont string) error {
_, err := io.Copy(writer, strings.NewReader(cont))
return err
}
var _ = Describe("issue #25", func() {
When("the server keeps sending messages during unsubscribe indefinitly", func() {
It("should not hang", func() {
done := make(chan struct{})
server, client := net.Pipe()
receiver := BufferReader(server)
go func() {
defer GinkgoRecover()
defer close(done)
By("connecting")
connectHeaders := stompngo.Headers{}.
Add(stompngo.HK_ACCEPT_VERSION, stompngo.SPL_12).
Add(stompngo.HK_HOST, "/")
stomper, err := stompngo.Connect(client, connectHeaders)
Expect(err).ToNot(HaveOccurred())
By("subscribing")
subscribeHeaders := stompngo.Headers{}.
Add(stompngo.HK_DESTINATION, "testDestination").
Add(stompngo.HK_ID, "testId").
Add(stompngo.HK_ACK, stompngo.AckModeAuto)
md, err := stomper.Subscribe(subscribeHeaders)
Expect(err).ToNot(HaveOccurred())
By("received first message")
Eventually(md).Should(Receive())
By("started unsubscribing")
unsubscribeHeaders := stompngo.Headers{}.
Add(stompngo.HK_ID, "testId").
Add(stompngo.StompPlusDrainNow, "")
Expect(stomper.Unsubscribe(unsubscribeHeaders)).To(Succeed())
}()
Eventually(receiver).Should(Say(stompFrame(`CONNECT
accept-version:1.2
host:/
content-type:text/plain; charset=UTF-8
content-length:0
`)))
Expect(write(server, stompFrame(`CONNECTED
version:1.2
`))).To(Succeed())
Eventually(receiver).Should(Say(stompFrame(`SUBSCRIBE
destination:testDestination
id:testId
ack:auto
content-type:text/plain; charset=UTF-8
content-length:0
`)))
By("spamming messages")
go func() {
defer GinkgoRecover()
msg := "test"
msgId := 0
for ; ; time.Sleep(10 * time.Millisecond) {
msgId++
By("writing a message")
Expect(write(server, stompFrame(fmt.Sprintf(`MESSAGE
subscription:testId
message-id:%d
ack:%d
destination:testDestination
content-type:text/plain
%s`, msgId, msgId, msg)))).To(Succeed())
}
}()
Eventually(done).Should(Receive())
})
})
})
This test fails on my machine with
STEP: connecting
STEP: subscribing
STEP: received first message
STEP: spamming messages
STEP: writing a message
STEP: started unsubscribing
STEP: writing a message
STEP: writing a message
... more
• Failure [1.012 seconds]
issue #25
when the server keeps sending messages during unsubscribe indefinitly
should not hang [It]
Timed out after 1.000s.
Expected
<chan struct {} | len:0, cap:0>: 0xc0003ae000
to receive something.
Possibly the server is to blame because it just ignores the unsubscribe.