Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
chore: move message to job folder
Signed-off-by: Bo-Yi.Wu <[email protected]>
  • Loading branch information
appleboy committed Oct 29, 2022
commit aaa1241ba1ba84d2bc01cea83b50356ab9033e00
15 changes: 8 additions & 7 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/goccy/go-json"
"github.com/golang-queue/queue/core"
"github.com/golang-queue/queue/job"
)

var _ core.Worker = (*Consumer)(nil)
Expand All @@ -26,12 +27,12 @@ type Consumer struct {
stopFlag int32
}

func (s *Consumer) handle(job *Job) error {
func (s *Consumer) handle(m *job.Message) error {
// create channel with buffer size 1 to avoid goroutine leak
done := make(chan error, 1)
panicChan := make(chan interface{}, 1)
startTime := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), job.Timeout)
ctx, cancel := context.WithTimeout(context.Background(), m.Timeout)
defer func() {
cancel()
}()
Expand All @@ -46,10 +47,10 @@ func (s *Consumer) handle(job *Job) error {
}()

// run custom process function
if job.Task != nil {
done <- job.Task(ctx)
if m.Task != nil {
done <- m.Task(ctx)
} else {
done <- s.runFunc(ctx, job)
done <- s.runFunc(ctx, m)
}
}()

Expand All @@ -62,7 +63,7 @@ func (s *Consumer) handle(job *Job) error {
// cancel job
cancel()

leftTime := job.Timeout - time.Since(startTime)
leftTime := m.Timeout - time.Since(startTime)
// wait job
select {
case <-time.After(leftTime):
Expand All @@ -79,7 +80,7 @@ func (s *Consumer) handle(job *Job) error {

// Run to execute new task
func (s *Consumer) Run(task core.QueuedMessage) error {
data := task.(*Job)
data := task.(*job.Message)
if data.Task == nil {
_ = json.Unmarshal(task.Bytes(), data)
}
Expand Down
35 changes: 18 additions & 17 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/golang-queue/queue/core"
"github.com/golang-queue/queue/job"
"github.com/golang-queue/queue/mocks"

"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -103,7 +104,7 @@ func TestJobReachTimeout(t *testing.T) {
WithWorkerCount(2),
)
assert.NoError(t, err)
assert.NoError(t, q.Queue(m, WithTimeout(30*time.Millisecond)))
assert.NoError(t, q.Queue(m, job.WithTimeout(30*time.Millisecond)))
q.Start()
time.Sleep(50 * time.Millisecond)
q.Release()
Expand Down Expand Up @@ -137,8 +138,8 @@ func TestCancelJobAfterShutdown(t *testing.T) {
WithWorkerCount(2),
)
assert.NoError(t, err)
assert.NoError(t, q.Queue(m, WithTimeout(100*time.Millisecond)))
assert.NoError(t, q.Queue(m, WithTimeout(100*time.Millisecond)))
assert.NoError(t, q.Queue(m, job.WithTimeout(100*time.Millisecond)))
assert.NoError(t, q.Queue(m, job.WithTimeout(100*time.Millisecond)))
q.Start()
time.Sleep(10 * time.Millisecond)
assert.Equal(t, 2, int(q.metric.busyWorkers))
Expand Down Expand Up @@ -207,7 +208,7 @@ func TestGoroutinePanic(t *testing.T) {
}

func TestHandleTimeout(t *testing.T) {
job := &Job{
m := &job.Message{
Timeout: 100 * time.Millisecond,
Payload: []byte("foo"),
}
Expand All @@ -218,11 +219,11 @@ func TestHandleTimeout(t *testing.T) {
}),
)

err := w.handle(job)
err := w.handle(m)
assert.Error(t, err)
assert.Equal(t, context.DeadlineExceeded, err)

job = &Job{
m = &job.Message{
Timeout: 150 * time.Millisecond,
Payload: []byte("foo"),
}
Expand All @@ -236,7 +237,7 @@ func TestHandleTimeout(t *testing.T) {

done := make(chan error)
go func() {
done <- w.handle(job)
done <- w.handle(m)
}()

err = <-done
Expand All @@ -245,7 +246,7 @@ func TestHandleTimeout(t *testing.T) {
}

func TestJobComplete(t *testing.T) {
job := &Job{
m := &job.Message{
Timeout: 100 * time.Millisecond,
Payload: []byte("foo"),
}
Expand All @@ -255,11 +256,11 @@ func TestJobComplete(t *testing.T) {
}),
)

err := w.handle(job)
err := w.handle(m)
assert.Error(t, err)
assert.Equal(t, errors.New("job completed"), err)

job = &Job{
m = &job.Message{
Timeout: 250 * time.Millisecond,
Payload: []byte("foo"),
}
Expand All @@ -273,7 +274,7 @@ func TestJobComplete(t *testing.T) {

done := make(chan error)
go func() {
done <- w.handle(job)
done <- w.handle(m)
}()

err = <-done
Expand All @@ -282,19 +283,19 @@ func TestJobComplete(t *testing.T) {
}

func TestTaskJobComplete(t *testing.T) {
job := &Job{
m := &job.Message{
Timeout: 100 * time.Millisecond,
Task: func(ctx context.Context) error {
return errors.New("job completed")
},
}
w := NewConsumer()

err := w.handle(job)
err := w.handle(m)
assert.Error(t, err)
assert.Equal(t, errors.New("job completed"), err)

job = &Job{
m = &job.Message{
Timeout: 250 * time.Millisecond,
Task: func(ctx context.Context) error {
return nil
Expand All @@ -304,21 +305,21 @@ func TestTaskJobComplete(t *testing.T) {
w = NewConsumer()
done := make(chan error)
go func() {
done <- w.handle(job)
done <- w.handle(m)
}()

err = <-done
assert.NoError(t, err)

// job timeout
job = &Job{
m = &job.Message{
Timeout: 50 * time.Millisecond,
Task: func(ctx context.Context) error {
time.Sleep(60 * time.Millisecond)
return nil
},
}
assert.Equal(t, context.DeadlineExceeded, w.handle(job))
assert.Equal(t, context.DeadlineExceeded, w.handle(m))
}

func TestIncreaseWorkerCount(t *testing.T) {
Expand Down
20 changes: 12 additions & 8 deletions job.go → job/job.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package queue
package job

import (
"context"
"time"

"github.com/goccy/go-json"
)

// Job describes a task and its metadata.
type Job struct {
// TaskFunc is the task function
type TaskFunc func(context.Context) error

// Message describes a task and its metadata.
type Message struct {
Task TaskFunc `json:"-"`

// Timeout is the duration the task can be processed by Handler.
Expand All @@ -25,16 +29,16 @@ type Job struct {
}

// Bytes get string body
func (j *Job) Bytes() []byte {
if j.Task != nil {
func (m *Message) Bytes() []byte {
if m.Task != nil {
return nil
}
return j.Payload
return m.Payload
}

// Encode for encoding the structure
func (j *Job) encode() []byte {
b, _ := json.Marshal(j)
func (m *Message) Encode() []byte {
b, _ := json.Marshal(m)

return b
}
55 changes: 55 additions & 0 deletions job/option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package job

import "time"

type Config struct {
RetryCount int64
RetryDelay time.Duration
Timeout time.Duration
}

// An Option configures a mutex.
type Option interface {
Apply(*Config)
}

// OptionFunc is a function that configures a job.
type OptionFunc func(*Config)

// Apply calls f(option)
func (f OptionFunc) Apply(option *Config) {
f(option)
}

func DefaultOptions(opts ...Option) *Config {
o := &Config{
RetryCount: 0,
RetryDelay: 100 * time.Millisecond,
}

// Loop through each option
for _, opt := range opts {
// Call the option giving the instantiated
opt.Apply(o)
}

return o
}

func WithRetryCount(count int64) Option {
return OptionFunc(func(q *Config) {
q.RetryCount = count
})
}

func WithRetryDelay(t time.Duration) Option {
return OptionFunc(func(q *Config) {
q.RetryDelay = t
})
}

func WithTimeout(t time.Duration) Option {
return OptionFunc(func(q *Config) {
q.Timeout = t
})
}
40 changes: 0 additions & 40 deletions job_option.go

This file was deleted.

Loading