Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
3a7e682
Move process to create contexts
zeripath Sep 18, 2021
47bda44
display children processes
zeripath Sep 18, 2021
cd16cbb
Make requests a process
zeripath Sep 18, 2021
12039b9
Add context to repo and add ctx to OpenRepository
zeripath Sep 18, 2021
d2b01e4
minor comments
zeripath Sep 22, 2021
659fcf6
Merge remote-tracking branch 'origin/main' into request-as-process
zeripath Sep 23, 2021
9598eca
fix lint and children lock
zeripath Sep 23, 2021
3344e26
Merge remote-tracking branch 'origin/main' into request-as-process
zeripath Oct 10, 2021
7cf7749
separate remove and cancel functions
zeripath Oct 10, 2021
a8e228e
associate repo functions with the repo context
zeripath Oct 10, 2021
08b77d1
fix lint
zeripath Oct 13, 2021
47b0614
Merge remote-tracking branch 'origin/main' into request-as-process
zeripath Oct 13, 2021
518b79e
Merge remote-tracking branch 'origin/main' into request-as-process
zeripath Oct 15, 2021
9bb820b
Simplify PID to strings using the time of start plus/minus a counter
zeripath Oct 15, 2021
9895680
extract process out of manager.go
zeripath Oct 15, 2021
afc5b41
fix test
zeripath Oct 15, 2021
7446c87
Merge remote-tracking branch 'origin/main' into request-as-process
zeripath Oct 15, 2021
86ab980
Merge branch 'main' into request-as-process
zeripath Oct 17, 2021
4ce4614
Make the Mirror Queue a queue (#17326)
zeripath Oct 17, 2021
633b041
make mirroring a process
zeripath Oct 17, 2021
377a384
Ensure that mirrors are al within the same context
zeripath Oct 17, 2021
32c58ee
Merge remote-tracking branch 'origin/main' into request-as-process
zeripath Oct 17, 2021
71dec99
add clarity to the difference between cancel and remove
zeripath Oct 19, 2021
9e95fdb
add explanatory notes for remove and close
zeripath Oct 19, 2021
3010f59
Merge remote-tracking branch 'origin/main' into request-as-process
zeripath Oct 19, 2021
9cc97d0
explicitly name the arguments in the blame reader
zeripath Oct 19, 2021
e4aebfb
Change remove to finished
zeripath Oct 20, 2021
217fbf7
update blame documentation
zeripath Oct 20, 2021
6b6ac80
as per review
zeripath Oct 20, 2021
0fcbc38
Merge remote-tracking branch 'origin/main' into request-as-process
zeripath Oct 27, 2021
e06216b
Close the cat-file batch and checks after the context cancellation
zeripath Oct 27, 2021
2062e43
Merge remote-tracking branch 'origin/main' into request-as-process
zeripath Oct 28, 2021
46c2b7a
Merge branch 'main' into request-as-process
zeripath Nov 1, 2021
37bfa14
Merge branch 'main' into request-as-process
zeripath Nov 5, 2021
ad2e278
Merge remote-tracking branch 'origin/main' into request-as-process
zeripath Nov 16, 2021
6062b04
Merge remote-tracking branch 'origin/main' into request-as-process
zeripath Nov 20, 2021
59dc919
Merge remote-tracking branch 'origin/main' into request-as-process
zeripath Nov 26, 2021
347e6a8
Merge branch 'main' into request-as-process
lafriks Nov 28, 2021
fd86412
Merge branch 'main' into request-as-process
lunny Nov 28, 2021
0d1ae72
Merge remote-tracking branch 'origin/main' into request-as-process
zeripath Nov 28, 2021
1d565bb
Merge branch 'main' into request-as-process
zeripath Nov 29, 2021
bbe69c8
Merge branch 'main' into request-as-process
zeripath Nov 30, 2021
1203fa9
Ensure that http requests use the same context as the request
zeripath Nov 30, 2021
772d31d
use the repo context in the diff
zeripath Nov 30, 2021
37f0716
improve code documentation
zeripath Nov 30, 2021
ec6b663
use the gitrepo context
zeripath Nov 30, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Make the Mirror Queue a queue (#17326)
Convert the old mirror syncing queue to the more modern queue format.

Fix a bug in the from the repo-archive queue PR - the assumption was made that uniqueness could be enforced with by checking equality in a map in channel unique queues - however this only works for primitive types - which was the initial intention but is an imperfect. This is fixed by marshalling the data and placing the martialled data in the unique map instead.

The documentation is also updated to add information about the deprecated configuration values.

Signed-off-by: Andrew Thornton <[email protected]>
  • Loading branch information
zeripath committed Oct 17, 2021
commit 4ce4614a3c88b94a0c4bef99f2de3c57fb0c0ec2
4 changes: 2 additions & 2 deletions custom/conf/app.example.ini
Original file line number Diff line number Diff line change
Expand Up @@ -769,10 +769,10 @@ PATH =
;; Global limit of repositories per user, applied at creation time. -1 means no limit
;MAX_CREATION_LIMIT = -1
;;
;; Mirror sync queue length, increase if mirror syncing starts hanging
;; Mirror sync queue length, increase if mirror syncing starts hanging (DEPRECATED: please use [queue.mirror] LENGTH instead)
;MIRROR_QUEUE_LENGTH = 1000
;;
;; Patch test queue length, increase if pull request patch testing starts hanging
;; Patch test queue length, increase if pull request patch testing starts hanging (DEPRECATED: please use [queue.pr_patch_checker] LENGTH instead)
;PULL_REQUEST_QUEUE_LENGTH = 1000
;;
;; Preferred Licenses to place at the top of the List
Expand Down
39 changes: 36 additions & 3 deletions docs/content/doc/advanced/config-cheat-sheet.en-us.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ Values containing `#` or `;` must be quoted using `` ` `` or `"""`.
- `DEFAULT_PUSH_CREATE_PRIVATE`: **true**: Default private when creating a new repository with push-to-create.
- `MAX_CREATION_LIMIT`: **-1**: Global maximum creation limit of repositories per user,
`-1` means no limit.
- `PULL_REQUEST_QUEUE_LENGTH`: **1000**: Length of pull request patch test queue, make it
- `PULL_REQUEST_QUEUE_LENGTH`: **1000**: Length of pull request patch test queue, make it. **DEPRECATED** use `LENGTH` in `[queue.pr_patch_checker]`.
as large as possible. Use caution when editing this value.
- `MIRROR_QUEUE_LENGTH`: **1000**: Patch test queue length, increase if pull request patch
testing starts hanging.
testing starts hanging. **DEPRECATED** use `LENGTH` in `[queue.mirror]`.
- `PREFERRED_LICENSES`: **Apache License 2.0,MIT License**: Preferred Licenses to place at
the top of the list. Name must match file name in options/license or custom/options/license.
- `DISABLE_HTTP_GIT`: **false**: Disable the ability to interact with repositories over the
Expand Down Expand Up @@ -382,6 +382,8 @@ relation to port exhaustion.

## Queue (`queue` and `queue.*`)

Configuration at `[queue]` will set defaults for queues with overrides for individual queues at `[queue.*]`. (However see below.)

- `TYPE`: **persistable-channel**: General queue type, currently support: `persistable-channel` (uses a LevelDB internally), `channel`, `level`, `redis`, `dummy`
- `DATADIR`: **queues/**: Base DataDir for storing persistent and level queues. `DATADIR` for individual queues can be set in `queue.name` sections but will default to `DATADIR/`**`common`**. (Previously each queue would default to `DATADIR/`**`name`**.)
- `LENGTH`: **20**: Maximal queue size before channel queues block
Expand All @@ -400,6 +402,37 @@ relation to port exhaustion.
- `BOOST_TIMEOUT`: **5m**: Boost workers will timeout after this long.
- `BOOST_WORKERS`: **1** (v1.14 and before: **5**): This many workers will be added to the worker pool if there is a boost.

Gitea creates the following non-unique queues:

- `code_indexer`
- `issue_indexer`
- `notification-service`
- `task`
- `mail`
- `push_update`

And the following unique queues:

- `repo_stats_update`
- `repo-archive`
- `mirror`
- `pr_patch_checker`

Certain queues have defaults that override the defaults set in `[queue]` (this occurs mostly to support older configuration):

- `[queue.issue_indexer]`
- `TYPE` this will default to `[queue]` `TYPE` if it is set but if not it will appropriately convert `[indexer]` `ISSUE_INDEXER_QUEUE_TYPE` if that is set.
- `LENGTH` will default to `[indexer]` `UPDATE_BUFFER_LEN` if that is set.
- `BATCH_LENGTH` will default to `[indexer]` `ISSUE_INDEXER_QUEUE_BATCH_NUMBER` if that is set.
- `DATADIR` will default to `[indexer]` `ISSUE_INDEXER_QUEUE_DIR` if that is set.
- `CONN_STR` will default to `[indexer]` `ISSUE_INDEXER_QUEUE_CONN_STR` if that is set.
- `[queue.mailer]`
- `LENGTH` will default to **100** or whatever `[mailer]` `SEND_BUFFER_LEN` is.
- `[queue.pr_patch_checker]`
- `LENGTH` will default to **1000** or whatever `[repository]` `PULL_REQUEST_QUEUE_LENGTH` is.
- `[queue.mirror]`
- `LENGTH` will default to **1000** or whatever `[repository]` `MIRROR_QUEUE_LENGTH` is.

## Admin (`admin`)

- `DEFAULT_EMAIL_NOTIFICATIONS`: **enabled**: Default configuration for email notifications for users (user configurable). Options: enabled, onmention, disabled
Expand Down Expand Up @@ -588,7 +621,7 @@ Define allowed algorithms and their minimum key length (use -1 to disable a type
command or full path).
- `SENDMAIL_ARGS`: **_empty_**: Specify any extra sendmail arguments.
- `SENDMAIL_TIMEOUT`: **5m**: default timeout for sending email through sendmail
- `SEND_BUFFER_LEN`: **100**: Buffer length of mailing queue.
- `SEND_BUFFER_LEN`: **100**: Buffer length of mailing queue. **DEPRECATED** use `LENGTH` in `[queue.mailer]`

## Cache (`cache`)

Expand Down
29 changes: 22 additions & 7 deletions modules/queue/unique_queue_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"sync"

"code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/log"
)

Expand All @@ -29,7 +30,7 @@ type ChannelUniqueQueueConfiguration ChannelQueueConfiguration
type ChannelUniqueQueue struct {
*WorkerPool
lock sync.Mutex
table map[Data]bool
table map[string]bool
shutdownCtx context.Context
shutdownCtxCancel context.CancelFunc
terminateCtx context.Context
Expand All @@ -54,7 +55,7 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue
shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)

queue := &ChannelUniqueQueue{
table: map[Data]bool{},
table: map[string]bool{},
shutdownCtx: shutdownCtx,
shutdownCtxCancel: shutdownCtxCancel,
terminateCtx: terminateCtx,
Expand All @@ -65,9 +66,13 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue
}
queue.WorkerPool = NewWorkerPool(func(data ...Data) {
for _, datum := range data {
// No error is possible here because PushFunc ensures that this can be marshalled
bs, _ := json.Marshal(datum)

queue.lock.Lock()
delete(queue.table, datum)
delete(queue.table, string(bs))
queue.lock.Unlock()

handle(datum)
}
}, config.WorkerPoolConfiguration)
Expand All @@ -94,23 +99,28 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error {
if !assignableTo(data, q.exemplar) {
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name)
}

bs, err := json.Marshal(data)
if err != nil {
return err
}
q.lock.Lock()
locked := true
defer func() {
if locked {
q.lock.Unlock()
}
}()
if _, ok := q.table[data]; ok {
if _, ok := q.table[string(bs)]; ok {
return ErrAlreadyInQueue
}
// FIXME: We probably need to implement some sort of limit here
// If the downstream queue blocks this table will grow without limit
q.table[data] = true
q.table[string(bs)] = true
if fn != nil {
err := fn()
if err != nil {
delete(q.table, data)
delete(q.table, string(bs))
return err
}
}
Expand All @@ -122,9 +132,14 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error {

// Has checks if the data is in the queue
func (q *ChannelUniqueQueue) Has(data Data) (bool, error) {
bs, err := json.Marshal(data)
if err != nil {
return false, err
}

q.lock.Lock()
defer q.lock.Unlock()
_, has := q.table[data]
_, has := q.table[string(bs)]
return has, nil
}

Expand Down
2 changes: 0 additions & 2 deletions modules/setting/mailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
// Mailer represents mail service.
type Mailer struct {
// Mailer
QueueLength int
Name string
From string
FromName string
Expand Down Expand Up @@ -54,7 +53,6 @@ func newMailService() {
}

MailService = &Mailer{
QueueLength: sec.Key("SEND_BUFFER_LEN").MustInt(100),
Name: sec.Key("NAME").MustString(AppName),
SendAsPlainText: sec.Key("SEND_AS_PLAIN_TEXT").MustBool(false),
MailerType: sec.Key("MAILER_TYPE").In("", []string{"smtp", "sendmail", "dummy"}),
Expand Down
64 changes: 39 additions & 25 deletions modules/setting/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
package setting

import (
"fmt"
"path/filepath"
"strconv"
"time"

"code.gitea.io/gitea/modules/log"
ini "gopkg.in/ini.v1"
)

// QueueSettings represent the settings for a queue from the ini
Expand Down Expand Up @@ -106,11 +107,8 @@ func NewQueueService() {

// Now handle the old issue_indexer configuration
section := Cfg.Section("queue.issue_indexer")
sectionMap := map[string]bool{}
for _, key := range section.Keys() {
sectionMap[key.Name()] = true
}
if _, ok := sectionMap["TYPE"]; !ok && defaultType == "" {
directlySet := toDirectlySetKeysMap(section)
if !directlySet["TYPE"] && defaultType == "" {
switch Indexer.IssueQueueType {
case LevelQueueType:
_, _ = section.NewKey("TYPE", "level")
Expand All @@ -125,37 +123,53 @@ func NewQueueService() {
Indexer.IssueQueueType)
}
}
if _, ok := sectionMap["LENGTH"]; !ok && Indexer.UpdateQueueLength != 0 {
_, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Indexer.UpdateQueueLength))
if !directlySet["LENGTH"] && Indexer.UpdateQueueLength != 0 {
_, _ = section.NewKey("LENGTH", strconv.Itoa(Indexer.UpdateQueueLength))
}
if _, ok := sectionMap["BATCH_LENGTH"]; !ok && Indexer.IssueQueueBatchNumber != 0 {
_, _ = section.NewKey("BATCH_LENGTH", fmt.Sprintf("%d", Indexer.IssueQueueBatchNumber))
if !directlySet["BATCH_LENGTH"] && Indexer.IssueQueueBatchNumber != 0 {
_, _ = section.NewKey("BATCH_LENGTH", strconv.Itoa(Indexer.IssueQueueBatchNumber))
}
if _, ok := sectionMap["DATADIR"]; !ok && Indexer.IssueQueueDir != "" {
if !directlySet["DATADIR"] && Indexer.IssueQueueDir != "" {
_, _ = section.NewKey("DATADIR", Indexer.IssueQueueDir)
}
if _, ok := sectionMap["CONN_STR"]; !ok && Indexer.IssueQueueConnStr != "" {
if !directlySet["CONN_STR"] && Indexer.IssueQueueConnStr != "" {
_, _ = section.NewKey("CONN_STR", Indexer.IssueQueueConnStr)
}

// Handle the old mailer configuration
section = Cfg.Section("queue.mailer")
sectionMap = map[string]bool{}
for _, key := range section.Keys() {
sectionMap[key.Name()] = true
}
if _, ok := sectionMap["LENGTH"]; !ok {
_, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Cfg.Section("mailer").Key("SEND_BUFFER_LEN").MustInt(100)))
}
handleOldLengthConfiguration("mailer", Cfg.Section("mailer").Key("SEND_BUFFER_LEN").MustInt(100))

// Handle the old test pull requests configuration
// Please note this will be a unique queue
section = Cfg.Section("queue.pr_patch_checker")
sectionMap = map[string]bool{}
handleOldLengthConfiguration("pr_patch_checker", Cfg.Section("repository").Key("PULL_REQUEST_QUEUE_LENGTH").MustInt(1000))

// Handle the old mirror queue configuration
// Please note this will be a unique queue
handleOldLengthConfiguration("mirror", Cfg.Section("repository").Key("MIRROR_QUEUE_LENGTH").MustInt(1000))
}

// handleOldLengthConfiguration allows fallback to older configuration. `[queue.name]` `LENGTH` will override this configuration, but
// if that is left unset then we should fallback to the older configuration. (Except where the new length woul be <=0)
func handleOldLengthConfiguration(queueName string, value int) {
// Don't override with 0
if value <= 0 {
return
}

section := Cfg.Section("queue." + queueName)
directlySet := toDirectlySetKeysMap(section)
if !directlySet["LENGTH"] {
_, _ = section.NewKey("LENGTH", strconv.Itoa(value))
}
}

// toDirectlySetKeysMap returns a bool map of keys directly set by this section
// Note: we cannot use section.HasKey(...) as that will immediately set the Key if a parent section has the Key
// but this section does not.
func toDirectlySetKeysMap(section *ini.Section) map[string]bool {
sectionMap := map[string]bool{}
for _, key := range section.Keys() {
sectionMap[key.Name()] = true
}
if _, ok := sectionMap["LENGTH"]; !ok {
_, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Repository.PullRequestQueueLength))
}
return sectionMap
}
4 changes: 0 additions & 4 deletions modules/setting/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ var (
DefaultPrivate string
DefaultPushCreatePrivate bool
MaxCreationLimit int
MirrorQueueLength int
PullRequestQueueLength int
PreferredLicenses []string
DisableHTTPGit bool
AccessControlAllowOrigin string
Expand Down Expand Up @@ -142,8 +140,6 @@ var (
DefaultPrivate: RepoCreatingLastUserVisibility,
DefaultPushCreatePrivate: true,
MaxCreationLimit: -1,
MirrorQueueLength: 1000,
PullRequestQueueLength: 1000,
PreferredLicenses: []string{"Apache License 2.0", "MIT License"},
DisableHTTPGit: false,
AccessControlAllowOrigin: "",
Expand Down
Loading