crystal
crystal copied to clipboard
In-memory pipe implementation
This discussion came up in https://github.com/crystal-lang/crystal/issues/5430#issuecomment-1179500955
IO.pipe is currently implemented based on file descriptors, which has a couple downsides:
- It requires to be backed by a system implementation
- You can run out of file descriptors when using lots
- There are blocking issues (#8152)
A use-mode implementation would avoid these problens and be more performant because there's no switch to the kernel. It can't be used for inter-process communication, though. So we should retain a pipe implementation based on file-descriptors for this use case.
So we should retain a pipe implementation based on file-descriptors for this use case.
Indeed, this is needed for Process and Signal.
In #5430 I mentioned I had some code for this. I posted it in #12245 to show where I was going with it. I also have another implementation that isn't based on Deque (it just uses Slice directly), but I realized while messing with it this evening that Deque simplified the implementation a lot.
One thing that might be nice is to allow the buffer to be elastic — shrink it back down when the size:capacity ratio is reduced below some threshold. I have no idea if that's feasible or what the real-world performance looks like but a user-space queue won't yield the CPU on I/O because we're not doing any kernel-level I/O, so a whoooole lot of writes can happen before a single read does, potentially ballooning memory until OOM.
Another mitigation strategy for that is to replace the Deque with a Channel. My assumption is that, since Deque provides the underlying implementation for Channel, that Channel is slower, but I haven't done any benchmarking yet. Using a Channel would definitely avoid unwanted memory bloat since it provides a hard limit on memory usage and would yield the CPU when the buffer fills, so it might be closer to live functionality.
We'll need to handle concurrent access correctly, so using a channel would be a good idea. The Go implementation can serve as inspiration: https://go.dev/src/io/pipe.go
If I stick with the Deque, the plan was to wrap a mutex around the code in read and write specifically to account for that.
Yeah, Channel does that for you already
Yes, but that's not the only difference between the two. The way you're talking about it implies you are favoring one implementation based on a single factor.
Managing concurrent access is hard. So I would start with channels, just because it's easier. And there's no point in reimplementing that just for the sake of it. Do you think there is a considerable downside to using channels?
there's no point in reimplementing that just for the sake of it.
There is more to what I said than “for the sake of it”.
Do you think there is a considerable downside to using channels?
I pointed out tradeoffs in this comment.
Forgive me, I read that comment as a perfect argument for using channels.
The only downside you mentioned is a hunch that performance might be impacted. I don't think that's justified. We probably need pretty much everything that Channel provides for making the pipe implementation concurrency safe. Re-implementing that unlikely yields any better results.
Anyways, we should do the simple thing first which also gives more confidence for correctness. We can always optimize once there is a solid base.
I wasn't making a case one way or the other, really. Just offering discussion. I don't think there was enough information at the time to draw any conclusions about a preferred implementation. There may still not be even after this post.
The only downside you mentioned is a hunch that performance might be impacted. I don't think that's justified.
Basing a decision on my assumption would be a mistake, certainly. I would never suggest making decisions about performance without benchmarks. But I'd argue that your dismissal of my note about performance is a mistake, as well.
And while I agree that correctness is more important than performance, channels provide little to no opportunity for optimization here. The API design explicitly prohibits that:
- There's no way to send or receive a batch of elements, so every byte passed through it would lock and unlock a mutex on both the sending and receiving side
- There's no introspection into channel state because that's not thread-safe, so to do any nonblocking I/O whatsoever, each byte would need to be read inside a
select/else - There's no way to implement
peek
These were all intentional design choices for channels. So your comment about optimizing a channel-based implementation later seems a lot more optimistic than reality allows. I wrote up a quick benchmark passing 8KB at a time through a channel vs a deque, plus the current IO::FileDescriptor-based pipe as the "time to beat":
channel 819.25 ( 1.22ms) (± 1.00%) 2.13MB/op 9768.59× slower
deque 8.00M (124.95ns) (± 0.91%) 0.0B/op fastest
FD pipe 1.10M (907.29ns) (± 1.51%) 0.0B/op 7.26× slower
Benchmark code
require "benchmark"
size = 8192
channel = Channel(UInt8).new(size)
queue = Deque(UInt8).new(size)
p_read, p_write = IO.pipe(false, false)
slice = Bytes.new(size, '.'.ord.to_u8)
Benchmark.ips do |x|
x.report "channel" do
size.times { |i| channel.send slice[i] }
size.times do |i|
select
when byte = channel.receive?
if byte
slice[i] = byte
else
break
end
else
end
end
end
mutex = Mutex.new
x.report "deque" do
mutex.synchronize do
queue.copy_from slice
queue.copy_to slice
queue.pop(size)
end
end
x.report "FD pipe" do
p_write.write slice
p_read.read slice
end
end
class Deque(T)
def copy_from(slice : Slice(T))
if @buffer.null?
initialize slice.size
end
if @start == 0
slice.copy_to @buffer, @capacity
else
slice.copy_to @buffer + @start, @capacity - @start
slice.copy_to @buffer, @start - @capacity
end
end
def copy_to(slice : Slice(T))
if @start == 0
slice.copy_from @buffer, @capacity
else
slice.copy_from @buffer + @start, @capacity - @start
slice.copy_from @buffer, @start - @capacity
end
end
end
Since each operation here is 8KB, that's:
| Implementation | Throughput |
|---|---|
| Channel | 6.4MB/sec |
| Deque | 62.5GB/sec |
| IO.pipe | 8.6GB/sec |
This means passing bytes over a Channel isn't just slower than over a Deque or even the IO::FileDescriptor-based implementation, it's slower than my internet connection.
Getting the Deque implementation this fast did require an optimization because copying byte-by-byte was slow. I couldn't come up with a way to do the same for Channel because it's a lot more complicated internally and its API design doesn't really seem to allow for it — Deque has plenty of introspection available. Also, the Deque optimization is still incomplete — it copies the entire buffer every time, even if it's not full. It was fine for my purposes because I guaranteed the buffer was full, but in real-world scenarios it can probably short-circuit to save cycles, and Deque#pop(n : Int) could possibly also be optimized to avoid having to iterate.
Maybe a similar optimization can be done for channels (since they're implemented on top of deques), and I'd be curious if it's feasible without bypassing a lot of Channel's safety mechanisms. I couldn't come up with a way to do it, though. Feel free to give it a shot.
TL;DR: I'd caution against zeroing in on a preferred implementation with too little information considered.
Regarding the blocking issues in #8152, isn't that using a different pipe creation that is different from IO.pipe? Anyhow, I'm fairly certain nothing would stop us from building a pipe variant that is closer to a socket than to a file, as epoll do work on pipes, and thus we can use wait_readable.
Another example of where pipes are currently used is multithreaded crystal, to enqueue fibers between threads.
Regarding the benchmark:
1: Don't involve select in benchmarking channels - that is introducing overhead that is different from what you are measuring. Either that, or use select in all of the benchmarks. Select need to support reading from pipes, after all.
2: Any benchmark that try to measure this that doesn't involve the scheduler at any point where nothing ever has to wait - preferably involving both from the reader and writer side is pretty much nonsense. We already know raw Deques and Arrays are faster than interprocess/fiber communication.
FWIW, IO.pipe held up well enough that I don't see a performance problem. Perhaps it needs an implementation that plays nicer with poll and blocking, but that seems like a very different issue. But there is definitely a question about how
Regarding the blocking issues in #8152, isn't that using a different pipe creation that is different from
IO.pipe? Anyhow, I'm fairly certain nothing would stop us from building a pipe variant that is closer to a socket than to a file, as epoll do work on pipes, and thus we can usewait_readable.
The purpose of this discussion is to have an implementation that doesn't use file descriptors at all, for the same reason you might use IO::Memory over a tempfile.
1: Don't involve
selectin benchmarking channels - that is introducing overhead that is different from what you are measuring. Either that, or useselectin all of the benchmarks. Select need to support reading from pipes, after all.
Please re-read the part of that post where I mentioned why select was needed for the Channel implementation — which does not apply to Deque. The overhead of select is the reason I brought it up. It allocated about 272 bytes of heap memory for every single byte passed through.
2: Any benchmark that try to measure this that doesn't involve the scheduler at any point where nothing ever has to wait - preferably involving both from the reader and writer side is pretty much nonsense. We already know raw Deques and Arrays are faster than interprocess/fiber communication.
If you think it's not up to scratch, feel free to show me how it's done.
FWIW,
IO.pipeheld up well enough that I don't see a performance problem. Perhaps it needs an implementation that plays nicer with poll and blocking, but that seems like a very different issue.
Please see this comment. You're more than welcome to keep using the implementation backed by file descriptors. It isn't going to be removed.
This issue stemmed from the fact that the use of non-blocking IO.pipes blocks certain standard library specs on Windows (see what I did there?), and those specs obviously do not involve any inter-process communication. IIRC this affects the OAuth2 client specs. Any working implementation is fine by me even if it is restricted to spec/support/io.cr.