polyphony icon indicating copy to clipboard operation
polyphony copied to clipboard

Pub/sub with Polyphony instead of threads

Open gottlike opened this issue 1 year ago • 11 comments

I've looked at all the examples and a bunch of docs, but I can't wrap my head around how to do something like this the right way:

# frozen_string_literal: true

require 'bundler/setup'
require 'redis'
require 'polyphony/adapters/redis'

10_000.times do |i|
  spin do
    redis = Redis.new
    msg = redis.blpop("redis-channel-#{i}")
    puts msg
  end

  spin do
    redis = Redis.new

    # Do some IO work (few milliseconds or many seconds)

    redis.lpush("redis-channel-#{i}", Time.now.to_i)
  end
end

suspend

My use case here, is trying to read from a Redis stream with XREADGROUP and then spin up workers that process the retrieved stream entries as fast as possible. The "Do some IO work" part could mean to have another blpop/lpush combination that should block, like kind of a REST call over Redis. Can someone help? Am also open to booking consulting hours for this.. I just can't get this to work 🙈

gottlike avatar Jun 15 '24 14:06 gottlike

I just found a tool that does exactly what I needed: https://docs.nats.io/nats-concepts/core-nats/reqreply/reqreply_walkthrough

Maybe Polyphony can enhance some things here, too (Ruby client: https://github.com/nats-io/nats-pure.rb)? But in any case, I think this solves my question. Will leave the issue open, in case Polyphony can make things even better with NATS 😁

gottlike avatar Jun 16 '24 19:06 gottlike

Alright, so I am indeed pretty happy with NATS for my use case. Now I would like to use Polyphony instead of threads and ran into issues. Code for reproduction can be found here: https://gist.github.com/gottlike/e9cfed216ea7637c1c9a4ef031eb4c9e

You can run everything like this:

  1. docker run --rm -p 4222:4222 --name nats nats
  2. ruby worker_sub.rb
  3. ruby worker.rb
  4. ruby benchmark.rb

There's two issues:

  • When using Polyphony, I lose messages (visible in the worker process logs)
  • If there's an exception inside a Fiber (spin), it doesn't bubble up to the NATS client

When using threads, everything works fine.

gottlike avatar Jun 18 '24 11:06 gottlike

OK, so you're integrating with nats.io. I saw your gist uses the nats-pure gem (I guess because the nats.rb gem is based on EventMachine?) Anyway, I took a look: https://github.com/nats-io/nats-pure.rb/blob/0f8ca2f4416f53864e663862c3a5b5def0ac2758/lib/nats/io/client.rb#L1922-L1938

      def read(max_bytes, deadline=nil)
        begin
          return @socket.read_nonblock(max_bytes)
        rescue ::IO::WaitReadable
          if ::IO.select([@socket], nil, nil, deadline)
            retry
          else
            raise NATS::IO::SocketTimeoutError
          end
        rescue ::IO::WaitWritable
          if ::IO.select(nil, [@socket], nil, deadline)
            retry
          else
            raise NATS::IO::SocketTimeoutError
          end
        ...

They do a non-blocking read, and if no data is ready for reading they use select to wait for incoming data or timeout. This most probably won't work properly with Polyphony. There might be other places in their code where they take liberty (so-to-speak) with the underlying concurrency model.

I'll try to take another look tonight and see if I can make an adapter for it.

noteflakes avatar Jun 18 '24 11:06 noteflakes

Thanks @noteflakes! I use nats-pure, because the nats gem is not maintained anymore. In their Slack they said that nats-pure will replace nats at some point.

gottlike avatar Jun 18 '24 11:06 gottlike

Something else popped up, with regard to running my script in Docker:

/root/vendor/bundle/ruby/3.2.0/gems/polyphony-1.6/lib/polyphony.rb:8:in `initialize': Operation not permitted - Operation not permitted (Errno::EPERM)
	from /root/vendor/bundle/ruby/3.2.0/gems/polyphony-1.6/lib/polyphony.rb:8:in `new'
	from /root/vendor/bundle/ruby/3.2.0/gems/polyphony-1.6/lib/polyphony.rb:8:in `<top (required)>'
	from <internal:/usr/lib/ruby/3.2.0/rubygems/core_ext/kernel_require.rb>:38:in `require'
	from <internal:/usr/lib/ruby/3.2.0/rubygems/core_ext/kernel_require.rb>:38:in `require'
	from /root/mq.rb:6:in `<top (required)>'
	from function.rb:8:in `require_relative'
	from function.rb:8:in `<main>'

When I run the script locally via ruby script.rb it works, but in Docker with ENTRYPOINT ["ruby", "script.rb"] it doesn't.

gottlike avatar Jun 18 '24 18:06 gottlike

What kernel version are you on?

noteflakes avatar Jun 18 '24 19:06 noteflakes

Looking at https://www.man7.org/linux/man-pages/man2/io_uring_setup.2.html:

EPERM /proc/sys/kernel/io_uring_disabled has the value 2, or it has the value 1 and the calling process does not hold the CAP_SYS_ADMIN capability or is not a member of /proc/sys/kernel/io_uring_group.

noteflakes avatar Jun 18 '24 19:06 noteflakes

You may want to prevent Polyphony from using io_uring, by setting POLYPHONY_LIBEV=1 in your ENV.

noteflakes avatar Jun 18 '24 19:06 noteflakes

I'm using Alpine 3.18 (Kernel 5.15). I just passed this to my Docker process with --env POLYPHONY_LIBEV=1, but the error persists.

gottlike avatar Jun 18 '24 19:06 gottlike

Kernel 6.1.. shouldn't trust the AI's response 😆

gottlike avatar Jun 18 '24 19:06 gottlike

Ah, never mind. I had to put this before the bundle install in the Dockerfile, like so: POLYPHONY_LIBEV=1 bundle install

Now it works 👍

gottlike avatar Jun 18 '24 19:06 gottlike