Skip to content

Commit

Permalink
Fix repo sync status bug (#143)
Browse files Browse the repository at this point in the history
  • Loading branch information
pulltheflower authored Oct 12, 2024
1 parent 6ea423c commit 7b4f1c0
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 26 deletions.
16 changes: 14 additions & 2 deletions component/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -1555,12 +1555,14 @@ func (c *RepoComponent) MirrorFromSaas(ctx context.Context, namespace, name, cur
CreatedAt: mirror.CreatedAt.Unix(),
MirrorToken: syncClientSetting.Token,
})
repo.SyncStatus = types.SyncStatusPending
} else {
repo.SyncStatus = types.SyncStatusInProgress
}

repo.SyncStatus = types.SyncStatusInProgress
_, err = c.repo.UpdateRepo(ctx, *repo)
if err != nil {
return fmt.Errorf("failed to update repo sync status")
return fmt.Errorf("failed to update repo sync status: %w", err)
}
return nil
}
Expand All @@ -1571,6 +1573,10 @@ func (c *RepoComponent) mirrorFromSaasSync(ctx context.Context, mirror *database
if err != nil {
return fmt.Errorf("failed to find sync client setting, error: %w", err)
}
repo, err := c.repo.FindById(ctx, mirror.RepositoryID)
if err != nil {
return fmt.Errorf("failed to find repo, error: %w", err)
}
if c.config.GitServer.Type == types.GitServerTypeGitea {
err = c.git.MirrorSync(ctx, gitserver.MirrorSyncReq{
Namespace: namespace,
Expand All @@ -1589,6 +1595,12 @@ func (c *RepoComponent) mirrorFromSaasSync(ctx context.Context, mirror *database
CreatedAt: mirror.CreatedAt.Unix(),
MirrorToken: syncClientSetting.Token,
})
repo.SyncStatus = types.SyncStatusPending
}

_, err = c.repo.UpdateRepo(ctx, *repo)
if err != nil {
return fmt.Errorf("failed to update repo sync status: %w", err)
}
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion mirror/lfs_sync_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package mirror
import (
"context"

"opencsg.com/csghub-server/builder/store/database"
"opencsg.com/csghub-server/common/config"
"opencsg.com/csghub-server/mirror/lfssyncer"
)

type LFSSyncWorker interface {
Run()
SyncLfs(ctx context.Context, workerID int, mirrorID int64) error
SyncLfs(ctx context.Context, workerID int, mirror *database.Mirror) error
}

func NewLFSSyncWorker(config *config.Config, numWorkers int) (LFSSyncWorker, error) {
Expand Down
32 changes: 25 additions & 7 deletions mirror/lfssyncer/minio.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type MinioLFSSyncWorker struct {
tasks chan queue.MirrorTask
wg sync.WaitGroup
mirrorStore *database.MirrorStore
repoStore *database.RepoStore
lfsMetaObjectStore *database.LfsMetaObjectStore
s3Client *minio.Client
config *config.Config
Expand All @@ -41,6 +42,7 @@ func NewMinioLFSSyncWorker(config *config.Config, numWorkers int) (*MinioLFSSync
return nil, newError
}
w.mirrorStore = database.NewMirrorStore()
w.repoStore = database.NewRepoStore()
w.lfsMetaObjectStore = database.NewLfsMetaObjectStore()
w.config = config
mq, err := queue.GetPriorityQueueInstance()
Expand Down Expand Up @@ -83,21 +85,37 @@ func (w *MinioLFSSyncWorker) worker(id int) {
for {
task := <-w.tasks
ctx := context.Background()
err := w.SyncLfs(ctx, id, task.MirrorID)
mirror, err := w.mirrorStore.FindByID(ctx, task.MirrorID)
if err != nil {
slog.Error("fail to get mirror", slog.Int("workerId", id), slog.String("error", err.Error()))
continue
}
repo, err := w.repoStore.FindById(ctx, mirror.RepositoryID)
if err != nil {
slog.Error("fail to get repository", slog.Int("workerId", id), slog.String("error", err.Error()))
continue
}
err = w.SyncLfs(ctx, id, mirror)
if err != nil {
repo.SyncStatus = types.SyncStatusFailed
_, repoErr := w.repoStore.UpdateRepo(ctx, *repo)
if repoErr != nil {
slog.Error("fail to update repo sync status to failed: %w", slog.Any("error", err))
}
slog.Error("fail to sync lfs", slog.Int("workerId", id), slog.String("error", err.Error()))
continue
}

repo.SyncStatus = types.SyncStatusCompleted
_, err = w.repoStore.UpdateRepo(ctx, *repo)
if err != nil {
slog.Error("fail to update repo sync status to complete: %w", slog.Any("error", err))
}
}
}

func (w *MinioLFSSyncWorker) SyncLfs(ctx context.Context, workerId int, mirrorID int64) error {
func (w *MinioLFSSyncWorker) SyncLfs(ctx context.Context, workerId int, mirror *database.Mirror) error {
var pointers []*types.Pointer
mirror, err := w.mirrorStore.FindByID(ctx, mirrorID)
if err != nil {
slog.Error("fail to get mirror", slog.Int("workerId", workerId), slog.String("error", err.Error()))
return fmt.Errorf("fail to get mirror: %w", err)
}
lfsMetaObjects, err := w.lfsMetaObjectStore.FindByRepoID(ctx, mirror.Repository.ID)
if err != nil {
slog.Error("fail to get lfs meta objects", slog.Int("workerId", workerId), slog.String("error", err.Error()))
Expand Down
46 changes: 30 additions & 16 deletions mirror/reposyncer/local_woker.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,38 +147,50 @@ func (w *LocalMirrorWoker) SyncRepo(ctx context.Context, task queue.MirrorTask)
branch := parts[len(parts)-1]

mirror.Repository.DefaultBranch = branch
mirror.Repository.SyncStatus = types.SyncStatusInProgress
_, err = w.repoStore.UpdateRepo(ctx, *mirror.Repository)
if err != nil {
return fmt.Errorf("failed to update repo: %w", err)
return fmt.Errorf("failed to update repo sync status to in progress: %w", err)
}
slog.Info("Update repo default branch successfully", slog.Any("repo_type", mirror.Repository.RepositoryType), slog.Any("namespace", namespace), slog.Any("name", name))
slog.Info("Start to sync lfs files", "repo_type", mirror.Repository.RepositoryType, "namespace", namespace, "name", name)
err = w.generateLfsMetaObjects(ctx, mirror)
lfsFileCount, err := w.generateLfsMetaObjects(ctx, mirror)
if err != nil {
mirror.Status = types.MirrorIncomplete
mirror.LastMessage = err.Error()
err = w.mirrorStore.Update(ctx, mirror)
if err != nil {
return fmt.Errorf("failed to update mirror: %w", err)
}

mirror.Repository.SyncStatus = types.SyncStatusFailed
_, err = w.repoStore.UpdateRepo(ctx, *mirror.Repository)
if err != nil {
return fmt.Errorf("failed to update repo sync status to failed: %w", err)
}
return fmt.Errorf("failed to sync lfs files: %v", err)
}
mirror.Status = types.MirrorRepoSynced
if lfsFileCount > 0 {
mirror.Status = types.MirrorRepoSynced
w.mq.PushLfsMirror(&queue.MirrorTask{
MirrorID: mirror.ID,
Priority: queue.Priority(mirror.Priority),
CreatedAt: mirror.CreatedAt.Unix(),
MirrorToken: task.MirrorToken,
})
} else {
mirror.Status = types.MirrorFinished
}

err = w.mirrorStore.Update(ctx, mirror)
if err != nil {
return fmt.Errorf("failed to update mirror: %w", err)
}
w.mq.PushLfsMirror(&queue.MirrorTask{
MirrorID: mirror.ID,
Priority: queue.Priority(mirror.Priority),
CreatedAt: mirror.CreatedAt.Unix(),
MirrorToken: task.MirrorToken,
})

return nil
}

func (c *LocalMirrorWoker) generateLfsMetaObjects(ctx context.Context, mirror *database.Mirror) error {
func (c *LocalMirrorWoker) generateLfsMetaObjects(ctx context.Context, mirror *database.Mirror) (int, error) {
var lfsMetaObjects []database.LfsMetaObject
namespace := strings.Split(mirror.Repository.Path, "/")[0]
name := strings.Split(mirror.Repository.Path, "/")[1]
Expand All @@ -188,12 +200,12 @@ func (c *LocalMirrorWoker) generateLfsMetaObjects(ctx context.Context, mirror *d
RepoType: mirror.Repository.RepositoryType,
})
if err != nil {
return fmt.Errorf("failed to get repo branches: %v", err)
return 0, fmt.Errorf("failed to get repo branches: %v", err)
}
for _, branch := range branches {
lfsPointers, err := c.getAllLfsPointersByRef(ctx, mirror.Repository.RepositoryType, namespace, name, branch.Name)
if err != nil {
return fmt.Errorf("failed to get all lfs pointers: %v", err)
return 0, fmt.Errorf("failed to get all lfs pointers: %v", err)
}
for _, lfsPointer := range lfsPointers {
lfsMetaObjects = append(lfsMetaObjects, database.LfsMetaObject{
Expand All @@ -206,12 +218,14 @@ func (c *LocalMirrorWoker) generateLfsMetaObjects(ctx context.Context, mirror *d
}
lfsMetaObjects = removeDuplicateLfsMetaObject(lfsMetaObjects)

err = c.lfsMetaObjectStore.BulkUpdateOrCreate(ctx, lfsMetaObjects)
if err != nil {
return fmt.Errorf("failed to bulk update or create lfs meta objects: %v", err)
if len(lfsMetaObjects) > 0 {
err = c.lfsMetaObjectStore.BulkUpdateOrCreate(ctx, lfsMetaObjects)
if err != nil {
return 0, fmt.Errorf("failed to bulk update or create lfs meta objects: %v", err)
}
}

return nil
return len(lfsMetaObjects), nil
}

func (c *LocalMirrorWoker) getAllLfsPointersByRef(ctx context.Context, RepoType types.RepositoryType, namespace, name, ref string) ([]*types.LFSPointer, error) {
Expand Down

0 comments on commit 7b4f1c0

Please sign in to comment.