-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathoperate.go
More file actions
131 lines (112 loc) · 3.65 KB
/
operate.go
File metadata and controls
131 lines (112 loc) · 3.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
// Copyright (c) 2022 Target Brands, Inc. All rights reserved.
//
// Use of this source code is governed by the LICENSE file in this repository.
package main
import (
"context"
"time"
"github.com/go-vela/server/queue"
"github.com/go-vela/types/library"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)
// operate is a helper function to initiate all
// subprocesses for the operator to poll the
// queue and execute Vela pipelines.
func (w *Worker) operate(ctx context.Context) error {
var err error
// setup the client
w.VelaClient, err = setupClient(w.Config.Server)
if err != nil {
return err
}
// create the errgroup for managing operator subprocesses
//
// https://pkg.go.dev/golang.org/x/sync/errgroup?tab=doc#Group
executors, gctx := errgroup.WithContext(ctx)
// Define the database representation of the worker
// and register itself in the database
registryWorker := new(library.Worker)
registryWorker.SetHostname(w.Config.API.Address.Hostname())
registryWorker.SetAddress(w.Config.API.Address.String())
registryWorker.SetRoutes(w.Config.Queue.Routes)
registryWorker.SetActive(true)
registryWorker.SetLastCheckedIn(time.Now().UTC().Unix())
registryWorker.SetBuildLimit(int64(w.Config.Build.Limit))
// spawn goroutine for phoning home
executors.Go(func() error {
for {
select {
case <-gctx.Done():
logrus.Info("Completed looping on worker registration")
return nil
default:
// set checking time to now and call the server
registryWorker.SetLastCheckedIn(time.Now().UTC().Unix())
// register or update the worker
//nolint:contextcheck // ignore passing context
err = w.checkIn(registryWorker)
if err != nil {
logrus.Error(err)
}
// if unable to update the worker, log the error but allow the worker to continue running
if err != nil {
logrus.Errorf("unable to update worker %s on the server: %v", registryWorker.GetHostname(), err)
}
// sleep for the configured time
time.Sleep(w.Config.CheckIn)
}
}
})
// setup the queue
//
// https://pkg.go.dev/github.com/go-vela/server/queue?tab=doc#New
//nolint:contextcheck // ignore passing context
w.Queue, err = queue.New(w.Config.Queue)
if err != nil {
return err
}
// iterate till the configured build limit
for i := 0; i < w.Config.Build.Limit; i++ {
// evaluate and capture i at each iteration
//
// https://github.com/golang/go/wiki/CommonMistakes#using-goroutines-on-loop-iterator-variables
id := i
// log a message indicating the start of an operator thread
//
// https://pkg.go.dev/github.com/sirupsen/logrus?tab=doc#Info
logrus.Infof("Thread ID %d listening to queue...", id)
// spawn errgroup routine for operator subprocess
//
// https://pkg.go.dev/golang.org/x/sync/errgroup?tab=doc#Group.Go
executors.Go(func() error {
// create an infinite loop to poll for builds
for {
select {
case <-gctx.Done():
logrus.WithFields(logrus.Fields{
"id": id,
}).Info("Completed looping on worker executor")
return nil
default:
// exec operator subprocess to poll and execute builds
// (do not pass the context to avoid errors in one
// executor+build inadvertently canceling other builds)
//nolint:contextcheck // ignore passing context
err = w.exec(id)
if err != nil {
// log the error received from the executor
//
// https://pkg.go.dev/github.com/sirupsen/logrus?tab=doc#Errorf
logrus.Errorf("failing worker executor: %v", err)
return err
}
}
}
})
}
// wait for errors from operator subprocesses
//
// https://pkg.go.dev/golang.org/x/sync/errgroup?tab=doc#Group.Wait
return executors.Wait()
}