Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions cmd/common/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package common

import (
"os"
"os/signal"
"syscall"

"github.com/golang/glog"
"github.com/openshift/machine-config-operator/internal/clients"
Expand Down Expand Up @@ -50,7 +52,7 @@ func CreateResourceLock(cb *clients.Builder, componentNamespace, componentName s
}

// GetLeaderElectionConfig returns leader election configs defaults based on the cluster topology
func GetLeaderElectionConfig(restcfg *rest.Config) configv1.LeaderElection {
func GetLeaderElectionConfig(ctx context.Context, restcfg *rest.Config) configv1.LeaderElection {

// Defaults follow conventions
// https://github.com/openshift/enhancements/blob/master/CONVENTIONS.md#high-availability
Expand All @@ -59,7 +61,7 @@ func GetLeaderElectionConfig(restcfg *rest.Config) configv1.LeaderElection {
"", "",
)

if infra, err := clusterstatus.GetClusterInfraStatus(context.TODO(), restcfg); err == nil && infra != nil {
if infra, err := clusterstatus.GetClusterInfraStatus(ctx, restcfg); err == nil && infra != nil {
if infra.ControlPlaneTopology == configv1.SingleReplicaTopologyMode {
return leaderelection.LeaderElectionSNOConfig(defaultLeaderElection)
}
Expand All @@ -69,3 +71,23 @@ func GetLeaderElectionConfig(restcfg *rest.Config) configv1.LeaderElection {

return defaultLeaderElection
}

// SignalHandler catches SIGINT/SIGTERM signals and makes sure the passed context gets cancelled when those signals happen. This allows us to use a
// context to shut down our operations cleanly when we are signalled to shutdown.
func SignalHandler(runCancel context.CancelFunc) {

// make a signal handling channel for os signals
ch := make(chan os.Signal, 1)
// stop listening for signals when we leave this function
defer func() { signal.Stop(ch) }()
// catch SIGINT and SIGTERM
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
sig := <-ch
glog.Infof("Shutting down due to: %s", sig)
// if we're shutting down, cancel the context so everything else will stop
runCancel()
glog.Infof("Context cancelled")
sig = <-ch
glog.Fatalf("Received shutdown signal twice, exiting: %s", sig)

}
3 changes: 2 additions & 1 deletion cmd/machine-config-controller/bootstrap.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"flag"

"github.com/golang/glog"
Expand Down Expand Up @@ -43,7 +44,7 @@ func runbootstrapCmd(cmd *cobra.Command, args []string) {
glog.Fatalf("--dest-dir or --manifest-dir not set")
}

if err := bootstrap.New(rootOpts.templates, bootstrapOpts.manifestsDir, bootstrapOpts.pullSecretFile).Run(bootstrapOpts.destinationDir); err != nil {
if err := bootstrap.New(rootOpts.templates, bootstrapOpts.manifestsDir, bootstrapOpts.pullSecretFile).Run(context.TODO(), bootstrapOpts.destinationDir); err != nil {
glog.Fatalf("error running MCC[BOOTSTRAP]: %v", err)
}
}
122 changes: 103 additions & 19 deletions cmd/machine-config-controller/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"flag"
"fmt"
"os"
"time"

"github.com/golang/glog"
"github.com/openshift/machine-config-operator/cmd/common"
Expand All @@ -18,6 +20,7 @@ import (
"github.com/openshift/machine-config-operator/pkg/version"
"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/leaderelection"
)

Expand All @@ -44,29 +47,50 @@ func init() {
startCmd.PersistentFlags().StringVar(&startOpts.promMetricsListenAddress, "metrics-listen-address", "127.0.0.1:8797", "Listen address for prometheus metrics listener")
}

type asyncResult struct {
name string
error error
}

func runStartCmd(cmd *cobra.Command, args []string) {
flag.Set("logtostderr", "true")
flag.Parse()

// This is the context that signals whether the operator should be running and doing work
runContext, runCancel := context.WithCancel(context.Background())
// This is the context that signals whether we should release our leader lease
leaderContext, leaderCancel := context.WithCancel(context.Background())

// To help debugging, immediately log version
glog.Infof("Version: %+v (%s)", version.Raw, version.Hash)

cb, err := clients.NewBuilder(startOpts.kubeconfig)
if err != nil {
ctrlcommon.WriteTerminationError(fmt.Errorf("creating clients: %w", err))
}
run := func(ctx context.Context) {
ctrlctx := ctrlcommon.CreateControllerContext(cb, ctx.Done(), componentName)

resultChannel := make(chan asyncResult, 1)
resultChannelCount := 0

// The context that gets passed into this function by the leaderelection stuff is the leaderContext, so we don't use it.
// In the one case where we would have used it ( where we've lost a leaderelection for some reason) we really just need to insta-die
// rather than cleanly shut down.
run := func(_ context.Context) {

// Set up the signal handler
go common.SignalHandler(runCancel)

ctrlctx := ctrlcommon.CreateControllerContext(cb, runContext.Done(), componentName)

// Start the metrics handler
go ctrlcommon.StartMetricsListener(startOpts.promMetricsListenAddress, ctrlctx.Stop)
resultChannelCount++
go func() {
defer utilruntime.HandleCrash()
err := ctrlcommon.StartMetricsListener(startOpts.promMetricsListenAddress, ctrlctx.Stop)
resultChannel <- asyncResult{name: "metrics handler", error: err}
}()

controllers := createControllers(ctrlctx)
draincontroller := drain.New(
ctrlctx.KubeInformerFactory.Core().V1().Nodes(),
ctrlctx.ClientBuilder.KubeClientOrDie("node-update-controller"),
ctrlctx.ClientBuilder.MachineConfigClientOrDie("node-update-controller"),
)

// Start the shared factory informers that you need to use in your controller
ctrlctx.InformerFactory.Start(ctrlctx.Stop)
Expand All @@ -75,27 +99,81 @@ func runStartCmd(cmd *cobra.Command, args []string) {
ctrlctx.ConfigInformerFactory.Start(ctrlctx.Stop)
ctrlctx.OperatorInformerFactory.Start(ctrlctx.Stop)

// Make sure the informers have started
close(ctrlctx.InformersStarted)

for _, c := range controllers {
go c.Run(2, ctrlctx.Stop)
// Start the actual controllers
for num := range controllers {
resultChannelCount++
glog.Infof("Starring %s controller", controllers[num].Name())
// Closure, need to make sure we don't grab the loop reference
controller := controllers[num]
go func() {
defer utilruntime.HandleCrash()
if controller.Name() == "DrainController" {
controller.Run(runContext, 5)
} else {
controller.Run(runContext, 2)
}
resultChannel <- asyncResult{name: controller.Name() + " controller", error: err}
}()
}
go draincontroller.Run(5, ctrlctx.Stop)

select {}
var shutdownTimer *time.Timer
for resultChannelCount > 0 {
glog.Infof("Waiting on %d outstanding goroutines.", resultChannelCount)
if shutdownTimer == nil { // running
select {
case <-runContext.Done():
glog.Info("Run context completed; beginning two-minute graceful shutdown period.")
shutdownTimer = time.NewTimer(2 * time.Minute)

case result := <-resultChannel:
// TODO(jkyros): one of our goroutines puked early, this means we shut down everything.
resultChannelCount--
if result.error == nil {
glog.Infof("Collected %s goroutine.", result.name)
} else {
glog.Errorf("Collected %s goroutine: %v", result.name, result.error)
runCancel() // this will cause shutdownTimer initialization in the next loop
}
}
} else { // shutting down
select {
case <-shutdownTimer.C: // never triggers after the channel is stopped, although it would not matter much if it did because subsequent cancel calls do nothing.
leaderCancel()
shutdownTimer.Stop()
case result := <-resultChannel:
resultChannelCount--
if result.error == nil {
glog.Infof("Collected %s goroutine.", result.name)
} else {
glog.Errorf("Collected %s goroutine: %v", result.name, result.error)
}
if resultChannelCount == 0 {
glog.Info("That was the last one, cancelling the leader lease.")

}
}
}
}
leaderCancel()
glog.Info("Finished collecting operator goroutines.")
}

leaderElectionCfg := common.GetLeaderElectionConfig(cb.GetBuilderConfig())
leaderElectionCfg := common.GetLeaderElectionConfig(runContext, cb.GetBuilderConfig())

leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
Lock: common.CreateResourceLock(cb, startOpts.resourceLockNamespace, componentName),
LeaseDuration: leaderElectionCfg.LeaseDuration.Duration,
RenewDeadline: leaderElectionCfg.RenewDeadline.Duration,
RetryPeriod: leaderElectionCfg.RetryPeriod.Duration,
leaderelection.RunOrDie(leaderContext, leaderelection.LeaderElectionConfig{
Lock: common.CreateResourceLock(cb, startOpts.resourceLockNamespace, componentName),
ReleaseOnCancel: true,
LeaseDuration: leaderElectionCfg.LeaseDuration.Duration,
RenewDeadline: leaderElectionCfg.RenewDeadline.Duration,
RetryPeriod: leaderElectionCfg.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
glog.Fatalf("leaderelection lost")
glog.Infof("Stopped leading. Terminating.")
os.Exit(0)
},
},
})
Expand Down Expand Up @@ -160,6 +238,12 @@ func createControllers(ctx *ctrlcommon.ControllerContext) []ctrlcommon.Controlle
ctx.ClientBuilder.KubeClientOrDie("node-update-controller"),
ctx.ClientBuilder.MachineConfigClientOrDie("node-update-controller"),
),
// The drain controller drains pods from nodes
drain.New(
ctx.KubeInformerFactory.Core().V1().Nodes(),
ctx.ClientBuilder.KubeClientOrDie("node-update-controller"),
ctx.ClientBuilder.MachineConfigClientOrDie("node-update-controller"),
),
)

return controllers
Expand Down
88 changes: 77 additions & 11 deletions cmd/machine-config-operator/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"flag"
"os"
"time"

"github.com/golang/glog"
"github.com/openshift/machine-config-operator/cmd/common"
Expand All @@ -12,6 +13,7 @@ import (
"github.com/openshift/machine-config-operator/pkg/operator"
"github.com/openshift/machine-config-operator/pkg/version"
"github.com/spf13/cobra"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/leaderelection"
)

Expand All @@ -35,10 +37,24 @@ func init() {
startCmd.PersistentFlags().StringVar(&startOpts.imagesFile, "images-json", "", "images.json file for MCO.")
}

type asyncResult struct {
name string
error error
}

func runStartCmd(cmd *cobra.Command, args []string) {
flag.Set("logtostderr", "true")
flag.Parse()

// This is the context that signals whether the operator should be running and doing work
runContext, runCancel := context.WithCancel(context.Background())
// This is the context that signals whether we should release our leader lease
leaderContext, leaderCancel := context.WithCancel(context.Background())

// So we can collect status of our goroutines
resultChannel := make(chan asyncResult, 1)
resultChannelCount := 0

// To help debugging, immediately log version
glog.Infof("Version: %s (Raw: %s, Hash: %s)", os.Getenv("RELEASE_VERSION"), version.Raw, version.Hash)

Expand All @@ -50,8 +66,11 @@ func runStartCmd(cmd *cobra.Command, args []string) {
if err != nil {
glog.Fatalf("error creating clients: %v", err)
}
run := func(ctx context.Context) {
ctrlctx := ctrlcommon.CreateControllerContext(cb, ctx.Done(), ctrlcommon.MCONamespace)
run := func(_ context.Context) {

go common.SignalHandler(runCancel)

ctrlctx := ctrlcommon.CreateControllerContext(cb, runContext.Done(), ctrlcommon.MCONamespace)
controller := operator.New(
ctrlcommon.MCONamespace, componentName,
startOpts.imagesFile,
Expand Down Expand Up @@ -89,22 +108,69 @@ func runStartCmd(cmd *cobra.Command, args []string) {
ctrlctx.KubeMAOSharedInformer.Start(ctrlctx.Stop)
close(ctrlctx.InformersStarted)

go controller.Run(2, ctrlctx.Stop)
resultChannelCount++
go func() {
defer utilruntime.HandleCrash()
controller.Run(runContext, 2)
resultChannel <- asyncResult{name: "main operator", error: err}
}()

// TODO(jkyros); This might be overkill for the operator, it only has one goroutine
var shutdownTimer *time.Timer
for resultChannelCount > 0 {
glog.Infof("Waiting on %d outstanding goroutines.", resultChannelCount)
if shutdownTimer == nil { // running
select {
case <-runContext.Done():
glog.Info("Run context completed; beginning two-minute graceful shutdown period.")
shutdownTimer = time.NewTimer(2 * time.Minute)

select {}
case result := <-resultChannel:
// TODO(jkyros): one of our goroutines puked early, this means we shut down everything.
resultChannelCount--
if result.error == nil {
glog.Infof("Collected %s goroutine.", result.name)
} else {
glog.Errorf("Collected %s goroutine: %v", result.name, result.error)
runCancel() // this will cause shutdownTimer initialization in the next loop
}
}
} else { // shutting down
select {
case <-shutdownTimer.C: // never triggers after the channel is stopped, although it would not matter much if it did because subsequent cancel calls do nothing.
leaderCancel()
shutdownTimer.Stop()
case result := <-resultChannel:
resultChannelCount--
if result.error == nil {
glog.Infof("Collected %s goroutine.", result.name)
} else {
glog.Errorf("Collected %s goroutine: %v", result.name, result.error)
}
if resultChannelCount == 0 {
glog.Info("That was the last one, cancelling the leader lease.")
leaderCancel()
}
}
}
}
glog.Info("Finished collecting operator goroutines.")
}

leaderElectionCfg := common.GetLeaderElectionConfig(cb.GetBuilderConfig())
// TODO(jkyros): should this be a different "pre-run" context here?
leaderElectionCfg := common.GetLeaderElectionConfig(runContext, cb.GetBuilderConfig())

leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
Lock: common.CreateResourceLock(cb, ctrlcommon.MCONamespace, componentName),
LeaseDuration: leaderElectionCfg.LeaseDuration.Duration,
RenewDeadline: leaderElectionCfg.RenewDeadline.Duration,
RetryPeriod: leaderElectionCfg.RetryPeriod.Duration,
leaderelection.RunOrDie(leaderContext, leaderelection.LeaderElectionConfig{
Lock: common.CreateResourceLock(cb, ctrlcommon.MCONamespace, componentName),
ReleaseOnCancel: true,
LeaseDuration: leaderElectionCfg.LeaseDuration.Duration,
RenewDeadline: leaderElectionCfg.RenewDeadline.Duration,
RetryPeriod: leaderElectionCfg.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
glog.Fatalf("leaderelection lost")
glog.Infof("Stopped leading. Terminating.")
os.Exit(0)
},
},
})
Expand Down
Loading