Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Improve archive queue
  • Loading branch information
lunny committed Jun 22, 2021
commit 732046da070e73713c11fc958a2ec7135fe3de33
4 changes: 4 additions & 0 deletions routers/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"code.gitea.io/gitea/routers/common"
"code.gitea.io/gitea/routers/private"
web_routers "code.gitea.io/gitea/routers/web"
"code.gitea.io/gitea/services/archiver"
"code.gitea.io/gitea/services/auth"
"code.gitea.io/gitea/services/mailer"
mirror_service "code.gitea.io/gitea/services/mirror"
Expand Down Expand Up @@ -63,6 +64,9 @@ func NewServices() {
mailer.NewContext()
_ = cache.NewContext()
notification.NewContext()
if err := archiver.Init(); err != nil {
log.Fatal("archiver init failed: %v", err)
}
}

// GlobalInit is for global configuration reload-able.
Expand Down
58 changes: 52 additions & 6 deletions routers/web/repo/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import (
"fmt"
"net/http"
"strings"
"time"

"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/base"
"code.gitea.io/gitea/modules/context"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/storage"
Expand Down Expand Up @@ -377,20 +379,58 @@ func Download(ctx *context.Context) {
return
}

archiver, err := archiver_service.ArchiveRepository(aReq)
archiver, err := models.GetRepoArchiver(models.DefaultDBContext(), aReq.RepoID, aReq.Type, aReq.CommitID)
if err != nil {
ctx.ServerError("ArchiveRepository", err)
ctx.ServerError("models.GetRepoArchiver", err)
return
}
if archiver != nil && archiver.Status == models.RepoArchiverReady {
download(ctx, aReq.GetArchiveName(), archiver)
return
}

if err := archiver_service.StartArchive(aReq); err != nil {
ctx.ServerError("archiver_service.StartArchive", err)
return
}

var times int
var t = time.NewTicker(time.Second * 1)
defer t.Stop()

for {
select {
case <-graceful.GetManager().HammerContext().Done():
log.Warn("exit archive downlaod because system stop")
return
case <-t.C:
if times > 20 {
ctx.ServerError("wait download timeout", nil)
return
}
times++
archiver, err = models.GetRepoArchiver(models.DefaultDBContext(), aReq.RepoID, aReq.Type, aReq.CommitID)
if err != nil {
ctx.ServerError("archiver_service.StartArchive", err)
return
}
if archiver != nil && archiver.Status == models.RepoArchiverReady {
download(ctx, aReq.GetArchiveName(), archiver)
return
}
}
}
}

func download(ctx *context.Context, archiveName string, archiver *models.RepoArchiver) {
downloadName := ctx.Repo.Repository.Name + "-" + archiveName

rPath, err := archiver.RelativePath()
if err != nil {
ctx.ServerError("archiver.RelativePath", err)
return
}

downloadName := ctx.Repo.Repository.Name + "-" + aReq.GetArchiveName()

if setting.RepoArchive.ServeDirect {
//If we have a signed url (S3, object storage), redirect to this directly.
u, err := storage.RepoArchives.URL(rPath, downloadName)
Expand Down Expand Up @@ -425,11 +465,17 @@ func InitiateDownload(ctx *context.Context) {
return
}

archiver, err := archiver_service.ArchiveRepository(aReq)
archiver, err := models.GetRepoArchiver(models.DefaultDBContext(), aReq.RepoID, aReq.Type, aReq.CommitID)
if err != nil {
ctx.ServerError("archiver_service.ArchiveRepository", err)
ctx.ServerError("archiver_service.StartArchive", err)
return
}
if archiver == nil || archiver.Status != models.RepoArchiverReady {
if err := archiver_service.StartArchive(aReq); err != nil {
ctx.ServerError("archiver_service.StartArchive", err)
return
}
}

var completed bool
if archiver != nil && archiver.Status == models.RepoArchiverReady {
Expand Down
86 changes: 54 additions & 32 deletions services/archiver/archiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,10 @@ import (
// This is entirely opaque to external entities, though, and mostly used as a
// handle elsewhere.
type ArchiveRequest struct {
uri string
repoID int64
repo *git.Repository
refName string
ext string
archiveType git.ArchiveType
commitID string
RepoID int64
refName string
Type git.ArchiveType
CommitID string
}

// SHA1 hashes will only go up to 40 characters, but SHA256 hashes will go all
Expand All @@ -46,38 +43,37 @@ var shaRegex = regexp.MustCompile(`^[0-9a-f]{4,64}$`)
// if it's determined that the request still needs to be satisfied.
func NewRequest(repoID int64, repo *git.Repository, uri string) (*ArchiveRequest, error) {
r := &ArchiveRequest{
repoID: repoID,
uri: uri,
repo: repo,
RepoID: repoID,
}

var ext string
switch {
case strings.HasSuffix(uri, ".zip"):
r.ext = ".zip"
r.archiveType = git.ZIP
ext = ".zip"
r.Type = git.ZIP
case strings.HasSuffix(uri, ".tar.gz"):
r.ext = ".tar.gz"
r.archiveType = git.TARGZ
ext = ".tar.gz"
r.Type = git.TARGZ
default:
return nil, fmt.Errorf("Unknown format: %s", uri)
}

r.refName = strings.TrimSuffix(r.uri, r.ext)
r.refName = strings.TrimSuffix(uri, ext)

var err error
// Get corresponding commit.
if r.repo.IsBranchExist(r.refName) {
r.commitID, err = r.repo.GetBranchCommitID(r.refName)
if repo.IsBranchExist(r.refName) {
r.CommitID, err = repo.GetBranchCommitID(r.refName)
if err != nil {
return nil, err
}
} else if r.repo.IsTagExist(r.refName) {
r.commitID, err = r.repo.GetTagCommitID(r.refName)
} else if repo.IsTagExist(r.refName) {
r.CommitID, err = repo.GetTagCommitID(r.refName)
if err != nil {
return nil, err
}
} else if shaRegex.MatchString(r.refName) {
r.commitID = r.refName
r.CommitID = r.refName
} else {
return nil, fmt.Errorf("Unknow ref %s type", r.refName)
}
Expand All @@ -88,7 +84,7 @@ func NewRequest(repoID int64, repo *git.Repository, uri string) (*ArchiveRequest
// GetArchiveName returns the name of the caller, based on the ref used by the
// caller to create this request.
func (aReq *ArchiveRequest) GetArchiveName() string {
return aReq.refName + aReq.ext
return aReq.refName + "." + aReq.Type.String()
}

func doArchive(r *ArchiveRequest) (*models.RepoArchiver, error) {
Expand All @@ -98,7 +94,7 @@ func doArchive(r *ArchiveRequest) (*models.RepoArchiver, error) {
}
defer commiter.Close()

archiver, err := models.GetRepoArchiver(ctx, r.repoID, r.archiveType, r.commitID)
archiver, err := models.GetRepoArchiver(ctx, r.RepoID, r.Type, r.CommitID)
if err != nil {
return nil, err
}
Expand All @@ -111,9 +107,9 @@ func doArchive(r *ArchiveRequest) (*models.RepoArchiver, error) {
}
} else {
archiver = &models.RepoArchiver{
RepoID: r.repoID,
Type: r.archiveType,
CommitID: r.commitID,
RepoID: r.RepoID,
Type: r.Type,
CommitID: r.CommitID,
Status: models.RepoArchiverGenerating,
}
if err := models.AddRepoArchiver(ctx, archiver); err != nil {
Expand Down Expand Up @@ -146,22 +142,34 @@ func doArchive(r *ArchiveRequest) (*models.RepoArchiver, error) {
}()
var done = make(chan error)

go func(done chan error, w *io.PipeWriter) {
go func(done chan error, w *io.PipeWriter, archiver *models.RepoArchiver) {
defer func() {
if r := recover(); r != nil {
done <- fmt.Errorf("%v", r)
}
}()
err := r.repo.CreateArchive(
repo, err := archiver.LoadRepo()
if err != nil {
done <- err
return
}

gitRepo, err := git.OpenRepository(repo.RepoPath())
if err != nil {
done <- err
return
}

err = gitRepo.CreateArchive(
graceful.GetManager().ShutdownContext(),
r.archiveType,
archiver.Type,
w,
setting.Repository.PrefixArchiveFiles,
r.commitID,
archiver.CommitID,
)
_ = w.CloseWithError(err)
done <- err
}(done, w)
}(done, w, archiver)

if _, err := storage.RepoArchives.Save(rPath, rd, -1); err != nil {
return nil, fmt.Errorf("unable to write archive: %v", err)
Expand Down Expand Up @@ -192,7 +200,7 @@ func ArchiveRepository(request *ArchiveRequest) (*models.RepoArchiver, error) {
return doArchive(request)
}

var archiverQueue queue.Queue
var archiverQueue queue.UniqueQueue

// Init initlize archive
func Init() error {
Expand All @@ -210,10 +218,24 @@ func Init() error {
}
}

archiverQueue = queue.CreateQueue("repo-archive", handler, new(ArchiveRequest))
archiverQueue = queue.CreateUniqueQueue("repo-archive", handler, new(ArchiveRequest))
if archiverQueue == nil {
return errors.New("unable to create codes indexer queue")
}

go graceful.GetManager().RunWithShutdownFns(archiverQueue.Run)

return nil
}

// StartArchive push the archive request to the queue
func StartArchive(request *ArchiveRequest) error {
has, err := archiverQueue.Has(request)
if err != nil {
return err
}
if has {
return nil
}
return archiverQueue.Push(request)
}