Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
enhance: add internal/message pkg for StreamRequest to fix import cycle
  • Loading branch information
cognifloyd committed Apr 22, 2022
commit 47cda897dd8c8c3687b197dd9e383869972b217a
3 changes: 2 additions & 1 deletion cmd/vela-worker/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/go-vela/worker/executor"
"github.com/go-vela/worker/internal/message"
"github.com/go-vela/worker/runtime"
"github.com/go-vela/worker/version"

Expand Down Expand Up @@ -129,7 +130,7 @@ func (w *Worker) exec(index int) error {
return nil
}

streamRequests := make(chan executor.StreamRequest)
streamRequests := make(chan message.StreamRequest)

// log streaming uses buildCtx so that it is not subject to the timeout.
go func() {
Expand Down
46 changes: 7 additions & 39 deletions executor/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/go-vela/types/library"
"github.com/go-vela/types/pipeline"
"github.com/go-vela/worker/internal/message"
)

// Engine represents the interface for Vela integrating
Expand Down Expand Up @@ -49,13 +50,13 @@ type Engine interface {
// AssembleBuild defines a function that
// prepares the containers within a build
// for execution.
AssembleBuild(context.Context, chan<- StreamRequest) error
AssembleBuild(context.Context, chan<- message.StreamRequest) error
// ExecBuild defines a function that
// runs a pipeline for a build.
ExecBuild(context.Context, chan<- StreamRequest) error
ExecBuild(context.Context, chan<- message.StreamRequest) error
// StreamBuild defines a function that receives a StreamRequest
// and then runs StreamService or StreamStep in a goroutine.
StreamBuild(context.Context, <-chan StreamRequest) error
StreamBuild(context.Context, <-chan message.StreamRequest) error
// DestroyBuild defines a function that
// cleans up the build after execution.
DestroyBuild(context.Context) error
Expand All @@ -70,7 +71,7 @@ type Engine interface {
PlanService(context.Context, *pipeline.Container) error
// ExecService defines a function that
// runs a service.
ExecService(context.Context, *pipeline.Container, chan<- StreamRequest) error
ExecService(context.Context, *pipeline.Container, chan<- message.StreamRequest) error
// StreamService defines a function that
// tails the output for a service.
StreamService(context.Context, *pipeline.Container) error
Expand All @@ -88,7 +89,7 @@ type Engine interface {
PlanStage(context.Context, *pipeline.Stage, *sync.Map) error
// ExecStage defines a function that
// runs a stage.
ExecStage(context.Context, *pipeline.Stage, *sync.Map, chan<- StreamRequest) error
ExecStage(context.Context, *pipeline.Stage, *sync.Map, chan<- message.StreamRequest) error
// DestroyStage defines a function that
// cleans up the stage after execution.
DestroyStage(context.Context, *pipeline.Stage) error
Expand All @@ -103,44 +104,11 @@ type Engine interface {
PlanStep(context.Context, *pipeline.Container) error
// ExecStep defines a function that
// runs a step.
ExecStep(context.Context, *pipeline.Container, chan<- StreamRequest) error
ExecStep(context.Context, *pipeline.Container, chan<- message.StreamRequest) error
// StreamStep defines a function that
// tails the output for a step.
StreamStep(context.Context, *pipeline.Container) error
// DestroyStep defines a function that
// cleans up the step after execution.
DestroyStep(context.Context, *pipeline.Container) error
}

// StreamFunc is either Engine.StreamService or Engine.StreamStep.
type StreamFunc = func(context.Context, *pipeline.Container) error

// StreamRequest is the message used to begin streaming for a container
// (requests goes from Engine.ExecService / Engine.ExecStep to Engine.StreamBuild).
type StreamRequest struct {
// Key is either "service" or "step".
Key string
// Stream is either Engine.StreamService or Engine.StreamStep.
Stream StreamFunc
// Container is the container for the service or step to stream logs for.
Container *pipeline.Container
}

// MockStreamRequestsWithCancel discards all requests until you call the cancel function.
func MockStreamRequestsWithCancel(ctx context.Context) (chan<- StreamRequest, context.CancelFunc) {
cancelCtx, done := context.WithCancel(ctx)
streamRequests := make(chan StreamRequest)

// discard all stream requests
go func() {
for {
select {
case <-streamRequests:
case <-cancelCtx.Done():
return
}
}
}()

return streamRequests, done
}
10 changes: 5 additions & 5 deletions executor/linux/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
"golang.org/x/sync/errgroup"

"github.com/go-vela/types/constants"
"github.com/go-vela/worker/executor"
"github.com/go-vela/worker/internal/build"
"github.com/go-vela/worker/internal/message"
"github.com/go-vela/worker/internal/step"
)

Expand Down Expand Up @@ -191,7 +191,7 @@ func (c *client) PlanBuild(ctx context.Context) error {
// AssembleBuild prepares the containers within a build for execution.
//
// nolint: funlen // ignore function length due to comments and logging messages
func (c *client) AssembleBuild(ctx context.Context, streamRequests chan<- executor.StreamRequest) error {
func (c *client) AssembleBuild(ctx context.Context, streamRequests chan<- message.StreamRequest) error {
// defer taking a snapshot of the build
//
// https://pkg.go.dev/github.com/go-vela/worker/internal/build#Snapshot
Expand Down Expand Up @@ -385,7 +385,7 @@ func (c *client) AssembleBuild(ctx context.Context, streamRequests chan<- execut
}

// ExecBuild runs a pipeline for a build.
func (c *client) ExecBuild(ctx context.Context, streamRequests chan<- executor.StreamRequest) error {
func (c *client) ExecBuild(ctx context.Context, streamRequests chan<- message.StreamRequest) error {
// defer an upload of the build
//
// https://pkg.go.dev/github.com/go-vela/worker/internal/build#Upload
Expand Down Expand Up @@ -494,7 +494,7 @@ func (c *client) ExecBuild(ctx context.Context, streamRequests chan<- executor.S

// StreamBuild receives a StreamRequest and then
// runs StreamService or StreamStep in a goroutine.
func (c *client) StreamBuild(ctx context.Context, streamRequests <-chan executor.StreamRequest) error {
func (c *client) StreamBuild(ctx context.Context, streamRequests <-chan message.StreamRequest) error {
// create an error group with the parent context
//
// https://pkg.go.dev/golang.org/x/sync/errgroup?tab=doc#WithContext
Expand Down Expand Up @@ -530,7 +530,7 @@ func (c *client) StreamBuild(ctx context.Context, streamRequests <-chan executor
return nil
})
case <-ctx.Done():
// build done or cancelled
// build done or canceled
return nil
}
}
Expand Down
6 changes: 3 additions & 3 deletions executor/linux/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/go-vela/server/mock/server"
"github.com/urfave/cli/v2"

"github.com/go-vela/worker/executor"
"github.com/go-vela/worker/internal/message"
"github.com/go-vela/worker/runtime/docker"

"github.com/go-vela/sdk-go/vela"
Expand Down Expand Up @@ -248,7 +248,7 @@ func TestLinux_AssembleBuild(t *testing.T) {
t.Errorf("unable to create runtime engine: %v", err)
}

streamRequests, done := executor.MockStreamRequestsWithCancel(context.Background())
streamRequests, done := message.MockStreamRequestsWithCancel(context.Background())
defer done()

tests := []struct {
Expand Down Expand Up @@ -389,7 +389,7 @@ func TestLinux_ExecBuild(t *testing.T) {
t.Errorf("unable to create runtime engine: %v", err)
}

streamRequests, done := executor.MockStreamRequestsWithCancel(context.Background())
streamRequests, done := message.MockStreamRequestsWithCancel(context.Background())
defer done()

tests := []struct {
Expand Down
6 changes: 3 additions & 3 deletions executor/linux/secret.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/go-vela/types/constants"
"github.com/go-vela/types/library"
"github.com/go-vela/types/pipeline"
"github.com/go-vela/worker/executor"
"github.com/go-vela/worker/internal/message"
"github.com/go-vela/worker/internal/step"

"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -97,7 +97,7 @@ func (s *secretSvc) destroy(ctx context.Context, ctn *pipeline.Container) error
}

// exec runs a secret plugins for a pipeline.
func (s *secretSvc) exec(ctx context.Context, p *pipeline.SecretSlice, streamRequests chan<- executor.StreamRequest) error {
func (s *secretSvc) exec(ctx context.Context, p *pipeline.SecretSlice, streamRequests chan<- message.StreamRequest) error {
// stream all the logs to the init step
_init, err := step.Load(s.client.init, &s.client.steps)
if err != nil {
Expand Down Expand Up @@ -137,7 +137,7 @@ func (s *secretSvc) exec(ctx context.Context, p *pipeline.SecretSlice, streamReq
}

// trigger StreamStep goroutine with logging context
streamRequests <- executor.StreamRequest{
streamRequests <- message.StreamRequest{
Key: "secret",
Stream: s.stream,
Container: _secret.Origin,
Expand Down
4 changes: 2 additions & 2 deletions executor/linux/secret_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/go-vela/server/compiler/native"
"github.com/go-vela/server/mock/server"

"github.com/go-vela/worker/executor"
"github.com/go-vela/worker/internal/message"
"github.com/go-vela/worker/runtime/docker"

"github.com/go-vela/sdk-go/vela"
Expand Down Expand Up @@ -266,7 +266,7 @@ func TestLinux_Secret_exec(t *testing.T) {
t.Errorf("unable to create runtime engine: %v", err)
}

streamRequests, done := executor.MockStreamRequestsWithCancel(context.Background())
streamRequests, done := message.MockStreamRequestsWithCancel(context.Background())
defer done()

// setup tests
Expand Down
6 changes: 3 additions & 3 deletions executor/linux/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/go-vela/types/constants"
"github.com/go-vela/types/library"
"github.com/go-vela/types/pipeline"
"github.com/go-vela/worker/executor"
"github.com/go-vela/worker/internal/message"
"github.com/go-vela/worker/internal/service"
)

Expand Down Expand Up @@ -117,7 +117,7 @@ func (c *client) PlanService(ctx context.Context, ctn *pipeline.Container) error
}

// ExecService runs a service.
func (c *client) ExecService(ctx context.Context, ctn *pipeline.Container, streamRequests chan<- executor.StreamRequest) error {
func (c *client) ExecService(ctx context.Context, ctn *pipeline.Container, streamRequests chan<- message.StreamRequest) error {
// update engine logger with service metadata
//
// https://pkg.go.dev/github.com/sirupsen/logrus?tab=doc#Entry.WithField
Expand All @@ -144,7 +144,7 @@ func (c *client) ExecService(ctx context.Context, ctn *pipeline.Container, strea
}

// trigger StreamService goroutine with logging context
streamRequests <- executor.StreamRequest{
streamRequests <- message.StreamRequest{
Key: "service",
Stream: c.StreamService,
Container: ctn,
Expand Down
4 changes: 2 additions & 2 deletions executor/linux/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

"github.com/go-vela/server/mock/server"

"github.com/go-vela/worker/executor"
"github.com/go-vela/worker/internal/message"
"github.com/go-vela/worker/runtime/docker"

"github.com/go-vela/sdk-go/vela"
Expand Down Expand Up @@ -232,7 +232,7 @@ func TestLinux_ExecService(t *testing.T) {
t.Errorf("unable to create runtime engine: %v", err)
}

streamRequests, done := executor.MockStreamRequestsWithCancel(context.Background())
streamRequests, done := message.MockStreamRequestsWithCancel(context.Background())
defer done()

// setup tests
Expand Down
4 changes: 2 additions & 2 deletions executor/linux/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"sync"

"github.com/go-vela/types/pipeline"
"github.com/go-vela/worker/executor"
"github.com/go-vela/worker/internal/message"
"github.com/go-vela/worker/internal/step"
)

Expand Down Expand Up @@ -98,7 +98,7 @@ func (c *client) PlanStage(ctx context.Context, s *pipeline.Stage, m *sync.Map)
}

// ExecStage runs a stage.
func (c *client) ExecStage(ctx context.Context, s *pipeline.Stage, m *sync.Map, streamRequests chan<- executor.StreamRequest) error {
func (c *client) ExecStage(ctx context.Context, s *pipeline.Stage, m *sync.Map, streamRequests chan<- message.StreamRequest) error {
// update engine logger with stage metadata
//
// https://pkg.go.dev/github.com/sirupsen/logrus?tab=doc#Entry.WithField
Expand Down
4 changes: 2 additions & 2 deletions executor/linux/stage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/go-vela/server/compiler/native"
"github.com/go-vela/server/mock/server"

"github.com/go-vela/worker/executor"
"github.com/go-vela/worker/internal/message"
"github.com/go-vela/worker/runtime/docker"

"github.com/go-vela/sdk-go/vela"
Expand Down Expand Up @@ -303,7 +303,7 @@ func TestLinux_ExecStage(t *testing.T) {
t.Errorf("unable to create runtime engine: %v", err)
}

streamRequests, done := executor.MockStreamRequestsWithCancel(context.Background())
streamRequests, done := message.MockStreamRequestsWithCancel(context.Background())
defer done()

// setup tests
Expand Down
6 changes: 3 additions & 3 deletions executor/linux/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/go-vela/types/constants"
"github.com/go-vela/types/library"
"github.com/go-vela/types/pipeline"
"github.com/go-vela/worker/executor"
"github.com/go-vela/worker/internal/message"
"github.com/go-vela/worker/internal/step"
)

Expand Down Expand Up @@ -124,7 +124,7 @@ func (c *client) PlanStep(ctx context.Context, ctn *pipeline.Container) error {
}

// ExecStep runs a step.
func (c *client) ExecStep(ctx context.Context, ctn *pipeline.Container, streamRequests chan<- executor.StreamRequest) error {
func (c *client) ExecStep(ctx context.Context, ctn *pipeline.Container, streamRequests chan<- message.StreamRequest) error {
// TODO: remove hardcoded reference
if ctn.Name == "init" {
return nil
Expand Down Expand Up @@ -156,7 +156,7 @@ func (c *client) ExecStep(ctx context.Context, ctn *pipeline.Container, streamRe
}

// trigger StreamStep goroutine with logging context
streamRequests <- executor.StreamRequest{
streamRequests <- message.StreamRequest{
Key: "step",
Stream: c.StreamStep,
Container: ctn,
Expand Down
4 changes: 2 additions & 2 deletions executor/linux/step_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

"github.com/go-vela/server/mock/server"

"github.com/go-vela/worker/executor"
"github.com/go-vela/worker/internal/message"
"github.com/go-vela/worker/runtime/docker"

"github.com/go-vela/sdk-go/vela"
Expand Down Expand Up @@ -239,7 +239,7 @@ func TestLinux_ExecStep(t *testing.T) {
t.Errorf("unable to create runtime engine: %v", err)
}

streamRequests, done := executor.MockStreamRequestsWithCancel(context.Background())
streamRequests, done := message.MockStreamRequestsWithCancel(context.Background())
defer done()

// setup tests
Expand Down
10 changes: 5 additions & 5 deletions executor/local/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
"golang.org/x/sync/errgroup"

"github.com/go-vela/types/constants"
"github.com/go-vela/worker/executor"
"github.com/go-vela/worker/internal/build"
"github.com/go-vela/worker/internal/message"
"github.com/go-vela/worker/internal/step"
)

Expand Down Expand Up @@ -134,7 +134,7 @@ func (c *client) PlanBuild(ctx context.Context) error {
}

// AssembleBuild prepares the containers within a build for execution.
func (c *client) AssembleBuild(ctx context.Context, streamRequests chan<- executor.StreamRequest) error {
func (c *client) AssembleBuild(ctx context.Context, streamRequests chan<- message.StreamRequest) error {
// defer taking a snapshot of the build
//
// https://pkg.go.dev/github.com/go-vela/worker/internal/build#Snapshot
Expand Down Expand Up @@ -246,7 +246,7 @@ func (c *client) AssembleBuild(ctx context.Context, streamRequests chan<- execut
}

// ExecBuild runs a pipeline for a build.
func (c *client) ExecBuild(ctx context.Context, streamRequests chan<- executor.StreamRequest) error {
func (c *client) ExecBuild(ctx context.Context, streamRequests chan<- message.StreamRequest) error {
// defer an upload of the build
//
// https://pkg.go.dev/github.com/go-vela/worker/internal/build#Upload
Expand Down Expand Up @@ -347,7 +347,7 @@ func (c *client) ExecBuild(ctx context.Context, streamRequests chan<- executor.S

// StreamBuild receives a StreamRequest and then
// runs StreamService or StreamStep in a goroutine.
func (c *client) StreamBuild(ctx context.Context, streamRequests <-chan executor.StreamRequest) error {
func (c *client) StreamBuild(ctx context.Context, streamRequests <-chan message.StreamRequest) error {
// create an error group with the parent context
//
// https://pkg.go.dev/golang.org/x/sync/errgroup?tab=doc#WithContext
Expand Down Expand Up @@ -378,7 +378,7 @@ func (c *client) StreamBuild(ctx context.Context, streamRequests <-chan executor
return nil
})
case <-ctx.Done():
// build done or cancelled
// build done or canceled
return nil
}
}
Expand Down
Loading