Skip to content
Open
Prev Previous commit
Next Next commit
Add some benchmarks of moving a bunch of data through a pipeline
Add some benchmarks that move MB-scale data through pipelines
consisting of alternating commands and functions, one in small writes,
and one buffered into larger writes, then processing it one line at a
time. This is not so efficient, because every transition from
`Function` → `Command` requires an extra (hidden) goroutine that
copies the data from an `io.Reader` to a `*os.File`.

We can make this faster!
  • Loading branch information
mhagger committed Dec 17, 2023
commit f595c9e2eb630a4eee3e11fdf9a3309930186d67
91 changes: 91 additions & 0 deletions pipe/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,97 @@ func BenchmarkTenMixedStages(b *testing.B) {
}
}

func BenchmarkMoreDataUnbuffered(b *testing.B) {
ctx := context.Background()

cp := func(_ context.Context, _ pipe.Env, stdin io.Reader, stdout io.Writer) error {
_, err := io.Copy(stdout, stdin)
return err
}

for i := 0; i < b.N; i++ {
count := 0
p := pipe.New()
p.Add(
pipe.Function(
"seq",
func(ctx context.Context, _ pipe.Env, stdin io.Reader, stdout io.Writer) error {
for i := 1; i <= 100000; i++ {
fmt.Fprintln(stdout, i)
}
return nil
},
),
pipe.Command("cat"),
pipe.Function("copy2", cp),
pipe.Command("cat"),
pipe.Function("copy3", cp),
pipe.Command("cat"),
pipe.Function("copy4", cp),
pipe.Command("cat"),
pipe.Function("copy5", cp),
pipe.Command("cat"),
pipe.LinewiseFunction(
"count",
func(ctx context.Context, _ pipe.Env, line []byte, stdout *bufio.Writer) error {
count++
return nil
},
),
)
err := p.Run(ctx)
if assert.NoError(b, err) {
assert.EqualValues(b, 100000, count)
}
}
}

func BenchmarkMoreDataBuffered(b *testing.B) {
ctx := context.Background()

cp := func(_ context.Context, _ pipe.Env, stdin io.Reader, stdout io.Writer) error {
_, err := io.Copy(stdout, stdin)
return err
}

for i := 0; i < b.N; i++ {
count := 0
p := pipe.New()
p.Add(
pipe.Function(
"seq",
func(ctx context.Context, _ pipe.Env, stdin io.Reader, stdout io.Writer) error {
out := bufio.NewWriter(stdout)
for i := 1; i <= 1000000; i++ {
fmt.Fprintln(out, i)
}
return out.Flush()
},
),
pipe.Command("cat"),
pipe.Function("copy2", cp),
pipe.Command("cat"),
pipe.Function("copy3", cp),
pipe.Command("cat"),
pipe.Function("copy4", cp),
pipe.Command("cat"),
pipe.Function("copy5", cp),
pipe.Command("cat"),
pipe.LinewiseFunction(
"count",
func(ctx context.Context, _ pipe.Env, line []byte, stdout *bufio.Writer) error {
count++
return nil
},
),
)
err := p.Run(ctx)
if assert.NoError(b, err) {
assert.EqualValues(b, 1000000, count)
}
}
}

func genErr(err error) pipe.StageFunc {
return func(_ context.Context, _ pipe.Env, _ io.Reader, _ io.Writer) error {
return err
Expand Down