Skip to content

Commit

Permalink
perf: receive done (#1428)
Browse files Browse the repository at this point in the history
* perf: receive done
  • Loading branch information
constwz committed Jul 5, 2024
1 parent 37e3aa3 commit c7a88d1
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 24 deletions.
54 changes: 33 additions & 21 deletions modular/receiver/receive_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,20 @@ func (r *ReceiveModular) HandleDoneReceivePieceTask(ctx context.Context, task ta
log.CtxErrorw(ctx, "failed to get checksum from db", "task", task, "error", err)
return nil, ErrGfSpDBWithDetail("failed to get checksum from db, error: " + err.Error())
}
// // If it already have integrity data,Avoid repetitive writing db
skipInsertIntegrityMeta := false
if len(pieceChecksums) != int(segmentCount) {
log.CtxErrorw(ctx, "replicate piece unfinished", "task", task)
err = ErrUnfinishedTask
return nil, ErrUnfinishedTask
// Interface idempotent processing. If it already have integrity data, can skip this check
integrityMeta, integrityErr := r.baseApp.GfSpDB().GetObjectIntegrity(task.GetObjectInfo().Id.Uint64(), task.GetRedundancyIdx())
if integrityMeta != nil && integrityErr == nil {
// The checksum is obtained from integrityMeta
pieceChecksums = integrityMeta.PieceChecksumList
skipInsertIntegrityMeta = true
} else {
log.CtxErrorw(ctx, "replicate piece unfinished", "task", task)
err = ErrUnfinishedTask
return nil, ErrUnfinishedTask
}
}

expectedIntegrityHash := task.GetObjectInfo().GetChecksums()[task.GetRedundancyIdx()+1]
Expand All @@ -137,25 +147,27 @@ func (r *ReceiveModular) HandleDoneReceivePieceTask(ctx context.Context, task ta
}

setIntegrityTime := time.Now()
if task.GetObjectInfo().GetIsUpdating() {
integrityMeta := &corespdb.ShadowIntegrityMeta{
ObjectID: task.GetObjectInfo().Id.Uint64(),
RedundancyIndex: task.GetRedundancyIdx(),
IntegrityChecksum: integrityChecksum,
PieceChecksumList: pieceChecksums,
Version: task.GetObjectInfo().GetVersion(),
ObjectSize: task.GetObjectInfo().GetPayloadSize(),
}
err = r.baseApp.GfSpDB().SetShadowObjectIntegrity(integrityMeta)
} else {
integrityMeta := &corespdb.IntegrityMeta{
ObjectID: task.GetObjectInfo().Id.Uint64(),
RedundancyIndex: task.GetRedundancyIdx(),
IntegrityChecksum: integrityChecksum,
PieceChecksumList: pieceChecksums,
ObjectSize: task.GetObjectInfo().GetPayloadSize(),
if !skipInsertIntegrityMeta {
if task.GetObjectInfo().GetIsUpdating() {
integrityMeta := &corespdb.ShadowIntegrityMeta{
ObjectID: task.GetObjectInfo().Id.Uint64(),
RedundancyIndex: task.GetRedundancyIdx(),
IntegrityChecksum: integrityChecksum,
PieceChecksumList: pieceChecksums,
Version: task.GetObjectInfo().GetVersion(),
ObjectSize: task.GetObjectInfo().GetPayloadSize(),
}
err = r.baseApp.GfSpDB().SetShadowObjectIntegrity(integrityMeta)
} else {
integrityMeta := &corespdb.IntegrityMeta{
ObjectID: task.GetObjectInfo().Id.Uint64(),
RedundancyIndex: task.GetRedundancyIdx(),
IntegrityChecksum: integrityChecksum,
PieceChecksumList: pieceChecksums,
ObjectSize: task.GetObjectInfo().GetPayloadSize(),
}
err = r.baseApp.GfSpDB().SetObjectIntegrity(integrityMeta)
}
err = r.baseApp.GfSpDB().SetObjectIntegrity(integrityMeta)
}
metrics.PerfReceivePieceTimeHistogram.WithLabelValues("receive_piece_server_done_set_integrity_time").Observe(time.Since(setIntegrityTime).Seconds())
if err != nil {
Expand Down
9 changes: 6 additions & 3 deletions modular/receiver/receive_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@ import (

sdkmath "cosmossdk.io/math"
"github.com/bnb-chain/greenfield-common/go/hash"
storagetypes "github.com/bnb-chain/greenfield/x/storage/types"
"github.com/stretchr/testify/assert"
"go.uber.org/mock/gomock"
"gorm.io/gorm"

"github.com/bnb-chain/greenfield-storage-provider/base/gfspclient"
"github.com/bnb-chain/greenfield-storage-provider/base/gfsppieceop"
"github.com/bnb-chain/greenfield-storage-provider/base/types/gfsptask"
"github.com/bnb-chain/greenfield-storage-provider/core/piecestore"
"github.com/bnb-chain/greenfield-storage-provider/core/spdb"
"github.com/bnb-chain/greenfield-storage-provider/core/taskqueue"
storagetypes "github.com/bnb-chain/greenfield/x/storage/types"
"github.com/stretchr/testify/assert"
"go.uber.org/mock/gomock"
)

func TestErrPieceStoreWithDetail(t *testing.T) {
Expand Down Expand Up @@ -236,6 +238,7 @@ func TestHandleDoneReceivePieceTask_PieceCountMismatch(t *testing.T) {
mockSPDB := spdb.NewMockSPDB(ctrl)
r.baseApp.SetGfSpDB(mockSPDB)
mockSPDB.EXPECT().GetAllReplicatePieceChecksumOptimized(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).Times(1)
mockSPDB.EXPECT().GetObjectIntegrity(gomock.Any(), gomock.Any()).Return(nil, gorm.ErrRecordNotFound).AnyTimes()
_, err := r.HandleDoneReceivePieceTask(context.TODO(), mockTask)
assert.NotNil(t, err)
}
Expand Down

0 comments on commit c7a88d1

Please sign in to comment.