Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion multi_arch_Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Build the manager binary
FROM --platform=$BUILDPLATFORM golang:1.14.2 as builder
FROM --platform=$BUILDPLATFORM golang:1.16.7 as builder
ARG TARGETPLATFORM
ARG BUILDPLATFORM
ARG BUILDARCH
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/cronexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ func (ce *CronHPAExecutor) FindJob(job CronJob) (bool, FailedFindJobReason) {
entries := ce.Engine.Entries()
for _, e := range entries {
if e.Job.ID() == job.ID() {
// clean up out of date jobs when it reach maxOutOfDateTimeout
// clean up out of date jobs when it reached maxOutOfDateTimeout
if e.Next.Add(maxOutOfDateTimeout).After(time.Now()) {
return true, ""
}
log.Warningf("The job %s is out of date and need to be clean up.", job.Name())
log.Warningf("The job %s(job id %s) in cronhpa %s namespace %s is out of date.", job.Name(), job.ID(), job.CronHPAMeta().Name, job.CronHPAMeta().Namespace)
return false, JobTimeOut
}
}
Expand Down
21 changes: 13 additions & 8 deletions pkg/controller/cronhorizontalpodautoscaler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func (r *ReconcileCronHorizontalPodAutoscaler) Reconcile(request reconcile.Reque
if errors.IsNotFound(err) {
// Object not found, return. Created objects are automatically garbage collected.
// For additional cleanup logic use finalizers.
log.Infof("GC start for: cronHPA %s in %s namespace is not found", request.Name, request.Namespace)
go r.CronManager.GC()
return reconcile.Result{}, nil
}
Expand All @@ -95,7 +96,7 @@ func (r *ReconcileCronHorizontalPodAutoscaler) Reconcile(request reconcile.Reque
for _, cJob := range conditions {
err := r.CronManager.delete(cJob.JobId)
if err != nil {
log.Errorf("Failed to delete job %s,because of %v", cJob.Name, err)
log.Errorf("Failed to delete job %s in cronHPA %s namespace %s, because of %v", cJob.Name, instance.Name, instance.Namespace, err)
}
}
// update scaleTargetRef and excludeDates
Expand All @@ -113,25 +114,27 @@ func (r *ReconcileCronHorizontalPodAutoscaler) Reconcile(request reconcile.Reque
if cJob.JobId != "" {
err := r.CronManager.delete(cJob.JobId)
if err != nil {
log.Errorf("Failed to delete expired job %s,because of %v", cJob.Name, err)
log.Errorf("Failed to delete expired job %s in cronHPA %s namespace %s,because of %v", cJob.Name, instance.Name, instance.Namespace, err)
}
}
continue
}
// if nothing changed
skip = true
}
}

// need remove this condition because this is not job spec
if !skip {
if cJob.JobId != "" {
err := r.CronManager.delete(cJob.JobId)
if err != nil {
log.Errorf("Failed to delete expired job %s,because of %v", cJob.Name, err)
log.Errorf("Failed to delete expired job %s in cronHPA %s namespace %s, because of %v", cJob.Name, instance.Name, instance.Namespace, err)
}
}
}

// need remove this condition because this is not job spec
// if job nothing changed then append to left conditions
if skip {
leftConditions = append(leftConditions, cJob)
}
Expand All @@ -156,7 +159,8 @@ func (r *ReconcileCronHorizontalPodAutoscaler) Reconcile(request reconcile.Reque

if err != nil {
jobCondition.State = v1beta1.Failed
jobCondition.Message = fmt.Sprintf("Failed to create cron hpa job %s,because of %v", job.Name, err)
jobCondition.Message = fmt.Sprintf("Failed to create cron hpa job %s in %s namespace %s,because of %v",
job.Name, instance.Name, instance.Namespace, err)
log.Errorf("Failed to create cron hpa job %s,because of %v", job.Name, err)
} else {
name := job.Name
Expand All @@ -168,7 +172,8 @@ func (r *ReconcileCronHorizontalPodAutoscaler) Reconcile(request reconcile.Reque
if runOnce(job) && (c.State == v1beta1.Succeed || c.State == v1beta1.Failed) {
err := r.CronManager.delete(jobId)
if err != nil {
log.Errorf("cron hpa %s(%s) has ran once but fail to exit,because of %v", name, jobId, err)
log.Errorf("cron hpa runonce job %s(%s) in %s namespace %s has ran once but fail to exit,because of %v",
name, jobId, instance.Name, instance.Namespace, err)
}
continue
}
Expand All @@ -190,11 +195,11 @@ func (r *ReconcileCronHorizontalPodAutoscaler) Reconcile(request reconcile.Reque
noNeedUpdateStatus = false
instance.Status.Conditions = updateConditions(instance.Status.Conditions, jobCondition)
}
// conditions doesn't changed and no need to update.
// conditions are not changed and no need to update.
if !noNeedUpdateStatus || len(leftConditions) != len(conditions) {
err := r.Update(context.Background(), instance)
if err != nil {
log.Errorf("Failed to update cron hpa %s status,because of %v", instance.Name, err)
log.Errorf("Failed to update cron hpa %s in namespace %s status, because of %v", instance.Name, instance.Namespace, err)
}
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/controller/cronjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (ch *CronJobHPA) ScaleHPA() (msg string, err error) {

targetGV, err := schema.ParseGroupVersion(targetRef.APIVersion)
if err != nil {
return "", fmt.Errorf("Failed to get TargetGroup of HPA %s,because of %v", hpa.Name, err)
return "", fmt.Errorf("Failed to get TargetGroup of HPA %s in namespace %s ,because of %v", hpa.Name, hpa.Namespace, err)
}

targetGK := schema.GroupKind{
Expand Down Expand Up @@ -207,7 +207,8 @@ func (ch *CronJobHPA) ScaleHPA() (msg string, err error) {

if hpa.Status.CurrentReplicas >= ch.DesiredSize {
// skip change replicas and exit
return fmt.Sprintf("Skip scale replicas because HPA %s current replicas:%d >= desired replicas:%d.", hpa.Name, scale.Spec.Replicas, ch.DesiredSize), nil
return fmt.Sprintf("Skip scale replicas because HPA %s in namespace %s current replicas:%d >= desired replicas:%d.",
hpa.Name, hpa.Namespace, scale.Spec.Replicas, ch.DesiredSize), nil
}

msg = fmt.Sprintf("current replicas:%d, desired replicas:%d.", scale.Spec.Replicas, ch.DesiredSize)
Expand Down Expand Up @@ -239,7 +240,7 @@ func (ch *CronJobHPA) ScalePlainRef() (msg string, err error) {
scale, err = ch.scaler.Scales(ch.TargetRef.RefNamespace).Get(context.Background(), targetGR, ch.TargetRef.RefName, v1.GetOptions{})
if err == nil {
found = true
log.Infof("%s %s in namespace %s has been scaled successfully. job: %s replicas: %d", ch.TargetRef.RefKind, ch.TargetRef.RefName, ch.TargetRef.RefNamespace, ch.Name(), ch.DesiredSize)
log.Infof("%s %s in namespace %s has been scaled successfully. job: %s replicas: %d id: %s", ch.TargetRef.RefKind, ch.TargetRef.RefName, ch.TargetRef.RefNamespace, ch.Name(), ch.DesiredSize, ch.ID())
break
}
}
Expand Down
139 changes: 74 additions & 65 deletions pkg/controller/cronmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type CronManager struct {
sync.Mutex
cfg *rest.Config
client client.Client
jobQueue map[string]CronJob
jobQueue *sync.Map
//cronProcessor CronProcessor
cronExecutor CronExecutor
mapper meta.RESTMapper
Expand All @@ -47,26 +47,28 @@ type CronManager struct {
}

func (cm *CronManager) createOrUpdate(j CronJob) error {
cm.Lock()
defer cm.Unlock()
if _, ok := cm.jobQueue[j.ID()]; !ok {
if _, ok := cm.jobQueue.Load(j.ID()); !ok {
err := cm.cronExecutor.AddJob(j)
if err != nil {
return fmt.Errorf("Failed to add job to cronExecutor,because of %v", err)
}
cm.jobQueue[j.ID()] = j
cm.jobQueue.Store(j.ID(), j)
log.Infof("cronHPA job %s of cronHPA %s in %s created, %d active jobs exist", j.Name(), j.CronHPAMeta().Name, j.CronHPAMeta().Namespace,
len(cm.jobQueue))
queueLength(cm.jobQueue))
} else {
job := cm.jobQueue[j.ID()]
loadJob, _ := cm.jobQueue.Load(j.ID())
job, convert := loadJob.(*CronJobHPA)
if !convert {
return fmt.Errorf("failed to convert job %v to CronJobHPA", loadJob)
}
if ok := job.Equals(j); !ok {
err := cm.cronExecutor.Update(j)
if err != nil {
return fmt.Errorf("failed to update job %s of cronHPA %s in %s to cronExecutor, because of %v", job.Name(), job.CronHPAMeta().Name, job.CronHPAMeta().Namespace, err)
}
//update job queue
cm.jobQueue[j.ID()] = j
log.Infof("cronHPA job %s of cronHPA %s in %s updated, %d active jobs exist", j.Name(), j.CronHPAMeta().Name, j.CronHPAMeta().Namespace, len(cm.jobQueue))
cm.jobQueue.Store(j.ID(), j)
log.Infof("cronHPA job %s of cronHPA %s in %s updated, %d active jobs exist", j.Name(), j.CronHPAMeta().Name, j.CronHPAMeta().Namespace, queueLength(cm.jobQueue))
} else {
return &NoNeedUpdate{}
}
Expand All @@ -75,15 +77,14 @@ func (cm *CronManager) createOrUpdate(j CronJob) error {
}

func (cm *CronManager) delete(id string) error {
cm.Lock()
defer cm.Unlock()
if j, ok := cm.jobQueue[id]; ok {
if loadJob, ok := cm.jobQueue.Load(id); ok {
j, _ := loadJob.(*CronJobHPA)
err := cm.cronExecutor.RemoveJob(j)
if err != nil {
return fmt.Errorf("Failed to remove job from cronExecutor,because of %v", err)
}
delete(cm.jobQueue, id)
log.Infof("Remove cronHPA job %s of cronHPA %s in %s from jobQueue,%d active jobs left", j.Name(), j.CronHPAMeta().Name, j.CronHPAMeta().Namespace, len(cm.jobQueue))
cm.jobQueue.Delete(id)
log.Infof("Remove cronHPA job %s of cronHPA %s in %s from jobQueue,%d active jobs left", j.Name(), j.CronHPAMeta().Name, j.CronHPAMeta().Namespace, queueLength(cm.jobQueue))
}
return nil
}
Expand All @@ -98,7 +99,7 @@ func (cm *CronManager) JobResultHandler(js *cron.JobResult) {
}, instance)

if e != nil {
log.Errorf("Failed to fetch cronHPA job %s of cronHPA %s in %s namespace,because of %v", job.Name(), cronHpa.Name, cronHpa.Namespace, e)
log.Errorf("Failed to fetch cronHPA job %s of cronHPA %s in namespace %s, because of %v", job.Name(), cronHpa.Name, cronHpa.Namespace, e)
return
}

Expand Down Expand Up @@ -206,14 +207,7 @@ func (cm *CronManager) gcLoop() {

// GC will collect all jobs which ref is not exists and recycle.
func (cm *CronManager) GC() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GC func will be calling multiple times when multiple cronhp CR be delete at the same time.
There will be many goruntine running GC function at same time.
Maybe GC function only need to be running only once in this case.

log.Infof("Start GC")
m := make(map[string]CronJob)
cm.Lock()
for k, v := range cm.jobQueue {
m[k] = v
}
cm.Unlock()
current := len(cm.jobQueue)
current := queueLength(cm.jobQueue)
log.V(2).Infof("Current active jobs: %d,try to clean up the abandon ones.", current)

// clean up all metrics
Expand All @@ -222,57 +216,59 @@ func (cm *CronManager) GC() {
KubeFailedJobsInCronEngineTotal.Set(0)
KubeExpiredJobsInCronEngineTotal.Set(0)

for _, job := range m {
hpa := job.(*CronJobHPA).HPARef
gcJobFunc := func(key, j interface{}) bool {
hpa := j.(*CronJobHPA).HPARef
job := j.(*CronJobHPA)
exitsts := true
found, reason := cm.cronExecutor.FindJob(job)
if !found {
if reason == JobTimeOut {
instance := &autoscalingv1beta1.CronHorizontalPodAutoscaler{}
if err := cm.client.Get(context.Background(), types.NamespacedName{
Namespace: hpa.Namespace,
Name: hpa.Name,
}, instance); err != nil {
log.Errorf("Failed to run time out job %s due to failed to get cronHPA %s in %s namespace,err: %v", job.Name(), hpa.Name, hpa.Namespace, err)
continue
}
cm.eventRecorder.Event(instance, v1.EventTypeWarning, "OutOfDate", fmt.Sprintf("rerun out of date job: %s", job.Name()))
if msg, reRunErr := job.Run(); reRunErr != nil {
log.Errorf("failed to rerun out of date job %s, msg:%s, err %v", job.Name(), msg, reRunErr)
instance := &autoscalingv1beta1.CronHorizontalPodAutoscaler{}

// check exists first
if err := cm.client.Get(context.Background(), types.NamespacedName{
Namespace: hpa.Namespace,
Name: hpa.Name,
}, instance); err != nil {
exitsts = false
if errors.IsNotFound(err) {
log.Infof("remove job %s(%s) of cronHPA %s in namespace %s", job.Name(), job.SchedulePlan(), hpa.Name, hpa.Namespace)
if found {
err := cm.cronExecutor.RemoveJob(job)
if err != nil {
log.Errorf("Failed to gc job %s(%s) of cronHPA %s in namespace %s", job.Name(), job.SchedulePlan(), hpa.Name, hpa.Namespace)
return true
}
}
cm.delete(job.ID())
// metrics update
// when a job is in cron engine but not in crd.
// that means the job has been expired and need to be clean up.
KubeExpiredJobsInCronEngineTotal.Add(1)
}

log.Warningf("Failed to find job %s of cronHPA %s in %s in cron engine and resubmit the job.", job.Name(), hpa.Name, hpa.Namespace)
cm.cronExecutor.AddJob(job)
// metrics update
// ignore other errors
}
if !found {
if exitsts {
if reason == JobTimeOut {
cm.eventRecorder.Event(instance, v1.EventTypeWarning, "OutOfDate", fmt.Sprintf("rerun out of date job: %s", job.Name()))
log.Warningf("Failed to find job %s (job id: %s, plan %s) in cronHPA %s in %s in cron engine and rerun the job.", job.Name(), job.ID(), job.SchedulePlan(), hpa.Name, hpa.Namespace)
if msg, reRunErr := job.Run(); reRunErr != nil {
log.Errorf("failed to rerun out of date job %s (job id: %s, plan %s) in cronHPA %s in %s, msg:%s, err %v",
job.Name(), job.ID(), job.SchedulePlan(), hpa.Name, hpa.Namespace, msg, reRunErr)
}
}

log.Warningf("Failed to find job %s of cronHPA %s in %s in cron engine and resubmit the job.", job.Name(), hpa.Name, hpa.Namespace)
cm.cronExecutor.AddJob(job)
}

// metrics update
// when one job is not in cron engine but in crd.
// That means the job is failed and need to be resubmitted.
KubeFailedJobsInCronEngineTotal.Add(1)
KubeSubmittedJobsInCronEngineTotal.Add(1)
continue
} else {
instance := &autoscalingv1beta1.CronHorizontalPodAutoscaler{}
if err := cm.client.Get(context.Background(), types.NamespacedName{
Namespace: hpa.Namespace,
Name: hpa.Name,
}, instance); err != nil {
if errors.IsNotFound(err) {
log.Infof("remove job %s of cronHPA %s in %s namespace", job.Name(), hpa.Name, hpa.Namespace)
err := cm.cronExecutor.RemoveJob(job)
if err != nil {
log.Errorf("Failed to gc job %s of cronHPA %s in %s namespace", job.Name(), hpa.Name, hpa.Namespace)
continue
}
cm.delete(job.ID())
// metrics update
// when a job is in cron engine but not in crd.
// that means the job has been expired and need to be clean up.
KubeExpiredJobsInCronEngineTotal.Add(1)
}

// metrics update
// ignore other errors
}
conditions := instance.Status.Conditions
for _, c := range conditions {
if c.JobId != job.ID() {
Expand All @@ -291,8 +287,12 @@ func (cm *CronManager) GC() {
}

}
return true
}
left := len(cm.jobQueue)

cm.jobQueue.Range(gcJobFunc)

left := queueLength(cm.jobQueue)

// metrics update
// set total jobs in cron engine
Expand All @@ -305,7 +305,7 @@ func NewCronManager(cfg *rest.Config, client client.Client, recorder record.Even
cm := &CronManager{
cfg: cfg,
client: client,
jobQueue: make(map[string]CronJob),
jobQueue: &sync.Map{},
eventRecorder: recorder,
}

Expand All @@ -330,3 +330,12 @@ func NewCronManager(cfg *rest.Config, client client.Client, recorder record.Even
cm.cronExecutor = NewCronHPAExecutor(nil, cm.JobResultHandler)
return cm
}

func queueLength(que *sync.Map) int64 {
len := int64(0)
que.Range(func(k, v interface{}) bool {
len++
return true
})
return len
}