ng2-stompjs icon indicating copy to clipboard operation
ng2-stompjs copied to clipboard

How to use multiple subscriptions and unsubscribes?

Open pantonis opened this issue 8 years ago • 25 comments

Hi,

Using one instance of a StompService can I subscribe to multiple queues?

pantonis avatar Apr 12 '17 14:04 pantonis

Yes, you can subscribe any number of queues. Just make multiple calls to subscribe.

Please check the samples as well. If you are new to dependency injection, please add the StompService to provide list in app.module.ts (as suggested in samples). That way same instance of the STOMP connection will be used across your application.

kum-deepak avatar Apr 12 '17 14:04 kum-deepak

And each subscription must be assigned to a different observable?

pantonis avatar Apr 12 '17 14:04 pantonis

When you make a call to subscribe, a new Observable is returned, which will provide messages for that queue.

kum-deepak avatar Apr 12 '17 14:04 kum-deepak

Forgive me I am not web dev expert.

 this.messages = this._stompService.subscribe('/topic/ng-demo-sub');
 this.messages2 = this._stompService.subscribe('/topic/ng-demo-sub2');

or

 this.messages = this._stompService.subscribe('/topic/ng-demo-sub');
 this.messages = this._stompService.subscribe('/topic/ng-demo-sub2');

pantonis avatar Apr 12 '17 14:04 pantonis

The first one:

this.messages = this._stompService.subscribe('/topic/ng-demo-sub'); this.messages2 = this._stompService.subscribe('/topic/ng-demo-sub2');

kum-deepak avatar Apr 12 '17 14:04 kum-deepak

and to unsubscribe

this.subscription.unsubscribe();

is this enough for both subscriptions?

pantonis avatar Apr 12 '17 14:04 pantonis

No, the code will look something like following (I haven't executed the code, just typed here):

// use messages from /topic/ng-demo-sub by subscribing to the Observable
this.messages = this._stompService
  .subscribe('/topic/ng-demo-sub')
  .subscribe((msg: Message) => {
    // use it
  });

// use messages from /topic/ng-demo-sub2 by subscribing to the Observable
this.messages2 = this._stompService
  .subscribe('/topic/ng-demo-sub2')
  .subscribe((msg: Message) => {
    // use it
  });

// Unsubscribe from /topic/ng-demo-sub
this.messages.unsubscribe();
this.messages = null;

// Unsubscribe from /topic/ng-demo-sub2
this.messages2.unsubscribe();
this.messages2 = null;

The above has been updated.

kum-deepak avatar Apr 12 '17 14:04 kum-deepak

In summary both the subscriptions are independent of each other and do not affect each other.

kum-deepak avatar Apr 12 '17 15:04 kum-deepak

ok thanks a lot

pantonis avatar Apr 12 '17 16:04 pantonis

this.messages = this._stompService.subscribe('/topic/ng-demo-sub');
this.messages2 = this._stompService.subscribe('/topic/ng-demo-sub2');

// use messages from /topic/ng-demo-sub by subscribing to the Observable
this.messages.subscribe((msg: Message) => {
   // use it
});

// use messages from /topic/ng-demo-sub2 by subscribing to the Observable
this.messages2.subscribe((msg: Message) => {
   // use it
});

// Unsubscribe from /topic/ng-demo-sub
this.messages.unsubscribe();
this.messages = null;

// Unsubscribe from /topic/ng-demo-sub2
this.messages2.unsubscribe();
this.messages2 = null;

I am using StompRService and when I call this.messages2.unsubscribe() the queueName in the unsubscribe function is '/topic/ng-demo-sub'.

marcelormourao avatar Apr 11 '18 19:04 marcelormourao

@marcelormourao, please create a demo project demonstrating the issue. You can fork one of the demo projects (links in the README). Thanks!

Meanwhile I am also taking a look.

kum-deepak avatar Apr 12 '18 01:04 kum-deepak

I has a similiar problem =(

fernando-nog avatar Apr 12 '18 12:04 fernando-nog

I have tested the following sequence (I have added these and few more cases in src/specs/app/services/stomp-queue.operations.spec.ts):

      queSubscription1 = stompService.subscribe(queueName1)
        .map((message) => message.body)
        .subscribe(spyHandler1);
      queSubscription2 = stompService.subscribe(queueName2)
        .map((message) => message.body)
        .subscribe(spyHandler2);

      stompService.publish(queueName1, 'Message 01-01');
      stompService.publish(queueName2, 'Message 02-01');
     
      queSubscription1.unsubscribe();

      stompService.publish(queueName1, 'Message 01-02');
      stompService.publish(queueName2, 'Message 02-02');

Please see the log output corresponding to the above call. Lines starting with **** are my annotations.

LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), 'Opening Web Socket...'
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), 'Connecting...'
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), 'Web Socket Opened...'
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), '>>> CONNECT
login:guest
passcode:guest
accept-version:1.2,1.1,1.0
heart-beat:0,0

'
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), '<<< CONNECTED
server:RabbitMQ/3.7.3
session:session-E3Pxe34zOx2lwcASq8Zgxg
heart-beat:0,0
version:1.2


'
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), 'connected to server RabbitMQ/3.7.3'
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), 'Connected'
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), 'Will try sending queued messages '
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), 'Request to subscribe /topic/ng-demo-sub01'
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), 'Will subscribe to /topic/ng-demo-sub01'
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), '>>> SUBSCRIBE
ack:auto
id:sub-0
destination:/topic/ng-demo-sub01

'
***************************************************************************************************
**** id:sub-0 is internally used, it does not correspond to the actual queue name
***************************************************************************************************
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), 'Request to subscribe /topic/ng-demo-sub02'
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), 'Will subscribe to /topic/ng-demo-sub02'
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), '>>> SUBSCRIBE
ack:auto
id:sub-1
destination:/topic/ng-demo-sub02

'
***************************************************************************************************
**** id:sub-1 is internally used, it does not correspond to the actual queue name
***************************************************************************************************
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), '>>> SEND
destination:/topic/ng-demo-sub01
content-length:13

Message 01-01'
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), '>>> SEND
destination:/topic/ng-demo-sub02
content-length:13

Message 02-01'
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), '<<< MESSAGE
subscription:sub-0
destination:/topic/ng-demo-sub01
message-id:T_sub-0@@session-E3Pxe34zOx2lwcASq8Zgxg@@1
redelivered:false
content-length:13

Message 01-01
'
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), '<<< MESSAGE
subscription:sub-1
destination:/topic/ng-demo-sub02
message-id:T_sub-1@@session-E3Pxe34zOx2lwcASq8Zgxg@@2
redelivered:false
content-length:13

Message 02-01
'
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), 'Stop watching connection state (for /topic/ng-demo-sub01)'
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), 'Will unsubscribe from /topic/ng-demo-sub01 at Stomp'
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), '>>> UNSUBSCRIBE
id:sub-0

'
***************************************************************************************************
**** Unsubscribed from /topic/ng-demo-sub01
**** id:sub-0 is internally used, it does not correspond to the actual queue name
***************************************************************************************************
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), '>>> SEND
destination:/topic/ng-demo-sub01
content-length:13

Message 01-02'
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), '>>> SEND
destination:/topic/ng-demo-sub02
content-length:13

Message 02-02'
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), '<<< MESSAGE
subscription:sub-1
destination:/topic/ng-demo-sub02
message-id:T_sub-1@@session-E3Pxe34zOx2lwcASq8Zgxg@@3
redelivered:false
content-length:13

Message 02-02
'
***************************************************************************************************
**** Message is only received for /topic/ng-demo-sub02
***************************************************************************************************
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), '>>> DISCONNECT
receipt:close-2

'
LOG: Fri Apr 13 2018 14:56:03 GMT+0530 (IST), '<<< RECEIPT
receipt-id:close-2


'

kum-deepak avatar Apr 13 '18 09:04 kum-deepak

Please note that I have updated my schematic code at https://github.com/stomp-js/ng2-stompjs/issues/2#issuecomment-293604982

kum-deepak avatar Apr 13 '18 10:04 kum-deepak

@kum-deepak I am having the similar issue. I reported it here https://github.com/stomp-js/ng2-stompjs/issues/73. Basically it does not work if I specify a subscription callback via an inline function then I am having that issue. I did not try anonymous function like you had in your example - that might work but I still would like to be able to set inline function as a callback to the topic Observable subscribe. Thanks

evgenyfedorenko avatar Jun 15 '18 20:06 evgenyfedorenko

If you pass stomp headers to subscribe method like stompService.subscribe(destination, headers), make sure you create new headers with each call to subscribe. Headers are modified inside and that's causing problems with subscriptions.

tomekstankowski avatar Jun 27 '18 10:06 tomekstankowski

@s1vert, seems bit peculiar behavior. Please fork any of the samples and modify it to show the issue. I will like to have a look at the issue.

kum-deepak avatar Jun 27 '18 10:06 kum-deepak

@s1vert I think I undertsand the issue now. It will need to be fixed in underlying library as well as in this one. It will be done in next major release.

kum-deepak avatar Jul 04 '18 03:07 kum-deepak

would this be part of 7.0.0?

sahyun1 avatar Nov 01 '18 20:11 sahyun1

@sahyun1 this thread has two different issues mentioned. The original question - multiple subscriptions - that has always worked.

There was one more issue that some functions were altering the header argument. It has been fixed in underlying library https://github.com/stomp-js/stompjs/issues/11. It will be part of v7.

If you mean any other issue, please create a new one 😄

kum-deepak avatar Nov 02 '18 07:11 kum-deepak

No, the code will look something like following (I haven't executed the code, just typed here):

// use messages from /topic/ng-demo-sub by subscribing to the Observable
this.messages = this._stompService
  .subscribe('/topic/ng-demo-sub')
  .subscribe((msg: Message) => {
    // use it
  });

// use messages from /topic/ng-demo-sub2 by subscribing to the Observable
this.messages2 = this._stompService
  .subscribe('/topic/ng-demo-sub2')
  .subscribe((msg: Message) => {
    // use it
  });

// Unsubscribe from /topic/ng-demo-sub
this.messages.unsubscribe();
this.messages = null;

// Unsubscribe from /topic/ng-demo-sub2
this.messages2.unsubscribe();
this.messages2 = null;

The above has been updated.

Cant we do it dynamically ?

ramakanthreddyk avatar Dec 20 '18 13:12 ramakanthreddyk

Could not understand your query. Please post some code sample.

kum-deepak avatar Dec 20 '18 14:12 kum-deepak

Could not understand your query. Please post some code sample.

get(demo) { this._stompService .subscribe('/topic/${demo}') .subscribe((msg: Message) => { // use this message });

}

this.get(ng-demo-sub); this.get(ng-demo-sub2)

// Unsubscribe from /topic/ng-demo-sub this.messages.unsubscribe(); this.messages = null;

cant we unsubscribe from one observable

ramakanthreddyk avatar Dec 20 '18 15:12 ramakanthreddyk

Probably you want the following (I haven't executed the code, just typed here):

  get(topic) {
    return this._stompService.subscribe(`/topic/${topic}`).subscribe((message) => {
      // Do something
    });
  }

    // Subscribe
    this.messages = this.get('demo1');
    this.messages2 = this.get('demo2');

    // Unsubscribe from /topic/ng-demo-sub
    this.messages.unsubscribe();
    this.messages = null;

    // Unsubscribe from /topic/ng-demo-sub2
    this.messages2.unsubscribe();
    this.messages2 = null;

kum-deepak avatar Dec 20 '18 16:12 kum-deepak

And each subscription must be assigned to a different observable?

If the number of new variables gets out of hand, you may use an array to collect all the subscriptions and use a loop to unsubscribe in the end. This also guarantees that you don't forget about any subscriptions.

This looks like a good place to use a pattern like subsink. You just throw a bunch of subscriptions at it, then call unsubscribe once. The code behind this library is dead simple, if it doesn't suit you, it's pretty easy to make your own implementation.

dJani97 avatar Jul 29 '22 17:07 dJani97