MedallionShell
MedallionShell copied to clipboard
Reduce thread usage in pipelining
The following test runs very quickly with the min thread count elevated (e. g. 32) and very slowly without (e. g. 16). I believe that this is because the underlying streams open in a non-async way which means that our piping consumes thread, possibly more threads than it needs to due to how async is implemented:
[Test]
public void TestPipeline()
{
ThreadPool.SetMinThreads(100, 100);
const int ProcessCount = 10;
var pipeline = Enumerable.Range(0, ProcessCount)
.Select(_ => TestShell.Run(SampleCommand, "pipebytes"))
.Aggregate((first, second) => first | second);
try
{
for (var i = 0; i < 10; ++i)
{
char @char = (char)('a' + i);
pipeline.StandardInput.AutoFlush.ShouldEqual(true);
var writeTask = pipeline.StandardInput.WriteAsync(@char);
writeTask.Wait(TimeSpan.FromSeconds(30)).ShouldEqual(true, $"write {i} should complete");
var buffer = new char[10];
var readTask = pipeline.StandardOutput.ReadAsync(buffer, 0, buffer.Length);
readTask.Wait(TimeSpan.FromSeconds(30)).ShouldEqual(true, $"read {i} should complete");
readTask.Result.ShouldEqual(1);
buffer[0].ShouldEqual(@char);
}
pipeline.StandardInput.Dispose();
pipeline.Task.Wait(TimeSpan.FromSeconds(30)).ShouldEqual(true, "pipeline should exit");
}
finally
{
pipeline.Kill();
}
}
// in program.cs
case "pipebytes":
using (var standardInput = Console.OpenStandardInput())
using (var standardOutput = Console.OpenStandardOutput())
{
var buffer = new byte[10];
while (true)
{
var bytesRead = standardInput.Read(buffer, 0, buffer.Length);
if (bytesRead == 0) { break; }
standardOutput.Write(buffer, 0, bytesRead);
standardOutput.Flush();
}
}
break;
Ways to fix could be trying to get the pipes to switch over to async, or even just using non-async reads/flushes to avoid extra thread use.
Possibly relevant: https://devblogs.microsoft.com/oldnewthing/20130812-00/?p=3533
Sample code fails with "pipe busy" because a pipe can't have more handles open:
void Main()
{
var path = @"C:\Users\mikea_000\Documents\Interests\CS\MedallionShell\SampleCommand\bin\Debug\net46\SampleCommand.exe";
var proc = new Process { StartInfo = { FileName = path, Arguments = "pipe", UseShellExecute = false, RedirectStandardInput = true, RedirectStandardOutput = true, RedirectStandardError = true } };
try
{
proc.Start();
var stdinHandle = ((FileStream)proc.StandardInput.BaseStream).SafeFileHandle;
var newHandle = ReOpenFile(stdinHandle, 0x40000000, 1 | 2| 4, 0x40000000 | 0x08000000 | 0x04000000);
Marshal.ThrowExceptionForHR(Marshal.GetHRForLastWin32Error());
newHandle.Dump();
}
finally {
proc.Kill();
}
}
[DllImport("kernel32.dll", SetLastError = true)]
private static extern SafeFileHandle ReOpenFile(SafeFileHandle hOriginalFile, uint dwDesiredAccess, uint dwShareMode, uint dwFlagsAndAttributes);
Some initial (looking unsuccessful) work to use async IO on linux: https://github.com/madelson/MedallionShell/tree/asyncify-on-linux
Tried playing around with ReOpenFile a bit more for this on Windows, to no avail:
https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-reopenfile
void Main()
{
var sc = @"C:\Users\...\MedallionShell\SampleCommand\bin\Debug\net46\SampleCommand.exe";
var process = new Process { StartInfo = { FileName = sc, RedirectStandardOutput = true, RedirectStandardInput = true, UseShellExecute = false, CreateNoWindow = true } };
process.Start();
try
{
var standardOutput = (FileStream)process.StandardOutput.BaseStream;
var newHandle = ReOpenFile(standardOutput.SafeFileHandle.DangerousGetHandle(), FileAccess.Read, FileShare.Write, ((FileAttributes)0x40000000).Dump());
newHandle.Dump();
Marshal.GetLastWin32Error().Dump();
var newSafeHandle = new SafeFileHandle(newHandle, ownsHandle: true);
var stream = new FileStream(newSafeHandle, FileAccess.Read, 1024, isAsync: true);
}
finally {
process.Kill();
}
}
[DllImport("kernel32.dll", SetLastError = true)]
public static extern IntPtr ReOpenFile(IntPtr originalFile, FileAccess dwDesiredAccess, FileShare dwShareMode, FileAttributes dwFlagsAndAttributes);
One option here on linux would be to use poll (https://devarea.com/linux-io-multiplexing-select-vs-poll-vs-epoll/#.XfoZdOhKiUk). We could then have one thread polling all 3 stdio streams for a process, or even across multiple processes (getting this exactly right would be tricky because of the need to sync up poll times, but could probably be done with a clever handling of timeouts).
On Windows there is PeekNamedPipe (https://docs.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-peeknamedpipe) which lets us scan a pipe without blocking. The problem is that this doesn't block at all, so we just end up busy-waiting unless we add artificial sleeps.