Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
enhancement(executor): log streaming is not subject to build timeout
This makes the log streaming use the parent buildCtx.
Only Executing steps counts towards the build timeout.

To do this, introduce a new StreamBuild function that manages
running StreamStep or StreamService for each of the containers.
  • Loading branch information
cognifloyd committed Apr 22, 2022
commit b3c9f89996280e693def9a95e0763b96e9209c41
19 changes: 16 additions & 3 deletions cmd/vela-worker/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,12 @@ func (w *Worker) exec(index int) error {
}

// create a background context
ctx := context.Background()
buildCtx, done := context.WithCancel(context.Background())
defer done()

// add to the background context with a timeout
// built in for ensuring a build doesn't run forever
ctx, timeout := context.WithTimeout(ctx, t)
ctx, timeout := context.WithTimeout(buildCtx, t)
defer timeout()

defer func() {
Expand Down Expand Up @@ -136,9 +137,21 @@ func (w *Worker) exec(index int) error {
return nil
}

streamRequests := make(chan executor.StreamRequest)

// log streaming uses buildCtx so that it is not subject to the timeout.
go func() {
logger.Info("streaming build logs")
// execute the build with the executor
err = _executor.StreamBuild(buildCtx, streamRequests)
if err != nil {
logger.Errorf("unable to stream build logs: %v", err)
}
}()

logger.Info("executing build")
// execute the build with the executor
err = _executor.ExecBuild(ctx)
err = _executor.ExecBuild(ctx, streamRequests)
if err != nil {
logger.Errorf("unable to execute build: %v", err)
return nil
Expand Down
25 changes: 21 additions & 4 deletions executor/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ type Engine interface {
AssembleBuild(context.Context) error
// ExecBuild defines a function that
// runs a pipeline for a build.
ExecBuild(context.Context) error
ExecBuild(context.Context, chan<- StreamRequest) error
// StreamBuild defines a function that receives a StreamRequest
// and then runs StreamService or StreamStep in a goroutine.
StreamBuild(context.Context, <-chan StreamRequest) error
// DestroyBuild defines a function that
// cleans up the build after execution.
DestroyBuild(context.Context) error
Expand All @@ -67,7 +70,7 @@ type Engine interface {
PlanService(context.Context, *pipeline.Container) error
// ExecService defines a function that
// runs a service.
ExecService(context.Context, *pipeline.Container) error
ExecService(context.Context, *pipeline.Container, chan<- StreamRequest) error
// StreamService defines a function that
// tails the output for a service.
StreamService(context.Context, *pipeline.Container) error
Expand All @@ -85,7 +88,7 @@ type Engine interface {
PlanStage(context.Context, *pipeline.Stage, *sync.Map) error
// ExecStage defines a function that
// runs a stage.
ExecStage(context.Context, *pipeline.Stage, *sync.Map) error
ExecStage(context.Context, *pipeline.Stage, *sync.Map, chan<- StreamRequest) error
// DestroyStage defines a function that
// cleans up the stage after execution.
DestroyStage(context.Context, *pipeline.Stage) error
Expand All @@ -100,11 +103,25 @@ type Engine interface {
PlanStep(context.Context, *pipeline.Container) error
// ExecStep defines a function that
// runs a step.
ExecStep(context.Context, *pipeline.Container) error
ExecStep(context.Context, *pipeline.Container, chan<- StreamRequest) error
// StreamStep defines a function that
// tails the output for a step.
StreamStep(context.Context, *pipeline.Container) error
// DestroyStep defines a function that
// cleans up the step after execution.
DestroyStep(context.Context, *pipeline.Container) error
}

// StreamFunc is either Engine.StreamService or Engine.StreamStep.
type StreamFunc = func(context.Context, *pipeline.Container) error

// StreamRequest is the message used to begin streaming for a container
// (requests goes from Engine.ExecService / Engine.ExecStep to Engine.StreamBuild).
type StreamRequest struct {
// Key is either "service" or "step".
Key string
// Stream is either Engine.StreamService or Engine.StreamStep.
Stream StreamFunc
// Container is the container for the service or step to stream logs for.
Container *pipeline.Container
}
53 changes: 49 additions & 4 deletions executor/linux/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"golang.org/x/sync/errgroup"

"github.com/go-vela/types/constants"
"github.com/go-vela/worker/executor"
"github.com/go-vela/worker/internal/build"
"github.com/go-vela/worker/internal/step"
)
Expand Down Expand Up @@ -384,7 +385,7 @@ func (c *client) AssembleBuild(ctx context.Context) error {
}

// ExecBuild runs a pipeline for a build.
func (c *client) ExecBuild(ctx context.Context) error {
func (c *client) ExecBuild(ctx context.Context, streamRequests chan<- executor.StreamRequest) error {
// defer an upload of the build
//
// https://pkg.go.dev/github.com/go-vela/worker/internal/build#Upload
Expand All @@ -401,7 +402,7 @@ func (c *client) ExecBuild(ctx context.Context) error {

c.Logger.Infof("executing %s service", _service.Name)
// execute the service
c.err = c.ExecService(ctx, _service)
c.err = c.ExecService(ctx, _service, streamRequests)
if c.err != nil {
return fmt.Errorf("unable to execute service: %w", c.err)
}
Expand Down Expand Up @@ -430,7 +431,7 @@ func (c *client) ExecBuild(ctx context.Context) error {

c.Logger.Infof("executing %s step", _step.Name)
// execute the step
c.err = c.ExecStep(ctx, _step)
c.err = c.ExecStep(ctx, _step, streamRequests)
if c.err != nil {
return fmt.Errorf("unable to execute step: %w", c.err)
}
Expand Down Expand Up @@ -470,7 +471,7 @@ func (c *client) ExecBuild(ctx context.Context) error {

c.Logger.Infof("executing %s stage", stage.Name)
// execute the stage
c.err = c.ExecStage(stageCtx, stage, stageMap)
c.err = c.ExecStage(stageCtx, stage, stageMap, streamRequests)
if c.err != nil {
return fmt.Errorf("unable to execute stage: %w", c.err)
}
Expand All @@ -491,6 +492,50 @@ func (c *client) ExecBuild(ctx context.Context) error {
return c.err
}

// StreamBuild receives a StreamRequest and then
// runs StreamService or StreamStep in a goroutine.
func (c *client) StreamBuild(ctx context.Context, streamRequests <-chan executor.StreamRequest) error {
// create an error group with the parent context
//
// https://pkg.go.dev/golang.org/x/sync/errgroup?tab=doc#WithContext
logs, logCtx := errgroup.WithContext(ctx)

defer func() {
c.Logger.Trace("waiting for stream functions to return")

err := logs.Wait()
if err != nil {
c.Logger.Errorf("error in a stream request, %v", err)
}

c.Logger.Trace("all stream functions have returned")
}()

for {
select {
case req := <-streamRequests:
logs.Go(func() error {
// update engine logger with step metadata
//
// https://pkg.go.dev/github.com/sirupsen/logrus?tab=doc#Entry.WithField
logger := c.Logger.WithField(req.Key, req.Container.Name)

logger.Debugf("streaming logs for %s container %s", req.Key, req.Container.ID)
// stream logs from container
err := req.Stream(logCtx, req.Container)
if err != nil {
logger.Error(err)
}

return nil
})
case <-ctx.Done():
// build done or cancelled
return nil
}
}
}

// DestroyBuild cleans up the build after execution.
func (c *client) DestroyBuild(ctx context.Context) error {
var err error
Expand Down
25 changes: 8 additions & 17 deletions executor/linux/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
"github.com/go-vela/types/constants"
"github.com/go-vela/types/library"
"github.com/go-vela/types/pipeline"
"github.com/go-vela/worker/executor"
"github.com/go-vela/worker/internal/service"
"golang.org/x/sync/errgroup"
)

// CreateService configures the service for execution.
Expand Down Expand Up @@ -117,7 +117,7 @@ func (c *client) PlanService(ctx context.Context, ctn *pipeline.Container) error
}

// ExecService runs a service.
func (c *client) ExecService(ctx context.Context, ctn *pipeline.Container) error {
func (c *client) ExecService(ctx context.Context, ctn *pipeline.Container, streamRequests chan<- executor.StreamRequest) error {
// update engine logger with service metadata
//
// https://pkg.go.dev/github.com/sirupsen/logrus?tab=doc#Entry.WithField
Expand All @@ -143,21 +143,12 @@ func (c *client) ExecService(ctx context.Context, ctn *pipeline.Container) error
return err
}

// create an error group with the parent context
//
// https://pkg.go.dev/golang.org/x/sync/errgroup?tab=doc#WithContext
logs, logCtx := errgroup.WithContext(ctx)

logs.Go(func() error {
logger.Debug("streaming logs for container")
// stream logs from container
err := c.StreamService(logCtx, ctn)
if err != nil {
logger.Error(err)
}

return nil
})
// trigger StreamService goroutine with logging context
streamRequests <- executor.StreamRequest{
Key: "service",
Stream: c.StreamService,
Container: ctn,
}

return nil
}
Expand Down
5 changes: 3 additions & 2 deletions executor/linux/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync"

"github.com/go-vela/types/pipeline"
"github.com/go-vela/worker/executor"
"github.com/go-vela/worker/internal/step"
)

Expand Down Expand Up @@ -97,7 +98,7 @@ func (c *client) PlanStage(ctx context.Context, s *pipeline.Stage, m *sync.Map)
}

// ExecStage runs a stage.
func (c *client) ExecStage(ctx context.Context, s *pipeline.Stage, m *sync.Map) error {
func (c *client) ExecStage(ctx context.Context, s *pipeline.Stage, m *sync.Map, streamRequests chan<- executor.StreamRequest) error {
// update engine logger with stage metadata
//
// https://pkg.go.dev/github.com/sirupsen/logrus?tab=doc#Entry.WithField
Expand Down Expand Up @@ -136,7 +137,7 @@ func (c *client) ExecStage(ctx context.Context, s *pipeline.Stage, m *sync.Map)

logger.Infof("executing %s step", _step.Name)
// execute the step
err = c.ExecStep(ctx, _step)
err = c.ExecStep(ctx, _step, streamRequests)
if err != nil {
return fmt.Errorf("unable to exec step %s: %w", _step.Name, err)
}
Expand Down
25 changes: 8 additions & 17 deletions executor/linux/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (
"github.com/go-vela/types/constants"
"github.com/go-vela/types/library"
"github.com/go-vela/types/pipeline"
"github.com/go-vela/worker/executor"
"github.com/go-vela/worker/internal/step"
"golang.org/x/sync/errgroup"
)

// CreateStep configures the step for execution.
Expand Down Expand Up @@ -124,7 +124,7 @@ func (c *client) PlanStep(ctx context.Context, ctn *pipeline.Container) error {
}

// ExecStep runs a step.
func (c *client) ExecStep(ctx context.Context, ctn *pipeline.Container) error {
func (c *client) ExecStep(ctx context.Context, ctn *pipeline.Container, streamRequests chan<- executor.StreamRequest) error {
// TODO: remove hardcoded reference
if ctn.Name == "init" {
return nil
Expand Down Expand Up @@ -155,21 +155,12 @@ func (c *client) ExecStep(ctx context.Context, ctn *pipeline.Container) error {
return err
}

// create an error group with the parent context
//
// https://pkg.go.dev/golang.org/x/sync/errgroup?tab=doc#WithContext
logs, logCtx := errgroup.WithContext(ctx)

logs.Go(func() error {
logger.Debug("streaming logs for container")
// stream logs from container
err := c.StreamStep(logCtx, ctn)
if err != nil {
logger.Error(err)
}

return nil
})
// trigger StreamStep goroutine with logging context
streamRequests <- executor.StreamRequest{
Key: "step",
Stream: c.StreamStep,
Container: ctn,
}

// do not wait for detached containers
if ctn.Detach {
Expand Down
48 changes: 44 additions & 4 deletions executor/local/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"golang.org/x/sync/errgroup"

"github.com/go-vela/types/constants"
"github.com/go-vela/worker/executor"
"github.com/go-vela/worker/internal/build"
"github.com/go-vela/worker/internal/step"
)
Expand Down Expand Up @@ -245,7 +246,7 @@ func (c *client) AssembleBuild(ctx context.Context) error {
}

// ExecBuild runs a pipeline for a build.
func (c *client) ExecBuild(ctx context.Context) error {
func (c *client) ExecBuild(ctx context.Context, streamRequests chan<- executor.StreamRequest) error {
// defer an upload of the build
//
// https://pkg.go.dev/github.com/go-vela/worker/internal/build#Upload
Expand All @@ -260,7 +261,7 @@ func (c *client) ExecBuild(ctx context.Context) error {
}

// execute the service
c.err = c.ExecService(ctx, _service)
c.err = c.ExecService(ctx, _service, streamRequests)
if c.err != nil {
return fmt.Errorf("unable to execute service: %w", c.err)
}
Expand All @@ -287,7 +288,7 @@ func (c *client) ExecBuild(ctx context.Context) error {
}

// execute the step
c.err = c.ExecStep(ctx, _step)
c.err = c.ExecStep(ctx, _step, streamRequests)
if c.err != nil {
return fmt.Errorf("unable to execute step: %w", c.err)
}
Expand Down Expand Up @@ -324,7 +325,7 @@ func (c *client) ExecBuild(ctx context.Context) error {
}

// execute the stage
c.err = c.ExecStage(stageCtx, stage, stageMap)
c.err = c.ExecStage(stageCtx, stage, stageMap, streamRequests)
if c.err != nil {
return fmt.Errorf("unable to execute stage: %w", c.err)
}
Expand All @@ -344,6 +345,45 @@ func (c *client) ExecBuild(ctx context.Context) error {
return c.err
}

// StreamBuild receives a StreamRequest and then
// runs StreamService or StreamStep in a goroutine.
func (c *client) StreamBuild(ctx context.Context, streamRequests <-chan executor.StreamRequest) error {
// create an error group with the parent context
//
// https://pkg.go.dev/golang.org/x/sync/errgroup?tab=doc#WithContext
logs, logCtx := errgroup.WithContext(ctx)

defer func() {
fmt.Fprintln(os.Stdout, "waiting for stream functions to return")

err := logs.Wait()
if err != nil {
fmt.Fprintln(os.Stdout, "error in a stream request:", err)
}

fmt.Fprintln(os.Stdout, "all stream functions have returned")
}()

for {
select {
case req := <-streamRequests:
logs.Go(func() error {
fmt.Fprintf(os.Stdout, "streaming logs for %s container %s", req.Key, req.Container.ID)
// stream logs from container
err := req.Stream(logCtx, req.Container)
if err != nil {
fmt.Fprintln(os.Stdout, "error streaming logs:", err)
}

return nil
})
case <-ctx.Done():
// build done or cancelled
return nil
}
}
}

// DestroyBuild cleans up the build after execution.
func (c *client) DestroyBuild(ctx context.Context) error {
var err error
Expand Down
Loading