Skip to content

Commit ad374d2

Browse files
authored
Fix: When workflow already executed. After controller restart, healthcheck could never be scheduled (#236)
Signed-off-by: Yu Jiang <[email protected]>
1 parent 52b6ad0 commit ad374d2

File tree

6 files changed

+98
-25
lines changed

6 files changed

+98
-25
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
*.so
77
*.dylib
88
testbin/*
9+
active-monitor-controller
910

1011
# Temporary or metadata files
1112
*.yaml-e

Dockerfile-local

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
# Use distroless as minimal base image to package the manager binary
2+
# Refer to https://github.com/GoogleContainerTools/distroless for more details
3+
FROM gcr.io/distroless/static:latest
4+
WORKDIR /
5+
COPY active-monitor-controller .
6+
ENTRYPOINT [ "/active-monitor-controller" ]

Makefile

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,10 @@ test: manifests generate fmt vet envtest ## Run tests.
7272
build: manifests generate fmt vet ## Build manager binary.
7373
go build -o bin/manager cmd/main.go
7474

75+
.PHONY: build-amd64
76+
build: manifests generate fmt vet ## Build manager binary.
77+
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 GO111MODULE=on go build -a -o active-monitor-controller cmd/main.go
78+
7579
.PHONY: run
7680
run: manifests generate fmt vet ## Run a controller from your host.
7781
go run ./cmd/main.go
@@ -83,6 +87,10 @@ run: manifests generate fmt vet ## Run a controller from your host.
8387
docker-build: ## Build docker image with the manager.
8488
$(CONTAINER_TOOL) build -t ${IMG} .
8589

90+
.PHONY: docker-build-local
91+
docker-build: ## Build docker image with the manager.
92+
$(CONTAINER_TOOL) build -t ${IMG} -f Dockerfile-local .
93+
8694
.PHONY: docker-push
8795
docker-push: ## Push docker image with the manager.
8896
$(CONTAINER_TOOL) push ${IMG}

internal/controllers/healthcheck_controller.go

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ func (r *HealthCheckReconciler) processHealthCheck(ctx context.Context, log logr
189189
var finishedAtTime int64
190190
if healthCheck.Status.FinishedAt != nil {
191191
finishedAtTime = healthCheck.Status.FinishedAt.Time.Unix()
192+
log.Info("FinishedAtTime", "finishedAtTime", finishedAtTime)
192193
}
193194

194195
// workflows can be paused by setting repeatAfterSec to <= 0 and not specifying the schedule for cron.
@@ -217,8 +218,8 @@ func (r *HealthCheckReconciler) processHealthCheck(ctx context.Context, log logr
217218
// we need to update the spec so have to healthCheck.Spec.RepeatAfterSec instead of local variable hcSpec
218219
healthCheck.Spec.RepeatAfterSec = int(schedule.Next(time.Now()).Sub(time.Now())/time.Second) + 1
219220
log.Info("spec.RepeatAfterSec value is set", "RepeatAfterSec", healthCheck.Spec.RepeatAfterSec)
220-
} else if int(time.Now().Unix()-finishedAtTime) < hcSpec.RepeatAfterSec {
221-
log.Info("Workflow already executed", "finishedAtTime", finishedAtTime)
221+
} else if int(time.Now().Unix()-finishedAtTime) < hcSpec.RepeatAfterSec && r.RepeatTimersByName[healthCheck.GetName()] != nil {
222+
log.Info("Workflow already executed, and there is repeat schedule has been added to RepeatTimersByName map", "finishedAtTime", finishedAtTime)
222223
return ctrl.Result{}, nil
223224
}
224225

@@ -421,18 +422,25 @@ func (r *HealthCheckReconciler) deleteRBACForWorkflow(ctx context.Context, log l
421422
// this function exists to assist with how a function called by the timer.AfterFunc() method operates to call a
422423
// function which takes parameters, it is easiest to build this closure which holds access to the parameters we need.
423424
// the helper returns a function object taking no parameters directly, this is what we want to give AfterFunc
424-
func (r *HealthCheckReconciler) createSubmitWorkflowHelper(ctx context.Context, log logr.Logger, wfNamespace string, hc *activemonitorv1alpha1.HealthCheck) func() {
425+
func (r *HealthCheckReconciler) createSubmitWorkflowHelper(ctx context.Context, log logr.Logger, wfNamespace string, prevHealthCheck *activemonitorv1alpha1.HealthCheck) func() {
425426
return func() {
426427
log.Info("Creating and Submitting Workflow...")
427-
wfName, err := r.createSubmitWorkflow(ctx, log, hc)
428+
429+
healthCheckInstance := &activemonitorv1alpha1.HealthCheck{}
430+
if err := r.Get(ctx, client.ObjectKey{Name: prevHealthCheck.Name, Namespace: prevHealthCheck.Namespace}, healthCheckInstance); err != nil {
431+
log.Error(err, "Error getting healthcheck resource")
432+
return
433+
}
434+
435+
wfName, err := r.createSubmitWorkflow(ctx, log, healthCheckInstance)
428436
if err != nil {
429437
log.Error(err, "Error creating or submitting workflow")
430-
r.Recorder.Event(hc, v1.EventTypeWarning, "Warning", "Error creating or submitting workflow")
438+
r.Recorder.Event(healthCheckInstance, v1.EventTypeWarning, "Warning", "Error creating or submitting workflow")
431439
}
432-
err = r.watchWorkflowReschedule(ctx, ctrl.Request{}, log, wfNamespace, wfName, hc)
440+
err = r.watchWorkflowReschedule(ctx, ctrl.Request{}, log, wfNamespace, wfName, healthCheckInstance)
433441
if err != nil {
434442
log.Error(err, "Error watching or rescheduling workflow")
435-
r.Recorder.Event(hc, v1.EventTypeWarning, "Warning", "Error watching or rescheduling workflow")
443+
r.Recorder.Event(healthCheckInstance, v1.EventTypeWarning, "Warning", "Error watching or rescheduling workflow")
436444
}
437445
}
438446
}
@@ -652,6 +660,8 @@ func (r *HealthCheckReconciler) watchWorkflowReschedule(ctx context.Context, req
652660
break
653661
}
654662
}
663+
664+
log.Info("waiting for workflow to complete", "namespace", wfNamespace, "name", wfName)
655665
}
656666

657667
// since the workflow has taken an unknown duration of time to complete, it's possible that its parent

internal/controllers/healthcheck_controller_test.go

Lines changed: 63 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7-
"io/ioutil"
7+
"os"
88
"testing"
99
"time"
1010

@@ -21,15 +21,19 @@ import (
2121
)
2222

2323
var (
24-
healthCheckNamespace = "health"
25-
healthCheckName = "inline-monitor-remedy"
26-
healthCheckKey = types.NamespacedName{Name: healthCheckName, Namespace: healthCheckNamespace}
27-
healthCheckNameNs = "inline-monitor-remedy-namespace"
28-
healthCheckKeyNs = types.NamespacedName{Name: healthCheckNameNs, Namespace: healthCheckNamespace}
29-
healthCheckNamePause = "inline-hello-pause"
30-
healthCheckKeyPause = types.NamespacedName{Name: healthCheckNamePause, Namespace: healthCheckNamespace}
31-
healthCheckNameRetry = "inline-hello-custom-retry"
32-
healthCheckKeyRetry = types.NamespacedName{Name: healthCheckNameRetry, Namespace: healthCheckNamespace}
24+
healthCheckNamespace = "health"
25+
healthCheckName = "inline-monitor-remedy"
26+
healthCheckKey = types.NamespacedName{Name: healthCheckName, Namespace: healthCheckNamespace}
27+
healthCheckNameNs = "inline-monitor-remedy-namespace"
28+
healthCheckKeyNs = types.NamespacedName{Name: healthCheckNameNs, Namespace: healthCheckNamespace}
29+
healthCheckNamePause = "inline-hello-pause"
30+
healthCheckKeyPause = types.NamespacedName{Name: healthCheckNamePause, Namespace: healthCheckNamespace}
31+
healthCheckNameRetry = "inline-hello-custom-retry"
32+
healthCheckKeyRetry = types.NamespacedName{Name: healthCheckNameRetry, Namespace: healthCheckNamespace}
33+
healthCheckAlreadyScheduled = "inline-monitor-remedy-already-scheduled"
34+
healthCheckKeyAlreadyScheduled = types.NamespacedName{Name: healthCheckAlreadyScheduled, Namespace: healthCheckNamespace}
35+
36+
sharedCtrl *HealthCheckReconciler
3337
)
3438

3539
const timeout = time.Second * 60
@@ -38,8 +42,7 @@ var _ = Describe("Active-Monitor Controller", func() {
3842
Describe("healthCheck CR can be reconciled at cluster level", func() {
3943
var instance *activemonitorv1alpha1.HealthCheck
4044
It("instance should be parsable", func() {
41-
// healthCheckYaml, err := ioutil.ReadFile("../examples/inlineHello.yaml")
42-
healthCheckYaml, err := ioutil.ReadFile("../../examples/bdd/inlineMemoryRemedyUnitTest.yaml")
45+
healthCheckYaml, err := os.ReadFile("../../examples/bdd/inlineMemoryRemedyUnitTest.yaml")
4346
Expect(err).ToNot(HaveOccurred())
4447
instance, err = parseHealthCheckYaml(healthCheckYaml)
4548
Expect(err).ToNot(HaveOccurred())
@@ -77,8 +80,8 @@ var _ = Describe("Active-Monitor Controller", func() {
7780
var instance *activemonitorv1alpha1.HealthCheck
7881

7982
It("instance should be parsable", func() {
80-
// healthCheckYaml, err := ioutil.ReadFile("../examples/inlineHello.yaml")
81-
healthCheckYaml, err := ioutil.ReadFile("../../examples/bdd/inlineMemoryRemedyUnitTest_Namespace.yaml")
83+
// healthCheckYaml, err := os.ReadFile("../examples/inlineHello.yaml")
84+
healthCheckYaml, err := os.ReadFile("../../examples/bdd/inlineMemoryRemedyUnitTest_Namespace.yaml")
8285
Expect(err).ToNot(HaveOccurred())
8386

8487
instance, err = parseHealthCheckYaml(healthCheckYaml)
@@ -117,7 +120,7 @@ var _ = Describe("Active-Monitor Controller", func() {
117120
var instance *activemonitorv1alpha1.HealthCheck
118121

119122
It("instance should be parsable", func() {
120-
healthCheckYaml, err := ioutil.ReadFile("../../examples/bdd/inlineHelloTest.yaml")
123+
healthCheckYaml, err := os.ReadFile("../../examples/bdd/inlineHelloTest.yaml")
121124
Expect(err).ToNot(HaveOccurred())
122125

123126
instance, err = parseHealthCheckYaml(healthCheckYaml)
@@ -152,11 +155,55 @@ var _ = Describe("Active-Monitor Controller", func() {
152155
})
153156
})
154157

158+
Describe("healthCheck CR will be reconcile if it is executed and rescheduled", func() {
159+
var instance *activemonitorv1alpha1.HealthCheck
160+
161+
It("instance should be parsable", func() {
162+
healthCheckYaml, err := os.ReadFile("../../examples/bdd/inlineHelloTest.yaml")
163+
Expect(err).ToNot(HaveOccurred())
164+
165+
instance, err = parseHealthCheckYaml(healthCheckYaml)
166+
Expect(err).ToNot(HaveOccurred())
167+
Expect(instance).To(BeAssignableToTypeOf(&activemonitorv1alpha1.HealthCheck{}))
168+
Expect(instance.GetName()).To(Equal(healthCheckNamePause))
169+
})
170+
171+
It("instance should be reconciled", func() {
172+
instance.SetNamespace(healthCheckNamespace)
173+
instance.SetName(healthCheckAlreadyScheduled)
174+
instance.Spec.RepeatAfterSec = 60
175+
sharedCtrl.RepeatTimersByName[instance.Name] = time.AfterFunc(time.Second*60, func() {})
176+
177+
err := k8sClient.Create(context.TODO(), instance)
178+
if apierrors.IsInvalid(err) {
179+
log.Error(err, "failed to create object, got an invalid object error")
180+
return
181+
}
182+
Expect(err).NotTo(HaveOccurred())
183+
defer k8sClient.Delete(context.TODO(), instance)
184+
185+
Eventually(func() error {
186+
if err := k8sClient.Get(context.TODO(), healthCheckKeyAlreadyScheduled, instance); err != nil {
187+
return err
188+
}
189+
190+
if instance.Status.StartedAt != nil {
191+
return nil
192+
}
193+
return fmt.Errorf("HealthCheck is not valid")
194+
// return nil
195+
}, timeout).Should(Succeed())
196+
197+
By("Verify healthCheck has been reconciled by checking for status")
198+
Expect(instance.Status.ErrorMessage).ShouldNot(BeEmpty())
199+
})
200+
})
201+
155202
Describe("healthCheck CR will properly parse backoff customizations", func() {
156203
var instance *activemonitorv1alpha1.HealthCheck
157204

158205
It("instance should be parsable", func() {
159-
healthCheckYaml, err := ioutil.ReadFile("../../examples/bdd/inlineCustomBackoffTest.yaml")
206+
healthCheckYaml, err := os.ReadFile("../../examples/bdd/inlineCustomBackoffTest.yaml")
160207
Expect(err).ToNot(HaveOccurred())
161208

162209
instance, err = parseHealthCheckYaml(healthCheckYaml)

internal/controllers/suite_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,14 +106,15 @@ var _ = BeforeSuite(func() {
106106
})
107107
Expect(err).ToNot(HaveOccurred())
108108

109-
err = (&HealthCheckReconciler{
109+
sharedCtrl = &HealthCheckReconciler{
110110
Client: k8sManager.GetClient(),
111111
DynClient: dynamic.NewForConfigOrDie(k8sManager.GetConfig()),
112112
Recorder: k8sManager.GetEventRecorderFor("HealthCheck"),
113113
kubeclient: kubernetes.NewForConfigOrDie(k8sManager.GetConfig()),
114114
Log: log,
115115
TimerLock: sync.RWMutex{},
116-
}).SetupWithManager(k8sManager)
116+
}
117+
err = sharedCtrl.SetupWithManager(k8sManager)
117118
Expect(err).ToNot(HaveOccurred())
118119

119120
go func() {

0 commit comments

Comments
 (0)