Skip to content

Commit

Permalink
Merge pull request #1699 from 0chain/feat/cancel-repair
Browse files Browse the repository at this point in the history
Add cancel repair and add batch key to multi upload
  • Loading branch information
dabasov authored Dec 7, 2024
2 parents 65b096c + d34176c commit b5002b6
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 65 deletions.
106 changes: 77 additions & 29 deletions wasmsdk/blobber.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ import (
const FileOperationInsert = "insert"

var (
downloadDirContextMap = make(map[string]context.CancelCauseFunc)
downloadDirLock = sync.Mutex{}
opCancelContextMap = make(map[string]context.CancelCauseFunc)
opCancelLock = sync.Mutex{}
ErrUnderRepair = errors.New("allocation is under repair")
)

// listObjects list allocation objects from its blobbers
Expand Down Expand Up @@ -72,27 +73,45 @@ func listObjectsFromAuthTicket(allocationID, authTicket, lookupHash string, offs
}

// cancelUpload cancel the upload operation of the file
// - allocationID is the allocation id
// - remotePath is the remote path of the file
func cancelUpload(allocationID, remotePath string) error {
allocationObj, err := getAllocation(allocationID)
// - batchKey is the batch key of the operation
func cancelUpload(batchKey string) error {
opCancelLock.Lock()
defer opCancelLock.Unlock()
if cancel, ok := opCancelContextMap[batchKey]; ok {
cancel(sdk.ErrCancelUpload)
} else {
return errors.New("invalid batch key")
}
return nil
}

// pauseUpload pause the upload operation of the file
// - batchKey is the batch key of the operation
func pauseUpload(batchKey string) error {
opCancelLock.Lock()
defer opCancelLock.Unlock()
if cancel, ok := opCancelContextMap[batchKey]; ok {
cancel(sdk.ErrPauseUpload)
} else {
return errors.New("invalid batch key")
}
return nil
}

func cancelRepair(allocationID string) error {
alloc, err := getAllocation(allocationID)
if err != nil {
PrintError("Error fetching the allocation", err)
return err
}
return allocationObj.CancelUpload(remotePath)
return alloc.CancelRepair()
}

// pauseUpload pause the upload operation of the file
// - allocationID is the allocation id
// - remotePath is the remote path of the file
func pauseUpload(allocationID, remotePath string) error {
allocationObj, err := getAllocation(allocationID)
func cancelDownload(allocationID, remotePath string) error {
alloc, err := getAllocation(allocationID)
if err != nil {
PrintError("Error fetching the allocation", err)
return err
}
return allocationObj.PauseUpload(remotePath)
return alloc.CancelDownload(remotePath)
}

// createDir create a directory on blobbers
Expand Down Expand Up @@ -599,10 +618,11 @@ type MultiDownloadOption struct {
// ## Inputs
// - allocationID
// - jsonMultiUploadOptions: Json Array of MultiOperationOption. eg: "[{"operationType":"move","remotePath":"/README.md","destPath":"/folder1/"},{"operationType":"delete","remotePath":"/t3.txt"}]"
// - batchKey: batch key for the operation
//
// ## Outputs
// - error
func MultiOperation(allocationID string, jsonMultiUploadOptions string) error {
func MultiOperation(allocationID, jsonMultiUploadOptions string) error {
if allocationID == "" {
return errors.New("AllocationID is required")
}
Expand Down Expand Up @@ -631,6 +651,9 @@ func MultiOperation(allocationID string, jsonMultiUploadOptions string) error {
if err != nil {
return err
}
if allocationObj.IsUnderRepair() {
return ErrUnderRepair
}
return allocationObj.DoMultiOperation(operations)
}

Expand Down Expand Up @@ -696,7 +719,7 @@ func setUploadMode(mode int) {

// multiUpload upload multiple files in parallel
// - jsonBulkUploadOptions is the json array of BulkUploadOption. Follows the BulkUploadOption struct
func multiUpload(jsonBulkUploadOptions string) (MultiUploadResult, error) {
func multiUpload(jsonBulkUploadOptions, batchKey string) (MultiUploadResult, error) {
defer func() {
if r := recover(); r != nil {
PrintError("Recovered in multiupload Error", r)
Expand Down Expand Up @@ -729,6 +752,11 @@ func multiUpload(jsonBulkUploadOptions string) (MultiUploadResult, error) {
result.Success = false
return result, err
}
if allocationObj.IsUnderRepair() {
result.Error = ErrUnderRepair.Error()
result.Success = false
return result, ErrUnderRepair
}

operationRequests := make([]sdk.OperationRequest, n)
for idx, option := range options {
Expand Down Expand Up @@ -811,14 +839,34 @@ func multiUpload(jsonBulkUploadOptions string) (MultiUploadResult, error) {
}

}
err = allocationObj.DoMultiOperation(operationRequests)
if err != nil {
result.Error = err.Error()
ctx, cancel := context.WithCancelCause(context.Background())
defer cancel(nil)
opCancelLock.Lock()
opCancelContextMap[batchKey] = cancel
opCancelLock.Unlock()
defer func() {
opCancelLock.Lock()
delete(opCancelContextMap, batchKey)
opCancelLock.Unlock()
}()
errChan := make(chan error, 1)
go func() {
errChan <- allocationObj.DoMultiOperation(operationRequests, sdk.WithContext(ctx))
}()
select {
case <-ctx.Done():
result.Error = ctx.Err().Error()
result.Success = false
return result, err
return result, ctx.Err()
case err := <-errChan:
if err != nil {
result.Error = err.Error()
result.Success = false
return result, err
}
result.Success = true
return result, nil
}
result.Success = true
return result, nil
}

func uploadWithJsFuncs(allocationID, remotePath string, readChunkFuncName string, fileSize int64, thumbnailBytes []byte, webStreaming, encrypt, isUpdate, isRepair bool, numBlocks int, callbackFuncName string) (bool, error) {
Expand Down Expand Up @@ -1223,9 +1271,9 @@ func downloadDirectory(allocationID, remotePath, authticket, callbackFuncName st
go func() {
errChan <- alloc.DownloadDirectory(ctx, remotePath, "", authticket, statusBar)
}()
downloadDirLock.Lock()
downloadDirContextMap[remotePath] = cancel
downloadDirLock.Unlock()
opCancelLock.Lock()
opCancelContextMap[remotePath] = cancel
opCancelLock.Unlock()
select {
case err = <-errChan:
if err != nil {
Expand All @@ -1240,12 +1288,12 @@ func downloadDirectory(allocationID, remotePath, authticket, callbackFuncName st
// cancelDownloadDirectory cancel the download directory operation
// - remotePath : remote path of the directory
func cancelDownloadDirectory(remotePath string) {
downloadDirLock.Lock()
cancel, ok := downloadDirContextMap[remotePath]
opCancelLock.Lock()
cancel, ok := opCancelContextMap[remotePath]
if ok {
cancel(errors.New("download directory canceled by user"))
}
downloadDirLock.Unlock()
opCancelLock.Unlock()
}

func startListener(respChan chan string) error {
Expand Down
2 changes: 2 additions & 0 deletions wasmsdk/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ func main() {
"getFileMetaByName": getFileMetaByName,
"downloadDirectory": downloadDirectory,
"cancelDownloadDirectory": cancelDownloadDirectory,
"cancelRepair": cancelRepair,
"cancelDownload": cancelDownload,

// player
"play": play,
Expand Down
30 changes: 22 additions & 8 deletions zboxcore/sdk/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -2905,18 +2905,21 @@ func (a *Allocation) StartRepair(localRootPath, pathToRepair string, statusCB St
return err
}
}

repairCtx, repairCtxCancel := context.WithCancel(a.ctx)
repairReq := &RepairRequest{
listDir: listDir,
localRootPath: localRootPath,
statusCB: statusCB,
repairPath: pathToRepair,
listDir: listDir,
localRootPath: localRootPath,
statusCB: statusCB,
repairPath: pathToRepair,
repairCtx: repairCtx,
repairCtxCancel: repairCtxCancel,
}

repairReq.completedCallback = func() {
a.mutex.Lock()
defer a.mutex.Unlock()
a.repairRequestInProgress = nil
repairCtxCancel()
}

go func() {
Expand Down Expand Up @@ -2956,9 +2959,11 @@ func (a *Allocation) RepairSize(remotePath string) (RepairSize, error) {
if err != nil {
return RepairSize{}, err
}

repairCtx, repairCtxCancel := context.WithCancel(a.ctx)
repairReq := RepairRequest{
allocation: a,
allocation: a,
repairCtx: repairCtx,
repairCtxCancel: repairCtxCancel,
}
return repairReq.Size(context.Background(), dir)
}
Expand All @@ -2973,7 +2978,7 @@ func (a *Allocation) CancelUpload(remotePath string) error {
if !ok {
return errors.New("remote_path_not_found", "Invalid path. No upload in progress for the path "+remotePath)
} else {
cancelFunc(fmt.Errorf("upload canceled by user"))
cancelFunc(ErrCancelUpload)
}
return nil
}
Expand All @@ -2998,13 +3003,22 @@ func (a *Allocation) PauseUpload(remotePath string) error {
// CancelRepair cancels the repair operation for the allocation.
// It cancels the repair operation and returns an error if no repair is in progress for the allocation.
func (a *Allocation) CancelRepair() error {
a.mutex.Lock()
defer a.mutex.Unlock()
if a.repairRequestInProgress != nil {
a.repairRequestInProgress.isRepairCanceled = true
a.repairRequestInProgress.repairCtxCancel()
return nil
}
return errors.New("invalid_cancel_repair_request", "No repair in progress for the allocation")
}

func (a *Allocation) IsUnderRepair() bool {
a.mutex.Lock()
defer a.mutex.Unlock()
return a.repairRequestInProgress != nil
}

func (a *Allocation) GetMaxWriteReadFromBlobbers(blobbers []*BlobberAllocation) (maxW float64, maxR float64, err error) {
if !a.isInitialized() {
return 0, 0, notInitialized
Expand Down
9 changes: 7 additions & 2 deletions zboxcore/sdk/allocation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,8 @@ func TestAllocation_dispatchWork(t *testing.T) {
})
t.Run("Test_Cover_Repair_Request", func(t *testing.T) {
go a.dispatchWork(context.Background())
a.repairChan <- &RepairRequest{listDir: &ListResult{}}
repairCtx, repairCtxCancel := context.WithCancel(context.Background())
a.repairChan <- &RepairRequest{listDir: &ListResult{}, repairCtx: repairCtx, repairCtxCancel: repairCtxCancel}
})
}

Expand Down Expand Up @@ -2297,7 +2298,11 @@ func TestAllocation_CancelRepair(t *testing.T) {
{
name: "Test_Success",
setup: func(t *testing.T, a *Allocation) (teardown func(t *testing.T)) {
a.repairRequestInProgress = &RepairRequest{}
ctx, cancel := context.WithCancel(context.Background())
a.repairRequestInProgress = &RepairRequest{
repairCtx: ctx,
repairCtxCancel: cancel,
}
return nil
},
},
Expand Down
13 changes: 12 additions & 1 deletion zboxcore/sdk/multi_operation_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,17 @@ type MultiOperationOption func(mo *MultiOperation)

func WithRepair() MultiOperationOption {
return func(mo *MultiOperation) {
mo.Consensus.consensusThresh = 0
mo.Consensus.consensusThresh = 1
mo.isRepair = true
}
}

func WithContext(ctx context.Context) MultiOperationOption {
return func(mo *MultiOperation) {
mo.ctx, mo.ctxCncl = context.WithCancelCause(ctx)
}
}

type Operationer interface {
Process(allocObj *Allocation, connectionID string) ([]fileref.RefEntity, zboxutil.Uint128, error)
buildChange(refs []fileref.RefEntity, uid uuid.UUID) []allocationchange.AllocationChange
Expand Down Expand Up @@ -321,6 +327,11 @@ func (mo *MultiOperation) Process() error {
l.Logger.Error("consensus not met", activeBlobbers, mo.consensusThresh)
return errors.New("consensus_not_met", fmt.Sprintf("Active blobbers %d is less than consensus threshold %d", activeBlobbers, mo.consensusThresh))
}
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if mo.allocationObj.StorageVersion == StorageV2 {
return mo.commitV2()
}
Expand Down
Loading

0 comments on commit b5002b6

Please sign in to comment.