From ffae8821d80912b38696d7ac046f77e2fd80cc6b Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sun, 17 Dec 2023 11:05:24 +0100 Subject: [PATCH 01/11] Shush the linter --- pipe/scanner.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/pipe/scanner.go b/pipe/scanner.go index b56b58c..5ec16e8 100644 --- a/pipe/scanner.go +++ b/pipe/scanner.go @@ -56,11 +56,7 @@ func ScannerFunction( return err } } - if err := scanner.Err(); err != nil { - return err - } - - return nil + return scanner.Err() // `p.AddFunction()` arranges for `stdout` to be closed. }, ) From 95dc2e8211dca6ee31b2a285645d8084961e2351 Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sat, 16 Dec 2023 17:06:01 +0100 Subject: [PATCH 02/11] pipeline_test.go: get rid of a bunch of unnecessary tmpdirs --- pipe/pipeline_test.go | 71 ++++++++++++------------------------------- 1 file changed, 20 insertions(+), 51 deletions(-) diff --git a/pipe/pipeline_test.go b/pipe/pipeline_test.go index d925aee..2cd76a3 100644 --- a/pipe/pipeline_test.go +++ b/pipe/pipeline_test.go @@ -159,9 +159,7 @@ func TestNontrivialPipeline(t *testing.T) { t.Parallel() ctx := context.Background() - dir := t.TempDir() - - p := pipe.New(pipe.WithDir(dir)) + p := pipe.New() p.Add( pipe.Command("echo", "hello world"), pipe.Command("sed", "s/hello/goodbye/"), @@ -211,8 +209,6 @@ func TestPipelineReadFromSlowly2(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - dir := t.TempDir() - r, w := io.Pipe() var buf []byte @@ -236,7 +232,7 @@ func TestPipelineReadFromSlowly2(t *testing.T) { } }() - p := pipe.New(pipe.WithDir(dir), pipe.WithStdout(w)) + p := pipe.New(pipe.WithStdout(w)) p.Add(pipe.Command("seq", "100")) assert.NoError(t, p.Run(ctx)) @@ -253,9 +249,7 @@ func TestPipelineTwoCommandsPiping(t *testing.T) { t.Parallel() ctx := context.Background() - dir := t.TempDir() - - p := pipe.New(pipe.WithDir(dir)) + p := pipe.New() p.Add(pipe.Command("echo", "hello world")) assert.Panics(t, func() { p.Add(pipe.Command("")) }) out, err := p.Output(ctx) @@ -283,9 +277,7 @@ func TestPipelineExit(t *testing.T) { t.Parallel() ctx := context.Background() - dir := t.TempDir() - - p := pipe.New(pipe.WithDir(dir)) + p := pipe.New() p.Add( pipe.Command("false"), pipe.Command("true"), @@ -316,11 +308,10 @@ func TestPipelineInterrupted(t *testing.T) { } t.Parallel() - dir := t.TempDir() stdout := &bytes.Buffer{} - p := pipe.New(pipe.WithDir(dir), pipe.WithStdout(stdout)) + p := pipe.New(pipe.WithStdout(stdout)) p.Add(pipe.Command("sleep", "10")) ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond) @@ -339,11 +330,10 @@ func TestPipelineCanceled(t *testing.T) { } t.Parallel() - dir := t.TempDir() stdout := &bytes.Buffer{} - p := pipe.New(pipe.WithDir(dir), pipe.WithStdout(stdout)) + p := pipe.New(pipe.WithStdout(stdout)) p.Add(pipe.Command("sleep", "10")) ctx, cancel := context.WithCancel(context.Background()) @@ -367,9 +357,8 @@ func TestLittleEPIPE(t *testing.T) { } t.Parallel() - dir := t.TempDir() - p := pipe.New(pipe.WithDir(dir)) + p := pipe.New() p.Add( pipe.Command("sh", "-c", "sleep 1; echo foo"), pipe.Command("true"), @@ -391,9 +380,8 @@ func TestBigEPIPE(t *testing.T) { } t.Parallel() - dir := t.TempDir() - p := pipe.New(pipe.WithDir(dir)) + p := pipe.New() p.Add( pipe.Command("seq", "100000"), pipe.Command("true"), @@ -415,9 +403,8 @@ func TestIgnoredSIGPIPE(t *testing.T) { } t.Parallel() - dir := t.TempDir() - p := pipe.New(pipe.WithDir(dir)) + p := pipe.New() p.Add( pipe.IgnoreError(pipe.Command("seq", "100000"), pipe.IsSIGPIPE), pipe.Command("echo", "foo"), @@ -434,9 +421,7 @@ func TestFunction(t *testing.T) { t.Parallel() ctx := context.Background() - dir := t.TempDir() - - p := pipe.New(pipe.WithDir(dir)) + p := pipe.New() p.Add( pipe.Print("hello world"), pipe.Function( @@ -464,9 +449,7 @@ func TestPipelineWithFunction(t *testing.T) { t.Parallel() ctx := context.Background() - dir := t.TempDir() - - p := pipe.New(pipe.WithDir(dir)) + p := pipe.New() p.Add( pipe.Command("echo", "-n", "hello world"), pipe.Function( @@ -528,9 +511,7 @@ func TestPipelineWithLinewiseFunction(t *testing.T) { t.Parallel() ctx := context.Background() - dir := t.TempDir() - - p := pipe.New(pipe.WithDir(dir)) + p := pipe.New() // Print the numbers from 1 to 20 (generated from scratch): p.Add( seqFunction(20), @@ -581,7 +562,7 @@ func TestScannerAlwaysFlushes(t *testing.T) { var length int64 - p := pipe.New(pipe.WithDir(".")) + p := pipe.New() // Print the numbers from 1 to 20 (generated from scratch): p.Add( pipe.IgnoreError( @@ -629,7 +610,7 @@ func TestScannerFinishEarly(t *testing.T) { var length int64 - p := pipe.New(pipe.WithDir(".")) + p := pipe.New() // Print the numbers from 1 to 20 (generated from scratch): p.Add( pipe.IgnoreError( @@ -670,9 +651,7 @@ func TestPrintln(t *testing.T) { t.Parallel() ctx := context.Background() - dir := t.TempDir() - - p := pipe.New(pipe.WithDir(dir)) + p := pipe.New() p.Add(pipe.Println("Look Ma, no hands!")) out, err := p.Output(ctx) if assert.NoError(t, err) { @@ -684,9 +663,7 @@ func TestPrintf(t *testing.T) { t.Parallel() ctx := context.Background() - dir := t.TempDir() - - p := pipe.New(pipe.WithDir(dir)) + p := pipe.New() p.Add(pipe.Printf("Strangely recursive: %T", p)) out, err := p.Output(ctx) if assert.NoError(t, err) { @@ -880,10 +857,8 @@ func TestErrors(t *testing.T) { func BenchmarkSingleProgram(b *testing.B) { ctx := context.Background() - dir := b.TempDir() - for i := 0; i < b.N; i++ { - p := pipe.New(pipe.WithDir(dir)) + p := pipe.New() p.Add( pipe.Command("true"), ) @@ -894,10 +869,8 @@ func BenchmarkSingleProgram(b *testing.B) { func BenchmarkTenPrograms(b *testing.B) { ctx := context.Background() - dir := b.TempDir() - for i := 0; i < b.N; i++ { - p := pipe.New(pipe.WithDir(dir)) + p := pipe.New() p.Add( pipe.Command("echo", "hello world"), pipe.Command("cat"), @@ -920,15 +893,13 @@ func BenchmarkTenPrograms(b *testing.B) { func BenchmarkTenFunctions(b *testing.B) { ctx := context.Background() - dir := b.TempDir() - cp := func(_ context.Context, _ pipe.Env, stdin io.Reader, stdout io.Writer) error { _, err := io.Copy(stdout, stdin) return err } for i := 0; i < b.N; i++ { - p := pipe.New(pipe.WithDir(dir)) + p := pipe.New() p.Add( pipe.Println("hello world"), pipe.Function("copy1", cp), @@ -951,15 +922,13 @@ func BenchmarkTenFunctions(b *testing.B) { func BenchmarkTenMixedStages(b *testing.B) { ctx := context.Background() - dir := b.TempDir() - cp := func(_ context.Context, _ pipe.Env, stdin io.Reader, stdout io.Writer) error { _, err := io.Copy(stdout, stdin) return err } for i := 0; i < b.N; i++ { - p := pipe.New(pipe.WithDir(dir)) + p := pipe.New() p.Add( pipe.Command("echo", "hello world"), pipe.Function("copy1", cp), From 5fdc22a3d5665e2d6a3f4852e8385eefcc857349 Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sat, 16 Dec 2023 16:25:43 +0100 Subject: [PATCH 03/11] TestPipelineStdinThatIsNeverClosed(): create stdin more simply We want a stdin that is a pipe but not an `*os.File`? Just use `io.Pipe()` to create it. --- pipe/pipeline_test.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/pipe/pipeline_test.go b/pipe/pipeline_test.go index 2cd76a3..53280f7 100644 --- a/pipe/pipeline_test.go +++ b/pipe/pipeline_test.go @@ -131,8 +131,7 @@ func TestPipelineStdinThatIsNeverClosed(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer cancel() - r, w, err := os.Pipe() - require.NoError(t, err) + r, w := io.Pipe() t.Cleanup(func() { _ = w.Close() _ = r.Close() @@ -140,10 +139,8 @@ func TestPipelineStdinThatIsNeverClosed(t *testing.T) { var stdout bytes.Buffer - // The point here is to wrap `r` so that `exec.Cmd` doesn't - // recognize that it's an `*os.File`: p := pipe.New( - pipe.WithStdin(io.NopCloser(r)), + pipe.WithStdin(r), pipe.WithStdout(&stdout), ) // Note that this command doesn't read from its stdin, so it will From c2c98026701acc618cffc81384e75e74dd7f24fd Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sat, 16 Dec 2023 19:15:30 +0100 Subject: [PATCH 04/11] pipeline_test.go: use `WithStdoutCloser()` to close stdout pipes `WithStdoutCloser()` is a thing now. No need to do it by hand. --- pipe/pipeline_test.go | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/pipe/pipeline_test.go b/pipe/pipeline_test.go index 53280f7..28172e7 100644 --- a/pipe/pipeline_test.go +++ b/pipe/pipeline_test.go @@ -184,15 +184,10 @@ func TestPipelineReadFromSlowly(t *testing.T) { readErr <- err }() - p := pipe.New(pipe.WithStdout(w)) + p := pipe.New(pipe.WithStdoutCloser(w)) p.Add(pipe.Command("echo", "hello world")) assert.NoError(t, p.Run(ctx)) - time.Sleep(100 * time.Millisecond) - // It's not super-intuitive, but `w` has to be closed here so that - // the `io.ReadAll()` call above knows that it's done: - _ = w.Close() - assert.NoError(t, <-readErr) assert.Equal(t, "hello world\n", string(buf)) } @@ -229,15 +224,10 @@ func TestPipelineReadFromSlowly2(t *testing.T) { } }() - p := pipe.New(pipe.WithStdout(w)) + p := pipe.New(pipe.WithStdoutCloser(w)) p.Add(pipe.Command("seq", "100")) assert.NoError(t, p.Run(ctx)) - time.Sleep(200 * time.Millisecond) - // It's not super-intuitive, but `w` has to be closed here so that - // the `io.ReadAll()` call above knows that it's done: - _ = w.Close() - assert.NoError(t, <-readErr) assert.Equal(t, 292, len(buf)) } From f595c9e2eb630a4eee3e11fdf9a3309930186d67 Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sat, 16 Dec 2023 17:56:03 +0100 Subject: [PATCH 05/11] Add some benchmarks of moving a bunch of data through a pipeline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add some benchmarks that move MB-scale data through pipelines consisting of alternating commands and functions, one in small writes, and one buffered into larger writes, then processing it one line at a time. This is not so efficient, because every transition from `Function` → `Command` requires an extra (hidden) goroutine that copies the data from an `io.Reader` to a `*os.File`. We can make this faster! --- pipe/pipeline_test.go | 91 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 91 insertions(+) diff --git a/pipe/pipeline_test.go b/pipe/pipeline_test.go index 28172e7..30080a0 100644 --- a/pipe/pipeline_test.go +++ b/pipe/pipeline_test.go @@ -935,6 +935,97 @@ func BenchmarkTenMixedStages(b *testing.B) { } } +func BenchmarkMoreDataUnbuffered(b *testing.B) { + ctx := context.Background() + + cp := func(_ context.Context, _ pipe.Env, stdin io.Reader, stdout io.Writer) error { + _, err := io.Copy(stdout, stdin) + return err + } + + for i := 0; i < b.N; i++ { + count := 0 + p := pipe.New() + p.Add( + pipe.Function( + "seq", + func(ctx context.Context, _ pipe.Env, stdin io.Reader, stdout io.Writer) error { + for i := 1; i <= 100000; i++ { + fmt.Fprintln(stdout, i) + } + return nil + }, + ), + pipe.Command("cat"), + pipe.Function("copy2", cp), + pipe.Command("cat"), + pipe.Function("copy3", cp), + pipe.Command("cat"), + pipe.Function("copy4", cp), + pipe.Command("cat"), + pipe.Function("copy5", cp), + pipe.Command("cat"), + pipe.LinewiseFunction( + "count", + func(ctx context.Context, _ pipe.Env, line []byte, stdout *bufio.Writer) error { + count++ + return nil + }, + ), + ) + err := p.Run(ctx) + if assert.NoError(b, err) { + assert.EqualValues(b, 100000, count) + } + } +} + +func BenchmarkMoreDataBuffered(b *testing.B) { + ctx := context.Background() + + cp := func(_ context.Context, _ pipe.Env, stdin io.Reader, stdout io.Writer) error { + _, err := io.Copy(stdout, stdin) + return err + } + + for i := 0; i < b.N; i++ { + count := 0 + p := pipe.New() + p.Add( + pipe.Function( + "seq", + func(ctx context.Context, _ pipe.Env, stdin io.Reader, stdout io.Writer) error { + out := bufio.NewWriter(stdout) + for i := 1; i <= 1000000; i++ { + fmt.Fprintln(out, i) + } + return out.Flush() + }, + ), + pipe.Command("cat"), + pipe.Function("copy2", cp), + pipe.Command("cat"), + pipe.Function("copy3", cp), + pipe.Command("cat"), + pipe.Function("copy4", cp), + pipe.Command("cat"), + pipe.Function("copy5", cp), + pipe.Command("cat"), + pipe.LinewiseFunction( + "count", + func(ctx context.Context, _ pipe.Env, line []byte, stdout *bufio.Writer) error { + count++ + return nil + }, + ), + ) + err := p.Run(ctx) + if assert.NoError(b, err) { + assert.EqualValues(b, 1000000, count) + } + } +} + func genErr(err error) pipe.StageFunc { return func(_ context.Context, _ pipe.Env, _ io.Reader, _ io.Writer) error { return err From 385f6c74893205d6aaa79ea2d95f1259cef2986b Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Fri, 15 Dec 2023 10:58:55 +0100 Subject: [PATCH 06/11] Simplify the `NopCloser`s MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Rename * `newNopCloser()` → `newReaderNopCloser()` * `nopCloser` → `readerNopCloser` * `nopCloserWriterTo` → `readerWriterToNopCloser` * `nopWriteCloser` → `writerNopCloser` to help keep readers and writers straight and because only the `Close()` part is a NOP. * Move `writerNopCloser` to `nop_closer.go` to be with its siblings. --- pipe/command.go | 4 ++-- pipe/nop_closer.go | 45 ++++++++++++++++++++++++++++++++------------- pipe/pipeline.go | 14 +++----------- 3 files changed, 37 insertions(+), 26 deletions(-) diff --git a/pipe/command.go b/pipe/command.go index 2c465e9..8f6a882 100644 --- a/pipe/command.go +++ b/pipe/command.go @@ -72,9 +72,9 @@ func (s *commandStage) Start( // See the long comment in `Pipeline.Start()` for the // explanation of this special case. switch stdin := stdin.(type) { - case nopCloser: + case readerNopCloser: s.cmd.Stdin = stdin.Reader - case nopCloserWriterTo: + case readerWriterToNopCloser: s.cmd.Stdin = stdin.Reader default: s.cmd.Stdin = stdin diff --git a/pipe/nop_closer.go b/pipe/nop_closer.go index d435d0a..8c66c72 100644 --- a/pipe/nop_closer.go +++ b/pipe/nop_closer.go @@ -6,29 +6,48 @@ package pipe import "io" -// newNopCloser returns a ReadCloser with a no-op Close method wrapping -// the provided io.Reader r. -// If r implements io.WriterTo, the returned io.ReadCloser will implement io.WriterTo -// by forwarding calls to r. -func newNopCloser(r io.Reader) io.ReadCloser { +// newReaderNopCloser returns a ReadCloser with a no-op Close method, +// wrapping the provided io.Reader `r`. If `r` implements +// `io.WriterTo`, the returned `io.ReadCloser` will also implement +// `io.WriterTo` by forwarding calls to `r`. +func newReaderNopCloser(r io.Reader) io.ReadCloser { if _, ok := r.(io.WriterTo); ok { - return nopCloserWriterTo{r} + return readerWriterToNopCloser{r} } - return nopCloser{r} + return readerNopCloser{r} } -type nopCloser struct { +// readerNopCloser is a ReadCloser that wraps a provided `io.Reader`, +// but whose `Close()` method does nothing. We don't need to check +// whether the wrapped reader also implements `io.WriterTo`, since +// it's always unwrapped before use. +type readerNopCloser struct { io.Reader } -func (nopCloser) Close() error { return nil } +func (readerNopCloser) Close() error { + return nil +} -type nopCloserWriterTo struct { +// readerWriterToNopCloser is like `readerNopCloser` except that it +// also implements `io.WriterTo` by delegating `WriteTo()` to the +// wrapped `io.Reader` (which must also implement `io.WriterTo`). +type readerWriterToNopCloser struct { io.Reader } -func (nopCloserWriterTo) Close() error { return nil } +func (readerWriterToNopCloser) Close() error { return nil } + +func (r readerWriterToNopCloser) WriteTo(w io.Writer) (n int64, err error) { + return r.Reader.(io.WriterTo).WriteTo(w) +} + +// writerNopCloser is a WriteCloser that wraps a provided `io.Writer`, +// but whose `Close()` method does nothing. +type writerNopCloser struct { + io.Writer +} -func (c nopCloserWriterTo) WriteTo(w io.Writer) (n int64, err error) { - return c.Reader.(io.WriterTo).WriteTo(w) +func (w writerNopCloser) Close() error { + return nil } diff --git a/pipe/pipeline.go b/pipe/pipeline.go index e591c63..d7ea41d 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -68,14 +68,6 @@ type Pipeline struct { var emptyEventHandler = func(e *Event) {} -type nopWriteCloser struct { - io.Writer -} - -func (w nopWriteCloser) Close() error { - return nil -} - type NewPipeFn func(opts ...Option) *Pipeline // NewPipeline returns a Pipeline struct with all of the `options` @@ -112,7 +104,7 @@ func WithStdin(stdin io.Reader) Option { // WithStdout assigns stdout to the last command in the pipeline. func WithStdout(stdout io.Writer) Option { return func(p *Pipeline) { - p.stdout = nopWriteCloser{stdout} + p.stdout = writerNopCloser{stdout} } } @@ -261,7 +253,7 @@ func (p *Pipeline) Start(ctx context.Context) error { // own `nopCloser`, which behaves like `io.NopCloser`, except // that `pipe.CommandStage` knows how to unwrap it before // passing it to `exec.Cmd`. - nextStdin = newNopCloser(p.stdin) + nextStdin = newReaderNopCloser(p.stdin) } for i, s := range p.stages { @@ -305,7 +297,7 @@ func (p *Pipeline) Start(ctx context.Context) error { func (p *Pipeline) Output(ctx context.Context) ([]byte, error) { var buf bytes.Buffer - p.stdout = nopWriteCloser{&buf} + p.stdout = writerNopCloser{&buf} err := p.Run(ctx) return buf.Bytes(), err } From 6820dbbb1db29dd4d94e1af85a6a7fa2c3677e7e Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Fri, 15 Dec 2023 21:03:26 +0100 Subject: [PATCH 07/11] Implement Stage2 with a more flexible way of starting stages The old `Stage` interface, and in particular its `Start()` method, is not ideal. `Start()` is responsible for creating its own stdout, without knowledge of what will be consuming it. In practice, there are only two main stages: * `commandStage` ultimately runs a subprocess, which needs an `*os.File` as both stdin and stdout. The old code created its stdout using `cmd.StdoutPipe()`, which creates an `*os.File`. * `goStage` runs a Go function, which is happy with any kind of `io.ReadCloser` / `io.WriteCloser` for its stdin and stdout. The old code created its stdout using `io.Pipe()`, which _doesn't_ return an `*os.File`. There are some scenarios where the old behavior was not ideal: 1. If a `goStage` was followed by a `commandStage`, the `commandStage` would had to consume the non-`*os.File` stdin that was created by the former. But since an external command requires an `*os.File`, `exec.Cmd` had to create an `os.Pipe()` internally and create an extra goroutine to copy from the `io.Reader` to the pipe. This is not only wasteful, but also meant that the `goStage` was not informed when the subprocess terminated or closed its stdin. (For example, the copy goroutine could block waiting to read from the `io.Reader`.) 2. If `Pipeline.stdout` was set to an `*os.File` and the last stage was a `commandStage`, then an extra stage was needed to copy the output of the subprocess to `Pipeline.stdout`, when the subprocess could instead have written directly to the corresponding file descriptor. This was wasteful, and also lead to cases where the subprocess couldn't detect that `Pipeline.stdout` had been closed. Problem (1) could have been fixed by changing `goStage` to always use `os.Pipe()` to create its stdout pipe. But that would be wasteful if two `goStage`s were adjacent, in which case they could use a cheaper `io.Pipe()` instead. And it wouldn't solve problem (2) at all. The problem can only be solved by considering both the producer _and_ the consumer of the stdin and stdout of any stage. If either end is a `commandStage`, then it is preferable to us `os.Pipe()`. If both ends are `goStage`s, then it is preferable to use `io.Pipe()`. And if `Pipeline.Stdout` is set, the last stage should write directly into it whenever possible. This PR solves the problem by adding a new interface, `Stage2`, that can optionally be implemented by a `Stage`. The new interface includes two new methods, Preferences() StagePreferences Start2( ctx context.Context, env Env, stdin io.ReadCloser, stdout io.WriteCloser, ) error The first indicates what kind of stdin/stdout the stage prefers, and the second starts up the stage with a `stdin` and `stdout` that are provided by the caller, rather than letting the stage return its own stdout. If a stage that implements `Stage2` is added to a `Pipeline`, then `Pipeline.Start()` uses the first method to figure out what kind of pipes are preferred between this stage and its neighbors, and the second starts the stage with the preferred type of pipe if possible. It also passes `Pipeline.stdout` into the last stage rather than copying the data an extra time. All of the stages that are defined in this package now implement both `Stage` and `Stage2`, so they get the benefit of this new behavior. Therefore, any callers that create stages in the usual way (using `pipe.Command()`, `pipe.CommandStage()`, `pipe.Function()`, `pipe.LinewiseFunction()`, etc.) will also get the benefit of the new behavior. For example, the benchmarks `BenchmarkMoreDataBuffered` and `BenchmarkMoreDataUnbuffered` (admittedly, worst cases for the old code) are sped up by roughly 2.25x and 6x, respectively: ``` snare:~/github/proj/go-pipe/git(main-bench)$ /bin/time go test -bench=. -benchtime=10s ./pipe/pipeline_test.go goos: linux goarch: amd64 cpu: Intel(R) Xeon(R) W-2255 CPU @ 3.70GHz BenchmarkSingleProgram-20 8254 1384888 ns/op BenchmarkTenPrograms-20 2174 5454223 ns/op BenchmarkTenFunctions-20 37846 327601 ns/op BenchmarkTenMixedStages-20 3298 3548630 ns/op BenchmarkMoreDataUnbuffered-20 28 400316217 ns/op BenchmarkMoreDataBuffered-20 45 259220902 ns/op PASS ok command-line-arguments 76.254s 172.01user 92.35system 1:16.73elapsed 344%CPU (0avgtext+0avgdata 107680maxresident)k 0inputs+7792outputs (42major+3771289minor)pagefaults 0swaps snare:~/github/proj/go-pipe/git(stage2)$ /bin/time go test -bench=. -benchtime=10s ./pipe/pipeline_test.go goos: linux goarch: amd64 cpu: Intel(R) Xeon(R) W-2255 CPU @ 3.70GHz BenchmarkSingleProgram-20 8586 1362019 ns/op BenchmarkTenPrograms-20 2234 5308280 ns/op BenchmarkTenFunctions-20 43003 291655 ns/op BenchmarkTenMixedStages-20 3441 3468454 ns/op BenchmarkMoreDataUnbuffered-20 175 67083563 ns/op BenchmarkMoreDataBuffered-20 100 113872376 ns/op PASS ok command-line-arguments 83.116s 177.30user 143.48system 1:23.54elapsed 383%CPU (0avgtext+0avgdata 114560maxresident)k 0inputs+7808outputs (40major+3921427minor)pagefaults 0swaps ``` Also, look how much simpler `testMemoryLimit()` has become without the awkward workaround that was previously required. Callers that define their own `Stage` types, on the other hand, will only benefit from the new behavior if they change their stages to _also_ implement `Stage2`. Even if they don't do that, however, their old stages should continue to work as before. In terms of backwards compatibility, some applications might notice a difference with the new pipe structure. The difference should usually be an improvement, for example lower resource consumption and less risk of deadlock. It is conceivable that some applications were in some way relying on the delayed completion of pipelines when an `io.Pipe` was closed, though I'm having trouble imagining scenarios like that in the real world. The amount of code needed to support backwards compatibility is rather substantial, not to mention that any new `Stage` types would have to implement both `Start()` and `Start2()` to take advantage of the new system. That might be an argument for revving the package's major version number and getting rid of the old interface entirely. Most clients would not need changes because there's not much reason for a client to implement its own `Stage` type. --- pipe/command.go | 133 ++++++++++++++++---- pipe/command_linux.go | 2 +- pipe/command_test.go | 3 +- pipe/filter-error.go | 12 ++ pipe/function.go | 51 +++++++- pipe/iocopier.go | 49 +++++++- pipe/memorylimit.go | 59 ++++++--- pipe/memorylimit_test.go | 61 ++++------ pipe/pipeline.go | 255 +++++++++++++++++++++++++++------------ pipe/pipeline_test.go | 37 +++++- pipe/stage.go | 141 ++++++++++++++++++++-- 11 files changed, 623 insertions(+), 180 deletions(-) diff --git a/pipe/command.go b/pipe/command.go index 8f6a882..1b83a15 100644 --- a/pipe/command.go +++ b/pipe/command.go @@ -15,12 +15,17 @@ import ( "golang.org/x/sync/errgroup" ) -// commandStage is a pipeline `Stage` based on running an external -// command and piping the data through its stdin and stdout. +// commandStage is a pipeline `Stage2` based on running an external +// command and piping the data through its stdin and stdout. It also +// implements `Stage2`. type commandStage struct { - name string - stdin io.Closer - cmd *exec.Cmd + name string + cmd *exec.Cmd + + // lateClosers is a list of things that have to be closed once the + // command has finished. + lateClosers []io.Closer + done chan struct{} wg errgroup.Group stderr bytes.Buffer @@ -30,11 +35,15 @@ type commandStage struct { ctxErr atomic.Value } -// Command returns a pipeline `Stage` based on the specified external +var ( + _ Stage2 = (*commandStage)(nil) +) + +// Command returns a pipeline `Stage2` based on the specified external // `command`, run with the given command-line `args`. Its stdin and // stdout are handled as usual, and its stderr is collected and // included in any `*exec.ExitError` that the command might emit. -func Command(command string, args ...string) Stage { +func Command(command string, args ...string) Stage2 { if len(command) == 0 { panic("attempt to create command with empty command") } @@ -47,7 +56,7 @@ func Command(command string, args ...string) Stage { // the specified `cmd`. Its stdin and stdout are handled as usual, and // its stderr is collected and included in any `*exec.ExitError` that // the command might emit. -func CommandStage(name string, cmd *exec.Cmd) Stage { +func CommandStage(name string, cmd *exec.Cmd) Stage2 { return &commandStage{ name: name, cmd: cmd, @@ -62,30 +71,101 @@ func (s *commandStage) Name() string { func (s *commandStage) Start( ctx context.Context, env Env, stdin io.ReadCloser, ) (io.ReadCloser, error) { + pr, pw, err := os.Pipe() + if err != nil { + return nil, err + } + + if err := s.Start2(ctx, env, stdin, pw); err != nil { + _ = pr.Close() + _ = pw.Close() + return nil, err + } + + // Now close our copy of the write end of the pipe (the subprocess + // has its own copy now and will keep it open as long as it is + // running). There's not much we can do now in the case of an + // error, so just ignore them. + _ = pw.Close() + + // The caller is responsible for closing `pr`. + return pr, nil +} + +func (s *commandStage) Preferences() StagePreferences { + prefs := StagePreferences{ + StdinPreference: IOPreferenceFile, + StdoutPreference: IOPreferenceFile, + } + if s.cmd.Stdin != nil { + prefs.StdinPreference = IOPreferenceNil + } + if s.cmd.Stdout != nil { + prefs.StdoutPreference = IOPreferenceNil + } + + return prefs +} + +func (s *commandStage) Start2( + ctx context.Context, env Env, stdin io.ReadCloser, stdout io.WriteCloser, +) error { if s.cmd.Dir == "" { s.cmd.Dir = env.Dir } s.setupEnv(ctx, env) + // Things that have to be closed as soon as the command has + // started: + var earlyClosers []io.Closer + + // See the type command for `Stage` and the long comment in + // `Pipeline.WithStdin()` for the explanation of this unwrapping + // and closing behavior. + if stdin != nil { - // See the long comment in `Pipeline.Start()` for the - // explanation of this special case. switch stdin := stdin.(type) { case readerNopCloser: + // In this case, we shouldn't close it. But unwrap it for + // efficiency's sake: s.cmd.Stdin = stdin.Reader case readerWriterToNopCloser: + // In this case, we shouldn't close it. But unwrap it for + // efficiency's sake: s.cmd.Stdin = stdin.Reader + case *os.File: + // In this case, we can close stdin as soon as the command + // has started: + s.cmd.Stdin = stdin + earlyClosers = append(earlyClosers, stdin) default: + // In this case, we need to close `stdin`, but we should + // only do so after the command has finished: s.cmd.Stdin = stdin + s.lateClosers = append(s.lateClosers, stdin) } - // Also keep a copy so that we can close it when the command exits: - s.stdin = stdin } - stdout, err := s.cmd.StdoutPipe() - if err != nil { - return nil, err + if stdout != nil { + // See the long comment in `Pipeline.Start()` for the + // explanation of this special case. + switch stdout := stdout.(type) { + case writerNopCloser: + // In this case, we shouldn't close it. But unwrap it for + // efficiency's sake: + s.cmd.Stdout = stdout.Writer + case *os.File: + // In this case, we can close stdout as soon as the command + // has started: + s.cmd.Stdout = stdout + earlyClosers = append(earlyClosers, stdout) + default: + // In this case, we need to close `stdout`, but we should + // only do so after the command has finished: + s.cmd.Stdout = stdout + s.lateClosers = append(s.lateClosers, stdout) + } } // If the caller hasn't arranged otherwise, read the command's @@ -97,7 +177,7 @@ func (s *commandStage) Start( // can be sure. p, err := s.cmd.StderrPipe() if err != nil { - return nil, err + return err } s.wg.Go(func() error { _, err := io.Copy(&s.stderr, p) @@ -114,7 +194,11 @@ func (s *commandStage) Start( s.runInOwnProcessGroup() if err := s.cmd.Start(); err != nil { - return nil, err + return err + } + + for _, closer := range earlyClosers { + _ = closer.Close() } // Arrange for the process to be killed (gently) if the context @@ -128,7 +212,7 @@ func (s *commandStage) Start( } }() - return stdout, nil + return nil } // setupEnv sets or modifies the environment that will be passed to @@ -217,19 +301,18 @@ func (s *commandStage) Wait() error { // Make sure that any stderr is copied before `s.cmd.Wait()` // closes the read end of the pipe: - wErr := s.wg.Wait() + wgErr := s.wg.Wait() err := s.cmd.Wait() err = s.filterCmdError(err) - if err == nil && wErr != nil { - err = wErr + if err == nil && wgErr != nil { + err = wgErr } - if s.stdin != nil { - cErr := s.stdin.Close() - if cErr != nil && err == nil { - return cErr + for _, closer := range s.lateClosers { + if closeErr := closer.Close(); closeErr != nil && err == nil { + err = closeErr } } diff --git a/pipe/command_linux.go b/pipe/command_linux.go index c32ebc7..27c50fb 100644 --- a/pipe/command_linux.go +++ b/pipe/command_linux.go @@ -10,7 +10,7 @@ import ( ) // On linux, we can limit or observe memory usage in command stages. -var _ LimitableStage = (*commandStage)(nil) +var _ LimitableStage2 = (*commandStage)(nil) var ( errProcessInfoMissing = errors.New("cmd.Process is nil") diff --git a/pipe/command_test.go b/pipe/command_test.go index 92fd37a..67cd55e 100644 --- a/pipe/command_test.go +++ b/pipe/command_test.go @@ -79,7 +79,8 @@ func TestCopyEnvWithOverride(t *testing.T) { ex := ex t.Run(ex.label, func(t *testing.T) { assert.ElementsMatch(t, ex.expectedResult, - copyEnvWithOverrides(ex.env, ex.overrides)) + copyEnvWithOverrides(ex.env, ex.overrides), + ) }) } } diff --git a/pipe/filter-error.go b/pipe/filter-error.go index 654796a..7d143d2 100644 --- a/pipe/filter-error.go +++ b/pipe/filter-error.go @@ -14,6 +14,9 @@ import ( type ErrorFilter func(err error) error func FilterError(s Stage, filter ErrorFilter) Stage { + if s, ok := s.(Stage2); ok { + return efStage2{Stage2: s, filter: filter} + } return efStage{Stage: s, filter: filter} } @@ -26,6 +29,15 @@ func (s efStage) Wait() error { return s.filter(s.Stage.Wait()) } +type efStage2 struct { + Stage2 + filter ErrorFilter +} + +func (s efStage2) Wait() error { + return s.filter(s.Stage2.Wait()) +} + // ErrorMatcher decides whether its argument matches some class of // errors (e.g., errors that we want to ignore). The function will // only be invoked for non-nil errors. diff --git a/pipe/function.go b/pipe/function.go index bc5d0bd..6470965 100644 --- a/pipe/function.go +++ b/pipe/function.go @@ -9,7 +9,7 @@ import ( // StageFunc is a function that can be used to power a `goStage`. It // should read its input from `stdin` and write its output to // `stdout`. `stdin` and `stdout` will be closed automatically (if -// necessary) once the function returns. +// non-nil) once the function returns. // // Neither `stdin` nor `stdout` are necessarily buffered. If the // `StageFunc` requires buffering, it needs to arrange that itself. @@ -38,26 +38,65 @@ type goStage struct { err error } +var ( + _ Stage2 = (*goStage)(nil) +) + func (s *goStage) Name() string { return s.name } +func (s *goStage) Preferences() StagePreferences { + return StagePreferences{ + StdinPreference: IOPreferenceUndefined, + StdoutPreference: IOPreferenceUndefined, + } +} + func (s *goStage) Start(ctx context.Context, env Env, stdin io.ReadCloser) (io.ReadCloser, error) { - r, w := io.Pipe() + pr, pw := io.Pipe() + + if err := s.Start2(ctx, env, stdin, pw); err != nil { + _ = pr.Close() + _ = pw.Close() + return nil, err + } + + return pr, nil +} + +func (s *goStage) Start2( + ctx context.Context, env Env, stdin io.ReadCloser, stdout io.WriteCloser, +) error { + var r io.Reader = stdin + if stdin, ok := stdin.(readerNopCloser); ok { + r = stdin.Reader + } + + var w io.Writer = stdout + if stdout, ok := stdout.(writerNopCloser); ok { + w = stdout.Writer + } + go func() { - s.err = s.f(ctx, env, stdin, w) - if err := w.Close(); err != nil && s.err == nil { - s.err = fmt.Errorf("error closing output pipe for stage %q: %w", s.Name(), err) + s.err = s.f(ctx, env, r, w) + + if stdout != nil { + if err := stdout.Close(); err != nil && s.err == nil { + s.err = fmt.Errorf("error closing stdout for stage %q: %w", s.Name(), err) + } } + if stdin != nil { if err := stdin.Close(); err != nil && s.err == nil { s.err = fmt.Errorf("error closing stdin for stage %q: %w", s.Name(), err) } } + close(s.done) }() - return r, nil + return nil } func (s *goStage) Wait() error { diff --git a/pipe/iocopier.go b/pipe/iocopier.go index 78a9143..6f8b78e 100644 --- a/pipe/iocopier.go +++ b/pipe/iocopier.go @@ -7,8 +7,8 @@ import ( "os" ) -// ioCopier is a stage that copies its stdin to a specified -// `io.Writer`. It generates no stdout itself. +// ioCopier is a stage that copies its stdin to `w` then closes it. It +// generates no stdout itself. type ioCopier struct { w io.WriteCloser done chan struct{} @@ -56,6 +56,51 @@ func (s *ioCopier) Start(_ context.Context, _ Env, r io.ReadCloser) (io.ReadClos return nil, nil } +func (s *ioCopier) Preferences() StagePreferences { + return StagePreferences{ + StdinPreference: IOPreferenceUndefined, + StdoutPreference: IOPreferenceNil, + } +} + +// This method always returns `nil`. +func (s *ioCopier) Start2( + _ context.Context, _ Env, stdin io.ReadCloser, stdout io.WriteCloser, +) error { + if stdout != nil { + // We won't write anything to the supplied stdout, so if for + // some reason it wasn't nil, close it immediately: + _ = stdout.Close() + } + + go func() { + _, err := io.Copy(s.w, stdin) + // We don't consider `ErrClosed` an error (FIXME: is this + // correct?): + if err != nil && !errors.Is(err, os.ErrClosed) { + s.err = err + } + if err := stdin.Close(); err != nil && s.err == nil { + s.err = err + } + if err := s.w.Close(); err != nil && s.err == nil { + s.err = err + } + close(s.done) + }() + + // FIXME: if `s.w.Write()` is blocking (e.g., because there is a + // downstream process that is not reading from the other side), + // there's no way to terminate the copy when the context expires. + // This is not too bad, because the `io.Copy()` call will exit by + // itself when its input is closed. + // + // We could, however, be smarter about exiting more quickly if the + // context expires but `s.w.Write()` is not blocking. + + return nil +} + func (s *ioCopier) Wait() error { <-s.done return s.err diff --git a/pipe/memorylimit.go b/pipe/memorylimit.go index f21ee15..cee82a0 100644 --- a/pipe/memorylimit.go +++ b/pipe/memorylimit.go @@ -11,14 +11,14 @@ import ( const memoryPollInterval = time.Second -// ErrMemoryLimitExceeded is the error that will be used to kill a process, if -// necessary, from MemoryLimit. +// ErrMemoryLimitExceeded is the error that will be used to kill a +// process, if necessary, from MemoryLimit. var ErrMemoryLimitExceeded = errors.New("memory limit exceeded") -// LimitableStage is the superset of Stage that must be implemented by stages -// passed to MemoryLimit and MemoryObserver. -type LimitableStage interface { - Stage +// LimitableStage2 is the superset of Stage2 that must be implemented +// by stages passed to MemoryLimit and MemoryObserver. +type LimitableStage2 interface { + Stage2 GetRSSAnon(context.Context) (uint64, error) Kill(error) @@ -26,9 +26,9 @@ type LimitableStage interface { // MemoryLimit watches the memory usage of the stage and stops it if it // exceeds the given limit. -func MemoryLimit(stage Stage, byteLimit uint64, eventHandler func(e *Event)) Stage { +func MemoryLimit(stage Stage2, byteLimit uint64, eventHandler func(e *Event)) Stage { - limitableStage, ok := stage.(LimitableStage) + limitableStage, ok := stage.(LimitableStage2) if !ok { eventHandler(&Event{ Command: stage.Name(), @@ -46,7 +46,7 @@ func MemoryLimit(stage Stage, byteLimit uint64, eventHandler func(e *Event)) Sta } func killAtLimit(byteLimit uint64, eventHandler func(e *Event)) memoryWatchFunc { - return func(ctx context.Context, stage LimitableStage) { + return func(ctx context.Context, stage LimitableStage2) { var consecutiveErrors int t := time.NewTicker(memoryPollInterval) @@ -91,8 +91,8 @@ func killAtLimit(byteLimit uint64, eventHandler func(e *Event)) memoryWatchFunc // MemoryObserver watches memory use of the stage and logs the maximum // value when the stage exits. -func MemoryObserver(stage Stage, eventHandler func(e *Event)) Stage { - limitableStage, ok := stage.(LimitableStage) +func MemoryObserver(stage Stage2, eventHandler func(e *Event)) Stage { + limitableStage, ok := stage.(LimitableStage2) if !ok { eventHandler(&Event{ Command: stage.Name(), @@ -110,7 +110,7 @@ func MemoryObserver(stage Stage, eventHandler func(e *Event)) Stage { func logMaxRSS(eventHandler func(e *Event)) memoryWatchFunc { - return func(ctx context.Context, stage LimitableStage) { + return func(ctx context.Context, stage LimitableStage2) { var ( maxRSS uint64 samples, errors, consecutiveErrors int @@ -161,26 +161,51 @@ func logMaxRSS(eventHandler func(e *Event)) memoryWatchFunc { type memoryWatchStage struct { nameSuffix string - stage LimitableStage + stage LimitableStage2 watch memoryWatchFunc cancel context.CancelFunc wg sync.WaitGroup } -type memoryWatchFunc func(context.Context, LimitableStage) +type memoryWatchFunc func(context.Context, LimitableStage2) -var _ LimitableStage = (*memoryWatchStage)(nil) +var _ LimitableStage2 = (*memoryWatchStage)(nil) func (m *memoryWatchStage) Name() string { return m.stage.Name() + m.nameSuffix } -func (m *memoryWatchStage) Start(ctx context.Context, env Env, stdin io.ReadCloser) (io.ReadCloser, error) { +func (m *memoryWatchStage) Start( + ctx context.Context, env Env, stdin io.ReadCloser, +) (io.ReadCloser, error) { io, err := m.stage.Start(ctx, env, stdin) if err != nil { return nil, err } + m.monitor(ctx) + + return io, nil +} + +func (m *memoryWatchStage) Preferences() StagePreferences { + return m.stage.Preferences() +} + +func (m *memoryWatchStage) Start2( + ctx context.Context, env Env, stdin io.ReadCloser, stdout io.WriteCloser, +) error { + if err := m.stage.Start2(ctx, env, stdin, stdout); err != nil { + return err + } + + m.monitor(ctx) + + return nil +} + +// monitor starts up a goroutine that monitors the memory of `m`. +func (m *memoryWatchStage) monitor(ctx context.Context) { ctx, cancel := context.WithCancel(ctx) m.cancel = cancel m.wg.Add(1) @@ -189,8 +214,6 @@ func (m *memoryWatchStage) Start(ctx context.Context, env Env, stdin io.ReadClos m.watch(ctx, m.stage) m.wg.Done() }() - - return io, nil } func (m *memoryWatchStage) Wait() error { diff --git a/pipe/memorylimit_test.go b/pipe/memorylimit_test.go index 7501c80..ac675f8 100644 --- a/pipe/memorylimit_test.go +++ b/pipe/memorylimit_test.go @@ -8,6 +8,7 @@ import ( "log" "os" "strings" + "syscall" "testing" "time" @@ -48,7 +49,7 @@ func TestMemoryObserverTreeMem(t *testing.T) { require.Greater(t, rss, 400_000_000) } -func testMemoryObserver(t *testing.T, mbs int, stage pipe.Stage) int { +func testMemoryObserver(t *testing.T, mbs int, stage pipe.Stage2) int { ctx := context.Background() stdinReader, stdinWriter := io.Pipe() @@ -112,54 +113,36 @@ func TestMemoryLimitTreeMem(t *testing.T) { require.ErrorContains(t, err, "memory limit exceeded") } -type closeWrapper struct { - io.Writer - close func() error -} - -func (w closeWrapper) Close() error { - return w.close() -} - -func testMemoryLimit(t *testing.T, mbs int, limit uint64, stage pipe.Stage) (string, error) { +func testMemoryLimit(t *testing.T, mbs int, limit uint64, stage pipe.Stage2) (string, error) { ctx := context.Background() - stdinReader, stdinWriter := io.Pipe() - devNull, err := os.OpenFile("/dev/null", os.O_WRONLY, 0) require.NoError(t, err) - // io.Pipe doesn't know if anything is listening on the other end, so once - // our process is expectedly killed then we'll end up blocked trying to - // write to it. To workaround this, make sure we close the pipe reader when - // we've detected that the process has exited (i.e. when stdout has been - // closed). This will cause our write to immediately fail with this error. - closedErr := fmt.Errorf("stdout was closed") - stdout := closeWrapper{ - Writer: devNull, - close: func() error { - require.NoError(t, stdinReader.CloseWithError(closedErr)) - return nil - }, - } - buf := &bytes.Buffer{} logger := log.New(buf, "testMemoryObserver", log.Ldate|log.Ltime) - p := pipe.New(pipe.WithDir("/"), pipe.WithStdin(stdinReader), pipe.WithStdoutCloser(stdout)) - p.Add(pipe.MemoryLimit(stage, limit, LogEventHandler(logger))) + p := pipe.New(pipe.WithDir("/"), pipe.WithStdoutCloser(devNull)) + p.Add( + pipe.Function( + "write-to-less", + func(ctx context.Context, _ pipe.Env, _ io.Reader, stdout io.Writer) error { + // Write some nonsense data to less. + var bytes [1_000_000]byte + for i := 0; i < mbs; i++ { + _, err := stdout.Write(bytes[:]) + if err != nil { + require.ErrorIs(t, err, syscall.EPIPE) + } + } + + return nil + }, + ), + pipe.MemoryLimit(stage, limit, LogEventHandler(logger)), + ) require.NoError(t, p.Start(ctx)) - // Write some nonsense data to less. - var bytes [1_000_000]byte - for i := 0; i < mbs; i++ { - _, err := stdinWriter.Write(bytes[:]) - if err != nil { - require.ErrorIs(t, err, closedErr) - } - } - - require.NoError(t, stdinWriter.Close()) err = p.Wait() return buf.String(), err diff --git a/pipe/pipeline.go b/pipe/pipeline.go index d7ea41d..1484f75 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io" + "os" "sync/atomic" ) @@ -53,7 +54,7 @@ type ContextValuesFunc func(context.Context) []EnvVar type Pipeline struct { env Env - stdin io.Reader + stdin io.ReadCloser stdout io.WriteCloser stages []Stage cancel func() @@ -97,7 +98,51 @@ func WithDir(dir string) Option { // WithStdin assigns stdin to the first command in the pipeline. func WithStdin(stdin io.Reader) Option { return func(p *Pipeline) { - p.stdin = stdin + // We don't want the first stage to close `stdin`, and it is + // not even necessarily an `io.ReadCloser`. So wrap it in a + // fake `io.ReadCloser` whose `Close()` method doesn't do + // anything. + // + // We could use `io.NopCloser()` for this purpose, but that + // would have a subtle problem. If the first stage is a + // `Command`, then it wants to set the `exec.Cmd`'s `Stdin` to + // an `io.Reader` corresponding to `p.stdin`. If `Cmd.Stdin` + // is an `*os.File`, then `exec.Cmd` will pass the file + // descriptor to the subcommand directly; there is no need to + // create a pipe and copy the data into the input side of the + // pipe. But if `p.stdin` is not an `*os.File`, then this + // optimization is prevented. And even worse, it also has the + // side effect that the goroutine that copies from `Cmd.Stdin` + // into the pipe doesn't terminate until that fd is closed by + // the writing side. + // + // That isn't always what we want. Consider, for example, the + // following snippet, where the subcommand's stdin is set to + // the stdin of the enclosing Go program, but wrapped with + // `io.NopCloser`: + // + // cmd := exec.Command("ls") + // cmd.Stdin = io.NopCloser(os.Stdin) + // cmd.Stdout = os.Stdout + // cmd.Stderr = os.Stderr + // cmd.Run() + // + // In this case, we don't want the Go program to wait for + // `os.Stdin` to close (because `ls` isn't even trying to read + // from its stdin). But it does: `exec.Cmd` doesn't recognize + // that `Cmd.Stdin` is an `*os.File`, so it sets up a pipe and + // copies the data itself, and this goroutine doesn't + // terminate until `cmd.Stdin` (i.e., the Go program's own + // stdin) is closed. But if, for example, the Go program is + // run from an interactive shell session, that might never + // happen, in which case the program will fail to terminate, + // even after `ls` exits. + // + // So instead, in this special case, we wrap `stdin` in our + // own `nopCloser`, which behaves like `io.NopCloser`, except + // that `pipe.CommandStage` knows how to unwrap it before + // passing it to `exec.Cmd`. + p.stdin = newReaderNopCloser(stdin) } } @@ -196,6 +241,13 @@ func (p *Pipeline) AddWithIgnoredError(em ErrorMatcher, stages ...Stage) { } } +type stageStarter struct { + stage2 Stage2 + prefs StagePreferences + stdin io.ReadCloser + stdout io.WriteCloser +} + // Start starts the commands in the pipeline. If `Start()` exits // without an error, `Wait()` must also be called, to allow all // resources to be freed. @@ -207,89 +259,138 @@ func (p *Pipeline) Start(ctx context.Context) error { atomic.StoreUint32(&p.started, 1) ctx, p.cancel = context.WithCancel(ctx) - var nextStdin io.ReadCloser - if p.stdin != nil { - // We don't want the first stage to actually close this, and - // `p.stdin` is not even necessarily an `io.ReadCloser`. So - // wrap it in a fake `io.ReadCloser` whose `Close()` method - // doesn't do anything. - // - // We could use `io.NopCloser()` for this purpose, but it has - // a subtle problem. If the first stage is a `Command`, then - // it wants to set the `exec.Cmd`'s `Stdin` to an `io.Reader` - // corresponding to `p.stdin`. If `Cmd.Stdin` is an - // `*os.File`, then the file descriptor can be passed to the - // subcommand directly; there is no need for this process to - // create a pipe and copy the data into the input side of the - // pipe. But if `p.stdin` is not an `*os.File`, then this - // optimization is prevented. And even worse, it also has the - // side effect that the goroutine that copies from `Cmd.Stdin` - // into the pipe doesn't terminate until that fd is closed by - // the writing side. - // - // That isn't always what we want. Consider, for example, the - // following snippet, where the subcommand's stdin is set to - // the stdin of the enclosing Go program, but wrapped with - // `io.NopCloser`: - // - // cmd := exec.Command("ls") - // cmd.Stdin = io.NopCloser(os.Stdin) - // cmd.Stdout = os.Stdout - // cmd.Stderr = os.Stderr - // cmd.Run() - // - // In this case, we don't want the Go program to wait for - // `os.Stdin` to close (because `ls` isn't even trying to read - // from its stdin). But it does: `exec.Cmd` doesn't recognize - // that `Cmd.Stdin` is an `*os.File`, so it sets up a pipe and - // copies the data itself, and this goroutine doesn't - // terminate until `cmd.Stdin` (i.e., the Go program's own - // stdin) is closed. But if, for example, the Go program is - // run from an interactive shell session, that might never - // happen, in which case the program will fail to terminate, - // even after `ls` exits. - // - // So instead, in this special case, we wrap `p.stdin` in our - // own `nopCloser`, which behaves like `io.NopCloser`, except - // that `pipe.CommandStage` knows how to unwrap it before - // passing it to `exec.Cmd`. - nextStdin = newReaderNopCloser(p.stdin) - } + // We need to decide how to start the stages, especially whether + // to use `Stage.Start()` vs. `Stage.Start2()`, and, if the + // latter, what pipes to use to connect adjacent stages + // (`os.Pipe()` vs. `io.Pipe()`) based on the two stages' + // preferences. + stageStarters := make([]stageStarter, len(p.stages), len(p.stages)+1) + // Collect information about each stage's type and preferences: for i, s := range p.stages { - var err error - stdout, err := s.Start(ctx, p.env, nextStdin) - if err != nil { - // Close the pipe that the previous stage was writing to. - // That should cause it to exit even if it's not minding - // its context. - if nextStdin != nil { - _ = nextStdin.Close() + ss := &stageStarters[i] + if s, ok := s.(Stage2); ok { + ss.stage2 = s + ss.prefs = s.Preferences() + } else { + ss.prefs = StagePreferences{ + StdinPreference: IOPreferenceUndefined, } + } + } - // Kill and wait for any stages that have been started - // already to finish: - p.cancel() - for _, s := range p.stages[:i] { - _ = s.Wait() - } - p.eventHandler(&Event{ - Command: s.Name(), - Msg: "failed to start pipeline stage", - Err: err, + if p.stdin != nil { + // Arrange for the input of the 0th stage to come from + // `p.stdin`: + stageStarters[0].stdin = p.stdin + } + + // The handling of the last stage depends on whether it is a + // `Stage` or a `Stage2`. + if p.stdout != nil { + i := len(p.stages) - 1 + ss := &stageStarters[i] + + if ss.stage2 != nil { + ss.stdout = p.stdout + } else { + // If `p.stdout` is set but the last stage is not a + // `Stage2`, then we need to add an extra, synthetic stage + // to copy its output to `p.stdout`. + c := newIOCopier(p.stdout) + p.stages = append(p.stages, c) + stageStarters = append(stageStarters, stageStarter{ + stage2: c, + prefs: c.Preferences(), }) - return fmt.Errorf("starting pipeline stage %q: %w", s.Name(), err) } - nextStdin = stdout } - // If the pipeline was configured with a `stdout`, add a synthetic - // stage to copy the last stage's stdout to that writer: - if p.stdout != nil { - c := newIOCopier(p.stdout) - p.stages = append(p.stages, c) - // `ioCopier.Start()` never fails: - _, _ = c.Start(ctx, p.env, nextStdin) + // Clean up any processes and pipes that have been created. `i` is + // the index of the stage that failed to start (whose output pipe + // has already been cleaned up if necessary). + abort := func(i int, err error) error { + // Close the pipe that the previous stage was writing to. + // That should cause it to exit even if it's not minding + // its context. + if stageStarters[i].stdin != nil { + _ = stageStarters[i].stdin.Close() + } + + // Kill and wait for any stages that have been started + // already to finish: + p.cancel() + for _, s := range p.stages[:i] { + _ = s.Wait() + } + p.eventHandler(&Event{ + Command: p.stages[i].Name(), + Msg: "failed to start pipeline stage", + Err: err, + }) + return fmt.Errorf( + "starting pipeline stage %q: %w", p.stages[i].Name(), err, + ) + } + + // Loop over all but the last stage, starting them. By the time we + // get to a stage, its stdin will have already been determined, + // but we still need to figure out its stdout and set the stdin + // that will be used for the subsequent stage. + for i, s := range p.stages[:len(p.stages)-1] { + ss := &stageStarters[i] + + nextSS := &stageStarters[i+1] + + if ss.stage2 != nil { + // We need to generate a pipe pair for this stage to use + // to communicate with its successor: + if ss.prefs.StdoutPreference == IOPreferenceFile || + nextSS.prefs.StdinPreference == IOPreferenceFile { + // Use an OS-level pipe for the communication: + var err error + nextSS.stdin, ss.stdout, err = os.Pipe() + if err != nil { + return abort(i, err) + } + } else { + nextSS.stdin, ss.stdout = io.Pipe() + } + if err := ss.stage2.Start2(ctx, p.env, ss.stdin, ss.stdout); err != nil { + nextSS.stdin.Close() + ss.stdout.Close() + return abort(i, err) + } + } else { + // The stage will create its own stdout when we start + // it: + var err error + nextSS.stdin, err = s.Start(ctx, p.env, ss.stdin) + if err != nil { + return abort(i, err) + } + } + } + + // The last stage needs special handling, because its stdout + // doesn't need to flow into another stage (it's already set in + // `ss.stdout` if it's needed). + { + i := len(p.stages) - 1 + s := p.stages[i] + ss := &stageStarters[i] + + if ss.stage2 != nil { + if err := ss.stage2.Start2(ctx, p.env, ss.stdin, ss.stdout); err != nil { + return abort(i, err) + } + } else { + var err error + _, err = s.Start(ctx, p.env, ss.stdin) + if err != nil { + return abort(i, err) + } + } } return nil diff --git a/pipe/pipeline_test.go b/pipe/pipeline_test.go index 30080a0..e85d7d1 100644 --- a/pipe/pipeline_test.go +++ b/pipe/pipeline_test.go @@ -87,7 +87,7 @@ func TestPipelineSingleCommandWithStdout(t *testing.T) { } } -func TestPipelineStdinFileThatIsNeverClosed(t *testing.T) { +func TestPipelineStdinOSPipeThatIsNeverClosed(t *testing.T) { t.Parallel() // Make sure that the subprocess terminates on its own, as opposed @@ -105,7 +105,10 @@ func TestPipelineStdinFileThatIsNeverClosed(t *testing.T) { var stdout bytes.Buffer - p := pipe.New(pipe.WithStdin(r), pipe.WithStdout(&stdout)) + p := pipe.New( + pipe.WithStdin(r), + pipe.WithStdout(&stdout), + ) // Note that this command doesn't read from its stdin, so it will // terminate regardless of whether `w` gets closed: p.Add(pipe.Command("true")) @@ -115,7 +118,7 @@ func TestPipelineStdinFileThatIsNeverClosed(t *testing.T) { assert.NoError(t, p.Run(ctx)) } -func TestPipelineStdinThatIsNeverClosed(t *testing.T) { +func TestPipelineIOPipeStdinThatIsNeverClosed(t *testing.T) { t.Skip("test not run because it currently deadlocks") t.Parallel() @@ -167,7 +170,33 @@ func TestNontrivialPipeline(t *testing.T) { } } -func TestPipelineReadFromSlowly(t *testing.T) { +func TestOSPipePipelineReadFromSlowly(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + r, w, err := os.Pipe() + require.NoError(t, err) + + var buf []byte + readErr := make(chan error, 1) + + go func() { + time.Sleep(200 * time.Millisecond) + var err error + buf, err = io.ReadAll(r) + readErr <- err + }() + + p := pipe.New(pipe.WithStdoutCloser(w)) + p.Add(pipe.Command("echo", "hello world")) + assert.NoError(t, p.Run(ctx)) + + assert.NoError(t, <-readErr) + assert.Equal(t, "hello world\n", string(buf)) +} + +func TestIOPipePipelineReadFromSlowly(t *testing.T) { t.Parallel() ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() diff --git a/pipe/stage.go b/pipe/stage.go index f3d74d9..e162a98 100644 --- a/pipe/stage.go +++ b/pipe/stage.go @@ -5,7 +5,86 @@ import ( "io" ) -// Stage is an element of a `Pipeline`. +// +// From the point of view of the pipeline as a whole, if stdin is +// provided by the user (`WithStdin()`), then we don't want to close +// it at all, whether it's an `*os.File` or not. For this reason, +// stdin has to be wrapped using a `readerNopCloser` before being +// passed into the first stage. For efficiency reasons, it's +// advantageous for the first stage should ideally unwrap its stdin +// argument before actually using it. If the wrapped value is an +// `*os.File` and the stage is a command stage, then unwrapping is +// also important to get the right semantics. +// +// For stdout, it depends on whether the user supplied it using +// `WithStdout()` or `WithStdoutCloser()`. If the former, then the +// considerations are the same as for stdin. +// +// [1] It's theoretically possible for a command to pass the open file +// descriptor to another, longer-lived process, in which case the +// file descriptor wouldn't necessarily get closed when the +// command finishes. But that's ill-behaved in a command that is +// being used in a pipeline, so we'll ignore that possibility. + +// Stage is an element of a `Pipeline`. It reads from standard input +// and writes to standard output. +// +// Who closes stdin and stdout? +// +// A `Stage` as a whole needs to be responsible for closing its end of +// stdin and stdout (assuming that `Start()` / `Start2()` returns +// successfully). Its doing so tells the previous/next stage that it +// is done reading/writing data, which can affect their behavior. +// Therefore, it should close each one as soon as it is done with it. +// (If the caller wants to suppress the closing of stdin/stdout, it +// can always wrap the corresponding argument in a `nopCloser`.) +// +// Specifically, if a stage is started using `Start()`, then it is +// responsible for closing the stdin that is passed to it, and also +// for closing its end of the `io.Reader` that the method returns. If +// a stage implements `Stage2` and is started using `Start2()`, then +// it is responsible for closing both the stdin and stdout that are +// passed in as arguments. How this should be done depends on the kind +// of stage and whether stdin/stdout are of type `*os.File`. +// +// If a stage is an external command, it the subprocess ultimately +// needs its own copies of `*os.File` file descriptors for its stdin +// and stdout. The external command will "always" [1] close those when +// it exits. +// +// If the stage is an external command and one of the arguments is an +// `*os.File`, then it can set the corresponding field of `exec.Cmd` +// to that argument directly. This has the result that `exec.Cmd` +// duplicates that file descriptor and passes the dup to the +// subprocess. Therefore, the stage can close its copy of that +// argument as soon as the external command has started, because the +// external command will keep its own copy open as long as necessary +// (and no longer!), in roughly the following sequence: +// +// cmd.Stdin = f // Similarly for stdout +// cmd.Start(…) +// f.Close() // close our copy +// cmd.Wait() +// +// If the stage is an external command and one of its arguments is not +// an `*os.File`, then `exec.Cmd` will take care of creating an +// `os.Pipe()`, copying from the provided argument in/out of the pipe, +// and eventually closing both ends of the pipe. The stage must close +// the argument itself, but only _after_ the external command has +// finished: +// +// cmd.Stdin = r // Similarly for stdout +// cmd.Start(…) +// cmd.Wait() +// r.Close() +// +// If the stage is a Go function, then it holds the only copy of +// stdin/stdout, so it must wait until the function is done before +// closing them (regardless of their underlying type: +// +// f(…, stdin, stdout) +// stdin.Close() +// stdout.Close() type Stage interface { // Name returns the name of the stage. Name() string @@ -16,12 +95,9 @@ type Stage interface { // might be the case for the first stage in a pipeline.) It // returns an `io.ReadCloser` from which the stage's output can be // read (or `nil` if it generates no output, which should only be - // the case for the last stage in a pipeline). It is the stages' - // responsibility to close `stdin` (if it is not nil) when it has - // read all of the input that it needs, and to close the write end - // of its output reader when it is done, as that is generally how - // the subsequent stage knows that it has received all of its - // input and can finish its work, too. + // the case for the last stage in a pipeline). See the `Stage` + // type comment for more information about responsibility for + // closing stdin and stdout. // // If `Start()` returns without an error, `Wait()` must also be // called, to allow all resources to be freed. @@ -32,3 +108,54 @@ type Stage interface { // the context passed to `Start()`. Wait() error } + +// StagePreferences is the way that a Stage2 indicates its preferences +// about how it is run. This is used within `pipe.Pipeline` to decide +// when to use `os.Pipe()` vs. `io.Pipe()` for creating the pipes +// between stages. +type StagePreferences struct { + StdinPreference IOPreference + StdoutPreference IOPreference +} + +// Stage2 is a `Stage` that can accept both stdin and stdout arguments +// when it is started. +type Stage2 interface { + Stage + + // Preferences() returns this stage's preferences regarding how it + // should be run. + Preferences() StagePreferences + + // Start2 starts the stage (like `Stage.Start()`), except that it + // allows the caller to pass in both stdin and stdout. + Start2(ctx context.Context, env Env, stdin io.ReadCloser, stdout io.WriteCloser) error +} + +// IOPreference describes what type of stdin / stdout a stage would +// prefer. +// +// External commands prefer `*os.File`s (such as those produced by +// `os.Pipe()`) as their stdin and stdout, because those can be passed +// directly by the external process without any extra copying and also +// simplify the semantics around process termination. Go function +// stages are typically happy with any `io.ReadCloser` (such as one +// produced by `io.Pipe()`), which can be more efficient because +// traffic through an `io.Pipe()` happens entirely in userspace. +type IOPreference int + +const ( + // IOPreferenceUndefined indicates that the stage doesn't care + // what form the specified stdin / stdout takes (i.e., any old + // `io.ReadCloser` / `io.WriteCloser` is just fine). + IOPreferenceUndefined IOPreference = iota + + // IOPreferenceFile indicates that the stage would prefer for the + // specified stdin / stdout to be an `*os.File`, to avoid copying. + IOPreferenceFile + + // IOPreferenceNil indicates that the stage does not use the + // specified stdin / stdout, so `nil` should be passed in. This + // should only happen at the beginning / end of a pipeline. + IOPreferenceNil +) From 34621356648d5ef468f077de13c652a159a4f195 Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Sun, 17 Dec 2023 02:11:58 +0100 Subject: [PATCH 08/11] Add some tests that `Pipeline.Start()` picks the right stdin/stdout The most complicated code dealing with `Stage2` is the selection of which types of stdin/stderr to pass to stages, and that's also the main advantage of the `Stage2` interface. So add a bunch of tests that the correct types (especially, `io.Pipe()` vs. `os.Pipe()`) are indeed being selected. --- pipe/export_test.go | 4 + pipe/nop_closer.go | 16 ++ pipe/pipe_matching_test.go | 553 +++++++++++++++++++++++++++++++++++++ 3 files changed, 573 insertions(+) create mode 100644 pipe/export_test.go create mode 100644 pipe/pipe_matching_test.go diff --git a/pipe/export_test.go b/pipe/export_test.go new file mode 100644 index 0000000..2812292 --- /dev/null +++ b/pipe/export_test.go @@ -0,0 +1,4 @@ +package pipe + +// This file exports a functions to be used only for testing. +var UnwrapNopCloser = unwrapNopCloser diff --git a/pipe/nop_closer.go b/pipe/nop_closer.go index 8c66c72..18cf7a9 100644 --- a/pipe/nop_closer.go +++ b/pipe/nop_closer.go @@ -51,3 +51,19 @@ type writerNopCloser struct { func (w writerNopCloser) Close() error { return nil } + +// unwrapNopCloser unwraps the object if it is some kind of nop +// closer, and returns the underlying object. This function is used +// only for testing. +func unwrapNopCloser(obj any) (any, bool) { + switch obj := obj.(type) { + case readerNopCloser: + return obj.Reader, true + case readerWriterToNopCloser: + return obj.Reader, true + case writerNopCloser: + return obj.Writer, true + default: + return nil, false + } +} diff --git a/pipe/pipe_matching_test.go b/pipe/pipe_matching_test.go new file mode 100644 index 0000000..a0f4c59 --- /dev/null +++ b/pipe/pipe_matching_test.go @@ -0,0 +1,553 @@ +package pipe_test + +import ( + "context" + "fmt" + "io" + "os" + "testing" + + "github.com/github/go-pipe/pipe" + "github.com/stretchr/testify/assert" +) + +// Tests that `Pipeline.Start()` uses the correct types of pipes in +// various situations. +// +// The type of pipe to use depends on both the source and the consumer +// of the data, including the overall pipeline's stdin and stdout. So +// there are a lot of possibilities to consider. + +// Additional values used for the expected types of stdin/stdout: +const ( + IOPreferenceUndefinedNopCloser pipe.IOPreference = iota + 100 + IOPreferenceFileNopCloser +) + +func file(t *testing.T) *os.File { + f, err := os.Open(os.DevNull) + assert.NoError(t, err) + return f +} + +func readCloser() io.ReadCloser { + r, w := io.Pipe() + w.Close() + return r +} + +func writeCloser() io.WriteCloser { + r, w := io.Pipe() + r.Close() + return w +} + +func newPipeSniffingStage1( + retval io.ReadCloser, stdinExpectation pipe.IOPreference, +) *pipeSniffingStage1 { + return &pipeSniffingStage1{ + StdinExpectation: stdinExpectation, + retval: retval, + } +} + +type pipeSniffingStage1 struct { + StdinExpectation pipe.IOPreference + retval io.ReadCloser + stdin io.ReadCloser +} + +func newPipeSniffingFunc1(stdinExpectation pipe.IOPreference) *pipeSniffingStage1 { + return newPipeSniffingStage1(readCloser(), stdinExpectation) +} + +func newPipeSniffingCmd1(t *testing.T, stdinExpectation pipe.IOPreference) *pipeSniffingStage1 { + return newPipeSniffingStage1(file(t), stdinExpectation) +} + +func (*pipeSniffingStage1) Name() string { + return "pipe-sniffer" +} + +func (s *pipeSniffingStage1) Start( + _ context.Context, _ pipe.Env, stdin io.ReadCloser, +) (io.ReadCloser, error) { + s.stdin = stdin + if stdin != nil { + _ = stdin.Close() + } + + return s.retval, nil +} + +func (s *pipeSniffingStage1) Wait() error { + return nil +} + +func (s *pipeSniffingStage1) check(t *testing.T, i int) { + t.Helper() + + checkStdinExpectation(t, i, s.StdinExpectation, s.stdin) +} + +func newPipeSniffingStage2( + stdinPreference, stdinExpectation pipe.IOPreference, + stdoutPreference, stdoutExpectation pipe.IOPreference, +) *pipeSniffingStage2 { + return &pipeSniffingStage2{ + prefs: pipe.StagePreferences{ + StdinPreference: stdinPreference, + StdoutPreference: stdoutPreference, + }, + expect: pipe.StagePreferences{ + StdinPreference: stdinExpectation, + StdoutPreference: stdoutExpectation, + }, + } +} + +func newPipeSniffingFunc2( + stdinExpectation, stdoutExpectation pipe.IOPreference, +) *pipeSniffingStage2 { + return newPipeSniffingStage2( + pipe.IOPreferenceUndefined, stdinExpectation, + pipe.IOPreferenceUndefined, stdoutExpectation, + ) +} + +func newPipeSniffingCmd2( + stdinExpectation, stdoutExpectation pipe.IOPreference, +) *pipeSniffingStage2 { + return newPipeSniffingStage2( + pipe.IOPreferenceFile, stdinExpectation, + pipe.IOPreferenceFile, stdoutExpectation, + ) +} + +type pipeSniffingStage2 struct { + prefs pipe.StagePreferences + expect pipe.StagePreferences + stdin io.ReadCloser + stdout io.WriteCloser +} + +func (*pipeSniffingStage2) Name() string { + return "pipe-sniffer" +} + +func (s *pipeSniffingStage2) Start( + _ context.Context, _ pipe.Env, _ io.ReadCloser, +) (io.ReadCloser, error) { + panic("Start() called for a Stage2") +} + +func (s *pipeSniffingStage2) Preferences() pipe.StagePreferences { + return s.prefs +} + +func (s *pipeSniffingStage2) Start2( + _ context.Context, _ pipe.Env, stdin io.ReadCloser, stdout io.WriteCloser, +) error { + s.stdin = stdin + if stdin != nil { + _ = stdin.Close() + } + s.stdout = stdout + if stdout != nil { + _ = stdout.Close() + } + return nil +} + +func (s *pipeSniffingStage2) check(t *testing.T, i int) { + t.Helper() + + checkStdinExpectation(t, i, s.expect.StdinPreference, s.stdin) + checkStdoutExpectation(t, i, s.expect.StdoutPreference, s.stdout) +} + +func (s *pipeSniffingStage2) Wait() error { + return nil +} + +var _ pipe.Stage2 = (*pipeSniffingStage2)(nil) + +func ioTypeString(f any) string { + if f == nil { + return "nil" + } + if f, ok := pipe.UnwrapNopCloser(f); ok { + return fmt.Sprintf("nopCloser(%s)", ioTypeString(f)) + } + switch f := f.(type) { + case *os.File: + return "*os.File" + case io.Reader: + return "other" + case io.Writer: + return "other" + default: + return fmt.Sprintf("%T", f) + } +} + +func prefString(pref pipe.IOPreference) string { + switch pref { + case pipe.IOPreferenceUndefined: + return "other" + case pipe.IOPreferenceFile: + return "*os.File" + case pipe.IOPreferenceNil: + return "nil" + case IOPreferenceUndefinedNopCloser: + return "nopCloser(other)" + case IOPreferenceFileNopCloser: + return "nopCloser(*os.File)" + default: + panic(fmt.Sprintf("invalid IOPreference: %d", pref)) + } +} + +type ReaderNopCloser interface { + NopCloserReader() io.Reader +} + +func checkStdinExpectation(t *testing.T, i int, pref pipe.IOPreference, stdin io.ReadCloser) { + t.Helper() + + ioType := ioTypeString(stdin) + expType := prefString(pref) + assert.Equalf( + t, expType, ioType, + "stage %d stdin: expected %s, got %s (%T)", i, expType, ioType, stdin, + ) +} + +type WriterNopCloser interface { + NopCloserWriter() io.Writer +} + +func checkStdoutExpectation(t *testing.T, i int, pref pipe.IOPreference, stdout io.WriteCloser) { + t.Helper() + + ioType := ioTypeString(stdout) + expType := prefString(pref) + assert.Equalf( + t, expType, ioType, + "stage %d stdout: expected %s, got %s (%T)", i, expType, ioType, stdout, + ) +} + +type checker interface { + check(t *testing.T, i int) +} + +func TestPipeTypes(t *testing.T) { + ctx := context.Background() + + t.Parallel() + + for _, tc := range []struct { + name string + opts []pipe.Option + stages []pipe.Stage + stdin io.Reader + stdout io.Writer + }{ + { + name: "func2", + opts: []pipe.Option{}, + stages: []pipe.Stage{ + newPipeSniffingFunc2(pipe.IOPreferenceNil, pipe.IOPreferenceNil), + }, + }, + { + name: "func2-file-stdin", + opts: []pipe.Option{ + pipe.WithStdin(file(t)), + }, + stages: []pipe.Stage{ + newPipeSniffingFunc2(IOPreferenceFileNopCloser, pipe.IOPreferenceNil), + }, + }, + { + name: "func2-file-stdout", + opts: []pipe.Option{ + pipe.WithStdout(file(t)), + }, + stages: []pipe.Stage{ + newPipeSniffingFunc2(pipe.IOPreferenceNil, IOPreferenceFileNopCloser), + }, + }, + { + name: "func2-file-stdout-closer", + opts: []pipe.Option{ + pipe.WithStdoutCloser(file(t)), + }, + stages: []pipe.Stage{ + newPipeSniffingFunc2(pipe.IOPreferenceNil, pipe.IOPreferenceFile), + }, + }, + { + name: "func2-file-stdin-other-stdout-closer-other", + opts: []pipe.Option{ + pipe.WithStdin(readCloser()), + pipe.WithStdoutCloser(writeCloser()), + }, + stages: []pipe.Stage{ + newPipeSniffingFunc2(IOPreferenceUndefinedNopCloser, pipe.IOPreferenceUndefined), + }, + }, + { + name: "cmd2", + opts: []pipe.Option{}, + stages: []pipe.Stage{ + newPipeSniffingCmd2(pipe.IOPreferenceNil, pipe.IOPreferenceNil), + }, + }, + { + name: "cmd2-file-stdin", + opts: []pipe.Option{ + pipe.WithStdin(file(t)), + }, + stages: []pipe.Stage{ + newPipeSniffingCmd2(IOPreferenceFileNopCloser, pipe.IOPreferenceNil), + }, + }, + { + name: "cmd2-file-stdout", + opts: []pipe.Option{ + pipe.WithStdout(file(t)), + }, + stages: []pipe.Stage{ + newPipeSniffingCmd2(pipe.IOPreferenceNil, IOPreferenceFileNopCloser), + }, + }, + { + name: "cmd2-file-stdout-closer", + opts: []pipe.Option{ + pipe.WithStdoutCloser(file(t)), + }, + stages: []pipe.Stage{ + newPipeSniffingCmd2(pipe.IOPreferenceNil, pipe.IOPreferenceFile), + }, + }, + { + name: "cmd2-file-stdin-other-stdout-closer-other", + opts: []pipe.Option{ + pipe.WithStdin(readCloser()), + pipe.WithStdoutCloser(writeCloser()), + }, + stages: []pipe.Stage{ + newPipeSniffingCmd2(IOPreferenceUndefinedNopCloser, pipe.IOPreferenceUndefined), + }, + }, + { + name: "func1", + opts: []pipe.Option{}, + stages: []pipe.Stage{ + newPipeSniffingFunc1(pipe.IOPreferenceNil), + }, + }, + { + name: "func1-file-stdin", + opts: []pipe.Option{ + pipe.WithStdin(file(t)), + }, + stages: []pipe.Stage{ + newPipeSniffingFunc1(IOPreferenceFileNopCloser), + }, + }, + { + name: "func1-file-stdout", + opts: []pipe.Option{ + pipe.WithStdout(file(t)), + }, + stages: []pipe.Stage{ + newPipeSniffingFunc1(pipe.IOPreferenceNil), + }, + }, + { + name: "func1-file-stdin-other-stdout-closer-other", + opts: []pipe.Option{ + pipe.WithStdin(readCloser()), + pipe.WithStdoutCloser(writeCloser()), + }, + stages: []pipe.Stage{ + newPipeSniffingFunc1(IOPreferenceUndefinedNopCloser), + }, + }, + { + name: "func2-func2", + opts: []pipe.Option{ + pipe.WithStdin(file(t)), + pipe.WithStdoutCloser(writeCloser()), + }, + stages: []pipe.Stage{ + newPipeSniffingFunc2(IOPreferenceFileNopCloser, pipe.IOPreferenceUndefined), + newPipeSniffingFunc2(pipe.IOPreferenceUndefined, pipe.IOPreferenceUndefined), + }, + }, + { + name: "func2-cmd2", + opts: []pipe.Option{ + pipe.WithStdout(file(t)), + }, + stages: []pipe.Stage{ + newPipeSniffingFunc2(pipe.IOPreferenceNil, pipe.IOPreferenceFile), + newPipeSniffingCmd2(pipe.IOPreferenceFile, IOPreferenceFileNopCloser), + }, + }, + { + name: "cmd2-func2", + opts: []pipe.Option{ + pipe.WithStdin(readCloser()), + }, + stages: []pipe.Stage{ + newPipeSniffingCmd2(IOPreferenceUndefinedNopCloser, pipe.IOPreferenceFile), + newPipeSniffingFunc2(pipe.IOPreferenceFile, pipe.IOPreferenceNil), + }, + }, + { + name: "cmd2-cmd2", + opts: []pipe.Option{}, + stages: []pipe.Stage{ + newPipeSniffingCmd2(pipe.IOPreferenceNil, pipe.IOPreferenceFile), + newPipeSniffingCmd2(pipe.IOPreferenceFile, pipe.IOPreferenceNil), + }, + }, + { + name: "func1-func2", + opts: []pipe.Option{}, + stages: []pipe.Stage{ + newPipeSniffingFunc1(pipe.IOPreferenceNil), + newPipeSniffingFunc2(pipe.IOPreferenceUndefined, pipe.IOPreferenceNil), + }, + }, + { + name: "cmd1-func2", + opts: []pipe.Option{}, + stages: []pipe.Stage{ + newPipeSniffingCmd1(t, pipe.IOPreferenceNil), + newPipeSniffingFunc2(pipe.IOPreferenceFile, pipe.IOPreferenceNil), + }, + }, + { + name: "func1-cmd2", + opts: []pipe.Option{ + pipe.WithStdin(readCloser()), + }, + stages: []pipe.Stage{ + newPipeSniffingFunc1(IOPreferenceUndefinedNopCloser), + newPipeSniffingCmd2(pipe.IOPreferenceUndefined, pipe.IOPreferenceNil), + }, + }, + { + name: "cmd1-cmd2", + opts: []pipe.Option{ + pipe.WithStdin(readCloser()), + }, + stages: []pipe.Stage{ + newPipeSniffingCmd1(t, IOPreferenceUndefinedNopCloser), + newPipeSniffingCmd2(pipe.IOPreferenceFile, pipe.IOPreferenceNil), + }, + }, + { + name: "func1-func1", + opts: []pipe.Option{ + pipe.WithStdin(file(t)), + }, + stages: []pipe.Stage{ + newPipeSniffingFunc1(IOPreferenceFileNopCloser), + newPipeSniffingFunc1(pipe.IOPreferenceUndefined), + }, + }, + { + name: "cmd1-func1", + opts: []pipe.Option{ + pipe.WithStdin(file(t)), + }, + stages: []pipe.Stage{ + newPipeSniffingCmd1(t, IOPreferenceFileNopCloser), + newPipeSniffingFunc1(pipe.IOPreferenceFile), + }, + }, + { + name: "func1-cmd1", + opts: []pipe.Option{ + pipe.WithStdin(file(t)), + }, + stages: []pipe.Stage{ + newPipeSniffingFunc1(IOPreferenceFileNopCloser), + newPipeSniffingCmd1(t, pipe.IOPreferenceUndefined), + }, + }, + { + name: "func2-func1", + opts: []pipe.Option{}, + stages: []pipe.Stage{ + newPipeSniffingFunc2(pipe.IOPreferenceNil, pipe.IOPreferenceUndefined), + newPipeSniffingFunc1(pipe.IOPreferenceUndefined), + }, + }, + { + name: "cmd2-func1", + opts: []pipe.Option{ + pipe.WithStdin(readCloser()), + }, + stages: []pipe.Stage{ + newPipeSniffingCmd2(IOPreferenceUndefinedNopCloser, pipe.IOPreferenceFile), + newPipeSniffingFunc1(pipe.IOPreferenceFile), + }, + }, + { + name: "hybrid1", + opts: []pipe.Option{}, + stages: []pipe.Stage{ + newPipeSniffingStage2( + pipe.IOPreferenceUndefined, pipe.IOPreferenceNil, + pipe.IOPreferenceUndefined, pipe.IOPreferenceUndefined, + ), + newPipeSniffingStage2( + pipe.IOPreferenceUndefined, pipe.IOPreferenceUndefined, + pipe.IOPreferenceFile, pipe.IOPreferenceFile, + ), + newPipeSniffingStage2( + pipe.IOPreferenceUndefined, pipe.IOPreferenceFile, + pipe.IOPreferenceUndefined, pipe.IOPreferenceNil, + ), + }, + }, + { + name: "hybrid2", + opts: []pipe.Option{}, + stages: []pipe.Stage{ + newPipeSniffingStage2( + pipe.IOPreferenceUndefined, pipe.IOPreferenceNil, + pipe.IOPreferenceUndefined, pipe.IOPreferenceFile, + ), + newPipeSniffingStage2( + pipe.IOPreferenceFile, pipe.IOPreferenceFile, + pipe.IOPreferenceUndefined, pipe.IOPreferenceUndefined, + ), + newPipeSniffingStage2( + pipe.IOPreferenceUndefined, pipe.IOPreferenceUndefined, + pipe.IOPreferenceUndefined, pipe.IOPreferenceNil, + ), + }, + }, + } { + tc := tc + + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + p := pipe.New(tc.opts...) + p.Add(tc.stages...) + assert.NoError(t, p.Run(ctx)) + for i, s := range tc.stages { + s.(checker).check(t, i) + } + }) + } +} From 413069fef67832975aa350e6cc7d7c9d39b3f231 Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Fri, 2 Aug 2024 19:51:28 -0700 Subject: [PATCH 09/11] Improve some names related to the new interface: MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * `Stage2` → `StageWithIO` * `Stage2.Start()` → `StageWithIO.StartWithIO()` Rename some other miscellaneous things analogously. --- pipe/command.go | 24 +++++----- pipe/command_linux.go | 2 +- pipe/filter-error.go | 12 ++--- pipe/function.go | 6 +-- pipe/iocopier.go | 2 +- pipe/memorylimit.go | 30 ++++++------ pipe/memorylimit_test.go | 4 +- pipe/pipe_matching_test.go | 96 +++++++++++++++++++------------------- pipe/pipeline.go | 32 ++++++------- pipe/stage.go | 27 +++++------ 10 files changed, 118 insertions(+), 117 deletions(-) diff --git a/pipe/command.go b/pipe/command.go index 1b83a15..73d94c1 100644 --- a/pipe/command.go +++ b/pipe/command.go @@ -15,9 +15,9 @@ import ( "golang.org/x/sync/errgroup" ) -// commandStage is a pipeline `Stage2` based on running an external -// command and piping the data through its stdin and stdout. It also -// implements `Stage2`. +// commandStage is a pipeline `Stage` based on running an +// external command and piping the data through its stdin and stdout. +// It also implements `StageWithIO`. type commandStage struct { name string cmd *exec.Cmd @@ -36,14 +36,14 @@ type commandStage struct { } var ( - _ Stage2 = (*commandStage)(nil) + _ StageWithIO = (*commandStage)(nil) ) -// Command returns a pipeline `Stage2` based on the specified external -// `command`, run with the given command-line `args`. Its stdin and -// stdout are handled as usual, and its stderr is collected and -// included in any `*exec.ExitError` that the command might emit. -func Command(command string, args ...string) Stage2 { +// Command returns a pipeline `StageWithIO` based on the specified +// external `command`, run with the given command-line `args`. Its +// stdin and stdout are handled as usual, and its stderr is collected +// and included in any `*exec.ExitError` that the command might emit. +func Command(command string, args ...string) StageWithIO { if len(command) == 0 { panic("attempt to create command with empty command") } @@ -56,7 +56,7 @@ func Command(command string, args ...string) Stage2 { // the specified `cmd`. Its stdin and stdout are handled as usual, and // its stderr is collected and included in any `*exec.ExitError` that // the command might emit. -func CommandStage(name string, cmd *exec.Cmd) Stage2 { +func CommandStage(name string, cmd *exec.Cmd) StageWithIO { return &commandStage{ name: name, cmd: cmd, @@ -76,7 +76,7 @@ func (s *commandStage) Start( return nil, err } - if err := s.Start2(ctx, env, stdin, pw); err != nil { + if err := s.StartWithIO(ctx, env, stdin, pw); err != nil { _ = pr.Close() _ = pw.Close() return nil, err @@ -107,7 +107,7 @@ func (s *commandStage) Preferences() StagePreferences { return prefs } -func (s *commandStage) Start2( +func (s *commandStage) StartWithIO( ctx context.Context, env Env, stdin io.ReadCloser, stdout io.WriteCloser, ) error { if s.cmd.Dir == "" { diff --git a/pipe/command_linux.go b/pipe/command_linux.go index 27c50fb..69cd82e 100644 --- a/pipe/command_linux.go +++ b/pipe/command_linux.go @@ -10,7 +10,7 @@ import ( ) // On linux, we can limit or observe memory usage in command stages. -var _ LimitableStage2 = (*commandStage)(nil) +var _ LimitableStageWithIO = (*commandStage)(nil) var ( errProcessInfoMissing = errors.New("cmd.Process is nil") diff --git a/pipe/filter-error.go b/pipe/filter-error.go index 7d143d2..fa7a3f7 100644 --- a/pipe/filter-error.go +++ b/pipe/filter-error.go @@ -14,8 +14,8 @@ import ( type ErrorFilter func(err error) error func FilterError(s Stage, filter ErrorFilter) Stage { - if s, ok := s.(Stage2); ok { - return efStage2{Stage2: s, filter: filter} + if s, ok := s.(StageWithIO); ok { + return efStageWithIO{StageWithIO: s, filter: filter} } return efStage{Stage: s, filter: filter} } @@ -29,13 +29,13 @@ func (s efStage) Wait() error { return s.filter(s.Stage.Wait()) } -type efStage2 struct { - Stage2 +type efStageWithIO struct { + StageWithIO filter ErrorFilter } -func (s efStage2) Wait() error { - return s.filter(s.Stage2.Wait()) +func (s efStageWithIO) Wait() error { + return s.filter(s.StageWithIO.Wait()) } // ErrorMatcher decides whether its argument matches some class of diff --git a/pipe/function.go b/pipe/function.go index 6470965..627c036 100644 --- a/pipe/function.go +++ b/pipe/function.go @@ -39,7 +39,7 @@ type goStage struct { } var ( - _ Stage2 = (*goStage)(nil) + _ StageWithIO = (*goStage)(nil) ) func (s *goStage) Name() string { @@ -56,7 +56,7 @@ func (s *goStage) Preferences() StagePreferences { func (s *goStage) Start(ctx context.Context, env Env, stdin io.ReadCloser) (io.ReadCloser, error) { pr, pw := io.Pipe() - if err := s.Start2(ctx, env, stdin, pw); err != nil { + if err := s.StartWithIO(ctx, env, stdin, pw); err != nil { _ = pr.Close() _ = pw.Close() return nil, err @@ -65,7 +65,7 @@ func (s *goStage) Start(ctx context.Context, env Env, stdin io.ReadCloser) (io.R return pr, nil } -func (s *goStage) Start2( +func (s *goStage) StartWithIO( ctx context.Context, env Env, stdin io.ReadCloser, stdout io.WriteCloser, ) error { var r io.Reader = stdin diff --git a/pipe/iocopier.go b/pipe/iocopier.go index 6f8b78e..d48df78 100644 --- a/pipe/iocopier.go +++ b/pipe/iocopier.go @@ -64,7 +64,7 @@ func (s *ioCopier) Preferences() StagePreferences { } // This method always returns `nil`. -func (s *ioCopier) Start2( +func (s *ioCopier) StartWithIO( _ context.Context, _ Env, stdin io.ReadCloser, stdout io.WriteCloser, ) error { if stdout != nil { diff --git a/pipe/memorylimit.go b/pipe/memorylimit.go index cee82a0..7be7fe1 100644 --- a/pipe/memorylimit.go +++ b/pipe/memorylimit.go @@ -15,10 +15,10 @@ const memoryPollInterval = time.Second // process, if necessary, from MemoryLimit. var ErrMemoryLimitExceeded = errors.New("memory limit exceeded") -// LimitableStage2 is the superset of Stage2 that must be implemented -// by stages passed to MemoryLimit and MemoryObserver. -type LimitableStage2 interface { - Stage2 +// LimitableStageWithIO is the superset of StageWithIO that must be +// implemented by stages passed to MemoryLimit and MemoryObserver. +type LimitableStageWithIO interface { + StageWithIO GetRSSAnon(context.Context) (uint64, error) Kill(error) @@ -26,9 +26,9 @@ type LimitableStage2 interface { // MemoryLimit watches the memory usage of the stage and stops it if it // exceeds the given limit. -func MemoryLimit(stage Stage2, byteLimit uint64, eventHandler func(e *Event)) Stage { +func MemoryLimit(stage StageWithIO, byteLimit uint64, eventHandler func(e *Event)) Stage { - limitableStage, ok := stage.(LimitableStage2) + limitableStage, ok := stage.(LimitableStageWithIO) if !ok { eventHandler(&Event{ Command: stage.Name(), @@ -46,7 +46,7 @@ func MemoryLimit(stage Stage2, byteLimit uint64, eventHandler func(e *Event)) St } func killAtLimit(byteLimit uint64, eventHandler func(e *Event)) memoryWatchFunc { - return func(ctx context.Context, stage LimitableStage2) { + return func(ctx context.Context, stage LimitableStageWithIO) { var consecutiveErrors int t := time.NewTicker(memoryPollInterval) @@ -91,8 +91,8 @@ func killAtLimit(byteLimit uint64, eventHandler func(e *Event)) memoryWatchFunc // MemoryObserver watches memory use of the stage and logs the maximum // value when the stage exits. -func MemoryObserver(stage Stage2, eventHandler func(e *Event)) Stage { - limitableStage, ok := stage.(LimitableStage2) +func MemoryObserver(stage StageWithIO, eventHandler func(e *Event)) Stage { + limitableStage, ok := stage.(LimitableStageWithIO) if !ok { eventHandler(&Event{ Command: stage.Name(), @@ -110,7 +110,7 @@ func MemoryObserver(stage Stage2, eventHandler func(e *Event)) Stage { func logMaxRSS(eventHandler func(e *Event)) memoryWatchFunc { - return func(ctx context.Context, stage LimitableStage2) { + return func(ctx context.Context, stage LimitableStageWithIO) { var ( maxRSS uint64 samples, errors, consecutiveErrors int @@ -161,15 +161,15 @@ func logMaxRSS(eventHandler func(e *Event)) memoryWatchFunc { type memoryWatchStage struct { nameSuffix string - stage LimitableStage2 + stage LimitableStageWithIO watch memoryWatchFunc cancel context.CancelFunc wg sync.WaitGroup } -type memoryWatchFunc func(context.Context, LimitableStage2) +type memoryWatchFunc func(context.Context, LimitableStageWithIO) -var _ LimitableStage2 = (*memoryWatchStage)(nil) +var _ LimitableStageWithIO = (*memoryWatchStage)(nil) func (m *memoryWatchStage) Name() string { return m.stage.Name() + m.nameSuffix @@ -192,10 +192,10 @@ func (m *memoryWatchStage) Preferences() StagePreferences { return m.stage.Preferences() } -func (m *memoryWatchStage) Start2( +func (m *memoryWatchStage) StartWithIO( ctx context.Context, env Env, stdin io.ReadCloser, stdout io.WriteCloser, ) error { - if err := m.stage.Start2(ctx, env, stdin, stdout); err != nil { + if err := m.stage.StartWithIO(ctx, env, stdin, stdout); err != nil { return err } diff --git a/pipe/memorylimit_test.go b/pipe/memorylimit_test.go index ac675f8..ffecae2 100644 --- a/pipe/memorylimit_test.go +++ b/pipe/memorylimit_test.go @@ -49,7 +49,7 @@ func TestMemoryObserverTreeMem(t *testing.T) { require.Greater(t, rss, 400_000_000) } -func testMemoryObserver(t *testing.T, mbs int, stage pipe.Stage2) int { +func testMemoryObserver(t *testing.T, mbs int, stage pipe.StageWithIO) int { ctx := context.Background() stdinReader, stdinWriter := io.Pipe() @@ -113,7 +113,7 @@ func TestMemoryLimitTreeMem(t *testing.T) { require.ErrorContains(t, err, "memory limit exceeded") } -func testMemoryLimit(t *testing.T, mbs int, limit uint64, stage pipe.Stage2) (string, error) { +func testMemoryLimit(t *testing.T, mbs int, limit uint64, stage pipe.StageWithIO) (string, error) { ctx := context.Background() devNull, err := os.OpenFile("/dev/null", os.O_WRONLY, 0) diff --git a/pipe/pipe_matching_test.go b/pipe/pipe_matching_test.go index a0f4c59..4ff1c2c 100644 --- a/pipe/pipe_matching_test.go +++ b/pipe/pipe_matching_test.go @@ -90,11 +90,11 @@ func (s *pipeSniffingStage1) check(t *testing.T, i int) { checkStdinExpectation(t, i, s.StdinExpectation, s.stdin) } -func newPipeSniffingStage2( +func newPipeSniffingStageWithIO( stdinPreference, stdinExpectation pipe.IOPreference, stdoutPreference, stdoutExpectation pipe.IOPreference, -) *pipeSniffingStage2 { - return &pipeSniffingStage2{ +) *pipeSniffingStageWithIO { + return &pipeSniffingStageWithIO{ prefs: pipe.StagePreferences{ StdinPreference: stdinPreference, StdoutPreference: stdoutPreference, @@ -106,46 +106,46 @@ func newPipeSniffingStage2( } } -func newPipeSniffingFunc2( +func newPipeSniffingFuncWithIO( stdinExpectation, stdoutExpectation pipe.IOPreference, -) *pipeSniffingStage2 { - return newPipeSniffingStage2( +) *pipeSniffingStageWithIO { + return newPipeSniffingStageWithIO( pipe.IOPreferenceUndefined, stdinExpectation, pipe.IOPreferenceUndefined, stdoutExpectation, ) } -func newPipeSniffingCmd2( +func newPipeSniffingCmdWithIO( stdinExpectation, stdoutExpectation pipe.IOPreference, -) *pipeSniffingStage2 { - return newPipeSniffingStage2( +) *pipeSniffingStageWithIO { + return newPipeSniffingStageWithIO( pipe.IOPreferenceFile, stdinExpectation, pipe.IOPreferenceFile, stdoutExpectation, ) } -type pipeSniffingStage2 struct { +type pipeSniffingStageWithIO struct { prefs pipe.StagePreferences expect pipe.StagePreferences stdin io.ReadCloser stdout io.WriteCloser } -func (*pipeSniffingStage2) Name() string { +func (*pipeSniffingStageWithIO) Name() string { return "pipe-sniffer" } -func (s *pipeSniffingStage2) Start( +func (s *pipeSniffingStageWithIO) Start( _ context.Context, _ pipe.Env, _ io.ReadCloser, ) (io.ReadCloser, error) { - panic("Start() called for a Stage2") + panic("Start() called for a StageWithIO") } -func (s *pipeSniffingStage2) Preferences() pipe.StagePreferences { +func (s *pipeSniffingStageWithIO) Preferences() pipe.StagePreferences { return s.prefs } -func (s *pipeSniffingStage2) Start2( +func (s *pipeSniffingStageWithIO) StartWithIO( _ context.Context, _ pipe.Env, stdin io.ReadCloser, stdout io.WriteCloser, ) error { s.stdin = stdin @@ -159,18 +159,18 @@ func (s *pipeSniffingStage2) Start2( return nil } -func (s *pipeSniffingStage2) check(t *testing.T, i int) { +func (s *pipeSniffingStageWithIO) check(t *testing.T, i int) { t.Helper() checkStdinExpectation(t, i, s.expect.StdinPreference, s.stdin) checkStdoutExpectation(t, i, s.expect.StdoutPreference, s.stdout) } -func (s *pipeSniffingStage2) Wait() error { +func (s *pipeSniffingStageWithIO) Wait() error { return nil } -var _ pipe.Stage2 = (*pipeSniffingStage2)(nil) +var _ pipe.StageWithIO = (*pipeSniffingStageWithIO)(nil) func ioTypeString(f any) string { if f == nil { @@ -258,7 +258,7 @@ func TestPipeTypes(t *testing.T) { name: "func2", opts: []pipe.Option{}, stages: []pipe.Stage{ - newPipeSniffingFunc2(pipe.IOPreferenceNil, pipe.IOPreferenceNil), + newPipeSniffingFuncWithIO(pipe.IOPreferenceNil, pipe.IOPreferenceNil), }, }, { @@ -267,7 +267,7 @@ func TestPipeTypes(t *testing.T) { pipe.WithStdin(file(t)), }, stages: []pipe.Stage{ - newPipeSniffingFunc2(IOPreferenceFileNopCloser, pipe.IOPreferenceNil), + newPipeSniffingFuncWithIO(IOPreferenceFileNopCloser, pipe.IOPreferenceNil), }, }, { @@ -276,7 +276,7 @@ func TestPipeTypes(t *testing.T) { pipe.WithStdout(file(t)), }, stages: []pipe.Stage{ - newPipeSniffingFunc2(pipe.IOPreferenceNil, IOPreferenceFileNopCloser), + newPipeSniffingFuncWithIO(pipe.IOPreferenceNil, IOPreferenceFileNopCloser), }, }, { @@ -285,7 +285,7 @@ func TestPipeTypes(t *testing.T) { pipe.WithStdoutCloser(file(t)), }, stages: []pipe.Stage{ - newPipeSniffingFunc2(pipe.IOPreferenceNil, pipe.IOPreferenceFile), + newPipeSniffingFuncWithIO(pipe.IOPreferenceNil, pipe.IOPreferenceFile), }, }, { @@ -295,14 +295,14 @@ func TestPipeTypes(t *testing.T) { pipe.WithStdoutCloser(writeCloser()), }, stages: []pipe.Stage{ - newPipeSniffingFunc2(IOPreferenceUndefinedNopCloser, pipe.IOPreferenceUndefined), + newPipeSniffingFuncWithIO(IOPreferenceUndefinedNopCloser, pipe.IOPreferenceUndefined), }, }, { name: "cmd2", opts: []pipe.Option{}, stages: []pipe.Stage{ - newPipeSniffingCmd2(pipe.IOPreferenceNil, pipe.IOPreferenceNil), + newPipeSniffingCmdWithIO(pipe.IOPreferenceNil, pipe.IOPreferenceNil), }, }, { @@ -311,7 +311,7 @@ func TestPipeTypes(t *testing.T) { pipe.WithStdin(file(t)), }, stages: []pipe.Stage{ - newPipeSniffingCmd2(IOPreferenceFileNopCloser, pipe.IOPreferenceNil), + newPipeSniffingCmdWithIO(IOPreferenceFileNopCloser, pipe.IOPreferenceNil), }, }, { @@ -320,7 +320,7 @@ func TestPipeTypes(t *testing.T) { pipe.WithStdout(file(t)), }, stages: []pipe.Stage{ - newPipeSniffingCmd2(pipe.IOPreferenceNil, IOPreferenceFileNopCloser), + newPipeSniffingCmdWithIO(pipe.IOPreferenceNil, IOPreferenceFileNopCloser), }, }, { @@ -329,7 +329,7 @@ func TestPipeTypes(t *testing.T) { pipe.WithStdoutCloser(file(t)), }, stages: []pipe.Stage{ - newPipeSniffingCmd2(pipe.IOPreferenceNil, pipe.IOPreferenceFile), + newPipeSniffingCmdWithIO(pipe.IOPreferenceNil, pipe.IOPreferenceFile), }, }, { @@ -339,7 +339,7 @@ func TestPipeTypes(t *testing.T) { pipe.WithStdoutCloser(writeCloser()), }, stages: []pipe.Stage{ - newPipeSniffingCmd2(IOPreferenceUndefinedNopCloser, pipe.IOPreferenceUndefined), + newPipeSniffingCmdWithIO(IOPreferenceUndefinedNopCloser, pipe.IOPreferenceUndefined), }, }, { @@ -384,8 +384,8 @@ func TestPipeTypes(t *testing.T) { pipe.WithStdoutCloser(writeCloser()), }, stages: []pipe.Stage{ - newPipeSniffingFunc2(IOPreferenceFileNopCloser, pipe.IOPreferenceUndefined), - newPipeSniffingFunc2(pipe.IOPreferenceUndefined, pipe.IOPreferenceUndefined), + newPipeSniffingFuncWithIO(IOPreferenceFileNopCloser, pipe.IOPreferenceUndefined), + newPipeSniffingFuncWithIO(pipe.IOPreferenceUndefined, pipe.IOPreferenceUndefined), }, }, { @@ -394,8 +394,8 @@ func TestPipeTypes(t *testing.T) { pipe.WithStdout(file(t)), }, stages: []pipe.Stage{ - newPipeSniffingFunc2(pipe.IOPreferenceNil, pipe.IOPreferenceFile), - newPipeSniffingCmd2(pipe.IOPreferenceFile, IOPreferenceFileNopCloser), + newPipeSniffingFuncWithIO(pipe.IOPreferenceNil, pipe.IOPreferenceFile), + newPipeSniffingCmdWithIO(pipe.IOPreferenceFile, IOPreferenceFileNopCloser), }, }, { @@ -404,16 +404,16 @@ func TestPipeTypes(t *testing.T) { pipe.WithStdin(readCloser()), }, stages: []pipe.Stage{ - newPipeSniffingCmd2(IOPreferenceUndefinedNopCloser, pipe.IOPreferenceFile), - newPipeSniffingFunc2(pipe.IOPreferenceFile, pipe.IOPreferenceNil), + newPipeSniffingCmdWithIO(IOPreferenceUndefinedNopCloser, pipe.IOPreferenceFile), + newPipeSniffingFuncWithIO(pipe.IOPreferenceFile, pipe.IOPreferenceNil), }, }, { name: "cmd2-cmd2", opts: []pipe.Option{}, stages: []pipe.Stage{ - newPipeSniffingCmd2(pipe.IOPreferenceNil, pipe.IOPreferenceFile), - newPipeSniffingCmd2(pipe.IOPreferenceFile, pipe.IOPreferenceNil), + newPipeSniffingCmdWithIO(pipe.IOPreferenceNil, pipe.IOPreferenceFile), + newPipeSniffingCmdWithIO(pipe.IOPreferenceFile, pipe.IOPreferenceNil), }, }, { @@ -421,7 +421,7 @@ func TestPipeTypes(t *testing.T) { opts: []pipe.Option{}, stages: []pipe.Stage{ newPipeSniffingFunc1(pipe.IOPreferenceNil), - newPipeSniffingFunc2(pipe.IOPreferenceUndefined, pipe.IOPreferenceNil), + newPipeSniffingFuncWithIO(pipe.IOPreferenceUndefined, pipe.IOPreferenceNil), }, }, { @@ -429,7 +429,7 @@ func TestPipeTypes(t *testing.T) { opts: []pipe.Option{}, stages: []pipe.Stage{ newPipeSniffingCmd1(t, pipe.IOPreferenceNil), - newPipeSniffingFunc2(pipe.IOPreferenceFile, pipe.IOPreferenceNil), + newPipeSniffingFuncWithIO(pipe.IOPreferenceFile, pipe.IOPreferenceNil), }, }, { @@ -439,7 +439,7 @@ func TestPipeTypes(t *testing.T) { }, stages: []pipe.Stage{ newPipeSniffingFunc1(IOPreferenceUndefinedNopCloser), - newPipeSniffingCmd2(pipe.IOPreferenceUndefined, pipe.IOPreferenceNil), + newPipeSniffingCmdWithIO(pipe.IOPreferenceUndefined, pipe.IOPreferenceNil), }, }, { @@ -449,7 +449,7 @@ func TestPipeTypes(t *testing.T) { }, stages: []pipe.Stage{ newPipeSniffingCmd1(t, IOPreferenceUndefinedNopCloser), - newPipeSniffingCmd2(pipe.IOPreferenceFile, pipe.IOPreferenceNil), + newPipeSniffingCmdWithIO(pipe.IOPreferenceFile, pipe.IOPreferenceNil), }, }, { @@ -486,7 +486,7 @@ func TestPipeTypes(t *testing.T) { name: "func2-func1", opts: []pipe.Option{}, stages: []pipe.Stage{ - newPipeSniffingFunc2(pipe.IOPreferenceNil, pipe.IOPreferenceUndefined), + newPipeSniffingFuncWithIO(pipe.IOPreferenceNil, pipe.IOPreferenceUndefined), newPipeSniffingFunc1(pipe.IOPreferenceUndefined), }, }, @@ -496,7 +496,7 @@ func TestPipeTypes(t *testing.T) { pipe.WithStdin(readCloser()), }, stages: []pipe.Stage{ - newPipeSniffingCmd2(IOPreferenceUndefinedNopCloser, pipe.IOPreferenceFile), + newPipeSniffingCmdWithIO(IOPreferenceUndefinedNopCloser, pipe.IOPreferenceFile), newPipeSniffingFunc1(pipe.IOPreferenceFile), }, }, @@ -504,15 +504,15 @@ func TestPipeTypes(t *testing.T) { name: "hybrid1", opts: []pipe.Option{}, stages: []pipe.Stage{ - newPipeSniffingStage2( + newPipeSniffingStageWithIO( pipe.IOPreferenceUndefined, pipe.IOPreferenceNil, pipe.IOPreferenceUndefined, pipe.IOPreferenceUndefined, ), - newPipeSniffingStage2( + newPipeSniffingStageWithIO( pipe.IOPreferenceUndefined, pipe.IOPreferenceUndefined, pipe.IOPreferenceFile, pipe.IOPreferenceFile, ), - newPipeSniffingStage2( + newPipeSniffingStageWithIO( pipe.IOPreferenceUndefined, pipe.IOPreferenceFile, pipe.IOPreferenceUndefined, pipe.IOPreferenceNil, ), @@ -522,15 +522,15 @@ func TestPipeTypes(t *testing.T) { name: "hybrid2", opts: []pipe.Option{}, stages: []pipe.Stage{ - newPipeSniffingStage2( + newPipeSniffingStageWithIO( pipe.IOPreferenceUndefined, pipe.IOPreferenceNil, pipe.IOPreferenceUndefined, pipe.IOPreferenceFile, ), - newPipeSniffingStage2( + newPipeSniffingStageWithIO( pipe.IOPreferenceFile, pipe.IOPreferenceFile, pipe.IOPreferenceUndefined, pipe.IOPreferenceUndefined, ), - newPipeSniffingStage2( + newPipeSniffingStageWithIO( pipe.IOPreferenceUndefined, pipe.IOPreferenceUndefined, pipe.IOPreferenceUndefined, pipe.IOPreferenceNil, ), diff --git a/pipe/pipeline.go b/pipe/pipeline.go index 1484f75..0bd3d6a 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -242,10 +242,10 @@ func (p *Pipeline) AddWithIgnoredError(em ErrorMatcher, stages ...Stage) { } type stageStarter struct { - stage2 Stage2 - prefs StagePreferences - stdin io.ReadCloser - stdout io.WriteCloser + stageWithIO StageWithIO + prefs StagePreferences + stdin io.ReadCloser + stdout io.WriteCloser } // Start starts the commands in the pipeline. If `Start()` exits @@ -260,7 +260,7 @@ func (p *Pipeline) Start(ctx context.Context) error { ctx, p.cancel = context.WithCancel(ctx) // We need to decide how to start the stages, especially whether - // to use `Stage.Start()` vs. `Stage.Start2()`, and, if the + // to use `Stage.Start()` vs. `Stage.StartWithIO()`, and, if the // latter, what pipes to use to connect adjacent stages // (`os.Pipe()` vs. `io.Pipe()`) based on the two stages' // preferences. @@ -269,8 +269,8 @@ func (p *Pipeline) Start(ctx context.Context) error { // Collect information about each stage's type and preferences: for i, s := range p.stages { ss := &stageStarters[i] - if s, ok := s.(Stage2); ok { - ss.stage2 = s + if s, ok := s.(StageWithIO); ok { + ss.stageWithIO = s ss.prefs = s.Preferences() } else { ss.prefs = StagePreferences{ @@ -286,22 +286,22 @@ func (p *Pipeline) Start(ctx context.Context) error { } // The handling of the last stage depends on whether it is a - // `Stage` or a `Stage2`. + // `Stage` or a `StageWithIO`. if p.stdout != nil { i := len(p.stages) - 1 ss := &stageStarters[i] - if ss.stage2 != nil { + if ss.stageWithIO != nil { ss.stdout = p.stdout } else { // If `p.stdout` is set but the last stage is not a - // `Stage2`, then we need to add an extra, synthetic stage + // `StageWithIO`, then we need to add an extra, synthetic stage // to copy its output to `p.stdout`. c := newIOCopier(p.stdout) p.stages = append(p.stages, c) stageStarters = append(stageStarters, stageStarter{ - stage2: c, - prefs: c.Preferences(), + stageWithIO: c, + prefs: c.Preferences(), }) } } @@ -342,7 +342,7 @@ func (p *Pipeline) Start(ctx context.Context) error { nextSS := &stageStarters[i+1] - if ss.stage2 != nil { + if ss.stageWithIO != nil { // We need to generate a pipe pair for this stage to use // to communicate with its successor: if ss.prefs.StdoutPreference == IOPreferenceFile || @@ -356,7 +356,7 @@ func (p *Pipeline) Start(ctx context.Context) error { } else { nextSS.stdin, ss.stdout = io.Pipe() } - if err := ss.stage2.Start2(ctx, p.env, ss.stdin, ss.stdout); err != nil { + if err := ss.stageWithIO.StartWithIO(ctx, p.env, ss.stdin, ss.stdout); err != nil { nextSS.stdin.Close() ss.stdout.Close() return abort(i, err) @@ -380,8 +380,8 @@ func (p *Pipeline) Start(ctx context.Context) error { s := p.stages[i] ss := &stageStarters[i] - if ss.stage2 != nil { - if err := ss.stage2.Start2(ctx, p.env, ss.stdin, ss.stdout); err != nil { + if ss.stageWithIO != nil { + if err := ss.stageWithIO.StartWithIO(ctx, p.env, ss.stdin, ss.stdout); err != nil { return abort(i, err) } } else { diff --git a/pipe/stage.go b/pipe/stage.go index e162a98..b68713c 100644 --- a/pipe/stage.go +++ b/pipe/stage.go @@ -32,7 +32,7 @@ import ( // Who closes stdin and stdout? // // A `Stage` as a whole needs to be responsible for closing its end of -// stdin and stdout (assuming that `Start()` / `Start2()` returns +// stdin and stdout (assuming that `Start()` / `StartWithIO()` returns // successfully). Its doing so tells the previous/next stage that it // is done reading/writing data, which can affect their behavior. // Therefore, it should close each one as soon as it is done with it. @@ -42,10 +42,11 @@ import ( // Specifically, if a stage is started using `Start()`, then it is // responsible for closing the stdin that is passed to it, and also // for closing its end of the `io.Reader` that the method returns. If -// a stage implements `Stage2` and is started using `Start2()`, then -// it is responsible for closing both the stdin and stdout that are -// passed in as arguments. How this should be done depends on the kind -// of stage and whether stdin/stdout are of type `*os.File`. +// a stage implements `StageWithIO` and is started using +// `StartWithIO()`, then it is responsible for closing both the stdin +// and stdout that are passed in as arguments. How this should be done +// depends on the kind of stage and whether stdin/stdout are of type +// `*os.File`. // // If a stage is an external command, it the subprocess ultimately // needs its own copies of `*os.File` file descriptors for its stdin @@ -109,27 +110,27 @@ type Stage interface { Wait() error } -// StagePreferences is the way that a Stage2 indicates its preferences -// about how it is run. This is used within `pipe.Pipeline` to decide -// when to use `os.Pipe()` vs. `io.Pipe()` for creating the pipes -// between stages. +// StagePreferences is the way that a `StageWithIO` indicates its +// preferences about how it is run. This is used within +// `pipe.Pipeline` to decide when to use `os.Pipe()` vs. `io.Pipe()` +// for creating the pipes between stages. type StagePreferences struct { StdinPreference IOPreference StdoutPreference IOPreference } -// Stage2 is a `Stage` that can accept both stdin and stdout arguments +// StageWithIO is a `Stage` that can accept both stdin and stdout arguments // when it is started. -type Stage2 interface { +type StageWithIO interface { Stage // Preferences() returns this stage's preferences regarding how it // should be run. Preferences() StagePreferences - // Start2 starts the stage (like `Stage.Start()`), except that it + // StartWithIO starts the stage (like `Stage.Start()`), except that it // allows the caller to pass in both stdin and stdout. - Start2(ctx context.Context, env Env, stdin io.ReadCloser, stdout io.WriteCloser) error + StartWithIO(ctx context.Context, env Env, stdin io.ReadCloser, stdout io.WriteCloser) error } // IOPreference describes what type of stdin / stdout a stage would From 81487bf7d7a96673d09ea892faa916c4d6625e87 Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Fri, 2 Aug 2024 19:54:34 -0700 Subject: [PATCH 10/11] Pipeline.Start(): initialize both stage preferences For symmetry and explicitness. --- pipe/pipeline.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pipe/pipeline.go b/pipe/pipeline.go index 0bd3d6a..af1219d 100644 --- a/pipe/pipeline.go +++ b/pipe/pipeline.go @@ -274,7 +274,8 @@ func (p *Pipeline) Start(ctx context.Context) error { ss.prefs = s.Preferences() } else { ss.prefs = StagePreferences{ - StdinPreference: IOPreferenceUndefined, + StdinPreference: IOPreferenceUndefined, + StdoutPreference: IOPreferenceUndefined, } } } From 585617b7a39cce792b4ac5a28b1c8c817568f881 Mon Sep 17 00:00:00 2001 From: Michael Haggerty Date: Fri, 2 Aug 2024 19:59:13 -0700 Subject: [PATCH 11/11] Makefile: fix linting rule --- Makefile | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/Makefile b/Makefile index 6cb365d..731839d 100644 --- a/Makefile +++ b/Makefile @@ -16,14 +16,13 @@ vet: go vet ./... BIN := $(CURDIR)/bin -GO := GO $(BIN): - mkdir -p $(BIN) + mkdir -p $(BIN) # Run golang-ci lint on all source files: GOLANGCILINT := $(BIN)/golangci-lint $(BIN)/golangci-lint: - GOBIN=$(BIN) $(GO) install github.com/golangci/golangci-lint/cmd/golangci-lint@latest + GOBIN=$(BIN) go install github.com/golangci/golangci-lint/cmd/golangci-lint@latest .PHONY: fmt lint: | $(GOLANGCILINT)