diff --git a/component/repo.go b/component/repo.go index b8c33e1c..72be7482 100644 --- a/component/repo.go +++ b/component/repo.go @@ -1555,12 +1555,14 @@ func (c *RepoComponent) MirrorFromSaas(ctx context.Context, namespace, name, cur CreatedAt: mirror.CreatedAt.Unix(), MirrorToken: syncClientSetting.Token, }) + repo.SyncStatus = types.SyncStatusPending + } else { + repo.SyncStatus = types.SyncStatusInProgress } - repo.SyncStatus = types.SyncStatusInProgress _, err = c.repo.UpdateRepo(ctx, *repo) if err != nil { - return fmt.Errorf("failed to update repo sync status") + return fmt.Errorf("failed to update repo sync status: %w", err) } return nil } @@ -1571,6 +1573,10 @@ func (c *RepoComponent) mirrorFromSaasSync(ctx context.Context, mirror *database if err != nil { return fmt.Errorf("failed to find sync client setting, error: %w", err) } + repo, err := c.repo.FindById(ctx, mirror.RepositoryID) + if err != nil { + return fmt.Errorf("failed to find repo, error: %w", err) + } if c.config.GitServer.Type == types.GitServerTypeGitea { err = c.git.MirrorSync(ctx, gitserver.MirrorSyncReq{ Namespace: namespace, @@ -1589,6 +1595,12 @@ func (c *RepoComponent) mirrorFromSaasSync(ctx context.Context, mirror *database CreatedAt: mirror.CreatedAt.Unix(), MirrorToken: syncClientSetting.Token, }) + repo.SyncStatus = types.SyncStatusPending + } + + _, err = c.repo.UpdateRepo(ctx, *repo) + if err != nil { + return fmt.Errorf("failed to update repo sync status: %w", err) } return nil } diff --git a/mirror/lfs_sync_worker.go b/mirror/lfs_sync_worker.go index 6940a07e..f48015c3 100644 --- a/mirror/lfs_sync_worker.go +++ b/mirror/lfs_sync_worker.go @@ -3,13 +3,14 @@ package mirror import ( "context" + "opencsg.com/csghub-server/builder/store/database" "opencsg.com/csghub-server/common/config" "opencsg.com/csghub-server/mirror/lfssyncer" ) type LFSSyncWorker interface { Run() - SyncLfs(ctx context.Context, workerID int, mirrorID int64) error + SyncLfs(ctx context.Context, workerID int, mirror *database.Mirror) error } func NewLFSSyncWorker(config *config.Config, numWorkers int) (LFSSyncWorker, error) { diff --git a/mirror/lfssyncer/minio.go b/mirror/lfssyncer/minio.go index 3051c140..b6da2349 100644 --- a/mirror/lfssyncer/minio.go +++ b/mirror/lfssyncer/minio.go @@ -24,6 +24,7 @@ type MinioLFSSyncWorker struct { tasks chan queue.MirrorTask wg sync.WaitGroup mirrorStore *database.MirrorStore + repoStore *database.RepoStore lfsMetaObjectStore *database.LfsMetaObjectStore s3Client *minio.Client config *config.Config @@ -41,6 +42,7 @@ func NewMinioLFSSyncWorker(config *config.Config, numWorkers int) (*MinioLFSSync return nil, newError } w.mirrorStore = database.NewMirrorStore() + w.repoStore = database.NewRepoStore() w.lfsMetaObjectStore = database.NewLfsMetaObjectStore() w.config = config mq, err := queue.GetPriorityQueueInstance() @@ -83,21 +85,37 @@ func (w *MinioLFSSyncWorker) worker(id int) { for { task := <-w.tasks ctx := context.Background() - err := w.SyncLfs(ctx, id, task.MirrorID) + mirror, err := w.mirrorStore.FindByID(ctx, task.MirrorID) if err != nil { + slog.Error("fail to get mirror", slog.Int("workerId", id), slog.String("error", err.Error())) + continue + } + repo, err := w.repoStore.FindById(ctx, mirror.RepositoryID) + if err != nil { + slog.Error("fail to get repository", slog.Int("workerId", id), slog.String("error", err.Error())) + continue + } + err = w.SyncLfs(ctx, id, mirror) + if err != nil { + repo.SyncStatus = types.SyncStatusFailed + _, repoErr := w.repoStore.UpdateRepo(ctx, *repo) + if repoErr != nil { + slog.Error("fail to update repo sync status to failed: %w", slog.Any("error", err)) + } slog.Error("fail to sync lfs", slog.Int("workerId", id), slog.String("error", err.Error())) continue } + + repo.SyncStatus = types.SyncStatusCompleted + _, err = w.repoStore.UpdateRepo(ctx, *repo) + if err != nil { + slog.Error("fail to update repo sync status to complete: %w", slog.Any("error", err)) + } } } -func (w *MinioLFSSyncWorker) SyncLfs(ctx context.Context, workerId int, mirrorID int64) error { +func (w *MinioLFSSyncWorker) SyncLfs(ctx context.Context, workerId int, mirror *database.Mirror) error { var pointers []*types.Pointer - mirror, err := w.mirrorStore.FindByID(ctx, mirrorID) - if err != nil { - slog.Error("fail to get mirror", slog.Int("workerId", workerId), slog.String("error", err.Error())) - return fmt.Errorf("fail to get mirror: %w", err) - } lfsMetaObjects, err := w.lfsMetaObjectStore.FindByRepoID(ctx, mirror.Repository.ID) if err != nil { slog.Error("fail to get lfs meta objects", slog.Int("workerId", workerId), slog.String("error", err.Error())) diff --git a/mirror/reposyncer/local_woker.go b/mirror/reposyncer/local_woker.go index 6c162236..121e9343 100644 --- a/mirror/reposyncer/local_woker.go +++ b/mirror/reposyncer/local_woker.go @@ -147,13 +147,14 @@ func (w *LocalMirrorWoker) SyncRepo(ctx context.Context, task queue.MirrorTask) branch := parts[len(parts)-1] mirror.Repository.DefaultBranch = branch + mirror.Repository.SyncStatus = types.SyncStatusInProgress _, err = w.repoStore.UpdateRepo(ctx, *mirror.Repository) if err != nil { - return fmt.Errorf("failed to update repo: %w", err) + return fmt.Errorf("failed to update repo sync status to in progress: %w", err) } slog.Info("Update repo default branch successfully", slog.Any("repo_type", mirror.Repository.RepositoryType), slog.Any("namespace", namespace), slog.Any("name", name)) slog.Info("Start to sync lfs files", "repo_type", mirror.Repository.RepositoryType, "namespace", namespace, "name", name) - err = w.generateLfsMetaObjects(ctx, mirror) + lfsFileCount, err := w.generateLfsMetaObjects(ctx, mirror) if err != nil { mirror.Status = types.MirrorIncomplete mirror.LastMessage = err.Error() @@ -161,24 +162,35 @@ func (w *LocalMirrorWoker) SyncRepo(ctx context.Context, task queue.MirrorTask) if err != nil { return fmt.Errorf("failed to update mirror: %w", err) } + + mirror.Repository.SyncStatus = types.SyncStatusFailed + _, err = w.repoStore.UpdateRepo(ctx, *mirror.Repository) + if err != nil { + return fmt.Errorf("failed to update repo sync status to failed: %w", err) + } return fmt.Errorf("failed to sync lfs files: %v", err) } - mirror.Status = types.MirrorRepoSynced + if lfsFileCount > 0 { + mirror.Status = types.MirrorRepoSynced + w.mq.PushLfsMirror(&queue.MirrorTask{ + MirrorID: mirror.ID, + Priority: queue.Priority(mirror.Priority), + CreatedAt: mirror.CreatedAt.Unix(), + MirrorToken: task.MirrorToken, + }) + } else { + mirror.Status = types.MirrorFinished + } + err = w.mirrorStore.Update(ctx, mirror) if err != nil { return fmt.Errorf("failed to update mirror: %w", err) } - w.mq.PushLfsMirror(&queue.MirrorTask{ - MirrorID: mirror.ID, - Priority: queue.Priority(mirror.Priority), - CreatedAt: mirror.CreatedAt.Unix(), - MirrorToken: task.MirrorToken, - }) return nil } -func (c *LocalMirrorWoker) generateLfsMetaObjects(ctx context.Context, mirror *database.Mirror) error { +func (c *LocalMirrorWoker) generateLfsMetaObjects(ctx context.Context, mirror *database.Mirror) (int, error) { var lfsMetaObjects []database.LfsMetaObject namespace := strings.Split(mirror.Repository.Path, "/")[0] name := strings.Split(mirror.Repository.Path, "/")[1] @@ -188,12 +200,12 @@ func (c *LocalMirrorWoker) generateLfsMetaObjects(ctx context.Context, mirror *d RepoType: mirror.Repository.RepositoryType, }) if err != nil { - return fmt.Errorf("failed to get repo branches: %v", err) + return 0, fmt.Errorf("failed to get repo branches: %v", err) } for _, branch := range branches { lfsPointers, err := c.getAllLfsPointersByRef(ctx, mirror.Repository.RepositoryType, namespace, name, branch.Name) if err != nil { - return fmt.Errorf("failed to get all lfs pointers: %v", err) + return 0, fmt.Errorf("failed to get all lfs pointers: %v", err) } for _, lfsPointer := range lfsPointers { lfsMetaObjects = append(lfsMetaObjects, database.LfsMetaObject{ @@ -206,12 +218,14 @@ func (c *LocalMirrorWoker) generateLfsMetaObjects(ctx context.Context, mirror *d } lfsMetaObjects = removeDuplicateLfsMetaObject(lfsMetaObjects) - err = c.lfsMetaObjectStore.BulkUpdateOrCreate(ctx, lfsMetaObjects) - if err != nil { - return fmt.Errorf("failed to bulk update or create lfs meta objects: %v", err) + if len(lfsMetaObjects) > 0 { + err = c.lfsMetaObjectStore.BulkUpdateOrCreate(ctx, lfsMetaObjects) + if err != nil { + return 0, fmt.Errorf("failed to bulk update or create lfs meta objects: %v", err) + } } - return nil + return len(lfsMetaObjects), nil } func (c *LocalMirrorWoker) getAllLfsPointersByRef(ctx context.Context, RepoType types.RepositoryType, namespace, name, ref string) ([]*types.LFSPointer, error) {