MedallionShell icon indicating copy to clipboard operation
MedallionShell copied to clipboard

Reduce thread usage in pipelining

Open madelson opened this issue 5 years ago • 4 comments

The following test runs very quickly with the min thread count elevated (e. g. 32) and very slowly without (e. g. 16). I believe that this is because the underlying streams open in a non-async way which means that our piping consumes thread, possibly more threads than it needs to due to how async is implemented:

        [Test]
        public void TestPipeline()
        {
            ThreadPool.SetMinThreads(100, 100);
            const int ProcessCount = 10;

            var pipeline = Enumerable.Range(0, ProcessCount)
                .Select(_ => TestShell.Run(SampleCommand, "pipebytes"))
                .Aggregate((first, second) => first | second);
            try
            {
                for (var i = 0; i < 10; ++i)
                {
                    char @char = (char)('a' + i);

                    pipeline.StandardInput.AutoFlush.ShouldEqual(true);
                    var writeTask = pipeline.StandardInput.WriteAsync(@char);
                    writeTask.Wait(TimeSpan.FromSeconds(30)).ShouldEqual(true, $"write {i} should complete");

                    var buffer = new char[10];
                    var readTask = pipeline.StandardOutput.ReadAsync(buffer, 0, buffer.Length);
                    readTask.Wait(TimeSpan.FromSeconds(30)).ShouldEqual(true, $"read {i} should complete");
                    readTask.Result.ShouldEqual(1);
                    buffer[0].ShouldEqual(@char);
                }

                pipeline.StandardInput.Dispose();
                pipeline.Task.Wait(TimeSpan.FromSeconds(30)).ShouldEqual(true, "pipeline should exit");
            }
            finally
            {
                pipeline.Kill();
            }
        }

// in program.cs

                case "pipebytes":
                    using (var standardInput = Console.OpenStandardInput())
                    using (var standardOutput = Console.OpenStandardOutput())
                    {
                        var buffer = new byte[10];
                        while (true)
                        {
                            var bytesRead = standardInput.Read(buffer, 0, buffer.Length);
                            if (bytesRead == 0) { break; }
                            standardOutput.Write(buffer, 0, bytesRead);
                            standardOutput.Flush();
                        }
                    }
                    break;

Ways to fix could be trying to get the pipes to switch over to async, or even just using non-async reads/flushes to avoid extra thread use.

madelson avatar Apr 26 '19 10:04 madelson

Possibly relevant: https://devblogs.microsoft.com/oldnewthing/20130812-00/?p=3533

Sample code fails with "pipe busy" because a pipe can't have more handles open:

void Main()
{
	var path = @"C:\Users\mikea_000\Documents\Interests\CS\MedallionShell\SampleCommand\bin\Debug\net46\SampleCommand.exe";
	
	var proc = new Process { StartInfo = { FileName = path, Arguments = "pipe", UseShellExecute = false, RedirectStandardInput = true, RedirectStandardOutput = true, RedirectStandardError = true } };
	try
	{
		proc.Start();
		
		var stdinHandle = ((FileStream)proc.StandardInput.BaseStream).SafeFileHandle;
		var newHandle = ReOpenFile(stdinHandle, 0x40000000, 1 | 2| 4, 0x40000000 | 0x08000000 | 0x04000000);
		Marshal.ThrowExceptionForHR(Marshal.GetHRForLastWin32Error());
		newHandle.Dump();
	}
	finally {
		proc.Kill();
	}
}

[DllImport("kernel32.dll", SetLastError = true)]
private static extern SafeFileHandle ReOpenFile(SafeFileHandle hOriginalFile, uint dwDesiredAccess, uint dwShareMode, uint dwFlagsAndAttributes);

madelson avatar Apr 26 '19 11:04 madelson

Some initial (looking unsuccessful) work to use async IO on linux: https://github.com/madelson/MedallionShell/tree/asyncify-on-linux

madelson avatar Apr 27 '19 13:04 madelson

Tried playing around with ReOpenFile a bit more for this on Windows, to no avail:

https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-reopenfile

void Main()
{
	var sc = @"C:\Users\...\MedallionShell\SampleCommand\bin\Debug\net46\SampleCommand.exe";
	var process = new Process { StartInfo = { FileName = sc, RedirectStandardOutput = true, RedirectStandardInput = true, UseShellExecute = false, CreateNoWindow = true } };
	process.Start();
	try
	{
		var standardOutput = (FileStream)process.StandardOutput.BaseStream;
		var newHandle = ReOpenFile(standardOutput.SafeFileHandle.DangerousGetHandle(), FileAccess.Read, FileShare.Write, ((FileAttributes)0x40000000).Dump());
		newHandle.Dump();
		Marshal.GetLastWin32Error().Dump();
		var newSafeHandle = new SafeFileHandle(newHandle, ownsHandle: true);
		var stream = new FileStream(newSafeHandle, FileAccess.Read, 1024, isAsync: true);
	}
	finally {
		process.Kill();
	}
}

[DllImport("kernel32.dll", SetLastError = true)]
public static extern IntPtr ReOpenFile(IntPtr originalFile, FileAccess dwDesiredAccess, FileShare dwShareMode, FileAttributes dwFlagsAndAttributes);

madelson avatar Dec 18 '19 11:12 madelson

One option here on linux would be to use poll (https://devarea.com/linux-io-multiplexing-select-vs-poll-vs-epoll/#.XfoZdOhKiUk). We could then have one thread polling all 3 stdio streams for a process, or even across multiple processes (getting this exactly right would be tricky because of the need to sync up poll times, but could probably be done with a clever handling of timeouts).

On Windows there is PeekNamedPipe (https://docs.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-peeknamedpipe) which lets us scan a pipe without blocking. The problem is that this doesn't block at all, so we just end up busy-waiting unless we add artificial sleeps.

madelson avatar Dec 18 '19 12:12 madelson