From 1984ad331ef5fe04732ab78b0c113f615e5489b0 Mon Sep 17 00:00:00 2001 From: Hitenjain14 Date: Fri, 13 Dec 2024 18:31:11 +0700 Subject: [PATCH 1/5] break when enoughs shards for reconstruction --- zboxcore/sdk/downloadworker.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/zboxcore/sdk/downloadworker.go b/zboxcore/sdk/downloadworker.go index ced23ac37..78f1e5f80 100644 --- a/zboxcore/sdk/downloadworker.go +++ b/zboxcore/sdk/downloadworker.go @@ -229,6 +229,7 @@ func (req *DownloadRequest) downloadBlock( fmt.Sprintf("Required downloads %d, remaining active blobber %d", req.consensusThresh, activeBlobbers)) } + actualRequiredDownloads := requiredDownloads if timeRequest { requiredDownloads = activeBlobbers } @@ -290,10 +291,16 @@ func (req *DownloadRequest) downloadBlock( c++ } - var failed int32 + var ( + failed int32 + success int32 + ) downloadErrors := make([]string, requiredDownloads) wg := &sync.WaitGroup{} for i := 0; i < requiredDownloads; i++ { + if atomic.LoadInt32(&success) >= int32(actualRequiredDownloads) { + break + } result := <-rspCh wg.Add(1) go func(i int) { @@ -314,6 +321,7 @@ func (req *DownloadRequest) downloadBlock( req.bufferMap[result.idx].ReleaseChunk(int(req.startBlock)) } } else if timeRequest { + atomic.AddInt32(&success, 1) req.downloadQueue[result.maskIdx].timeTaken = result.timeTaken } wg.Done() @@ -729,7 +737,7 @@ func (req *DownloadRequest) processDownload() { if startBlock+int64(j)*numBlocks+numBlocks > endBlock { blocksToDownload = endBlock - (startBlock + int64(j)*numBlocks) } - data, err := req.getBlocksData(startBlock+int64(j)*numBlocks, blocksToDownload, j == 0 && n > 1) + data, err := req.getBlocksData(startBlock+int64(j)*numBlocks, blocksToDownload, j == 0) if req.isDownloadCanceled { return errors.New("download_abort", "Download aborted by user") } From 7b3262dab7f5318d46bebe13bea52e7b0f207672 Mon Sep 17 00:00:00 2001 From: Hitenjain14 Date: Sat, 14 Dec 2024 14:41:56 +0700 Subject: [PATCH 2/5] add cancel download blocks and set buffers to null in worker --- wasmsdk/blobber.go | 8 ++++++++ wasmsdk/proxy.go | 1 + zboxcore/sdk/allocation.go | 20 +++++++++++++++++--- zboxcore/sdk/chunked_upload_process_js.go | 3 +++ 4 files changed, 29 insertions(+), 3 deletions(-) diff --git a/wasmsdk/blobber.go b/wasmsdk/blobber.go index 7da903394..737b0bbc1 100644 --- a/wasmsdk/blobber.go +++ b/wasmsdk/blobber.go @@ -1248,6 +1248,14 @@ func cancelDownloadDirectory(remotePath string) { downloadDirLock.Unlock() } +func cancelDownloadBlocks(allocationID, remotePath string, start, end int64) error { + alloc, err := getAllocation(allocationID) + if err != nil { + return err + } + return alloc.CancelDownloadBlocks(remotePath, start, end) +} + func startListener(respChan chan string) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/wasmsdk/proxy.go b/wasmsdk/proxy.go index f68426263..9826ac461 100644 --- a/wasmsdk/proxy.go +++ b/wasmsdk/proxy.go @@ -249,6 +249,7 @@ func main() { "getFileMetaByName": getFileMetaByName, "downloadDirectory": downloadDirectory, "cancelDownloadDirectory": cancelDownloadDirectory, + "cancelDownloadBlocks": cancelDownloadBlocks, // player "play": play, diff --git a/zboxcore/sdk/allocation.go b/zboxcore/sdk/allocation.go index 5528ed7b8..779930ebb 100644 --- a/zboxcore/sdk/allocation.go +++ b/zboxcore/sdk/allocation.go @@ -23,6 +23,7 @@ import ( "time" "github.com/0chain/gosdk/core/client" + "github.com/0chain/gosdk/core/encryption" "github.com/0chain/gosdk/core/transaction" "github.com/0chain/common/core/currency" @@ -1451,7 +1452,8 @@ func (a *Allocation) addAndGenerateDownloadRequest( opt(downloadReq) } downloadReq.workdir = filepath.Join(downloadReq.workdir, ".zcn") - a.downloadProgressMap[remotePath] = downloadReq + hash := encryption.Hash(fmt.Sprintf("%s:%d:%d", remotePath, startBlock, endBlock)) + a.downloadProgressMap[hash] = downloadReq a.downloadRequests = append(a.downloadRequests, downloadReq) if isFinal { downloadOps := a.downloadRequests @@ -2465,7 +2467,18 @@ func (a *Allocation) UploadAuthTicketToBlobber(authTicket string, clientEncPubKe // It cancels the download operation and removes the download request from the download progress map. // - remotepath: The remote path of the file to cancel the download operation. func (a *Allocation) CancelDownload(remotepath string) error { - if downloadReq, ok := a.downloadProgressMap[remotepath]; ok { + hash := encryption.Hash(fmt.Sprintf("%s:%d:%d", remotepath, 1, 0)) + if downloadReq, ok := a.downloadProgressMap[hash]; ok { + downloadReq.isDownloadCanceled = true + downloadReq.ctxCncl() + return nil + } + return errors.New("remote_path_not_found", "Invalid path. No download in progress for the path "+remotepath) +} + +func (a *Allocation) CancelDownloadBlocks(remotepath string, start, end int64) error { + hash := encryption.Hash(fmt.Sprintf("%s:%d:%d", remotepath, start, end)) + if downloadReq, ok := a.downloadProgressMap[hash]; ok { downloadReq.isDownloadCanceled = true downloadReq.ctxCncl() return nil @@ -2865,7 +2878,8 @@ func (a *Allocation) downloadFromAuthTicket(fileHandler sys.File, authTicket str opt(downloadReq) } a.mutex.Lock() - a.downloadProgressMap[remoteLookupHash] = downloadReq + hash := encryption.Hash(fmt.Sprintf("%s:%d:%d", remoteLookupHash, startBlock, endBlock)) + a.downloadProgressMap[hash] = downloadReq if len(a.downloadRequests) > 0 { downloadReq.connectionID = a.downloadRequests[0].connectionID } diff --git a/zboxcore/sdk/chunked_upload_process_js.go b/zboxcore/sdk/chunked_upload_process_js.go index 5d2027160..782400775 100644 --- a/zboxcore/sdk/chunked_upload_process_js.go +++ b/zboxcore/sdk/chunked_upload_process_js.go @@ -602,6 +602,9 @@ func parseEventData(data safejs.Value) (*FileMeta, *ChunkedUploadFormInfo, [][]b buf := make([]byte, fileShardLen) safejs.CopyBytesToGo(buf, fileShardUint8) fileShards := splitData(buf, int(chunkSize)) + fileShardUint8.Set("buffer", js.Null()) + formInfoUint8.Set("buffer", js.Null()) + fileMetaUint8.Set("buffer", js.Null()) thumbnailChunkDataUint8, err := data.Get("thumbnailChunkData") if err != nil { From 665d17afdb4a081fdaf094e413260f3d8e3557b7 Mon Sep 17 00:00:00 2001 From: Hitenjain14 Date: Sat, 14 Dec 2024 17:15:20 +0700 Subject: [PATCH 3/5] fix ut --- core/conf/config_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/conf/config_test.go b/core/conf/config_test.go index 356cf9cc2..8762195bd 100644 --- a/core/conf/config_test.go +++ b/core/conf/config_test.go @@ -77,7 +77,7 @@ func TestLoadConfig(t *testing.T) { return mockDefaultReader() }, run: func(r *require.Assertions, cfg Config) { - r.Equal(10, cfg.MinSubmit) + r.Equal(20, cfg.MinSubmit) }, }, { @@ -147,7 +147,7 @@ func TestLoadConfig(t *testing.T) { return mockDefaultReader() }, run: func(r *require.Assertions, cfg Config) { - r.Equal(5, cfg.QuerySleepTime) + r.Equal(1, cfg.QuerySleepTime) }, }, { name: "Test_Config_Max_Txn_Query_Less_Than_1", @@ -157,7 +157,7 @@ func TestLoadConfig(t *testing.T) { return mockDefaultReader() }, run: func(r *require.Assertions, cfg Config) { - r.Equal(5, cfg.MaxTxnQuery) + r.Equal(10, cfg.MaxTxnQuery) }, }, { name: "Test_Config_Confirmation_Chain_Length_Less_Than_1", From 18be4b5e7c596cd37c0ceeb3cfc2a0927d96a2ea Mon Sep 17 00:00:00 2001 From: Hitenjain14 Date: Sat, 14 Dec 2024 17:27:45 +0700 Subject: [PATCH 4/5] fix cancel download ut --- zboxcore/sdk/allocation_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/zboxcore/sdk/allocation_test.go b/zboxcore/sdk/allocation_test.go index 26e8db381..3df813c00 100644 --- a/zboxcore/sdk/allocation_test.go +++ b/zboxcore/sdk/allocation_test.go @@ -5,6 +5,7 @@ import ( "context" "encoding/hex" "encoding/json" + "fmt" "io" "io/fs" "log" @@ -1452,7 +1453,8 @@ func TestAllocation_CancelDownload(t *testing.T) { setup: func(t *testing.T, a *Allocation) (teardown func(t *testing.T)) { req := &DownloadRequest{} req.ctx, req.ctxCncl = context.WithCancel(context.TODO()) - a.downloadProgressMap[remotePath] = req + hash := fmt.Sprintf("%s:%d:%d", remotePath, 1, 0) + a.downloadProgressMap[hash] = req return nil }, }, From 0f3599cdccebc67e1157a33eb67c2cb92d590556 Mon Sep 17 00:00:00 2001 From: Hitenjain14 Date: Sat, 14 Dec 2024 17:44:58 +0700 Subject: [PATCH 5/5] add fix for hash --- zboxcore/sdk/allocation_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/zboxcore/sdk/allocation_test.go b/zboxcore/sdk/allocation_test.go index 3df813c00..a2cf5b8c5 100644 --- a/zboxcore/sdk/allocation_test.go +++ b/zboxcore/sdk/allocation_test.go @@ -19,6 +19,7 @@ import ( "github.com/0chain/gosdk/zboxcore/mocks" + encrypt "github.com/0chain/gosdk/core/encryption" "github.com/0chain/gosdk/dev/blobber" "github.com/0chain/gosdk/dev/blobber/model" "github.com/0chain/gosdk/zboxcore/encryption" @@ -1453,7 +1454,7 @@ func TestAllocation_CancelDownload(t *testing.T) { setup: func(t *testing.T, a *Allocation) (teardown func(t *testing.T)) { req := &DownloadRequest{} req.ctx, req.ctxCncl = context.WithCancel(context.TODO()) - hash := fmt.Sprintf("%s:%d:%d", remotePath, 1, 0) + hash := encrypt.Hash(fmt.Sprintf("%s:%d:%d", remotePath, 1, 0)) a.downloadProgressMap[hash] = req return nil },