Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
b45c3ae
move shutdownfns, terminatefns and hammerfns out of separate goroutines
zeripath Apr 30, 2021
105a121
oops windows;
zeripath Apr 30, 2021
7976e99
comment changes
zeripath Apr 30, 2021
8815d90
Update manager_windows.go
zeripath Apr 30, 2021
159c3a1
The LevelDB queues can actually wait on empty instead of polling
zeripath May 1, 2021
dd18577
Merge branch 'master' into wait-on-empty
zeripath May 1, 2021
86b597f
Merge branch 'master' into wait-on-empty
zeripath May 2, 2021
12f2f45
Merge branch 'wait-on-empty' of github.com:zeripath/gitea into wait-o…
zeripath May 2, 2021
ec99e10
Shutdown the shadow level queue once it is empty
zeripath May 2, 2021
0c107aa
Remove bytefifo additional goroutine for readToChan as it can just be…
zeripath May 2, 2021
c41b52a
Remove additional removeWorkers goroutine for workers
zeripath May 2, 2021
5e35339
simplify zero boost
zeripath May 2, 2021
f546070
Merge branch 'graceful-context' into wait-on-empty
zeripath May 3, 2021
eda962d
Simplify the AtShutdown and AtTerminate functions and add Channel Flu…
zeripath May 3, 2021
5fc1c80
add logging
zeripath May 3, 2021
5ea5882
Add shutdown flusher to CUQ
zeripath May 3, 2021
fb4b1c9
move persistable channel shutdown stuff to Shutdown Fn
zeripath May 3, 2021
b0bbd0d
Ensure that UPCQ has the correct config
zeripath May 3, 2021
803dc56
Merge branch 'master' into wait-on-empty
zeripath May 3, 2021
6b95a89
placate lint
zeripath May 3, 2021
0878aa3
Merge branch 'master' into wait-on-empty
zeripath May 4, 2021
293839c
Merge branch 'main' into wait-on-empty
zeripath May 4, 2021
dae99ea
pass down context
zeripath May 4, 2021
a58b7ed
Merge branch 'main' into wait-on-empty
6543 May 5, 2021
0f9aea8
fix test
zeripath May 5, 2021
7b18173
Merge branch 'wait-on-empty' of github.com:zeripath/gitea into wait-o…
zeripath May 5, 2021
795000b
handle shutdown during the flushing
zeripath May 5, 2021
a5809e9
reduce risk of race between zeroBoost and addWorkers
zeripath May 7, 2021
b14af1f
update comment as per 6543
zeripath May 8, 2021
a9872d2
Merge branch 'main' into wait-on-empty
zeripath May 8, 2021
24ca154
prevent double shutdown
zeripath May 8, 2021
e876fec
rename contexts and their cancel fns
zeripath May 8, 2021
6a60b2a
few missed commits
zeripath May 8, 2021
a5b2de5
Merge remote-tracking branch 'origin/main' into wait-on-empty
zeripath May 8, 2021
cff032f
fix-windows
zeripath May 8, 2021
68ffb7a
Merge branch 'main' into wait-on-empty
zeripath May 9, 2021
3bbfdb6
Merge branch 'main' into wait-on-empty
zeripath May 10, 2021
cbd2c3e
Merge branch 'main' into wait-on-empty
lafriks May 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
pass down context
Signed-off-by: Andrew Thornton <[email protected]>
  • Loading branch information
zeripath committed May 4, 2021
commit dae99ea4068b2565505ec428da7c5526ed738c8e
18 changes: 10 additions & 8 deletions modules/queue/bytefifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@

package queue

import "context"

// ByteFIFO defines a FIFO that takes a byte array
type ByteFIFO interface {
// Len returns the length of the fifo
Len() int64
Len(ctx context.Context) int64
// PushFunc pushes data to the end of the fifo and calls the callback if it is added
PushFunc(data []byte, fn func() error) error
PushFunc(ctx context.Context, data []byte, fn func() error) error
// Pop pops data from the start of the fifo
Pop() ([]byte, error)
Pop(ctx context.Context) ([]byte, error)
// Close this fifo
Close() error
}
Expand All @@ -20,7 +22,7 @@ type ByteFIFO interface {
type UniqueByteFIFO interface {
ByteFIFO
// Has returns whether the fifo contains this data
Has(data []byte) (bool, error)
Has(ctx context.Context, data []byte) (bool, error)
}

var _ ByteFIFO = &DummyByteFIFO{}
Expand All @@ -29,12 +31,12 @@ var _ ByteFIFO = &DummyByteFIFO{}
type DummyByteFIFO struct{}

// PushFunc returns nil
func (*DummyByteFIFO) PushFunc(data []byte, fn func() error) error {
func (*DummyByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error {
return nil
}

// Pop returns nil
func (*DummyByteFIFO) Pop() ([]byte, error) {
func (*DummyByteFIFO) Pop(ctx context.Context) ([]byte, error) {
return []byte{}, nil
}

Expand All @@ -44,7 +46,7 @@ func (*DummyByteFIFO) Close() error {
}

// Len is always 0
func (*DummyByteFIFO) Len() int64 {
func (*DummyByteFIFO) Len(ctx context.Context) int64 {
return 0
}

Expand All @@ -56,6 +58,6 @@ type DummyUniqueByteFIFO struct {
}

// Has always returns false
func (*DummyUniqueByteFIFO) Has([]byte) (bool, error) {
func (*DummyUniqueByteFIFO) Has(ctx context.Context, data []byte) (bool, error) {
return false, nil
}
18 changes: 11 additions & 7 deletions modules/queue/queue_bytefifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error {
}
}()
}
return q.byteFIFO.PushFunc(bs, fn)
return q.byteFIFO.PushFunc(q.terminateCtx, bs, fn)
}

// IsEmpty checks if the queue is empty
Expand All @@ -106,7 +106,7 @@ func (q *ByteFIFOQueue) IsEmpty() bool {
if !q.WorkerPool.IsEmpty() {
return false
}
return q.byteFIFO.Len() == 0
return q.byteFIFO.Len(q.terminateCtx) == 0
}

// Run runs the bytefifo queue
Expand Down Expand Up @@ -140,16 +140,20 @@ func (q *ByteFIFOQueue) readToChan() {
return
default:
q.lock.Lock()
bs, err := q.byteFIFO.Pop()
bs, err := q.byteFIFO.Pop(q.shutdownCtx)
if err != nil {
q.lock.Unlock()
if err == context.Canceled {
q.cancel()
return
}
log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err)
time.Sleep(time.Millisecond * 100)
continue
}

if len(bs) == 0 {
if q.waitOnEmpty && q.byteFIFO.Len() == 0 {
if q.waitOnEmpty && q.byteFIFO.Len(q.shutdownCtx) == 0 {
q.lock.Unlock()
log.Trace("%s: %s Waiting on Empty", q.typ, q.name)
select {
Expand Down Expand Up @@ -205,10 +209,10 @@ func (q *ByteFIFOQueue) Terminate() {
return
default:
}
q.terminateCancel()
if log.IsDebug() {
log.Debug("%s: %s Closing with %d tasks left in queue", q.typ, q.name, q.byteFIFO.Len())
log.Debug("%s: %s Closing with %d tasks left in queue", q.typ, q.name, q.byteFIFO.Len(q.terminateCtx))
}
q.terminateCancel()
if err := q.byteFIFO.Close(); err != nil {
log.Error("Error whilst closing internal byte fifo in %s: %s: %v", q.typ, q.name, err)
}
Expand Down Expand Up @@ -263,5 +267,5 @@ func (q *ByteFIFOUniqueQueue) Has(data Data) (bool, error) {
if err != nil {
return false, err
}
return q.byteFIFO.(UniqueByteFIFO).Has(bs)
return q.byteFIFO.(UniqueByteFIFO).Has(q.terminateCtx, bs)
}
8 changes: 5 additions & 3 deletions modules/queue/queue_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package queue

import (
"context"

"code.gitea.io/gitea/modules/nosql"

"gitea.com/lunny/levelqueue"
Expand Down Expand Up @@ -83,7 +85,7 @@ func NewLevelQueueByteFIFO(connection, prefix string) (*LevelQueueByteFIFO, erro
}

// PushFunc will push data into the fifo
func (fifo *LevelQueueByteFIFO) PushFunc(data []byte, fn func() error) error {
func (fifo *LevelQueueByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error {
if fn != nil {
if err := fn(); err != nil {
return err
Expand All @@ -93,7 +95,7 @@ func (fifo *LevelQueueByteFIFO) PushFunc(data []byte, fn func() error) error {
}

// Pop pops data from the start of the fifo
func (fifo *LevelQueueByteFIFO) Pop() ([]byte, error) {
func (fifo *LevelQueueByteFIFO) Pop(ctx context.Context) ([]byte, error) {
data, err := fifo.internal.RPop()
if err != nil && err != levelqueue.ErrNotFound {
return nil, err
Expand All @@ -109,7 +111,7 @@ func (fifo *LevelQueueByteFIFO) Close() error {
}

// Len returns the length of the fifo
func (fifo *LevelQueueByteFIFO) Len() int64 {
func (fifo *LevelQueueByteFIFO) Len(ctx context.Context) int64 {
return fifo.internal.Len()
}

Expand Down
2 changes: 1 addition & 1 deletion modules/queue/queue_disk_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) {
atShutdown(q.Shutdown)
atTerminate(q.Terminate)

if lq, ok := q.internal.(*LevelQueue); ok && lq.byteFIFO.Len() != 0 {
if lq, ok := q.internal.(*LevelQueue); ok && lq.byteFIFO.Len(lq.shutdownCtx) != 0 {
// Just run the level queue - we shut it down once it's flushed
go q.internal.Run(func(_ func()) {}, func(_ func()) {})
go func() {
Expand Down
20 changes: 8 additions & 12 deletions modules/queue/queue_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package queue

import (
"context"
"fmt"

"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
Expand Down Expand Up @@ -47,8 +46,6 @@ func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
return nil, err
}

byteFIFO.ctx = graceful.NewChannelContext(byteFIFOQueue.IsTerminated(), fmt.Errorf("queue has been terminated"))

queue := &RedisQueue{
ByteFIFOQueue: byteFIFOQueue,
}
Expand All @@ -73,8 +70,8 @@ var _ ByteFIFO = &RedisByteFIFO{}

// RedisByteFIFO represents a ByteFIFO formed from a redisClient
type RedisByteFIFO struct {
ctx context.Context
client redisClient
client redisClient

queueName string
}

Expand All @@ -89,7 +86,6 @@ func NewRedisByteFIFO(config RedisByteFIFOConfiguration) (*RedisByteFIFO, error)
fifo := &RedisByteFIFO{
queueName: config.QueueName,
}
fifo.ctx = graceful.GetManager().TerminateContext()
fifo.client = nosql.GetManager().GetRedisClient(config.ConnectionString)
if err := fifo.client.Ping(graceful.GetManager().ShutdownContext()).Err(); err != nil {
return nil, err
Expand All @@ -98,18 +94,18 @@ func NewRedisByteFIFO(config RedisByteFIFOConfiguration) (*RedisByteFIFO, error)
}

// PushFunc pushes data to the end of the fifo and calls the callback if it is added
func (fifo *RedisByteFIFO) PushFunc(data []byte, fn func() error) error {
func (fifo *RedisByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error {
if fn != nil {
if err := fn(); err != nil {
return err
}
}
return fifo.client.RPush(fifo.ctx, fifo.queueName, data).Err()
return fifo.client.RPush(ctx, fifo.queueName, data).Err()
}

// Pop pops data from the start of the fifo
func (fifo *RedisByteFIFO) Pop() ([]byte, error) {
data, err := fifo.client.LPop(fifo.ctx, fifo.queueName).Bytes()
func (fifo *RedisByteFIFO) Pop(ctx context.Context) ([]byte, error) {
data, err := fifo.client.LPop(ctx, fifo.queueName).Bytes()
if err == nil || err == redis.Nil {
return data, nil
}
Expand All @@ -122,8 +118,8 @@ func (fifo *RedisByteFIFO) Close() error {
}

// Len returns the length of the fifo
func (fifo *RedisByteFIFO) Len() int64 {
val, err := fifo.client.LLen(fifo.ctx, fifo.queueName).Result()
func (fifo *RedisByteFIFO) Len(ctx context.Context) int64 {
val, err := fifo.client.LLen(ctx, fifo.queueName).Result()
if err != nil {
log.Error("Error whilst getting length of redis queue %s: Error: %v", fifo.queueName, err)
return -1
Expand Down
10 changes: 6 additions & 4 deletions modules/queue/unique_queue_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package queue

import (
"context"

"code.gitea.io/gitea/modules/nosql"

"gitea.com/lunny/levelqueue"
Expand Down Expand Up @@ -87,12 +89,12 @@ func NewLevelUniqueQueueByteFIFO(connection, prefix string) (*LevelUniqueQueueBy
}

// PushFunc pushes data to the end of the fifo and calls the callback if it is added
func (fifo *LevelUniqueQueueByteFIFO) PushFunc(data []byte, fn func() error) error {
func (fifo *LevelUniqueQueueByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error {
return fifo.internal.LPushFunc(data, fn)
}

// Pop pops data from the start of the fifo
func (fifo *LevelUniqueQueueByteFIFO) Pop() ([]byte, error) {
func (fifo *LevelUniqueQueueByteFIFO) Pop(ctx context.Context) ([]byte, error) {
data, err := fifo.internal.RPop()
if err != nil && err != levelqueue.ErrNotFound {
return nil, err
Expand All @@ -101,12 +103,12 @@ func (fifo *LevelUniqueQueueByteFIFO) Pop() ([]byte, error) {
}

// Len returns the length of the fifo
func (fifo *LevelUniqueQueueByteFIFO) Len() int64 {
func (fifo *LevelUniqueQueueByteFIFO) Len(ctx context.Context) int64 {
return fifo.internal.Len()
}

// Has returns whether the fifo contains this data
func (fifo *LevelUniqueQueueByteFIFO) Has(data []byte) (bool, error) {
func (fifo *LevelUniqueQueueByteFIFO) Has(ctx context.Context, data []byte) (bool, error) {
return fifo.internal.Has(data)
}

Expand Down
2 changes: 1 addition & 1 deletion modules/queue/unique_queue_disk_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(func())
atShutdown(q.Shutdown)
atTerminate(q.Terminate)

if luq, ok := q.internal.(*LevelUniqueQueue); ok && luq.ByteFIFOUniqueQueue.byteFIFO.Len() != 0 {
if luq, ok := q.internal.(*LevelUniqueQueue); ok && luq.ByteFIFOUniqueQueue.byteFIFO.Len(luq.shutdownCtx) != 0 {
// Just run the level queue - we shut it down once it's flushed
go q.internal.Run(func(_ func()) {}, func(_ func()) {})
go func() {
Expand Down
21 changes: 9 additions & 12 deletions modules/queue/unique_queue_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@
package queue

import (
"fmt"
"context"

"code.gitea.io/gitea/modules/graceful"
"github.com/go-redis/redis/v8"
)

Expand Down Expand Up @@ -51,8 +50,6 @@ func NewRedisUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue,
return nil, err
}

byteFIFO.ctx = graceful.NewChannelContext(byteFIFOQueue.IsTerminated(), fmt.Errorf("queue has been terminated"))

queue := &RedisUniqueQueue{
ByteFIFOUniqueQueue: byteFIFOQueue,
}
Expand Down Expand Up @@ -92,8 +89,8 @@ func NewRedisUniqueByteFIFO(config RedisUniqueByteFIFOConfiguration) (*RedisUniq
}

// PushFunc pushes data to the end of the fifo and calls the callback if it is added
func (fifo *RedisUniqueByteFIFO) PushFunc(data []byte, fn func() error) error {
added, err := fifo.client.SAdd(fifo.ctx, fifo.setName, data).Result()
func (fifo *RedisUniqueByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error {
added, err := fifo.client.SAdd(ctx, fifo.setName, data).Result()
if err != nil {
return err
}
Expand All @@ -105,12 +102,12 @@ func (fifo *RedisUniqueByteFIFO) PushFunc(data []byte, fn func() error) error {
return err
}
}
return fifo.client.RPush(fifo.ctx, fifo.queueName, data).Err()
return fifo.client.RPush(ctx, fifo.queueName, data).Err()
}

// Pop pops data from the start of the fifo
func (fifo *RedisUniqueByteFIFO) Pop() ([]byte, error) {
data, err := fifo.client.LPop(fifo.ctx, fifo.queueName).Bytes()
func (fifo *RedisUniqueByteFIFO) Pop(ctx context.Context) ([]byte, error) {
data, err := fifo.client.LPop(ctx, fifo.queueName).Bytes()
if err != nil && err != redis.Nil {
return data, err
}
Expand All @@ -119,13 +116,13 @@ func (fifo *RedisUniqueByteFIFO) Pop() ([]byte, error) {
return data, nil
}

err = fifo.client.SRem(fifo.ctx, fifo.setName, data).Err()
err = fifo.client.SRem(ctx, fifo.setName, data).Err()
return data, err
}

// Has returns whether the fifo contains this data
func (fifo *RedisUniqueByteFIFO) Has(data []byte) (bool, error) {
return fifo.client.SIsMember(fifo.ctx, fifo.setName, data).Result()
func (fifo *RedisUniqueByteFIFO) Has(ctx context.Context, data []byte) (bool, error) {
return fifo.client.SIsMember(ctx, fifo.setName, data).Result()
}

func init() {
Expand Down