pulsar-dotpulsar icon indicating copy to clipboard operation
pulsar-dotpulsar copied to clipboard

When i run test console app consumer is creating and when i integrate same to actual product consumer is not creating and going to Faulted state

Open amareshmad opened this issue 3 years ago • 14 comments

**When I create .NET framework/.net core console app, I could be able to create a client and able to create consumer and receiving the messages as well but when I integrate it into my actual project it is not building the consumer **

For the first time from console got the issue DotPulsar.Exceptions.ChannelNotReadyException: The channnel is not ready yet later it is not generated, created the consumer and able consume messages

when I integrated same code with my actual project which supposed to create consumer, it is not creating the client consumer. I always getting the exceptin DotPulsar.Exceptions.ChannelNotReadyException: The channnel is not ready yet

DotPulsar.Exceptions.ChannelNotReadyException: The channnel is not ready yet at DotPulsar.Internal.NotReadyChannel1.Receive(CancellationToken cancellationToken) at DotPulsar.Internal.Consumer1.d__24.MoveNext() --- End of stack trace from previous location where exception was thrown --- at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Threading.Tasks.ValueTask1.get_Result() at System.Runtime.CompilerServices.ConfiguredValueTaskAwaitable1.ConfiguredValueTaskAwaiter.GetResult() at DotPulsar.Internal.Executor.d__9`1.MoveNext()

More info both test app & actual product code are running in same machine. .Net framework :4.8 Installed .net core sdk 6.0

async Task TestConsumer(){ const string myTopic = "global.qa.xxxxRequestV1.Queue";

    string SERVICE_URL = "pulsar+ssl://xxxxx.xxxx.com:6651";
    string username = "xxxxxxx";
    string password = "xxxxxxxx";
    Uri sURl = new(SERVICE_URL);

    Console.WriteLine($"Build consumer client.....");
    ExceptionHandler pException = new ExceptionHandler();

    await using var client = PulsarClient.Builder()
                                         .Authentication(new AuthenticationBasic(username, password))
                                         .ServiceUrl(sURl)
                                         .VerifyCertificateName(false)
                                         .VerifyCertificateAuthority(false)
                                         .ExceptionHandler(pException.OnException)
                                          .Build();
    
    Console.WriteLine($"creating Consumer.......");
    await using var consumer = client.NewConsumer()
                                     .Topic(myTopic)
                                     .SubscriptionType(SubscriptionType.Shared)
                                     .SubscriptionName("xxxxxxxxxxxxxxxx)
                                     .Create();

    Console.WriteLine($"Waiting for consumer message.......");
    await foreach (var message in consumer.Messages())
    {
        Console.WriteLine($"Received consumer: {Encoding.UTF8.GetString(message.Data.ToArray())}");

        await consumer.Acknowledge(message);
    }

}

public class AuthenticationBasic : IAuthentication { private string username; private string password; public AuthenticationBasic(string uname, string passwd) { username = uname; password = passwd; }

public string AuthenticationMethodName
{
    get { return "oms3.0"; }
}

public ValueTask<byte[]> GetAuthenticationData(CancellationToken cancellationToken)
{
    return new ValueTask<byte[]>(Encoding.UTF8.GetBytes(username + ":" + password));
}

}

public interface IHandleException { ///

/// Called after an action has thrown an Exception. /// ValueTask OnException(ExceptionContext exceptionContext); }

public class ExceptionHandler : IHandleException { //static private readonly ILog log = LogManager.GetLogger(typeof(DESPulsarQueue));

public ValueTask OnException(ExceptionContext exceptionContext)
{
    Console.WriteLine("An error occured while creating pulsar client:{0}", exceptionContext.Exception.StackTrace.ToString());
    return new ValueTask();
}

}

amareshmad avatar Oct 11 '22 13:10 amareshmad

Hi @amareshmad

Is the test project and the production project the same version of .NET?

blankensteiner avatar Oct 11 '22 13:10 blankensteiner

Hi @amareshmad

Is the test project and the production project the same version of .NET?

Test Project and same code is used for product code as well.

from test app it is active (ConsumerState = Active) from actual project consumer state is updating to ( ConsumerState =Faulted )

amareshmad avatar Oct 11 '22 13:10 amareshmad

I get that the code is the same, but is it the same version of .NET? You mention both .NET Framework and .NET 6.

blankensteiner avatar Oct 11 '22 13:10 blankensteiner

sample app tried in both .net framework and .net core, both sample apps are working.

same code tested consoles and integrated to product. product code uses .net framework 4.8, same as console.

using DotPulsar; using DotPulsar.Abstractions; using DotPulsar.Exceptions; using DotPulsar.Extensions; using System; using System.Buffers; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks;

namespace ConsoleApp1 { internal class Program { static void Main(string[] args) { TestMethod("west') } public async Task TestMethod(string zone) {

        try
        {
            string myTopic = string.Empty;
            if (zone == "east")
                myTopic = "persistent://east::xxxx.RequestV1.Queue";

            else
                myTopic = "persistent://west::xxxxx.RequestV1.Queue";

            string SERVICE_URL = "pulsar+ssl://xxxxxxxx.com:6651";
            string username = "xxxxxxx";
            string password = "xxxxxxxx";
            Uri sURl = new(SERVICE_URL);

            Console.WriteLine($"building consumer client");
            ExceptionHandler pException = new ExceptionHandler();

            await using var client = PulsarClient.Builder()
                                                 .Authentication(new AuthenticationBasic1(username, password))
                                                 .ServiceUrl(sURl)
                                                 .VerifyCertificateName(false)
                                                 .VerifyCertificateAuthority(false)
                                                 .KeepAliveInterval(TimeSpan.FromSeconds(10))
                                                 .RetryInterval(TimeSpan.FromSeconds(2))
                                                 .ExceptionHandler(pException.OnException)
                                                  .Build();

            Console.WriteLine($"creating Consumer.......");
            await using var consumer = client.NewConsumer()
                                              .SubscriptionName("xxxx.fdp.extraction.xxxxx-default")
                                             .Topic(myTopic)
                                             .SubscriptionType(SubscriptionType.Shared)
                                             .StateChangedHandler(AuthenticationBasic1.Monitor)
                                             .PriorityLevel(1)
                                             .Create();

            Console.WriteLine($"Waiting for consumer message.......");
            await foreach (var message in consumer.Messages())
            {
                Console.WriteLine($"Received consumer: {Encoding.UTF8.GetString(message.Data.ToArray())}");

                await consumer.Acknowledge(message);
            }
        }
        catch (PulsarClientClosedException ex)
        {
            Console.WriteLine("PulsarClientClosedException message:{0}, stack trace:{1}", ex.Message.ToString(), ex.StackTrace.ToString());
        }
        catch (PulsarClientDisposedException ex)
        {
            Console.WriteLine("Exception PulsarClientDisposedException message:{0}, stack trace:{1}", ex.Message.ToString(), ex.StackTrace.ToString());
        }
        catch (Exception ex)
        {
            Console.WriteLine("Exception message:{0}, stack trace:{1}", ex.Message.ToString(), ex.StackTrace.ToString());
        }
    }


}


public class AuthenticationBasic1 : IAuthentication
{
    private string username;
    private string password;

   // static private readonly ILog Console.WriteLine = LogManager.GetLogger(typeof(AuthenticationBasic1));
    public AuthenticationBasic1(string uname, string passwd)
    {
        username = uname;
        password = passwd;
    }

    public string AuthenticationMethodName
    {
        get { return "oms3.0"; }
    }

    public ValueTask<byte[]> GetAuthenticationData(CancellationToken cancellationToken)
    {
        return new ValueTask<byte[]>(Encoding.UTF8.GetBytes(username + ":" + password));
    }

    public static void Monitor(ConsumerStateChanged stateChanged)
    {
        var stateMessage = stateChanged.ConsumerState switch
        {
            ConsumerState.Active => "is active",
            ConsumerState.Inactive => "is inactive",
            ConsumerState.Disconnected => "is disconnected",
            ConsumerState.Closed => "has closed",
            ConsumerState.ReachedEndOfTopic => "has reached end of topic",
            ConsumerState.Faulted => "has faulted",
            _ => $"has an unknown state '{stateChanged.ConsumerState}'"
        };

        var topic = stateChanged.Consumer.Topic;
        var state = stateChanged.ConsumerState;
        Console.WriteLine($"The consumer for topic '{topic}' changed state to '{state}'");
    }
}

public interface IHandleException
{
    /// <summary>
    /// Called after an action has thrown an Exception.
    /// </summary>
    ValueTask OnException(ExceptionContext exceptionContext);
}

public class ExceptionHandler : IHandleException
{
    //static private readonly ILog Console.WriteLine = LogManager.GetLogger(typeof(ExceptionHandler));

    public ValueTask OnException(ExceptionContext exceptionContext)
    {
        Console.WriteLine("An error occured while creating pulsar client", exceptionContext.Exception);
        return new ValueTask();
    }
}

}

amareshmad avatar Oct 11 '22 13:10 amareshmad

any idea on what could be the reason to go for Consumer state to Faulted?

amareshmad avatar Oct 11 '22 14:10 amareshmad

I suspect that DotPulsar.Exceptions.ChannelNotReadyException is not the only exception you are seeing. That exception will never fault a Producer/Consumer/Reader. I have seen some .NET Frameworks 4.8 errors regarding the configuration of TLS. Meaning you have to tell .NET Framework, via the Windows Registry, to use newer versions of TLS. You could try also creating a producer and Send something. See if that gives you an exception.

blankensteiner avatar Oct 11 '22 15:10 blankensteiner

@Blankensteiner thanks for the reply. will create producer as well and verify same from product code.

if framework version is the issue even it could have blocked .NET 4.8 framework console app as well, right? because it is working fine from console app 4.8 version in the same machine.

@blankensteiner if you have couple of minutes want to trouble shoot, we can join over zoom/teams/skype or some collabarator tool.

amareshmad avatar Oct 11 '22 16:10 amareshmad

Yes, the console app should also fail in that case. Did you try with a producer? Also, are you running on Windows or Linux, and what dependencies does your production app have?

blankensteiner avatar Oct 12 '22 10:10 blankensteiner

windows app and running in windows server 2016.

tried with producer by test-console app, it is going to connected state. I mean from test-console app producer is creating connection and i can connection established will check that through product app.

amareshmad avatar Oct 13 '22 09:10 amareshmad

Windows app. Running in windows server 2016. Test apps also running in same windows server 2016.

Producer .net framework test console Class ProductMainClass{ public async System.Threading.Tasks.Task CreateProducer(string queueName) {

        var cts = new CancellationTokenSource();

        Console.CancelKeyPress += (sender, args) =>
        {
            cts.Cancel();
            args.Cancel = true;
        };

        const string myTopic = "persistent://xxxxxxxxxxxxx";
  

        string SERVICE_URL = "pulsar+ssl://xxxxxx:6651";
        string username = "xxxxxx";
        string password = "xxxxxxxxxxxxxxxxxxxxxx";
        Uri sURl = new(SERVICE_URL);

        log.Info($"build producer client");
        ExceptionHandler pException = new ExceptionHandler();

        await using var client = PulsarClient.Builder()
                                             .Authentication(new AuthenticationBasic(username, password))
                                             .ServiceUrl(sURl)
                                             .ExceptionHandler(pException.OnException)
                                             .Build();

       log.Info($"creating producer.......");
        await using var producer = client.NewProducer(Schema.String)
                                         .Topic(myTopic)
                                         .StateChangedHandler(Monitor)
                                         .Create();

        log.Info($"waiting for producer .......");
        //await ProduceMessages((IProducer<string>)producer, cts.Token); without this line status is going to closed & if i enable status is going to Faulted state.
    }
	
	    private static void Monitor(ProducerStateChanged stateChanged)
    {
        var topic = stateChanged.Producer.Topic;
        var state = stateChanged.ProducerState;
       log.InfoFormat($"The producer for topic '{topic}' changed state to '{state}'");
    }
	}
	public class AuthenticationBasic : IAuthentication
{
    private string username;
    private string password;
    public AuthenticationBasic(string uname, string passwd)
    {
        username = uname;
        password = passwd;
    }

    public string AuthenticationMethodName
    {
        get { return "oms3.0"; }
    }

    public ValueTask<byte[]> GetAuthenticationData(CancellationToken cancellationToken)
    {
        return new ValueTask<byte[]>(Encoding.UTF8.GetBytes(username + ":" + password));
    }
}

public interface IHandleException
{
    /// <summary>
    /// Called after an action has thrown an Exception.
    /// </summary>
    ValueTask OnException(ExceptionContext exceptionContext);
}

public class ExceptionHandler : IHandleException
{
    static private readonly ILog log = LogManager.GetLogger(typeof(DESPulsarQueue));

    public ValueTask OnException(ExceptionContext exceptionContext)
    {
        log.Error("An error occured while creating pulsar client", exceptionContext.Exception);
        return new ValueTask();
    }
}

It is able to create producer & its status is Connected.

When i integrate same into my actual product code I can see below error

at System.Net.Security.SslState.ValidateCreateContext(Boolean isServer, String targetHost, SslProtocols enabledSslProtocols, X509Certificate serverCertificate, X509CertificateCollection clientCertificates, Boolean remoteCertRequired, Boolean checkCertRevocationStatus, Boolean checkCertName)

at System.Net.Security.SslStream.BeginAuthenticateAsClient(String targetHost, X509CertificateCollection clientCertificates, SslProtocols enabledSslProtocols, Boolean checkCertificateRevocation, AsyncCallback asyncCallback, Object asyncState) at System.Net.Security.SslStream.<>c__DisplayClass32_0.<AuthenticateAsClientAsync>b__0(AsyncCallback callback, Object state) at System.Threading.Tasks.TaskFactory1.FromAsyncImpl(Func3 beginMethod, Func2 endFunction, Action1 endAction, Object state, TaskCreationOptions creationOptions) at System.Net.Security.SslStream.AuthenticateAsClientAsync(String targetHost, X509CertificateCollection clientCertificates, SslProtocols enabledSslProtocols, Boolean checkCertificateRevocation) at DotPulsar.Internal.Connector.<EncryptStream>d__7.MoveNext() at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.ConfiguredTaskAwaitable1.ConfiguredTaskAwaiter.GetResult() at DotPulsar.Internal.Connector.<Connect>d__5.MoveNext() at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.ConfiguredTaskAwaitable1.ConfiguredTaskAwaiter.GetResult() at DotPulsar.Internal.ConnectionPool.<EstablishNewConnection>d__17.MoveNext() at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.ConfiguredTaskAwaitable1.ConfiguredTaskAwaiter.GetResult() at DotPulsar.Internal.ConnectionPool.<GetConnection>d__16.MoveNext() at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Threading.Tasks.ValueTask1.get_Result() at DotPulsar.Internal.ConnectionPool.<GetConnection>d__15.MoveNext() at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Threading.Tasks.ValueTask1.get_Result() at DotPulsar.Internal.ConnectionPool.<FindConnectionForTopic>d__13.MoveNext() at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Threading.Tasks.ValueTask1.get_Result() at DotPulsar.Internal.Producer1.<GetNumberOfPartitions>d__27.MoveNext() at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.ConfiguredTaskAwaitable1.ConfiguredTaskAwaiter.GetResult() at DotPulsar.Internal.Producer`1.<Monitor>d__25.MoveNext() at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult() at DotPulsar.Internal.Executor.<Execute>d__5.MoveNext()

amareshmad avatar Oct 13 '22 11:10 amareshmad

Mainly observed, whenever after creating producer/consumer if I start send/receive through await producer/consumer object state is going to Faulted.

When I comment send/receive through their respective object the producer/consumer object is going to Closed state

amareshmad avatar Oct 13 '22 11:10 amareshmad

Have a look at TrustedCertificateAuthority, VerifyCertificateAuthority and VerifyCertificateName. Maybe start by setting VerifyCertificateAuthority to false.

blankensteiner avatar Oct 13 '22 21:10 blankensteiner

tried with setting VerifyCertificateAuthority ,VerifyCertificateName it didn't help.

tried with setting registry as well didn't help. [HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft.NETFramework\v2.0.50727] "SystemDefaultTlsVersions" = dword:00000001 "SchUseStrongCrypto" = dword:00000001 [HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft.NETFramework\v4.0.30319] "SystemDefaultTlsVersions" = dword:00000001 "SchUseStrongCrypto" = dword:00000001

Product code was in .net framework 4.5, migrated to 4.8 & verified existing messaging system (active mq ) started working then after I started migration to pulsar.

I can see from Wireshark log for both. from product we are getting connection is terminating.

Test .net framework console app connection DNS 93 Standard query 0x9e7c A xxxxxx.com DNS 141 Standard query response 0x9e7c A xxxxxx.com A xx.xxx.xxx.xxx A xx.xxx.xxx.xxx TCP 66 60427 → 6651 [SYN, ECE, CWR] Seq=0 Win=8192 Len=0 MSS=1460 WS=256 SACK_PERM TCP 66 58908 → 799 [ACK] Seq=13933 Ack=793324 Win=6523 Len=0 TSval=2310068914 TSecr=341545951 TCP 66 6651 → 60427 [SYN, ACK] Seq=0 Ack=1 Win=26883 Len=0 MSS=1460 SACK_PERM WS=128 TCP 54 60427 → 6651 [ACK] Seq=1 Ack=1 Win=262656 Len=0 TCP 56 6651 → 60427 [ACK] Seq=1 Ack=202 Win=28032 Len=0 TLSv1.2 1514 Server Hello TLSv1.2 1160 Certificate, Server Key Exchange, Certificate Request, Server Hello Done TCP 54 60427 → 6651 [ACK] Seq=202 Ack=2567 Win=262656 Len=0 TLSv1.2 187 Certificate, Client Key Exchange, Change Cipher Spec, Encrypted Handshake Message TLSv1.2 255 Client Hello

From actual product code log DNS 93 Standard query 0xb556 A DNS 141 Standard query response 0xb556 A xxxxxx.com A xx.xxx.xxx.xxx A xx.xxx.xxx.xxx TCP 66 57823 → 6651 [SYN, ECE, CWR] Seq=0 Win=8192 Len=0 MSS=1460 WS=256 SACK_PERM TCP 66 6651 → 57823 [SYN, ACK] Seq=0 Ack=1 Win=26883 Len=0 MSS=1460 SACK_PERM WS=128 TCP 54 57823 → 6651 [ACK] Seq=1 Ack=1 Win=262656 Len=0 TCP 54 57823 → 6651 [FIN, ACK] Seq=1 Ack=1 Win=262656 Len=0 TCP 56 6651 → 57823 [FIN, ACK] Seq=1 Ack=2 Win=27008 Len=0 TCP 54 57823 → 6651 [ACK] Seq=2 Ack=2 Win=262656 Len=0

I don't have any idea on how to procced (validate TrustedCertificateAuthority, VerifyCertificateAuthority and VerifyCertificateName) now onwards.

amareshmad avatar Oct 14 '22 15:10 amareshmad

It sounds like something is broken in the production app. Maybe some dependencies or settings are lingering from when it was .NET Framework 4.5. Possibly related to SSL/TLS, I can't really say. If possible, you could create the solution from scratch (using .NET Framework 4.8) and then copy/paste the code from the old solution into the new solution. Using the new .csproc format (+nuget dependencies within the project file) and appsettings.json (or a clean configuration file). Just to get a relatively clean slate to work with.

blankensteiner avatar Oct 14 '22 20:10 blankensteiner

Connection established from product code

Consumer code: await using var client = PulsarClient.Builder() .Authentication(new AuthenticationBasic(username, password)) .ServiceUrl(sURl) .ExceptionHandler(pException.OnException) .VerifyCertificateAuthority(false) .TrustedCertificateAuthority(new System.Security.Cryptography.X509Certificates.X509Certificate2()) .Build();

            log.Info($"creating Consumer.......");
            await using var consumer = client.NewConsumer(Schema.ByteArray)
                                             .Topic(myTopic)                                            
                                             .SubscriptionName(subscriptionName)
                                             .SubscriptionType(SubscriptionType.Shared)
                                             .StateChangedHandler(DESPulsarQueue.Monitor)
                                             .Create();

producer code: log.Info($"build producer client"); ExceptionHandler pException = new ExceptionHandler();

            await using var client = PulsarClient.Builder()
                                                 .Authentication(new AuthenticationBasic(username, password))
                                                 .ServiceUrl(sURl)
                                                 .ExceptionHandler(pException.OnException)
                                                 .VerifyCertificateAuthority(false)
                                                 .TrustedCertificateAuthority(new System.Security.Cryptography.X509Certificates.X509Certificate2())
                                                 .Build();

            log.Info($"creating producer.......");
            await using var producer = client.NewProducer(Schema.String)
                                             .Topic(myTopic)
                                             .StateChangedHandler(Monitor)
                                             .Create();

Source helped to setting up TLS layers: https://support.avigilon.com/s/article/How-to-Check-if-TLS-1-2-is-Enabled?language=en_US

amareshmad avatar Oct 17 '22 11:10 amareshmad

Glad to hear it. TLS can be tricky :-)

blankensteiner avatar Oct 17 '22 12:10 blankensteiner