Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Cron and merge dashboard tasks #10745

Merged
merged 20 commits into from
May 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions integrations/auth_ldap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func TestLDAPUserSync(t *testing.T) {
}
defer prepareTestEnv(t)()
addAuthSourceLDAP(t, "")
models.SyncExternalUsers(context.Background())
models.SyncExternalUsers(context.Background(), true)

session := loginUser(t, "user1")
// Check if users exists
Expand Down Expand Up @@ -216,7 +216,7 @@ func TestLDAPUserSSHKeySync(t *testing.T) {
defer prepareTestEnv(t)()
addAuthSourceLDAP(t, "sshPublicKey")

models.SyncExternalUsers(context.Background())
models.SyncExternalUsers(context.Background(), true)

// Check if users has SSH keys synced
for _, u := range gitLDAPUsers {
Expand Down
15 changes: 10 additions & 5 deletions models/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type NoticeType int
const (
//NoticeRepository type
NoticeRepository NoticeType = iota + 1
// NoticeTask type
NoticeTask
)

// Notice represents a system notice for admin.
Expand All @@ -36,11 +38,14 @@ func (n *Notice) TrStr() string {
}

// CreateNotice creates new system notice.
func CreateNotice(tp NoticeType, desc string) error {
return createNotice(x, tp, desc)
func CreateNotice(tp NoticeType, desc string, args ...interface{}) error {
return createNotice(x, tp, desc, args...)
}

func createNotice(e Engine, tp NoticeType, desc string) error {
func createNotice(e Engine, tp NoticeType, desc string, args ...interface{}) error {
if len(args) > 0 {
desc = fmt.Sprintf(desc, args...)
}
n := &Notice{
Type: tp,
Description: desc,
Expand All @@ -50,8 +55,8 @@ func createNotice(e Engine, tp NoticeType, desc string) error {
}

// CreateRepositoryNotice creates new system notice with type NoticeRepository.
func CreateRepositoryNotice(desc string) error {
return createNotice(x, NoticeRepository, desc)
func CreateRepositoryNotice(desc string, args ...interface{}) error {
return createNotice(x, NoticeRepository, desc, args...)
}

// RemoveAllWithNotice removes all directories in given path and
Expand Down
5 changes: 2 additions & 3 deletions models/branches.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

"code.gitea.io/gitea/modules/base"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/timeutil"
"code.gitea.io/gitea/modules/util"

Expand Down Expand Up @@ -561,11 +560,11 @@ func RemoveDeletedBranch(repoID int64, branch string) error {
}

// RemoveOldDeletedBranches removes old deleted branches
func RemoveOldDeletedBranches(ctx context.Context) {
func RemoveOldDeletedBranches(ctx context.Context, olderThan time.Duration) {
// Nothing to do for shutdown or terminate
log.Trace("Doing: DeletedBranchesCleanup")

deleteBefore := time.Now().Add(-setting.Cron.DeletedBranchesCleanup.OlderThan)
deleteBefore := time.Now().Add(-olderThan)
_, err := x.Where("deleted_unix < ?", deleteBefore.Unix()).Delete(new(DeletedBranch))
if err != nil {
log.Error("DeletedBranchesCleanup: %v", err)
Expand Down
22 changes: 22 additions & 0 deletions models/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,28 @@ func (err ErrSSHDisabled) Error() string {
return "SSH is disabled"
}

// ErrCancelled represents an error due to context cancellation
type ErrCancelled struct {
Message string
}

// IsErrCancelled checks if an error is a ErrCancelled.
func IsErrCancelled(err error) bool {
_, ok := err.(ErrCancelled)
return ok
}

func (err ErrCancelled) Error() string {
return "Cancelled: " + err.Message
}

// ErrCancelledf returns an ErrCancelled for the provided format and args
func ErrCancelledf(format string, args ...interface{}) error {
return ErrCancelled{
fmt.Sprintf(format, args...),
}
}

// ____ ___
// | | \______ ___________
// | | / ___// __ \_ __ \
Expand Down
65 changes: 40 additions & 25 deletions models/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -1853,35 +1853,44 @@ func GetPrivateRepositoryCount(u *User) (int64, error) {
}

// DeleteRepositoryArchives deletes all repositories' archives.
func DeleteRepositoryArchives() error {
func DeleteRepositoryArchives(ctx context.Context) error {
return x.
Where("id > 0").
Iterate(new(Repository),
func(idx int, bean interface{}) error {
repo := bean.(*Repository)
select {
case <-ctx.Done():
return ErrCancelledf("before deleting repository archives for %s", repo.FullName())
default:
}
return os.RemoveAll(filepath.Join(repo.RepoPath(), "archives"))
})
}

// DeleteOldRepositoryArchives deletes old repository archives.
func DeleteOldRepositoryArchives(ctx context.Context) {
func DeleteOldRepositoryArchives(ctx context.Context, olderThan time.Duration) error {
log.Trace("Doing: ArchiveCleanup")

if err := x.Where("id > 0").Iterate(new(Repository), func(idx int, bean interface{}) error {
return deleteOldRepositoryArchives(ctx, idx, bean)
return deleteOldRepositoryArchives(ctx, olderThan, idx, bean)
}); err != nil {
log.Error("ArchiveClean: %v", err)
log.Trace("Error: ArchiveClean: %v", err)
return err
}

log.Trace("Finished: ArchiveCleanup")
return nil
}

func deleteOldRepositoryArchives(ctx context.Context, idx int, bean interface{}) error {
func deleteOldRepositoryArchives(ctx context.Context, olderThan time.Duration, idx int, bean interface{}) error {
repo := bean.(*Repository)
basePath := filepath.Join(repo.RepoPath(), "archives")

for _, ty := range []string{"zip", "targz"} {
select {
case <-ctx.Done():
return fmt.Errorf("Aborted due to shutdown:\nin delete of old repository archives %v\nat delete file %s", repo, ty)
return ErrCancelledf("before deleting old repository archives with filetype %s for %s", ty, repo.FullName())
default:
}

Expand All @@ -1904,12 +1913,12 @@ func deleteOldRepositoryArchives(ctx context.Context, idx int, bean interface{})
return err
}

minimumOldestTime := time.Now().Add(-setting.Cron.ArchiveCleanup.OlderThan)
minimumOldestTime := time.Now().Add(-olderThan)
for _, info := range files {
if info.ModTime().Before(minimumOldestTime) && !info.IsDir() {
select {
case <-ctx.Done():
return fmt.Errorf("Aborted due to shutdown:\nin delete of old repository archives %v\nat delete file %s - %s", repo, ty, info.Name())
return ErrCancelledf("before deleting old repository archive file %s with filetype %s for %s", info.Name(), ty, repo.FullName())
default:
}
toDelete := filepath.Join(path, info.Name())
Expand All @@ -1936,13 +1945,13 @@ func repoStatsCheck(ctx context.Context, checker *repoChecker) {
return
}
for _, result := range results {
id := com.StrTo(result["id"]).MustInt64()
select {
case <-ctx.Done():
log.Warn("CheckRepoStats: Aborting due to shutdown")
log.Warn("CheckRepoStats: Cancelled before checking %s for Repo[%d]", checker.desc, id)
return
default:
}
id := com.StrTo(result["id"]).MustInt64()
log.Trace("Updating %s: %d", checker.desc, id)
_, err = x.Exec(checker.correctSQL, id, id)
if err != nil {
Expand All @@ -1952,7 +1961,7 @@ func repoStatsCheck(ctx context.Context, checker *repoChecker) {
}

// CheckRepoStats checks the repository stats
func CheckRepoStats(ctx context.Context) {
func CheckRepoStats(ctx context.Context) error {
log.Trace("Doing: CheckRepoStats")

checkers := []*repoChecker{
Expand Down Expand Up @@ -1987,13 +1996,13 @@ func CheckRepoStats(ctx context.Context) {
"issue count 'num_comments'",
},
}
for i := range checkers {
for _, checker := range checkers {
select {
case <-ctx.Done():
log.Warn("CheckRepoStats: Aborting due to shutdown")
return
log.Warn("CheckRepoStats: Cancelled before %s", checker.desc)
return ErrCancelledf("before checking %s", checker.desc)
default:
repoStatsCheck(ctx, checkers[i])
repoStatsCheck(ctx, checker)
}
}

Expand All @@ -2004,13 +2013,13 @@ func CheckRepoStats(ctx context.Context) {
log.Error("Select %s: %v", desc, err)
} else {
for _, result := range results {
id := com.StrTo(result["id"]).MustInt64()
select {
case <-ctx.Done():
log.Warn("CheckRepoStats: Aborting due to shutdown")
return
log.Warn("CheckRepoStats: Cancelled during %s for repo ID %d", desc, id)
return ErrCancelledf("during %s for repo ID %d", desc, id)
default:
}
id := com.StrTo(result["id"]).MustInt64()
log.Trace("Updating %s: %d", desc, id)
_, err = x.Exec("UPDATE `repository` SET num_closed_issues=(SELECT COUNT(*) FROM `issue` WHERE repo_id=? AND is_closed=? AND is_pull=?) WHERE id=?", id, true, false, id)
if err != nil {
Expand All @@ -2027,13 +2036,13 @@ func CheckRepoStats(ctx context.Context) {
log.Error("Select %s: %v", desc, err)
} else {
for _, result := range results {
id := com.StrTo(result["id"]).MustInt64()
select {
case <-ctx.Done():
log.Warn("CheckRepoStats: Aborting due to shutdown")
return
log.Warn("CheckRepoStats: Cancelled")
return ErrCancelledf("during %s for repo ID %d", desc, id)
default:
}
id := com.StrTo(result["id"]).MustInt64()
log.Trace("Updating %s: %d", desc, id)
_, err = x.Exec("UPDATE `repository` SET num_closed_pulls=(SELECT COUNT(*) FROM `issue` WHERE repo_id=? AND is_closed=? AND is_pull=?) WHERE id=?", id, true, true, id)
if err != nil {
Expand All @@ -2050,13 +2059,13 @@ func CheckRepoStats(ctx context.Context) {
log.Error("Select repository count 'num_forks': %v", err)
} else {
for _, result := range results {
id := com.StrTo(result["id"]).MustInt64()
select {
case <-ctx.Done():
log.Warn("CheckRepoStats: Aborting due to shutdown")
return
log.Warn("CheckRepoStats: Cancelled")
return ErrCancelledf("during %s for repo ID %d", desc, id)
default:
}
id := com.StrTo(result["id"]).MustInt64()
log.Trace("Updating repository count 'num_forks': %d", id)

repo, err := GetRepositoryByID(id)
Expand All @@ -2079,6 +2088,7 @@ func CheckRepoStats(ctx context.Context) {
}
}
// ***** END: Repository.NumForks *****
return nil
}

// SetArchiveRepoState sets if a repo is archived
Expand Down Expand Up @@ -2189,12 +2199,17 @@ func (repo *Repository) generateRandomAvatar(e Engine) error {
}

// RemoveRandomAvatars removes the randomly generated avatars that were created for repositories
func RemoveRandomAvatars() error {
func RemoveRandomAvatars(ctx context.Context) error {
return x.
Where("id > 0").BufferSize(setting.Database.IterateBufferSize).
Iterate(new(Repository),
func(idx int, bean interface{}) error {
repository := bean.(*Repository)
select {
case <-ctx.Done():
return ErrCancelledf("before random avatars removed for %s", repository.FullName())
default:
}
stringifiedID := strconv.FormatInt(repository.ID, 10)
if repository.Avatar == stringifiedID {
return repository.DeleteAvatar()
Expand Down
Loading