diff --git a/cmd/vela-worker/exec.go b/cmd/vela-worker/exec.go index a8a2b80d..06a60ae8 100644 --- a/cmd/vela-worker/exec.go +++ b/cmd/vela-worker/exec.go @@ -6,6 +6,7 @@ package main import ( "context" + "sync" "time" "github.com/go-vela/worker/executor" @@ -91,6 +92,27 @@ func (w *Worker) exec(index int) error { // add the executor to the worker w.Executors[index] = _executor + // This WaitGroup delays calling DestroyBuild until the StreamBuild goroutine finishes. + var wg sync.WaitGroup + + // this gets deferred first so that DestroyBuild runs AFTER the + // new contexts (buildCtx and timeoutCtx) have been canceled + defer func() { + // if exec() exits before starting StreamBuild, this returns immediately. + wg.Wait() + + logger.Info("destroying build") + + // destroy the build with the executor (pass a background + // context to guarantee all build resources are destroyed). + err = _executor.DestroyBuild(context.Background()) + if err != nil { + logger.Errorf("unable to destroy build: %v", err) + } + + logger.Info("completed build") + }() + // capture the configured build timeout t := w.Config.Build.Timeout // check if the repository has a custom timeout @@ -109,19 +131,6 @@ func (w *Worker) exec(index int) error { timeoutCtx, timeout := context.WithTimeout(buildCtx, t) defer timeout() - defer func() { - logger.Info("destroying build") - - // destroy the build with the executor (pass a background - // context to guarantee all build resources are destroyed). - err = _executor.DestroyBuild(context.Background()) - if err != nil { - logger.Errorf("unable to destroy build: %v", err) - } - - logger.Info("completed build") - }() - logger.Info("creating build") // create the build with the executor err = _executor.CreateBuild(timeoutCtx) @@ -138,8 +147,12 @@ func (w *Worker) exec(index int) error { return nil } + // add StreamBuild goroutine to WaitGroup + wg.Add(1) + // log/event streaming uses buildCtx so that it is not subject to the timeout. go func() { + defer wg.Done() logger.Info("streaming build logs") // execute the build with the executor err = _executor.StreamBuild(buildCtx) diff --git a/executor/linux/build.go b/executor/linux/build.go index e4193e05..6f77539b 100644 --- a/executor/linux/build.go +++ b/executor/linux/build.go @@ -471,10 +471,17 @@ func (c *client) AssembleBuild(ctx context.Context) error { // ExecBuild runs a pipeline for a build. func (c *client) ExecBuild(ctx context.Context) error { - // defer an upload of the build - // - // https://pkg.go.dev/github.com/go-vela/worker/internal/build#Upload - defer func() { build.Upload(c.build, c.Vela, c.err, c.Logger, c.repo) }() + defer func() { + // Exec* calls are responsible for sending StreamRequest messages. + // close the channel at the end of ExecBuild to signal that + // nothing else will send more StreamRequest messages. + close(c.streamRequests) + + // defer an upload of the build + // + // https://pkg.go.dev/github.com/go-vela/worker/internal/build#Upload + build.Upload(c.build, c.Vela, c.err, c.Logger, c.repo) + }() // execute the services for the pipeline for _, _service := range c.pipeline.Services { @@ -599,6 +606,10 @@ func (c *client) StreamBuild(ctx context.Context) error { } cancelStreaming() + // wait for context to be done before reporting that everything has returned. + <-delayedCtx.Done() + // there might be one more log message from WithDelayedCancelPropagation + // but there's not a good way to wait for that goroutine to finish. c.Logger.Info("all stream functions have returned") }() @@ -612,7 +623,13 @@ func (c *client) StreamBuild(ctx context.Context) error { for { select { - case req := <-c.streamRequests: + case req, ok := <-c.streamRequests: + if !ok { + // ExecBuild is done requesting streams + c.Logger.Debug("not accepting any more stream requests as channel is closed") + return nil + } + streams.Go(func() error { // update engine logger with step metadata // @@ -629,7 +646,7 @@ func (c *client) StreamBuild(ctx context.Context) error { return nil }) case <-delayedCtx.Done(): - c.Logger.Debug("streaming context canceled") + c.Logger.Debug("not accepting any more stream requests as streaming context is canceled") // build done or canceled return nil } diff --git a/executor/linux/build_test.go b/executor/linux/build_test.go index ade7766d..e733f8ec 100644 --- a/executor/linux/build_test.go +++ b/executor/linux/build_test.go @@ -1146,9 +1146,6 @@ func TestLinux_ExecBuild(t *testing.T) { t.Errorf("unable to create docker runtime engine: %v", err) } - streamRequests, done := message.MockStreamRequestsWithCancel(context.Background()) - defer done() - tests := []struct { name string failure bool @@ -1200,6 +1197,9 @@ func TestLinux_ExecBuild(t *testing.T) { t.Errorf("unable to compile %s pipeline %s: %v", test.name, test.pipeline, err) } + streamRequests, done := message.MockStreamRequestsWithCancel(context.Background()) + defer done() + _engine, err := New( WithBuild(_build), WithPipeline(_pipeline), @@ -1301,13 +1301,16 @@ func TestLinux_StreamBuild(t *testing.T) { } tests := []struct { - name string - failure bool - pipeline string - messageKey string - ctn *pipeline.Container - streamFunc func(*client) message.StreamFunc - planFunc func(*client) planFuncType + name string + failure bool + earlyExecExit bool + earlyBuildDone bool + pipeline string + msgCount int + messageKey string + ctn *pipeline.Container + streamFunc func(*client) message.StreamFunc + planFunc func(*client) planFuncType }{ { name: "docker-basic services pipeline", @@ -1442,6 +1445,72 @@ func TestLinux_StreamBuild(t *testing.T) { Pull: "not_present", }, }, + { + name: "docker-early exit from ExecBuild", + failure: false, + earlyExecExit: true, + pipeline: "testdata/build/steps/basic.yml", + messageKey: "step", + streamFunc: func(c *client) message.StreamFunc { + return c.StreamStep + }, + planFunc: func(c *client) planFuncType { + return c.PlanStep + }, + ctn: &pipeline.Container{ + ID: "step_github_octocat_1_test", + Directory: "/vela/src/github.com/github/octocat", + Environment: map[string]string{"FOO": "bar"}, + Image: "alpine:latest", + Name: "test", + Number: 1, + Pull: "not_present", + }, + }, + { + name: "docker-build complete before ExecBuild called", + failure: false, + earlyBuildDone: true, + pipeline: "testdata/build/steps/basic.yml", + messageKey: "step", + streamFunc: func(c *client) message.StreamFunc { + return c.StreamStep + }, + planFunc: func(c *client) planFuncType { + return c.PlanStep + }, + ctn: &pipeline.Container{ + ID: "step_github_octocat_1_test", + Directory: "/vela/src/github.com/github/octocat", + Environment: map[string]string{"FOO": "bar"}, + Image: "alpine:latest", + Name: "test", + Number: 1, + Pull: "not_present", + }, + }, + { + name: "docker-early exit from ExecBuild and build complete signaled", + failure: false, + earlyExecExit: true, + pipeline: "testdata/build/steps/basic.yml", + messageKey: "step", + streamFunc: func(c *client) message.StreamFunc { + return c.StreamStep + }, + planFunc: func(c *client) planFuncType { + return c.PlanStep + }, + ctn: &pipeline.Container{ + ID: "step_github_octocat_1_test", + Directory: "/vela/src/github.com/github/octocat", + Environment: map[string]string{"FOO": "bar"}, + Image: "alpine:latest", + Name: "test", + Number: 1, + Pull: "not_present", + }, + }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { @@ -1483,21 +1552,38 @@ func TestLinux_StreamBuild(t *testing.T) { // simulate ExecBuild() which runs concurrently with StreamBuild() go func() { - // ExecBuild calls PlanService()/PlanStep() before ExecService()/ExecStep() - // (ExecStage() calls PlanStep() before ExecStep()). - _engine.err = test.planFunc(_engine)(buildCtx, test.ctn) - - // ExecService()/ExecStep()/secret.exec() send this message - streamRequests <- message.StreamRequest{ - Key: test.messageKey, - Stream: test.streamFunc(_engine), - Container: test.ctn, + if test.earlyBuildDone { + // imitate build getting canceled or otherwise finishing before ExecBuild gets called. + done() + } + if test.earlyExecExit { + // imitate a failure after ExecBuild starts and before it sends a StreamRequest. + close(streamRequests) + } + if test.earlyBuildDone || test.earlyExecExit { + return } - // simulate exec build duration - time.Sleep(100 * time.Microsecond) + // simulate two messages of the same type + for i := 0; i < 2; i++ { + // ExecBuild calls PlanService()/PlanStep() before ExecService()/ExecStep() + // (ExecStage() calls PlanStep() before ExecStep()). + _engine.err = test.planFunc(_engine)(buildCtx, test.ctn) + + // ExecService()/ExecStep()/secret.exec() send this message + streamRequests <- message.StreamRequest{ + Key: test.messageKey, + Stream: test.streamFunc(_engine), + // in a real pipeline, the second message would be for a different container + Container: test.ctn, + } + + // simulate exec build duration + time.Sleep(100 * time.Microsecond) + } - // signal the end of the build so StreamBuild can terminate + // signal the end of ExecBuild so StreamBuild can finish up + close(streamRequests) done() }() diff --git a/executor/linux/linux.go b/executor/linux/linux.go index 4a78d2fb..c3df35c8 100644 --- a/executor/linux/linux.go +++ b/executor/linux/linux.go @@ -107,6 +107,7 @@ func New(opts ...Opt) (*client, error) { c.Logger = logrus.NewEntry(logger) // instantiate streamRequests channel (which may be overridden using withStreamRequests()). + // messages get sent during ExecBuild, then ExecBuild closes this on exit. c.streamRequests = make(chan message.StreamRequest) // apply all provided configuration options