From 7fb9eb368283bdc63769d4a6167aa6cc8b0bb4f6 Mon Sep 17 00:00:00 2001 From: BarryTong65 Date: Wed, 24 Apr 2024 14:37:21 +0800 Subject: [PATCH] fix: pick new gvg when retry failed replicate piece task --- modular/manager/manager.go | 34 +++++++++++----------- modular/manager/manager_test.go | 5 +++- modular/manager/task_retry_scheduler.go | 38 ++++++++++++------------- 3 files changed, 40 insertions(+), 37 deletions(-) diff --git a/modular/manager/manager.go b/modular/manager/manager.go index 2a072aed1..c120ad1e9 100644 --- a/modular/manager/manager.go +++ b/modular/manager/manager.go @@ -508,23 +508,25 @@ func (m *ManageModular) LoadTaskFromDB() error { replicateTask.InitReplicatePieceTask(objectInfo, storageParams, m.baseApp.TaskPriority(replicateTask), m.baseApp.TaskTimeout(replicateTask, objectInfo.GetPayloadSize()), m.baseApp.TaskMaxRetry(replicateTask), meta.IsAgentUpload) - if meta.GlobalVirtualGroupID == 0 { - bucketInfo, err := m.baseApp.GfSpClient().GetBucketByBucketName(context.Background(), objectInfo.BucketName, true) - if err != nil || bucketInfo == nil { - log.Errorw("failed to get bucket by bucket name", "bucket", bucketInfo, "error", err) - return err - } - gvgMeta, err := m.pickGlobalVirtualGroup(context.Background(), bucketInfo.BucketInfo.GlobalVirtualGroupFamilyId, storageParams) - log.Infow("pick global virtual group", "gvg_meta", gvgMeta, "error", err) - if err != nil { - return err - } - replicateTask.GlobalVirtualGroupId = gvgMeta.ID - replicateTask.SecondaryEndpoints = gvgMeta.SecondarySPEndpoints - } else { - replicateTask.GlobalVirtualGroupId = meta.GlobalVirtualGroupID - replicateTask.SecondaryEndpoints = meta.SecondaryEndpoints + //retrieve objects from the database that have not completed the replicate piece, reselect gvg, and then add them to the replicate queue + bucketInfo, err := m.baseApp.GfSpClient().GetBucketByBucketName(context.Background(), objectInfo.BucketName, true) + if err != nil || bucketInfo == nil { + log.Errorw("failed to get bucket by bucket name", "bucket", bucketInfo, "error", err) + return err + } + gvgMeta, err := m.pickGlobalVirtualGroup(context.Background(), bucketInfo.BucketInfo.GlobalVirtualGroupFamilyId, storageParams) + log.Infow("pick global virtual group", "gvg_meta", gvgMeta, "error", err) + if err != nil { + return err + } + replicateTask.GlobalVirtualGroupId = gvgMeta.ID + replicateTask.SecondaryEndpoints = gvgMeta.SecondarySPEndpoints + meta.GlobalVirtualGroupID = gvgMeta.ID + meta.SecondaryEndpoints = gvgMeta.SecondarySPEndpoints + if err = m.baseApp.GfSpDB().UpdateUploadProgress(meta); err != nil { + log.Errorw("failed to update object task state", "task_info", replicateTask.Info(), "error", err) } + pushErr := m.replicateQueue.Push(replicateTask) if pushErr != nil { log.Errorw("failed to push replicate piece task to queue", "object_info", objectInfo, "error", pushErr) diff --git a/modular/manager/manager_test.go b/modular/manager/manager_test.go index a6d998c22..c590d6ca4 100644 --- a/modular/manager/manager_test.go +++ b/modular/manager/manager_test.go @@ -239,7 +239,10 @@ func TestManageModular_LoadTaskFromDB(t *testing.T) { m3.EXPECT().GetBucketByBucketName(gomock.Any(), gomock.Any(), gomock.Any()).Return( &types.Bucket{BucketInfo: &types0.BucketInfo{ GlobalVirtualGroupFamilyId: 1, - }}, nil) + }}, nil).AnyTimes() + + m1.EXPECT().UpdateUploadProgress(gomock.Any()).Return( + nil).AnyTimes() vgm := vgmgr.NewMockVirtualGroupManager(ctrl) manage.virtualGroupManager = vgm diff --git a/modular/manager/task_retry_scheduler.go b/modular/manager/task_retry_scheduler.go index a5a1eae8f..5ec7f4aaf 100644 --- a/modular/manager/task_retry_scheduler.go +++ b/modular/manager/task_retry_scheduler.go @@ -8,11 +8,10 @@ import ( "time" "github.com/bnb-chain/greenfield-common/go/hash" - "github.com/bnb-chain/greenfield-storage-provider/core/piecestore" - "github.com/bnb-chain/greenfield-storage-provider/base/gfspapp" "github.com/bnb-chain/greenfield-storage-provider/base/gfsptqueue" "github.com/bnb-chain/greenfield-storage-provider/base/types/gfsptask" + "github.com/bnb-chain/greenfield-storage-provider/core/piecestore" "github.com/bnb-chain/greenfield-storage-provider/core/spdb" "github.com/bnb-chain/greenfield-storage-provider/pkg/log" "github.com/bnb-chain/greenfield-storage-provider/store/sqldb" @@ -212,24 +211,23 @@ func (s *TaskRetryScheduler) retryReplicateTask(meta *spdb.UploadObjectMeta) err replicateTask.InitReplicatePieceTask(objectInfo, storageParams, s.manager.baseApp.TaskPriority(replicateTask), s.manager.baseApp.TaskTimeout(replicateTask, objectInfo.GetPayloadSize()), s.manager.baseApp.TaskMaxRetry(replicateTask), meta.IsAgentUpload) - // for objects that have been uploaded but not starting the replication yet, it doesn't have the GVG info the UploadObjectMeta, - // so it needs to pick one to start the replicate task. - if meta.GlobalVirtualGroupID == 0 { - bucketInfo, err := s.manager.baseApp.GfSpClient().GetBucketByBucketName(context.Background(), objectInfo.BucketName, true) - if err != nil || bucketInfo == nil { - log.Errorw("failed to get bucket by bucket name", "bucket", bucketInfo, "error", err) - return err - } - gvgMeta, err := s.manager.pickGlobalVirtualGroup(context.Background(), bucketInfo.BucketInfo.GlobalVirtualGroupFamilyId, storageParams) - log.Infow("pick global virtual group", "gvg_meta", gvgMeta, "error", err) - if err != nil { - return err - } - replicateTask.GlobalVirtualGroupId = gvgMeta.ID - replicateTask.SecondaryEndpoints = gvgMeta.SecondarySPEndpoints - } else { - replicateTask.GlobalVirtualGroupId = meta.GlobalVirtualGroupID - replicateTask.SecondaryEndpoints = meta.SecondaryEndpoints + //retrieve objects from the database that have not completed the replicate piece, reselect gvg, and then add them to the replicate queue + bucketInfo, err := s.manager.baseApp.GfSpClient().GetBucketByBucketName(context.Background(), objectInfo.BucketName, true) + if err != nil || bucketInfo == nil { + log.Errorw("failed to get bucket by bucket name", "bucket", bucketInfo, "error", err) + return err + } + gvgMeta, err := s.manager.pickGlobalVirtualGroup(context.Background(), bucketInfo.BucketInfo.GlobalVirtualGroupFamilyId, storageParams) + log.Infow("pick global virtual group", "gvg_meta", gvgMeta, "error", err) + if err != nil { + return err + } + replicateTask.GlobalVirtualGroupId = gvgMeta.ID + replicateTask.SecondaryEndpoints = gvgMeta.SecondarySPEndpoints + meta.GlobalVirtualGroupID = gvgMeta.ID + meta.SecondaryEndpoints = gvgMeta.SecondarySPEndpoints + if err = s.manager.baseApp.GfSpDB().UpdateUploadProgress(meta); err != nil { + log.Errorw("failed to update object task state", "task_info", replicateTask.Info(), "error", err) } err = s.manager.replicateQueue.Push(replicateTask)