crystal icon indicating copy to clipboard operation
crystal copied to clipboard

In-memory pipe implementation

Open straight-shoota opened this issue 3 years ago • 14 comments

This discussion came up in https://github.com/crystal-lang/crystal/issues/5430#issuecomment-1179500955

IO.pipe is currently implemented based on file descriptors, which has a couple downsides:

  • It requires to be backed by a system implementation
  • You can run out of file descriptors when using lots
  • There are blocking issues (#8152)

A use-mode implementation would avoid these problens and be more performant because there's no switch to the kernel. It can't be used for inter-process communication, though. So we should retain a pipe implementation based on file-descriptors for this use case.

straight-shoota avatar Jul 09 '22 22:07 straight-shoota

So we should retain a pipe implementation based on file-descriptors for this use case.

Indeed, this is needed for Process and Signal.

In #5430 I mentioned I had some code for this. I posted it in #12245 to show where I was going with it. I also have another implementation that isn't based on Deque (it just uses Slice directly), but I realized while messing with it this evening that Deque simplified the implementation a lot.

One thing that might be nice is to allow the buffer to be elastic — shrink it back down when the size:capacity ratio is reduced below some threshold. I have no idea if that's feasible or what the real-world performance looks like but a user-space queue won't yield the CPU on I/O because we're not doing any kernel-level I/O, so a whoooole lot of writes can happen before a single read does, potentially ballooning memory until OOM.

Another mitigation strategy for that is to replace the Deque with a Channel. My assumption is that, since Deque provides the underlying implementation for Channel, that Channel is slower, but I haven't done any benchmarking yet. Using a Channel would definitely avoid unwanted memory bloat since it provides a hard limit on memory usage and would yield the CPU when the buffer fills, so it might be closer to live functionality.

jgaskins avatar Jul 10 '22 07:07 jgaskins

We'll need to handle concurrent access correctly, so using a channel would be a good idea. The Go implementation can serve as inspiration: https://go.dev/src/io/pipe.go

straight-shoota avatar Jul 11 '22 06:07 straight-shoota

If I stick with the Deque, the plan was to wrap a mutex around the code in read and write specifically to account for that.

jgaskins avatar Jul 11 '22 12:07 jgaskins

Yeah, Channel does that for you already

straight-shoota avatar Jul 11 '22 18:07 straight-shoota

Yes, but that's not the only difference between the two. The way you're talking about it implies you are favoring one implementation based on a single factor.

jgaskins avatar Jul 11 '22 20:07 jgaskins

Managing concurrent access is hard. So I would start with channels, just because it's easier. And there's no point in reimplementing that just for the sake of it. Do you think there is a considerable downside to using channels?

straight-shoota avatar Jul 12 '22 05:07 straight-shoota

there's no point in reimplementing that just for the sake of it.

There is more to what I said than “for the sake of it”.

Do you think there is a considerable downside to using channels?

I pointed out tradeoffs in this comment.

jgaskins avatar Jul 13 '22 02:07 jgaskins

Forgive me, I read that comment as a perfect argument for using channels. The only downside you mentioned is a hunch that performance might be impacted. I don't think that's justified. We probably need pretty much everything that Channel provides for making the pipe implementation concurrency safe. Re-implementing that unlikely yields any better results. Anyways, we should do the simple thing first which also gives more confidence for correctness. We can always optimize once there is a solid base.

straight-shoota avatar Jul 13 '22 07:07 straight-shoota

I wasn't making a case one way or the other, really. Just offering discussion. I don't think there was enough information at the time to draw any conclusions about a preferred implementation. There may still not be even after this post.

The only downside you mentioned is a hunch that performance might be impacted. I don't think that's justified.

Basing a decision on my assumption would be a mistake, certainly. I would never suggest making decisions about performance without benchmarks. But I'd argue that your dismissal of my note about performance is a mistake, as well.

And while I agree that correctness is more important than performance, channels provide little to no opportunity for optimization here. The API design explicitly prohibits that:

  • There's no way to send or receive a batch of elements, so every byte passed through it would lock and unlock a mutex on both the sending and receiving side
  • There's no introspection into channel state because that's not thread-safe, so to do any nonblocking I/O whatsoever, each byte would need to be read inside a select/else
  • There's no way to implement peek

These were all intentional design choices for channels. So your comment about optimizing a channel-based implementation later seems a lot more optimistic than reality allows. I wrote up a quick benchmark passing 8KB at a time through a channel vs a deque, plus the current IO::FileDescriptor-based pipe as the "time to beat":

channel 819.25  (  1.22ms) (± 1.00%)  2.13MB/op  9768.59× slower
  deque   8.00M (124.95ns) (± 0.91%)    0.0B/op          fastest
FD pipe   1.10M (907.29ns) (± 1.51%)    0.0B/op     7.26× slower
Benchmark code
require "benchmark"

size = 8192

channel = Channel(UInt8).new(size)
queue = Deque(UInt8).new(size)
p_read, p_write = IO.pipe(false, false)
slice = Bytes.new(size, '.'.ord.to_u8)

Benchmark.ips do |x|
  x.report "channel" do
    size.times { |i| channel.send slice[i] }
    size.times do |i|
      select
      when byte = channel.receive?
        if byte
          slice[i] = byte
        else
          break
        end
      else
      end
    end
  end

  mutex = Mutex.new
  x.report "deque" do
    mutex.synchronize do
      queue.copy_from slice
      queue.copy_to slice
      queue.pop(size)
    end
  end

  x.report "FD pipe" do
    p_write.write slice
    p_read.read slice
  end
end

class Deque(T)
  def copy_from(slice : Slice(T))
    if @buffer.null?
      initialize slice.size
    end

    if @start == 0
      slice.copy_to @buffer, @capacity
    else
      slice.copy_to @buffer + @start, @capacity - @start
      slice.copy_to @buffer, @start - @capacity
    end
  end

  def copy_to(slice : Slice(T))
    if @start == 0
      slice.copy_from @buffer, @capacity
    else
      slice.copy_from @buffer + @start, @capacity - @start
      slice.copy_from @buffer, @start - @capacity
    end
  end
end

Since each operation here is 8KB, that's:

Implementation Throughput
Channel 6.4MB/sec
Deque 62.5GB/sec
IO.pipe 8.6GB/sec

This means passing bytes over a Channel isn't just slower than over a Deque or even the IO::FileDescriptor-based implementation, it's slower than my internet connection.

Getting the Deque implementation this fast did require an optimization because copying byte-by-byte was slow. I couldn't come up with a way to do the same for Channel because it's a lot more complicated internally and its API design doesn't really seem to allow for it — Deque has plenty of introspection available. Also, the Deque optimization is still incomplete — it copies the entire buffer every time, even if it's not full. It was fine for my purposes because I guaranteed the buffer was full, but in real-world scenarios it can probably short-circuit to save cycles, and Deque#pop(n : Int) could possibly also be optimized to avoid having to iterate.

Maybe a similar optimization can be done for channels (since they're implemented on top of deques), and I'd be curious if it's feasible without bypassing a lot of Channel's safety mechanisms. I couldn't come up with a way to do it, though. Feel free to give it a shot.

TL;DR: I'd caution against zeroing in on a preferred implementation with too little information considered.

jgaskins avatar Jul 14 '22 04:07 jgaskins

Regarding the blocking issues in #8152, isn't that using a different pipe creation that is different from IO.pipe? Anyhow, I'm fairly certain nothing would stop us from building a pipe variant that is closer to a socket than to a file, as epoll do work on pipes, and thus we can use wait_readable.

Another example of where pipes are currently used is multithreaded crystal, to enqueue fibers between threads.

yxhuvud avatar Jul 14 '22 10:07 yxhuvud

Regarding the benchmark: 1: Don't involve select in benchmarking channels - that is introducing overhead that is different from what you are measuring. Either that, or use select in all of the benchmarks. Select need to support reading from pipes, after all. 2: Any benchmark that try to measure this that doesn't involve the scheduler at any point where nothing ever has to wait - preferably involving both from the reader and writer side is pretty much nonsense. We already know raw Deques and Arrays are faster than interprocess/fiber communication.

FWIW, IO.pipe held up well enough that I don't see a performance problem. Perhaps it needs an implementation that plays nicer with poll and blocking, but that seems like a very different issue. But there is definitely a question about how

yxhuvud avatar Jul 14 '22 10:07 yxhuvud

Regarding the blocking issues in #8152, isn't that using a different pipe creation that is different from IO.pipe? Anyhow, I'm fairly certain nothing would stop us from building a pipe variant that is closer to a socket than to a file, as epoll do work on pipes, and thus we can use wait_readable.

The purpose of this discussion is to have an implementation that doesn't use file descriptors at all, for the same reason you might use IO::Memory over a tempfile.

1: Don't involve select in benchmarking channels - that is introducing overhead that is different from what you are measuring. Either that, or use select in all of the benchmarks. Select need to support reading from pipes, after all.

Please re-read the part of that post where I mentioned why select was needed for the Channel implementation — which does not apply to Deque. The overhead of select is the reason I brought it up. It allocated about 272 bytes of heap memory for every single byte passed through.

2: Any benchmark that try to measure this that doesn't involve the scheduler at any point where nothing ever has to wait - preferably involving both from the reader and writer side is pretty much nonsense. We already know raw Deques and Arrays are faster than interprocess/fiber communication.

If you think it's not up to scratch, feel free to show me how it's done.

FWIW, IO.pipe held up well enough that I don't see a performance problem. Perhaps it needs an implementation that plays nicer with poll and blocking, but that seems like a very different issue.

Please see this comment. You're more than welcome to keep using the implementation backed by file descriptors. It isn't going to be removed.

jgaskins avatar Jul 14 '22 12:07 jgaskins

This issue stemmed from the fact that the use of non-blocking IO.pipes blocks certain standard library specs on Windows (see what I did there?), and those specs obviously do not involve any inter-process communication. IIRC this affects the OAuth2 client specs. Any working implementation is fine by me even if it is restricted to spec/support/io.cr.

HertzDevil avatar Jul 16 '22 10:07 HertzDevil