Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
chore: update json library
Signed-off-by: Bo-Yi Wu <[email protected]>
  • Loading branch information
appleboy committed May 29, 2022
commit 15c43eca41b777539a59eff02f47a1274c6d02b3
9 changes: 1 addition & 8 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package queue

import (
"context"
"encoding/json"
"errors"
"sync"
"sync/atomic"
"time"

"github.com/goccy/go-json"
"github.com/golang-queue/queue/core"
)

Expand Down Expand Up @@ -79,13 +79,6 @@ func (s *Consumer) handle(job *Job) error {

// Run to execute new task
func (s *Consumer) Run(task core.QueuedMessage) error {
// var data Job
// _ = json.Unmarshal(task.Bytes(), &data)
// if v, ok := task.(Job); ok {
// if v.Task != nil {
// data.Task = v.Task
// }
// }
data := task.(*Job)
if data.Task == nil {
_ = json.Unmarshal(task.Bytes(), data)
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ module github.com/golang-queue/queue
go 1.18

require (
github.com/goccy/go-json v0.9.7
github.com/golang/mock v1.6.0
github.com/stretchr/testify v1.7.1
go.uber.org/goleak v1.1.12
)

require (
github.com/davecgh/go-spew v1.1.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
)
5 changes: 4 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/goccy/go-json v0.9.7 h1:IcB+Aqpx/iMHu5Yooh7jEzJk1JZ7Pjtmys2ukPr7EeM=
github.com/goccy/go-json v0.9.7/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
Expand Down
31 changes: 14 additions & 17 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package queue

import (
"context"
"encoding/json"
"errors"
"sync"
"sync/atomic"
"time"

"github.com/goccy/go-json"
"github.com/golang-queue/queue/core"
)

Expand Down Expand Up @@ -47,16 +47,17 @@ type (
)

// Bytes get string body
func (j Job) Bytes() []byte {
func (j *Job) Bytes() []byte {
if j.Task != nil {
return nil
}
return j.Payload
}

// Encode for encoding the structure
func (j Job) Encode() []byte {
func (j *Job) Encode() []byte {
b, _ := json.Marshal(j)

return b
}

Expand Down Expand Up @@ -100,11 +101,11 @@ func (q *Queue) Shutdown() {
return
}

if q.metric.BusyWorkers() > 0 {
q.logger.Infof("shutdown all tasks: %d workers", q.metric.BusyWorkers())
}

q.stopOnce.Do(func() {
if q.metric.BusyWorkers() > 0 {
q.logger.Infof("shutdown all tasks: %d workers", q.metric.BusyWorkers())
}

if err := q.worker.Shutdown(); err != nil {
q.logger.Error(err)
}
Expand Down Expand Up @@ -158,13 +159,11 @@ func (q *Queue) handleQueue(timeout time.Duration, job core.QueuedMessage) error
return ErrQueueShutdown
}

data := &Job{
Timeout: timeout,
Payload: job.Bytes(),
}

if err := q.worker.Queue(&Job{
Payload: data.Encode(),
Payload: (&Job{
Timeout: timeout,
Payload: job.Bytes(),
}).Encode(),
}); err != nil {
return err
}
Expand All @@ -189,12 +188,10 @@ func (q *Queue) handleQueueTask(timeout time.Duration, task TaskFunc) error {
return ErrQueueShutdown
}

data := &Job{
if err := q.worker.Queue(&Job{
Timeout: timeout,
Task: task,
}

if err := q.worker.Queue(data); err != nil {
}); err != nil {
return err
}

Expand Down
3 changes: 2 additions & 1 deletion queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,17 +143,18 @@ func TestCloseQueueAfterShutdown(t *testing.T) {
}

func BenchmarkQueueTask(b *testing.B) {
b.ReportAllocs()
q := NewPool(5)
defer q.Release()
for n := 0; n < b.N; n++ {
_ = q.QueueTask(func(context.Context) error {
time.Sleep(10 * time.Millisecond)
return nil
})
}
}

func BenchmarkQueue(b *testing.B) {
b.ReportAllocs()
m := &mockMessage{
message: "foo",
}
Expand Down
2 changes: 1 addition & 1 deletion worker_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type taskWorker struct {
}

func (w *taskWorker) Run(task core.QueuedMessage) error {
if v, ok := task.(Job); ok {
if v, ok := task.(*Job); ok {
if v.Task != nil {
_ = v.Task(context.Background())
}
Expand Down