Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
333aec5
Passes PSQL, SQLite and MSSQL
guillep2k Jan 20, 2020
70a2a8d
Move to upsert strategy; all tests work
guillep2k Jan 21, 2020
677e49f
Merge branch 'master' into bylock-indexes
guillep2k Jan 21, 2020
d834fb1
Use a LockedResource to numerate issues and prs
guillep2k Jan 21, 2020
58ac901
Fix tests and reserved keyword
guillep2k Jan 21, 2020
aa3797c
Fix unit tests
guillep2k Jan 21, 2020
95db48a
Fix export comments
guillep2k Jan 22, 2020
d8ad174
A little refactoring and better function naming
guillep2k Jan 22, 2020
6bec3d5
Merge branch 'master' into bylock-indexes
guillep2k Jan 22, 2020
e747a2c
Support LockType == "" and LockKey == 0
guillep2k Jan 22, 2020
afeb6f0
Merge branch 'master' into bylock-indexes
guillep2k Jan 22, 2020
c503f0d
Prepare for merge
guillep2k Jan 28, 2020
9b7ec1d
Merge branch 'master' into bylock-indexes
guillep2k Jan 28, 2020
1792664
Go simple
guillep2k Jan 28, 2020
ce6c24f
Improve test legibility
guillep2k Jan 30, 2020
15ffbb4
Fix typo
guillep2k Jan 30, 2020
ea9c875
Remove dead code
guillep2k Jan 30, 2020
9cb79c9
Merge branch 'master' into bylock-indexes
guillep2k Jan 30, 2020
d185a4f
Prepare for merge
guillep2k Feb 1, 2020
f46eaf5
Merge branch 'master' into bylock-indexes
guillep2k Feb 1, 2020
621c9d6
Prepare to merge
guillep2k Feb 12, 2020
17fa5e1
Merge branch 'master' into bylock-indexes
guillep2k Feb 12, 2020
b30094b
Merge branch 'master' into bylock-indexes
guillep2k Feb 15, 2020
299d313
Merge branch 'master' into bylock-indexes
guillep2k Feb 16, 2020
cea7c4f
Merge branch 'master' into bylock-indexes
guillep2k Feb 20, 2020
2311de3
Merge branch 'master' into bylock-indexes
guillep2k Feb 29, 2020
7e280a4
Merge branch 'master' into bylock-indexes
guillep2k May 2, 2020
15e407b
Code review suggestions by @lunny
guillep2k May 2, 2020
dd85873
Ignore SQLite3 integration when _txlock=immediate
guillep2k May 2, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Use a LockedResource to numerate issues and prs
  • Loading branch information
guillep2k committed Jan 21, 2020
commit d834fb1451dc8cb56f4c37f8b883adb08da1bb49
14 changes: 7 additions & 7 deletions integrations/locked_resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ const (
// blockerDelay minus tolerance to complete.
// Note: these values might require tuning in order to avoid
// false negatives.
waiterDelay = 100 * time.Millisecond
waiterDelay = 100 * time.Millisecond
blockerDelay = 200 * time.Millisecond
tolerance = 50 * time.Millisecond // Should be <= (blockerDelay-waiterDelay)/2
tolerance = 50 * time.Millisecond // Should be <= (blockerDelay-waiterDelay)/2
)

type waitResult struct {
Waited time.Duration
Err error
Waited time.Duration
Err error
}

func TestLockedResource(t *testing.T) {
Expand Down Expand Up @@ -75,16 +75,16 @@ func blockTest(name string, f func(ctx models.DBContext) error) error {
cw <- blockTestFunc(name, false, ref, f)
}()

resb := <- cb
resw := <- cw
resb := <-cb
resw := <-cw
if resb.Err != nil {
return resb.Err
}
if resw.Err != nil {
return resw.Err
}

if resw.Waited < blockerDelay - tolerance {
if resw.Waited < blockerDelay-tolerance {
return fmt.Errorf("Waiter not blocked on %s; wait: %d ms, expected > %d ms",
name, resw.Waited.Milliseconds(), (blockerDelay - tolerance).Milliseconds())
}
Expand Down
45 changes: 17 additions & 28 deletions models/issue.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@ var (

const issueTasksRegexpStr = `(^\s*[-*]\s\[[\sx]\]\s.)|(\n\s*[-*]\s\[[\sx]\]\s.)`
const issueTasksDoneRegexpStr = `(^\s*[-*]\s\[[x]\]\s.)|(\n\s*[-*]\s\[[x]\]\s.)`
const issueMaxDupIndexAttempts = 3

// IssueLockedEnumerator is the name of the locked_resource used to
// numerate issues in a repository.
const IssueLockedEnumerator = "repository-index"

func init() {
issueTasksPat = regexp.MustCompile(issueTasksRegexpStr)
Expand Down Expand Up @@ -898,19 +901,23 @@ func newIssue(e *xorm.Session, doer *User, opts NewIssueOptions) (err error) {
}

// Milestone validation should happen before insert actual object.
if _, err := e.SetExpr("`index`", "coalesce(MAX(`index`),0)+1").
Where("repo_id=?", opts.Issue.RepoID).
Insert(opts.Issue); err != nil {
return ErrNewIssueInsert{err}
}

inserted, err := getIssueByID(e, opts.Issue.ID)
// Obtain the next issue number for this repository, which will be locked
// and reserved for the remaining of the transaction. Should the transaction
// be rolled back, the previous value will be restored.
locked, err := GetLockedResource(e, IssueLockedEnumerator, opts.Issue.RepoID)
if err != nil {
return err
return fmt.Errorf("GetLockedResource(%s)", IssueLockedEnumerator)
}
locked.Counter++
if err := UpdateLockedResource(e, locked); err != nil {
return fmt.Errorf("UpdateLockedResource(%s)", IssueLockedEnumerator)
}
opts.Issue.Index = locked.Counter

// Patch Index with the value calculated by the database
opts.Issue.Index = inserted.Index
if _, err = e.Insert(opts.Issue); err != nil {
return err
}

if opts.Issue.MilestoneID > 0 {
if _, err = e.Exec("UPDATE `milestone` SET num_issues=num_issues+1 WHERE id=?", opts.Issue.MilestoneID); err != nil {
Expand Down Expand Up @@ -988,24 +995,6 @@ func newIssue(e *xorm.Session, doer *User, opts NewIssueOptions) (err error) {

// NewIssue creates new issue with labels for repository.
func NewIssue(repo *Repository, issue *Issue, labelIDs []int64, uuids []string) (err error) {
// Retry several times in case INSERT fails due to duplicate key for (repo_id, index); see #7887
i := 0
for {
if err = newIssueAttempt(repo, issue, labelIDs, uuids); err == nil {
return nil
}
if !IsErrNewIssueInsert(err) {
return err
}
if i++; i == issueMaxDupIndexAttempts {
break
}
log.Error("NewIssue: error attempting to insert the new issue; will retry. Original error: %v", err)
}
return fmt.Errorf("NewIssue: too many errors attempting to insert the new issue. Last error was: %v", err)
}

func newIssueAttempt(repo *Repository, issue *Issue, labelIDs []int64, uuids []string) (err error) {
sess := x.NewSession()
defer sess.Close()
if err = sess.Begin(); err != nil {
Expand Down
18 changes: 9 additions & 9 deletions models/locked_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
)

type LockedResource struct {
LockType string `xorm:"pk VARCHAR(30)"`
LockKey int64 `xorm:"pk"`
Counter int64 `xorm:"NOT NULL DEFAULT 0"`
LockType string `xorm:"pk VARCHAR(30)"`
LockKey int64 `xorm:"pk"`
Counter int64 `xorm:"NOT NULL DEFAULT 0"`
}

func GetLockedResource(e Engine, lockType string, lockKey int64) (*LockedResource, error) {
Expand All @@ -29,7 +29,7 @@ func GetLockedResource(e Engine, lockType string, lockKey int64) (*LockedResourc
} else if !has {
return nil, fmt.Errorf("unexpected upsert fail %s:%d", lockType, lockKey)
}

return locked, nil
}

Expand Down Expand Up @@ -72,16 +72,16 @@ func TempLockResourceCtx(ctx DBContext, lockType string, lockKey int64) error {

func upsertLockedResource(e Engine, resource *LockedResource) (err error) {
// An atomic UPSERT operation (INSERT/UPDATE) is the only operation
// that ensures that the key is actually locked.
// that ensures that the key is actually locked.
switch {
case setting.Database.UseSQLite3 || setting.Database.UsePostgreSQL:
_, err = e.Exec("INSERT INTO locked_resource (lock_type, lock_key) "+
"VALUES (?,?) ON CONFLICT(lock_type, lock_key) DO UPDATE SET lock_key = ?",
resource.LockType, resource.LockKey, resource.LockKey);
resource.LockType, resource.LockKey, resource.LockKey)
case setting.Database.UseMySQL:
_, err = e.Exec("INSERT INTO locked_resource (lock_type, lock_key) "+
"VALUES (?,?) ON DUPLICATE KEY UPDATE lock_key = lock_key",
resource.LockType, resource.LockKey);
resource.LockType, resource.LockKey)
case setting.Database.UseMSSQL:
// https://weblogs.sqlteam.com/dang/2009/01/31/upsert-race-condition-with-merge/
_, err = e.Exec("MERGE locked_resource WITH (HOLDLOCK) as target "+
Expand All @@ -90,9 +90,9 @@ func upsertLockedResource(e Engine, resource *LockedResource) (err error) {
"WHEN MATCHED THEN UPDATE SET target.lock_key = target.lock_key "+
"WHEN NOT MATCHED THEN INSERT (lock_type, lock_key) "+
"VALUES (src.lock_type, src.lock_key);",
resource.LockType, resource.LockKey);
resource.LockType, resource.LockKey)
default:
return fmt.Errorf("database type not supported")
}
return
}
}
13 changes: 6 additions & 7 deletions models/locked_resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ func TestLockedResource(t *testing.T) {

// Get lock, increment counter value
withSession(t, func(t *testing.T, sess *xorm.Session) bool {
lck1, err := GetLockedResource(sess, "test-1",1)
if !assert.NoError(t, err) || !assert.NotEmpty(t, lck1) || !assert.Equal(t, int64(0), lck1.Counter) {
lck1, err := GetLockedResource(sess, "test-1", 1)
if !assert.NoError(t, err) || !assert.NotEmpty(t, lck1) || !assert.Equal(t, int64(0), lck1.Counter) {
return false
}
lck1.Counter++
Expand All @@ -41,20 +41,20 @@ func TestLockedResource(t *testing.T) {

// Get lock, check counter value
withSession(t, func(t *testing.T, sess *xorm.Session) bool {
lck1, err := GetLockedResource(sess, "test-1",1)
lck1, err := GetLockedResource(sess, "test-1", 1)
return assert.NoError(t, err) && assert.NotEmpty(t, lck1) && assert.Equal(t, int64(1), lck1.Counter)
})

// Attempt temp lock on an existing key, expect error
withSession(t, func(t *testing.T, sess *xorm.Session) bool {
err := TempLockResource(sess, "test-1",1)
err := TempLockResource(sess, "test-1", 1)
// Must give error
return assert.Error(t, err)
})

// Delete lock
withSession(t, func(t *testing.T, sess *xorm.Session) bool {
lck1, err := GetLockedResource(sess, "test-1",1)
lck1, err := GetLockedResource(sess, "test-1", 1)
if !assert.NoError(t, err) || !assert.NotEmpty(t, lck1) {
return false
}
Expand All @@ -63,10 +63,9 @@ func TestLockedResource(t *testing.T) {

// Attempt temp lock on an valid key, expect success
withSession(t, func(t *testing.T, sess *xorm.Session) bool {
return assert.NoError(t, TempLockResource(sess, "test-1",1))
return assert.NoError(t, TempLockResource(sess, "test-1", 1))
})

// Note: testing the validity of the locking mechanism (i.e. whether it actually locks)
// is be done at the integration tests to ensure that all the supported databases are checked.
}

37 changes: 32 additions & 5 deletions models/migrations/v125.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,43 @@
package migrations

import (
"code.gitea.io/gitea/models"

"xorm.io/xorm"
)

func addLockedResourceTable(x *xorm.Engine) error {

type LockedResource struct {
LockType string `xorm:"pk VARCHAR(30)"`
LockKey int64 `xorm:"pk"`
Counter int64 `xorm:"NOT NULL DEFAULT 0"`
LockType string `xorm:"pk VARCHAR(30)"`
LockKey int64 `xorm:"pk"`
Counter int64 `xorm:"NOT NULL DEFAULT 0"`
}

sess := x.NewSession()
defer sess.Close()

if err := sess.Begin(); err != nil {
return err
}

if err := sess.Sync2(new(LockedResource)); err != nil {
return err
}

return x.Sync2(new(LockedResource))

// Remove data we're goint to rebuild
if _, err := sess.Delete(&LockedResource{LockType: models.IssueLockedEnumerator}); err != nil {
return err
}

// Create current data for all repositories with issues and PRs
if _, err := sess.Exec("INSERT INTO locked_resource (lock_type, lock_key, counter) "+
"SELECT ?, max_data.repo_id, max_data.max_index "+
"FROM ( SELECT issue.repo_id AS repo_id, max(issue.index) AS max_index "+
"FROM issue GROUP BY issue.repo_id) AS max_data",
models.IssueLockedEnumerator); err != nil {
return err
}

return sess.Commit()
}
18 changes: 0 additions & 18 deletions models/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,24 +531,6 @@ func (pr *PullRequest) SetMerged() (err error) {

// NewPullRequest creates new pull request with labels for repository.
func NewPullRequest(repo *Repository, pull *Issue, labelIDs []int64, uuids []string, pr *PullRequest) (err error) {
// Retry several times in case INSERT fails due to duplicate key for (repo_id, index); see #7887
i := 0
for {
if err = newPullRequestAttempt(repo, pull, labelIDs, uuids, pr); err == nil {
return nil
}
if !IsErrNewIssueInsert(err) {
return err
}
if i++; i == issueMaxDupIndexAttempts {
break
}
log.Error("NewPullRequest: error attempting to insert the new issue; will retry. Original error: %v", err)
}
return fmt.Errorf("NewPullRequest: too many errors attempting to insert the new issue. Last error was: %v", err)
}

func newPullRequestAttempt(repo *Repository, pull *Issue, labelIDs []int64, uuids []string, pr *PullRequest) (err error) {
sess := x.NewSession()
defer sess.Close()
if err = sess.Begin(); err != nil {
Expand Down