Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
chore: update
Signed-off-by: Bo-Yi Wu <[email protected]>
  • Loading branch information
appleboy committed Mar 27, 2022
commit 2c37dd18084b11b61ee8a0cd6e8735e7f0df3573
4 changes: 2 additions & 2 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,13 @@ func (s *Consumer) Usage() int {
}

// Queue send notification to queue
func (s *Consumer) Queue(job QueuedMessage) error {
func (s *Consumer) Queue(task QueuedMessage) error {
if atomic.LoadInt32(&s.stopFlag) == 1 {
return ErrQueueShutdown
}

select {
case s.taskQueue <- job:
case s.taskQueue <- task:
return nil
default:
return errMaxCapacity
Expand Down
12 changes: 6 additions & 6 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,6 @@ func TestCancelJobAfterShutdown(t *testing.T) {
}

func TestGoroutineLeak(t *testing.T) {
m := mockMessage{
message: "foo",
}
w := NewConsumer(
WithLogger(NewEmptyLogger()),
WithFn(func(ctx context.Context, m QueuedMessage) error {
Expand All @@ -181,13 +178,16 @@ func TestGoroutineLeak(t *testing.T) {
}),
)
q, err := NewQueue(
WithLogger(NewEmptyLogger()),
WithLogger(NewLogger()),
WithWorker(w),
WithWorkerCount(10),
)
assert.NoError(t, err)
for i := 0; i < 500; i++ {
m.message = fmt.Sprintf("foobar: %d", i+1)
for i := 0; i < 400; i++ {
m := mockMessage{
message: fmt.Sprintf("new message: %d", i+1),
}

assert.NoError(t, q.Queue(m))
}

Expand Down
10 changes: 5 additions & 5 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,10 @@ func (q *Queue) schedule() {

// start handle job
func (q *Queue) start() {
var task QueuedMessage
tasks := make(chan QueuedMessage, 1)

for {
var task QueuedMessage
if atomic.LoadInt32(&q.stopFlag) == 1 {
return
}
Expand All @@ -227,8 +227,8 @@ func (q *Queue) start() {
case <-q.quit:
return
default:
task, err := q.worker.Request()
if task == nil || err != nil {
t, err := q.worker.Request()
if t == nil || err != nil {
if err != nil {
select {
case <-q.quit:
Expand All @@ -238,8 +238,8 @@ func (q *Queue) start() {
}
}
}
if task != nil {
tasks <- task
if t != nil {
tasks <- t
break loop
}
}
Expand Down
2 changes: 0 additions & 2 deletions worker_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@ type taskWorker struct {
func (w *taskWorker) BeforeRun() error { return nil }
func (w *taskWorker) AfterRun() error { return nil }
func (w *taskWorker) Run(task QueuedMessage) error {
// for msg := range w.messages {
if v, ok := task.(Job); ok {
if v.Task != nil {
_ = v.Task(context.Background())
}
}
// }
return nil
}

Expand Down