zig icon indicating copy to clipboard operation
zig copied to clipboard

std.ChildProcess doesn't allow to pipe two processes together.

Open vesim987 opened this issue 3 years ago • 9 comments

Right now there is no way to pipe stdin from one process to stdout in another process because spawnPosix is always creating(and overwriting .stdin/out/err) pipes when std*_behavior is set to Pipe. Example code(provided by g_w1) affected by this, which should work:

const std = @import("std");
const ChildProcess = std.ChildProcess;
pub fn main() !void {
    const child1 = try ChildProcess.init(&[_][]const u8{ "cat", "test.txt" }, std.heap.page_allocator);
    defer child1.deinit();

    const child = try ChildProcess.init(&[_][]const u8{"cat"}, std.heap.page_allocator);
    defer child.deinit();

    child1.stdout_behavior = .Pipe;
    child.stdin_behavior = .Pipe;

    try child1.spawn();
    child.stdin = child1.stdout.?;
    try child.spawn();

    _ = try child1.wait();
    _ = try child.wait();
}

The one fix that I see for this is to check if the fields stdin/out/err are already set, and don't create pipes when they are valid Files. That would affect this. The other fix is by creating new StdIo mode, something like Passthrough and handle it in setUpChildIo by just doing os.dup2 like in the .Pipe case.

vesim987 avatar Jan 10 '21 05:01 vesim987

I tried working on that, but I don't have idea how to handle ownership of the pipes. In a PoC I used StdIo.Ignore to pass the pipe's file descriptor.

diff --git a/lib/std/child_process.zig b/lib/std/child_process.zig
index d37dd9fdf..77a1c8f39 100644
--- a/lib/std/child_process.zig
+++ b/lib/std/child_process.zig
@@ -492,9 +492,9 @@ pub const ChildProcess = struct {
         const pid_result = try os.fork();
         if (pid_result == 0) {
             // we are the child
-            setUpChildIo(self.stdin_behavior, stdin_pipe[0], os.STDIN_FILENO, dev_null_fd) catch |err| forkChildErrReport(err_pipe[1], err);
-            setUpChildIo(self.stdout_behavior, stdout_pipe[1], os.STDOUT_FILENO, dev_null_fd) catch |err| forkChildErrReport(err_pipe[1], err);
-            setUpChildIo(self.stderr_behavior, stderr_pipe[1], os.STDERR_FILENO, dev_null_fd) catch |err| forkChildErrReport(err_pipe[1], err);
+            setUpChildIo(self.stdin_behavior, stdin_pipe[0], os.STDIN_FILENO, self.stdin) catch |err| forkChildErrReport(err_pipe[1], err);
+            setUpChildIo(self.stdout_behavior, stdout_pipe[1], os.STDOUT_FILENO, self.stdout) catch |err| forkChildErrReport(err_pipe[1], err);
+            setUpChildIo(self.stderr_behavior, stderr_pipe[1], os.STDERR_FILENO, self.stderr) catch |err| forkChildErrReport(err_pipe[1], err);

             if (self.stdin_behavior == .Pipe) {
                 os.close(stdin_pipe[0]);
@@ -792,12 +792,12 @@ pub const ChildProcess = struct {
         }
     }

-    fn setUpChildIo(stdio: StdIo, pipe_fd: i32, std_fileno: i32, dev_null_fd: i32) !void {
+    fn setUpChildIo(stdio: StdIo, pipe_fd: i32, std_fileno: i32, file: ?File) !void {
         switch (stdio) {
             .Pipe => try os.dup2(pipe_fd, std_fileno),
             .Close => os.close(std_fileno),
             .Inherit => {},
-            .Ignore => try os.dup2(dev_null_fd, std_fileno),
+            .Ignore => try os.dup2(file.?.handle, std_fileno),
         }
     }
 };

And the usage looks like:

const std = @import("std");
const ChildProcess = std.ChildProcess;
pub fn main() !void {
    const child1 = try ChildProcess.init(&[_][]const u8{ "cat", "test.txt" }, std.heap.page_allocator);
    defer child1.deinit();

    const child = try ChildProcess.init(&[_][]const u8{"cat"}, std.heap.page_allocator);
    defer child.deinit();

    child1.stdout_behavior = .Ignore;
    child.stdin_behavior = .Ignore;

    var pipe = try std.os.pipe2(std.os.O_CLOEXEC);

    child.stdin = std.fs.File{ .handle = pipe[0] };
    try child.spawn();

    child1.stdout = std.fs.File{ .handle = pipe[1] };
    try child1.spawn();
    std.os.close(pipe[1]);

    _ = try child1.wait();
    _ = try child.wait();
}```

Maybe someone else have an idea how to create a proper API for such case?

vesim987 avatar Jan 16 '21 00:01 vesim987

@vesim987 Specifying how data is passed through pipes sounds very much like how build.zig works ie by declarative description of the data flow/setup before execution (or even pipe dependencies).

The remaining question is, if this can and should be done at comptime for performance or runtime.

matu3ba avatar Apr 04 '22 08:04 matu3ba

~~Looks like I need to fix this (partially) for a non-hacky solution to #1356.~~

~~From a quick glimpse must be OS-specific functions to distinguish process1 -> process2 and process2 -> process1 pipes via comptime map of each pipe descriptor (starting with 0) like how shells are doing stuff~~

    0: stdin
    1: stdout
    2: stderr

~~The user-friendliest solution is to offer a comptime-optional (null being the current default behavior) and the other being a default prefilled list (pipenr(0/1/..), out(true/false), pipe_behavior with current behavior and user-provided one.~~

~~For efficiency, the default comptime-size should be 3 (stdin,stdout,stderr) like currently and user-configurable. Also, there should be advice to run the init code at comptime, if the non-standard solution is taken.~~

~~The passthrough behavior would then be then another streams_behavior option.~~

There are 2 related problems:

  1. There is no access to ChildProcess startup code before spawning process or one needs to hack around to get access (writing output to file and back or use another side channel)
  2. The user can not modify the startup code of both parent and child process. (what pipes to create etc)

Both problems can and likely must be fixed via function pointers and pointers to data to allow user flexibility. Reason: The child process gets the data via VM_CLONE, which allows complete access to the parent data. Unclear: How to design a good API.

matu3ba avatar Apr 06 '22 11:04 matu3ba

I'm currently running into an issue with child processes not correctly dealing with pipes. I believe I find myself here looking for a solution that allows child processes to create/dup/close and redirect standard in as they please.

Any thoughts on how I can get my child process pipes working?

MikoverseAllar avatar Sep 22 '22 11:09 MikoverseAllar

I've just run into this problem, is there no way in Zig currently to pipe output from one process to together?

I'm happy to do workaround, I just really need to get this working as it's in an application where I'm dealing with many-gigabyte intermediate values, so piping is rather crucial.

This is what I've tried:

pub fn testpipe() !void {
    var child1 = std.ChildProcess.init(&.{"ls", "-l"}, std.heap.page_allocator);
    var child2 = std.ChildProcess.init(&.{"cat"}, std.heap.page_allocator);

    child1.stdout_behavior = .Ignore;
    child2.stdin_behavior = .Ignore;

    var fds: [2]i32 = undefined;
    _ = std.os.linux.pipe2(&fds, .{.CLOEXEC = true});

    child2.stdin = std.fs.File{ .handle = fds[0] };
    child2.stdout = std.io.getStdOut();
    try child2.spawn();

    child1.stdout = std.fs.File{ .handle = fds[1] };
    try child1.spawn();
    _ = std.os.linux.close(fds[1]);

    const status1 = try child1.wait();
    const status2 = try child2.wait();

    std.debug.print("Child 1 exited with status: {}\n", .{status1});
    std.debug.print("Child 2 exited with status: {}\n", .{status2});
}

tecosaur avatar May 01 '24 07:05 tecosaur

@tecosaur I've done a simple port of piping processes from a C example, which appears to work if you don't need Windows support

// Quick and dirty port from <https://people.cs.rutgers.edu/~pxk/416/notes/c-tutorials/pipe.html>

const std = @import("std");

pub fn main() !void {
    const fd = try std.posix.pipe();

    const pid = try std.posix.fork();
    switch (pid) {
        // Child
        0 => {
            try runPipe(fd);
        },
        // Parent
        else => {
            const ret = std.posix.waitpid(-1, 0);
            if (ret.pid != -1) {
                std.debug.print("Process {d} exited with {d}\n", .{ ret.pid, ret.status });
            }
        },
    }
}

fn runPipe(pfd: [2]i32) !void {
    const pid = try std.posix.fork();

    const allocator = std.heap.page_allocator;

    switch (pid) {
        0 => {
            try std.posix.dup2(pfd[0], 0);
            // The child does not need this end of the pipe
            std.posix.close(pfd[1]);
            std.process.execve(allocator, &.{
                "tr",
                "a-z",
                "A-Z",
            }, null) catch {};
        },
        else => {
            try std.posix.dup2(pfd[1], 1);
            // The parent does not need this end of the pipe
            std.posix.close(pfd[0]);
            std.process.execve(allocator, &.{
                "ls",
                "-al",
                "/",
            }, null) catch {};
        },
    }
}

mgord9518 avatar May 14 '24 10:05 mgord9518