Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 9 additions & 13 deletions modules/queue/workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,22 +308,18 @@ func (p *WorkerPool) addWorkers(ctx context.Context, cancel context.CancelFunc,
p.cond.Broadcast()
cancel()
}
if p.hasNoWorkerScaling() {
select {
case <-p.baseCtx.Done():
// Don't warn if the baseCtx is shutdown
default:
log.Warn(
"Queue: %d is configured to be non-scaling and has no workers - this configuration is likely incorrect.\n"+
"The queue will be paused to prevent data-loss with the assumption that you will add workers and unpause as required.", p.qid)
}
p.pause()
}
select {
case <-p.baseCtx.Done():
// this worker queue is shut-down don't reboost
// Don't warn or check for ongoing work if the baseCtx is shutdown
case <-p.paused:
// Don't warn or check for ongoing work if the pool is paused
default:
if p.numberOfWorkers == 0 && atomic.LoadInt64(&p.numInQueue) > 0 {
if p.hasNoWorkerScaling() {
log.Warn(
"Queue: %d is configured to be non-scaling and has no workers - this configuration is likely incorrect.\n"+
"The queue will be paused to prevent data-loss with the assumption that you will add workers and unpause as required.", p.qid)
p.pause()
} else if p.numberOfWorkers == 0 && atomic.LoadInt64(&p.numInQueue) > 0 {
// OK there are no workers but... there's still work to be done -> Reboost
p.zeroBoost()
// p.lock will be unlocked by zeroBoost
Expand Down