diff --git a/multi_arch_Dockerfile b/multi_arch_Dockerfile index 1ba286334..304c44b2a 100644 --- a/multi_arch_Dockerfile +++ b/multi_arch_Dockerfile @@ -1,5 +1,5 @@ # Build the manager binary -FROM --platform=$BUILDPLATFORM golang:1.14.2 as builder +FROM --platform=$BUILDPLATFORM golang:1.16.7 as builder ARG TARGETPLATFORM ARG BUILDPLATFORM ARG BUILDARCH diff --git a/pkg/controller/cronexecutor.go b/pkg/controller/cronexecutor.go index 91b57a8e7..95f49b1ef 100644 --- a/pkg/controller/cronexecutor.go +++ b/pkg/controller/cronexecutor.go @@ -49,11 +49,11 @@ func (ce *CronHPAExecutor) FindJob(job CronJob) (bool, FailedFindJobReason) { entries := ce.Engine.Entries() for _, e := range entries { if e.Job.ID() == job.ID() { - // clean up out of date jobs when it reach maxOutOfDateTimeout + // clean up out of date jobs when it reached maxOutOfDateTimeout if e.Next.Add(maxOutOfDateTimeout).After(time.Now()) { return true, "" } - log.Warningf("The job %s is out of date and need to be clean up.", job.Name()) + log.Warningf("The job %s(job id %s) in cronhpa %s namespace %s is out of date.", job.Name(), job.ID(), job.CronHPAMeta().Name, job.CronHPAMeta().Namespace) return false, JobTimeOut } } diff --git a/pkg/controller/cronhorizontalpodautoscaler_controller.go b/pkg/controller/cronhorizontalpodautoscaler_controller.go index 10f740ded..2cc118a4e 100644 --- a/pkg/controller/cronhorizontalpodautoscaler_controller.go +++ b/pkg/controller/cronhorizontalpodautoscaler_controller.go @@ -79,6 +79,7 @@ func (r *ReconcileCronHorizontalPodAutoscaler) Reconcile(request reconcile.Reque if errors.IsNotFound(err) { // Object not found, return. Created objects are automatically garbage collected. // For additional cleanup logic use finalizers. + log.Infof("GC start for: cronHPA %s in %s namespace is not found", request.Name, request.Namespace) go r.CronManager.GC() return reconcile.Result{}, nil } @@ -95,7 +96,7 @@ func (r *ReconcileCronHorizontalPodAutoscaler) Reconcile(request reconcile.Reque for _, cJob := range conditions { err := r.CronManager.delete(cJob.JobId) if err != nil { - log.Errorf("Failed to delete job %s,because of %v", cJob.Name, err) + log.Errorf("Failed to delete job %s in cronHPA %s namespace %s, because of %v", cJob.Name, instance.Name, instance.Namespace, err) } } // update scaleTargetRef and excludeDates @@ -113,25 +114,27 @@ func (r *ReconcileCronHorizontalPodAutoscaler) Reconcile(request reconcile.Reque if cJob.JobId != "" { err := r.CronManager.delete(cJob.JobId) if err != nil { - log.Errorf("Failed to delete expired job %s,because of %v", cJob.Name, err) + log.Errorf("Failed to delete expired job %s in cronHPA %s namespace %s,because of %v", cJob.Name, instance.Name, instance.Namespace, err) } } continue } + // if nothing changed skip = true } } + // need remove this condition because this is not job spec if !skip { if cJob.JobId != "" { err := r.CronManager.delete(cJob.JobId) if err != nil { - log.Errorf("Failed to delete expired job %s,because of %v", cJob.Name, err) + log.Errorf("Failed to delete expired job %s in cronHPA %s namespace %s, because of %v", cJob.Name, instance.Name, instance.Namespace, err) } } } - // need remove this condition because this is not job spec + // if job nothing changed then append to left conditions if skip { leftConditions = append(leftConditions, cJob) } @@ -156,7 +159,8 @@ func (r *ReconcileCronHorizontalPodAutoscaler) Reconcile(request reconcile.Reque if err != nil { jobCondition.State = v1beta1.Failed - jobCondition.Message = fmt.Sprintf("Failed to create cron hpa job %s,because of %v", job.Name, err) + jobCondition.Message = fmt.Sprintf("Failed to create cron hpa job %s in %s namespace %s,because of %v", + job.Name, instance.Name, instance.Namespace, err) log.Errorf("Failed to create cron hpa job %s,because of %v", job.Name, err) } else { name := job.Name @@ -168,7 +172,8 @@ func (r *ReconcileCronHorizontalPodAutoscaler) Reconcile(request reconcile.Reque if runOnce(job) && (c.State == v1beta1.Succeed || c.State == v1beta1.Failed) { err := r.CronManager.delete(jobId) if err != nil { - log.Errorf("cron hpa %s(%s) has ran once but fail to exit,because of %v", name, jobId, err) + log.Errorf("cron hpa runonce job %s(%s) in %s namespace %s has ran once but fail to exit,because of %v", + name, jobId, instance.Name, instance.Namespace, err) } continue } @@ -190,11 +195,11 @@ func (r *ReconcileCronHorizontalPodAutoscaler) Reconcile(request reconcile.Reque noNeedUpdateStatus = false instance.Status.Conditions = updateConditions(instance.Status.Conditions, jobCondition) } - // conditions doesn't changed and no need to update. + // conditions are not changed and no need to update. if !noNeedUpdateStatus || len(leftConditions) != len(conditions) { err := r.Update(context.Background(), instance) if err != nil { - log.Errorf("Failed to update cron hpa %s status,because of %v", instance.Name, err) + log.Errorf("Failed to update cron hpa %s in namespace %s status, because of %v", instance.Name, instance.Namespace, err) } } diff --git a/pkg/controller/cronjob.go b/pkg/controller/cronjob.go index d25da77be..0395f65e0 100644 --- a/pkg/controller/cronjob.go +++ b/pkg/controller/cronjob.go @@ -147,7 +147,7 @@ func (ch *CronJobHPA) ScaleHPA() (msg string, err error) { targetGV, err := schema.ParseGroupVersion(targetRef.APIVersion) if err != nil { - return "", fmt.Errorf("Failed to get TargetGroup of HPA %s,because of %v", hpa.Name, err) + return "", fmt.Errorf("Failed to get TargetGroup of HPA %s in namespace %s ,because of %v", hpa.Name, hpa.Namespace, err) } targetGK := schema.GroupKind{ @@ -207,7 +207,8 @@ func (ch *CronJobHPA) ScaleHPA() (msg string, err error) { if hpa.Status.CurrentReplicas >= ch.DesiredSize { // skip change replicas and exit - return fmt.Sprintf("Skip scale replicas because HPA %s current replicas:%d >= desired replicas:%d.", hpa.Name, scale.Spec.Replicas, ch.DesiredSize), nil + return fmt.Sprintf("Skip scale replicas because HPA %s in namespace %s current replicas:%d >= desired replicas:%d.", + hpa.Name, hpa.Namespace, scale.Spec.Replicas, ch.DesiredSize), nil } msg = fmt.Sprintf("current replicas:%d, desired replicas:%d.", scale.Spec.Replicas, ch.DesiredSize) @@ -239,7 +240,7 @@ func (ch *CronJobHPA) ScalePlainRef() (msg string, err error) { scale, err = ch.scaler.Scales(ch.TargetRef.RefNamespace).Get(context.Background(), targetGR, ch.TargetRef.RefName, v1.GetOptions{}) if err == nil { found = true - log.Infof("%s %s in namespace %s has been scaled successfully. job: %s replicas: %d", ch.TargetRef.RefKind, ch.TargetRef.RefName, ch.TargetRef.RefNamespace, ch.Name(), ch.DesiredSize) + log.Infof("%s %s in namespace %s has been scaled successfully. job: %s replicas: %d id: %s", ch.TargetRef.RefKind, ch.TargetRef.RefName, ch.TargetRef.RefNamespace, ch.Name(), ch.DesiredSize, ch.ID()) break } } diff --git a/pkg/controller/cronmanager.go b/pkg/controller/cronmanager.go index 379882068..e6c46346d 100644 --- a/pkg/controller/cronmanager.go +++ b/pkg/controller/cronmanager.go @@ -38,7 +38,7 @@ type CronManager struct { sync.Mutex cfg *rest.Config client client.Client - jobQueue map[string]CronJob + jobQueue *sync.Map //cronProcessor CronProcessor cronExecutor CronExecutor mapper meta.RESTMapper @@ -47,26 +47,28 @@ type CronManager struct { } func (cm *CronManager) createOrUpdate(j CronJob) error { - cm.Lock() - defer cm.Unlock() - if _, ok := cm.jobQueue[j.ID()]; !ok { + if _, ok := cm.jobQueue.Load(j.ID()); !ok { err := cm.cronExecutor.AddJob(j) if err != nil { return fmt.Errorf("Failed to add job to cronExecutor,because of %v", err) } - cm.jobQueue[j.ID()] = j + cm.jobQueue.Store(j.ID(), j) log.Infof("cronHPA job %s of cronHPA %s in %s created, %d active jobs exist", j.Name(), j.CronHPAMeta().Name, j.CronHPAMeta().Namespace, - len(cm.jobQueue)) + queueLength(cm.jobQueue)) } else { - job := cm.jobQueue[j.ID()] + loadJob, _ := cm.jobQueue.Load(j.ID()) + job, convert := loadJob.(*CronJobHPA) + if !convert { + return fmt.Errorf("failed to convert job %v to CronJobHPA", loadJob) + } if ok := job.Equals(j); !ok { err := cm.cronExecutor.Update(j) if err != nil { return fmt.Errorf("failed to update job %s of cronHPA %s in %s to cronExecutor, because of %v", job.Name(), job.CronHPAMeta().Name, job.CronHPAMeta().Namespace, err) } //update job queue - cm.jobQueue[j.ID()] = j - log.Infof("cronHPA job %s of cronHPA %s in %s updated, %d active jobs exist", j.Name(), j.CronHPAMeta().Name, j.CronHPAMeta().Namespace, len(cm.jobQueue)) + cm.jobQueue.Store(j.ID(), j) + log.Infof("cronHPA job %s of cronHPA %s in %s updated, %d active jobs exist", j.Name(), j.CronHPAMeta().Name, j.CronHPAMeta().Namespace, queueLength(cm.jobQueue)) } else { return &NoNeedUpdate{} } @@ -75,15 +77,14 @@ func (cm *CronManager) createOrUpdate(j CronJob) error { } func (cm *CronManager) delete(id string) error { - cm.Lock() - defer cm.Unlock() - if j, ok := cm.jobQueue[id]; ok { + if loadJob, ok := cm.jobQueue.Load(id); ok { + j, _ := loadJob.(*CronJobHPA) err := cm.cronExecutor.RemoveJob(j) if err != nil { return fmt.Errorf("Failed to remove job from cronExecutor,because of %v", err) } - delete(cm.jobQueue, id) - log.Infof("Remove cronHPA job %s of cronHPA %s in %s from jobQueue,%d active jobs left", j.Name(), j.CronHPAMeta().Name, j.CronHPAMeta().Namespace, len(cm.jobQueue)) + cm.jobQueue.Delete(id) + log.Infof("Remove cronHPA job %s of cronHPA %s in %s from jobQueue,%d active jobs left", j.Name(), j.CronHPAMeta().Name, j.CronHPAMeta().Namespace, queueLength(cm.jobQueue)) } return nil } @@ -98,7 +99,7 @@ func (cm *CronManager) JobResultHandler(js *cron.JobResult) { }, instance) if e != nil { - log.Errorf("Failed to fetch cronHPA job %s of cronHPA %s in %s namespace,because of %v", job.Name(), cronHpa.Name, cronHpa.Namespace, e) + log.Errorf("Failed to fetch cronHPA job %s of cronHPA %s in namespace %s, because of %v", job.Name(), cronHpa.Name, cronHpa.Namespace, e) return } @@ -206,14 +207,7 @@ func (cm *CronManager) gcLoop() { // GC will collect all jobs which ref is not exists and recycle. func (cm *CronManager) GC() { - log.Infof("Start GC") - m := make(map[string]CronJob) - cm.Lock() - for k, v := range cm.jobQueue { - m[k] = v - } - cm.Unlock() - current := len(cm.jobQueue) + current := queueLength(cm.jobQueue) log.V(2).Infof("Current active jobs: %d,try to clean up the abandon ones.", current) // clean up all metrics @@ -222,57 +216,59 @@ func (cm *CronManager) GC() { KubeFailedJobsInCronEngineTotal.Set(0) KubeExpiredJobsInCronEngineTotal.Set(0) - for _, job := range m { - hpa := job.(*CronJobHPA).HPARef + gcJobFunc := func(key, j interface{}) bool { + hpa := j.(*CronJobHPA).HPARef + job := j.(*CronJobHPA) + exitsts := true found, reason := cm.cronExecutor.FindJob(job) - if !found { - if reason == JobTimeOut { - instance := &autoscalingv1beta1.CronHorizontalPodAutoscaler{} - if err := cm.client.Get(context.Background(), types.NamespacedName{ - Namespace: hpa.Namespace, - Name: hpa.Name, - }, instance); err != nil { - log.Errorf("Failed to run time out job %s due to failed to get cronHPA %s in %s namespace,err: %v", job.Name(), hpa.Name, hpa.Namespace, err) - continue - } - cm.eventRecorder.Event(instance, v1.EventTypeWarning, "OutOfDate", fmt.Sprintf("rerun out of date job: %s", job.Name())) - if msg, reRunErr := job.Run(); reRunErr != nil { - log.Errorf("failed to rerun out of date job %s, msg:%s, err %v", job.Name(), msg, reRunErr) + instance := &autoscalingv1beta1.CronHorizontalPodAutoscaler{} + + // check exists first + if err := cm.client.Get(context.Background(), types.NamespacedName{ + Namespace: hpa.Namespace, + Name: hpa.Name, + }, instance); err != nil { + exitsts = false + if errors.IsNotFound(err) { + log.Infof("remove job %s(%s) of cronHPA %s in namespace %s", job.Name(), job.SchedulePlan(), hpa.Name, hpa.Namespace) + if found { + err := cm.cronExecutor.RemoveJob(job) + if err != nil { + log.Errorf("Failed to gc job %s(%s) of cronHPA %s in namespace %s", job.Name(), job.SchedulePlan(), hpa.Name, hpa.Namespace) + return true + } } + cm.delete(job.ID()) + // metrics update + // when a job is in cron engine but not in crd. + // that means the job has been expired and need to be clean up. + KubeExpiredJobsInCronEngineTotal.Add(1) } - log.Warningf("Failed to find job %s of cronHPA %s in %s in cron engine and resubmit the job.", job.Name(), hpa.Name, hpa.Namespace) - cm.cronExecutor.AddJob(job) + // metrics update + // ignore other errors + } + if !found { + if exitsts { + if reason == JobTimeOut { + cm.eventRecorder.Event(instance, v1.EventTypeWarning, "OutOfDate", fmt.Sprintf("rerun out of date job: %s", job.Name())) + log.Warningf("Failed to find job %s (job id: %s, plan %s) in cronHPA %s in %s in cron engine and rerun the job.", job.Name(), job.ID(), job.SchedulePlan(), hpa.Name, hpa.Namespace) + if msg, reRunErr := job.Run(); reRunErr != nil { + log.Errorf("failed to rerun out of date job %s (job id: %s, plan %s) in cronHPA %s in %s, msg:%s, err %v", + job.Name(), job.ID(), job.SchedulePlan(), hpa.Name, hpa.Namespace, msg, reRunErr) + } + } + + log.Warningf("Failed to find job %s of cronHPA %s in %s in cron engine and resubmit the job.", job.Name(), hpa.Name, hpa.Namespace) + cm.cronExecutor.AddJob(job) + } // metrics update // when one job is not in cron engine but in crd. // That means the job is failed and need to be resubmitted. KubeFailedJobsInCronEngineTotal.Add(1) KubeSubmittedJobsInCronEngineTotal.Add(1) - continue } else { - instance := &autoscalingv1beta1.CronHorizontalPodAutoscaler{} - if err := cm.client.Get(context.Background(), types.NamespacedName{ - Namespace: hpa.Namespace, - Name: hpa.Name, - }, instance); err != nil { - if errors.IsNotFound(err) { - log.Infof("remove job %s of cronHPA %s in %s namespace", job.Name(), hpa.Name, hpa.Namespace) - err := cm.cronExecutor.RemoveJob(job) - if err != nil { - log.Errorf("Failed to gc job %s of cronHPA %s in %s namespace", job.Name(), hpa.Name, hpa.Namespace) - continue - } - cm.delete(job.ID()) - // metrics update - // when a job is in cron engine but not in crd. - // that means the job has been expired and need to be clean up. - KubeExpiredJobsInCronEngineTotal.Add(1) - } - - // metrics update - // ignore other errors - } conditions := instance.Status.Conditions for _, c := range conditions { if c.JobId != job.ID() { @@ -291,8 +287,12 @@ func (cm *CronManager) GC() { } } + return true } - left := len(cm.jobQueue) + + cm.jobQueue.Range(gcJobFunc) + + left := queueLength(cm.jobQueue) // metrics update // set total jobs in cron engine @@ -305,7 +305,7 @@ func NewCronManager(cfg *rest.Config, client client.Client, recorder record.Even cm := &CronManager{ cfg: cfg, client: client, - jobQueue: make(map[string]CronJob), + jobQueue: &sync.Map{}, eventRecorder: recorder, } @@ -330,3 +330,12 @@ func NewCronManager(cfg *rest.Config, client client.Client, recorder record.Even cm.cronExecutor = NewCronHPAExecutor(nil, cm.JobResultHandler) return cm } + +func queueLength(que *sync.Map) int64 { + len := int64(0) + que.Range(func(k, v interface{}) bool { + len++ + return true + }) + return len +}