ksqlDB.RestApi.Client-DotNet icon indicating copy to clipboard operation
ksqlDB.RestApi.Client-DotNet copied to clipboard

Multiple push queries

Open jbkuczma opened this issue 1 year ago • 0 comments

I am writing a GraphQL app where a subscription feeds events from multiple push queries into a single stream for a web app to consume. Below is an example of the code


       // dependency injection
       services.AddDbContext<IMyKSqlDbContext, MyKSqlDbContext>(options =>
       {
           var builder = options
               .UseKSqlDb("endpoint")
               .SetBasicAuthCredentials("apiKey", "apiSecret")
               .SetupQueryStream(opt =>
               {
                   opt.AutoOffsetReset = AutoOffsetReset.Latest;
                   opt.Properties[KSqlDbConfigs.KsqlQueryPushV2Enabled] = "true";
                   opt.Properties[KSqlDbConfigs.KsqlQueryPushV2ContinuationTokensEnabled] = "true";
               });
           
           builder.Options.ShouldPluralizeFromItemName = false;
           builder.Options.DisposeHttpClient = true;
           builder.Options.IdentifierEscaping = IdentifierEscaping.Always;
       }, ServiceLifetime.Transient, restApiLifetime: ServiceLifetime.Transient);

       // example usage
       var input = ctx.ArgumentValue<MyGraphQLInput>("input");
       var ksqlClient = ctx.Service<IMyKSqlDbContext>();
       var date = new DateOnly(input.Date.Year, input.Date.Month, input.Date.Day).ToString("O");

       var pushQueryOne = ksqlClient.CreateQueryStream<ModelOne>("MATERIALIZED_VIEW_ONE").Where(x => x.Date == date).ToAsyncEnumerable();
       var pushQueryTwo = ksqlClient.CreateQueryStream<ModelTwo>("MATERIALIZED_VIEW_TWO").Where(x => x.Date == date).ToAsyncEnumerable();

       return AsyncEnumerableEx.Merge<KSqlDbModel>(pushQueryOne, pushQueryTwo);

When looking at the console I see two requests executed by the KSqlDb client but they both use the latest query which eventually leads to a deserialization error since the models for each view differ.

info: ksqlDb.RestApi.Client[0]
      Executing query Sql: SELECT * FROM `MATERIALIZED_VIEW_ONE` WHERE (`Date` = '2024-04-08') EMIT CHANGES;
      Parameters:
      auto.offset.reset = latest
      ksql.query.push.v2.enabled = true
      ksql.query.push.v2.continuation.tokens.enabled = true
      
info: ksqlDb.RestApi.Client[0]
      Executing query Sql:SELECT * FROM `MATERIALIZED_VIEW_ONE` WHERE (`Date` = '2024-04-08') EMIT CHANGES;
      Parameters:
      auto.offset.reset = latest
      ksql.query.push.v2.enabled = true
      ksql.query.push.v2.continuation.tokens.enabled = true

dbug: ksqlDb.RestApi.Client[0]
      Raw data received: {"queryId":"transient_MATERIALIZED_VIEW_ONE_8554443953915907599","columnNames":["Id", "Date","EventType","Timestamp"],"columnTypes":["STRING","STRING","STRING","STRING"]}

dbug: ksqlDb.RestApi.Client[0]
      Raw data received: {"queryId":"transient_MATERIALIZED_VIEW_ONE_7088848852469522776","columnNames":["Id", "Date","EventType","Timestamp"],"columnTypes":["STRING","STRING","STRING","STRING"]}


I would expect to see two requests and each request sends its intended query.

jbkuczma avatar Apr 08 '24 17:04 jbkuczma