Ksql icon indicating copy to clipboard operation
Ksql copied to clipboard

Closing stream / rebuild stream handler

Open ook opened this issue 2 years ago • 4 comments

I try to use Ksql gem to stream changes into a Rails view. It works great so far.

I meet difficulty about closing the stream: the ksqldb stream subscription occur in a rails action, I was looking for a way to close a ksqldb stream from another rails action.

It seems I can't "rebuild" a stream handler from stream = Ksql::Client.stream("SELECT * FROM my_stream EMIT CHANGES;") to call stream.close (BTW, there's a typo on https://github.com/LapoElisacci/Ksql/blob/main/lib/ksql/stream.rb#L18 s/stared/started/ ) KSQLDB REST API seems to only propose to close the connection. How would you close a stream handler in such situation? Thank you.

ook avatar May 04 '22 13:05 ook

Hi @ook, thanks for your feedback!

If you know what's the STREAM ID, you should be able to close a stream by calling the close_query endpoint with the client https://github.com/LapoElisacci/Ksql/blob/9f2ba15f812c7890a0e88773ab36887327666bcf/lib/ksql/client.rb#L14

Please, let me know if that worked.

I'll fix the typo asap.

Thanks

LapoElisacci avatar May 04 '22 14:05 LapoElisacci

Thank you again for that quick response, @LapoElisacci ! Hum I see.

It seems to close successfully the query, but return a generic error:

Loading development environment (Rails 6.1.4.7)
[1] pry(main)> Ksql::Client.ksql('show queries;')
=> Ksql::Queries {
  :queries       => [
    [0] {
      "id"              => "transient_S_GROW_EVENTS_2830845903553965693",
      "queryString"     => "SELECT * FROM s_grow_events EMIT CHANGES;",
      "queryType"       => "PUSH",
      "sinkKafkaTopics" => [],
      "sinks"           => [],
      "state"           => "RUNNING",
      "statusCount"     => {
        "RUNNING" => 1
      }
    }
  ],
  :statementText => "show queries;",
  :warnings      => []
}
[2] pry(main)> Ksql::Client.close_query("transient_S_GROW_EVENTS_2830845903553965693")
=> Ksql::Error {
  :@type      => "generic_error",
  :error_code => 50000,
  :message    => "On wrong context or worker"
}
[3] pry(main)> Ksql::Client.ksql('show queries;')
=> Ksql::Queries {
  :queries       => [],
  :statementText => "show queries;",
  :warnings      => []
}

I notice that it doesn't trigger the stream.on_close block setup in the "initial" action. I'll search a better way to achieve my goal.

ook avatar May 04 '22 16:05 ook

Hey @ook, that's interesting thanks for sharing!

If you find out a way to better handle this scenario, please feel free to open a PR or suggest some changes here.

Thanks!

LapoElisacci avatar May 05 '22 06:05 LapoElisacci

@ook Version 0.1.1 released, I hope you don't mind that I mentioned you as a contributor 😄

LapoElisacci avatar May 05 '22 06:05 LapoElisacci