orleans icon indicating copy to clipboard operation
orleans copied to clipboard

got an item for subscription , but I don't have any subscriber for that stream. Dropping on the floor. while pushing message from external service

Open gsaxena0105 opened this issue 6 years ago • 5 comments

gsaxena0105 avatar Feb 27 '19 00:02 gsaxena0105

S127.0.0.1:11111:288913378*grn/5088C1F4/7fba54a3@7fd1c6a0 got an item for subscription 5088c1f4-7217-cdd2-0933-1b6b1772d2cd, but I don't have any subscriber for that stream. Dropping on the floor.

Orleans.Streams.*stg/13212857803762018443/f4fba88b.Default[103301] Created PersistentStreamPullingAgent *stg/13212857803762018443/4190ec0fe805f991b75d802af4fba88b01000000000000ff-0x813E0AE5 for Stream Provider Default on silo S127.0.0.1:11111:288913378 for Queue netoxmessegequeue-0-0x00000000.

S127.0.0.1:11111:288913378*grn/5088C1F4/4499929b@fcf4278f got an item for subscription 5088c1f4-7217-cdd2-c66f-bed61772d2cd, but I don't have any subscriber for that stream. Dropping on the floor.

  Created PersistentStreamPullingAgent *stg/13212857803762018443/4190ec0fe805f991b75d802af4fba88b01000000000000ff-0x813E0AE5 for Stream Provider Default on silo S127.0.0.1:11111:288913378 for Queue netoxmessegequeue-0-0x00000000.

gsaxena0105 avatar Feb 27 '19 00:02 gsaxena0105

The pubsub system must think you've a subscription to that stream.

Suggest calling GetAllSubscriptionHandles() on the stream during onActivateAsync and calling resume or unsubscribe on any existing stream handles, prior to creating any new subscriptions.

jason-bragg avatar Mar 04 '19 17:03 jason-bragg

The pubsub system must think you've a subscription to that stream.

Suggest calling GetAllSubscriptionHandles() on the stream during onActivateAsync and calling resume or unsubscribe on any existing stream handles, prior to creating any new subscriptions.

I have the same issue, GetAllSubscriptionHandles() returns 0 handles (because it unsubscribes at Silo shutdown).

my scenario is: run Silo, run external service <- works okay

shutdown silo (for example may be triggered by service upgrade or something) but keep external service running and after Silo starts again, external service successfully reconnects but these messages will appear.

If I restart Silo it works okay.

I am using SMS provider

luckyycode avatar Sep 01 '20 23:09 luckyycode

I'm encountering the same issue. Any workaorund?

tomerpeled avatar Feb 06 '22 13:02 tomerpeled

We've moved this issue to the Backlog. This means that it is not going to be worked on for the coming release. We review items in the backlog at the end of each milestone/release and depending on the team's priority we may reconsider this issue for the following milestone.

ghost avatar Jul 28 '22 23:07 ghost

This is a very serious bug. There's no issue if I restart the silo, but if I restart the Orleans client (a hosted Blazor WASM app), then I cannot seem to recover the subscription, and I start losing messages. I cannot seem to recover from this except by deleting the deployments and deleting the OrleansGrainState. Obviously this is not an acceptable workaround.

I'm using the following code for OnActivateAsync():

public override async Task OnActivateAsync(CancellationToken cancellationToken)
    {
        var streamProvider = this.GetStreamProvider("StreamProvider");
        _events = streamProvider.GetStream<object>("Application", "OlympiaVotes");
        var handles = await _events.GetAllSubscriptionHandles();
        if (handles.Count > 0)
            foreach (var handle in handles)
                await handle.ResumeAsync(async events =>
                {
                    foreach (var @event in events)
                        RaiseEvent(@event.Item);
                    await ConfirmEvents();
                });
        else
            await _events.SubscribeAsync(async events =>
            {
                foreach (var @event in events)
                    RaiseEvent(@event.Item);
                await ConfirmEvents();
            });
        await base.OnActivateAsync(cancellationToken);
    }

EDIT

I did clean up the code somewhat by replacing the async Func with a named method, and then I replaced the Resume with an Unsubscribe. This appears to have improved matters somewhat, in that if I restart the mvc orleans client once, it continues to work. If I then restart the silo, it continues to work. If I restart both in quick succession, I end up back where we were, needing to delete the deployments and GrainState.

ofgirichardsonb avatar Mar 19 '23 18:03 ofgirichardsonb

The following configuration seems to allow me to restart both components without losing the subscription:

var ehConnection =
            new EventHubConnection("Endpoint=sb://xxx-dev.servicebus.windows.net/;SharedAccessKeyName=Manage;SharedAccessKey=xxx;EntityPath=default", "default");
        silo.UseKubernetesHosting()
            .UseKubeMembership()
            .AddStreaming()
            .AddEventHubStreams("StreamProvider", configurator =>
            {
                configurator.UseAzureTableCheckpointer(options => options.Configure(cp =>
                {
                    cp.ConfigureTableServiceClient(new Uri("https://xxx.table.core.windows.net"),
                        new DefaultAzureCredential());
                }));
                configurator.ConfigureEventHub(ob =>
                    ob.Configure(eh => eh.ConfigureEventHubConnection(ehConnection, "$Default")));
            })
            .AddAzureTableGrainStorageAsDefault(tables => tables.Configure(options =>
            {
                options.ConfigureTableServiceClient(new Uri("https://xxx.table.core.windows.net"), new DefaultAzureCredential());
            }))
            .AddRedisGrainStorage("PubSubStore", options => options.Configure(redis =>
            {
                redis.ConnectionString = "xxx.redis.cache.windows.net:6380,password=xxx,ssl=True,abortConnect=False";
            }))
            .ConfigureEndpoints(11111, 30000, listenOnAnyHostAddress: true)
            .AddLogStorageBasedLogConsistencyProviderAsDefault()
            .ConfigureLogging(logging => logging.AddConsole());
        silo.Services.AddSerializer(serializer =>
            serializer.AddNewtonsoftJsonSerializer(
                isSupported: type => type.Namespace?.StartsWith("OlympiaVotes") == true));

I will note, however, that this is the only production configuration that I have been able to make work. It appears that the combination of EventHub streaming and the Azure Tables checkpointing has made the subscriptions stable between activations. I cannot say the same for Azure Queue streaming and Table storage for PubSubStore. I still believe OP has reported a bug, but for now this seems to be a valid solution.

ofgirichardsonb avatar Mar 20 '23 00:03 ofgirichardsonb

Hello we are facing this issue too, currently only using SMS streams and redis grain storage. Most of the time when doing kubernetes rollout of new version "Dropping on the floor" error messages pollutes logs. Only solution is undeploy and redeploy which causes downtime and we need to avoid any downtimes...

spingee avatar Jul 21 '23 08:07 spingee

Ok we used memory pubsubstore, after change to redis store it now works

spingee avatar Aug 04 '23 06:08 spingee