Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
change jobqueuqe to sync.map & check cronhpa job exists first in GC
  • Loading branch information
hexi.ghx committed Feb 21, 2023
commit 2d8553871637df5e4840c7a96d60f562e7641e7a
137 changes: 73 additions & 64 deletions pkg/controller/cronmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type CronManager struct {
sync.Mutex
cfg *rest.Config
client client.Client
jobQueue map[string]CronJob
jobQueue sync.Map
//cronProcessor CronProcessor
cronExecutor CronExecutor
mapper meta.RESTMapper
Expand All @@ -47,26 +47,28 @@ type CronManager struct {
}

func (cm *CronManager) createOrUpdate(j CronJob) error {
cm.Lock()
defer cm.Unlock()
if _, ok := cm.jobQueue[j.ID()]; !ok {
if _, ok := cm.jobQueue.Load(j.ID()); !ok {
err := cm.cronExecutor.AddJob(j)
if err != nil {
return fmt.Errorf("Failed to add job to cronExecutor,because of %v", err)
}
cm.jobQueue[j.ID()] = j
cm.jobQueue.Store(j.ID(), j)
log.Infof("cronHPA job %s of cronHPA %s in %s created, %d active jobs exist", j.Name(), j.CronHPAMeta().Name, j.CronHPAMeta().Namespace,
len(cm.jobQueue))
queueLength(cm.jobQueue))
} else {
job := cm.jobQueue[j.ID()]
loadJob, _ := cm.jobQueue.Load(j.ID())
job, convert := loadJob.(*CronJobHPA)
if !convert {
return fmt.Errorf("failed to convert job %v to CronJobHPA", loadJob)
}
if ok := job.Equals(j); !ok {
err := cm.cronExecutor.Update(j)
if err != nil {
return fmt.Errorf("failed to update job %s of cronHPA %s in %s to cronExecutor, because of %v", job.Name(), job.CronHPAMeta().Name, job.CronHPAMeta().Namespace, err)
}
//update job queue
cm.jobQueue[j.ID()] = j
log.Infof("cronHPA job %s of cronHPA %s in %s updated, %d active jobs exist", j.Name(), j.CronHPAMeta().Name, j.CronHPAMeta().Namespace, len(cm.jobQueue))
cm.jobQueue.Store(j.ID(), j)
log.Infof("cronHPA job %s of cronHPA %s in %s updated, %d active jobs exist", j.Name(), j.CronHPAMeta().Name, j.CronHPAMeta().Namespace, queueLength(cm.jobQueue))
} else {
return &NoNeedUpdate{}
}
Expand All @@ -75,15 +77,14 @@ func (cm *CronManager) createOrUpdate(j CronJob) error {
}

func (cm *CronManager) delete(id string) error {
cm.Lock()
defer cm.Unlock()
if j, ok := cm.jobQueue[id]; ok {
if loadJob, ok := cm.jobQueue.Load(id); ok {
j, _ := loadJob.(*CronJobHPA)
err := cm.cronExecutor.RemoveJob(j)
if err != nil {
return fmt.Errorf("Failed to remove job from cronExecutor,because of %v", err)
}
delete(cm.jobQueue, id)
log.Infof("Remove cronHPA job %s of cronHPA %s in %s from jobQueue,%d active jobs left", j.Name(), j.CronHPAMeta().Name, j.CronHPAMeta().Namespace, len(cm.jobQueue))
cm.jobQueue.Delete(id)
log.Infof("Remove cronHPA job %s of cronHPA %s in %s from jobQueue,%d active jobs left", j.Name(), j.CronHPAMeta().Name, j.CronHPAMeta().Namespace, queueLength(cm.jobQueue))
}
return nil
}
Expand Down Expand Up @@ -206,14 +207,7 @@ func (cm *CronManager) gcLoop() {

// GC will collect all jobs which ref is not exists and recycle.
func (cm *CronManager) GC() {
log.Infof("Start GC")
m := make(map[string]CronJob)
cm.Lock()
for k, v := range cm.jobQueue {
m[k] = v
}
cm.Unlock()
current := len(cm.jobQueue)
current := queueLength(cm.jobQueue)
log.V(2).Infof("Current active jobs: %d,try to clean up the abandon ones.", current)

// clean up all metrics
Expand All @@ -222,57 +216,59 @@ func (cm *CronManager) GC() {
KubeFailedJobsInCronEngineTotal.Set(0)
KubeExpiredJobsInCronEngineTotal.Set(0)

for _, job := range m {
hpa := job.(*CronJobHPA).HPARef
gcJobFunc := func(key, j interface{}) bool {
hpa := j.(*CronJobHPA).HPARef
job := j.(*CronJobHPA)
exitsts := true
found, reason := cm.cronExecutor.FindJob(job)
if !found {
if reason == JobTimeOut {
instance := &autoscalingv1beta1.CronHorizontalPodAutoscaler{}
if err := cm.client.Get(context.Background(), types.NamespacedName{
Namespace: hpa.Namespace,
Name: hpa.Name,
}, instance); err != nil {
log.Errorf("Failed to run time out job %s due to failed to get cronHPA %s in %s namespace,err: %v", job.Name(), hpa.Name, hpa.Namespace, err)
continue
}
cm.eventRecorder.Event(instance, v1.EventTypeWarning, "OutOfDate", fmt.Sprintf("rerun out of date job: %s", job.Name()))
if msg, reRunErr := job.Run(); reRunErr != nil {
log.Errorf("failed to rerun out of date job %s, msg:%s, err %v", job.Name(), msg, reRunErr)
instance := &autoscalingv1beta1.CronHorizontalPodAutoscaler{}

// check exists first
if err := cm.client.Get(context.Background(), types.NamespacedName{
Namespace: hpa.Namespace,
Name: hpa.Name,
}, instance); err != nil {
exitsts = false
if errors.IsNotFound(err) {
log.Infof("remove job %s(%s) of cronHPA %s in %s namespace", job.Name(), job.SchedulePlan(), hpa.Name, hpa.Namespace)
if found {
err := cm.cronExecutor.RemoveJob(job)
if err != nil {
log.Errorf("Failed to gc job %s(%s) of cronHPA %s in %s namespace", job.Name(), job.SchedulePlan(), hpa.Name, hpa.Namespace)
return true
}
}
cm.delete(job.ID())
// metrics update
// when a job is in cron engine but not in crd.
// that means the job has been expired and need to be clean up.
KubeExpiredJobsInCronEngineTotal.Add(1)
}

log.Warningf("Failed to find job %s of cronHPA %s in %s in cron engine and resubmit the job.", job.Name(), hpa.Name, hpa.Namespace)
cm.cronExecutor.AddJob(job)
// metrics update
// ignore other errors
}
if !found {
if exitsts {
if reason == JobTimeOut {
cm.eventRecorder.Event(instance, v1.EventTypeWarning, "OutOfDate", fmt.Sprintf("rerun out of date job: %s", job.Name()))
log.Warningf("Failed to find job %s (job id: %s, plan %s) of cronHPA %s in %s in cron engine and rerun the job.", job.Name(), job.ID(), job.SchedulePlan(), hpa.Name, hpa.Namespace)
if msg, reRunErr := job.Run(); reRunErr != nil {
log.Errorf("failed to rerun out of date job %s, msg:%s, err %v", job.Name(), msg, reRunErr)
}
return true
}

log.Warningf("Failed to find job %s of cronHPA %s in %s in cron engine and resubmit the job.", job.Name(), hpa.Name, hpa.Namespace)
cm.cronExecutor.AddJob(job)
}

// metrics update
// when one job is not in cron engine but in crd.
// That means the job is failed and need to be resubmitted.
KubeFailedJobsInCronEngineTotal.Add(1)
KubeSubmittedJobsInCronEngineTotal.Add(1)
continue
} else {
instance := &autoscalingv1beta1.CronHorizontalPodAutoscaler{}
if err := cm.client.Get(context.Background(), types.NamespacedName{
Namespace: hpa.Namespace,
Name: hpa.Name,
}, instance); err != nil {
if errors.IsNotFound(err) {
log.Infof("remove job %s of cronHPA %s in %s namespace", job.Name(), hpa.Name, hpa.Namespace)
err := cm.cronExecutor.RemoveJob(job)
if err != nil {
log.Errorf("Failed to gc job %s of cronHPA %s in %s namespace", job.Name(), hpa.Name, hpa.Namespace)
continue
}
cm.delete(job.ID())
// metrics update
// when a job is in cron engine but not in crd.
// that means the job has been expired and need to be clean up.
KubeExpiredJobsInCronEngineTotal.Add(1)
}

// metrics update
// ignore other errors
}
conditions := instance.Status.Conditions
for _, c := range conditions {
if c.JobId != job.ID() {
Expand All @@ -291,8 +287,12 @@ func (cm *CronManager) GC() {
}

}
return true
}
left := len(cm.jobQueue)

cm.jobQueue.Range(gcJobFunc)

left := queueLength(cm.jobQueue)

// metrics update
// set total jobs in cron engine
Expand All @@ -305,7 +305,7 @@ func NewCronManager(cfg *rest.Config, client client.Client, recorder record.Even
cm := &CronManager{
cfg: cfg,
client: client,
jobQueue: make(map[string]CronJob),
jobQueue: sync.Map{},
eventRecorder: recorder,
}

Expand All @@ -330,3 +330,12 @@ func NewCronManager(cfg *rest.Config, client client.Client, recorder record.Even
cm.cronExecutor = NewCronHPAExecutor(nil, cm.JobResultHandler)
return cm
}

func queueLength(que sync.Map) int64 {
len := int64(0)
que.Range(func(k, v interface{}) bool {
len++
return true
})
return len
}