diff --git a/artifactory/commands/transferfiles/manager.go b/artifactory/commands/transferfiles/manager.go index b6777ca3b..57922e11a 100644 --- a/artifactory/commands/transferfiles/manager.go +++ b/artifactory/commands/transferfiles/manager.go @@ -205,8 +205,8 @@ type producerConsumerWrapper struct { } func newProducerConsumerWrapper() producerConsumerWrapper { - chunkUploaderProducerConsumer := parallel.NewRunner(GetThreads(), tasksMaxCapacity, false) - chunkBuilderProducerConsumer := parallel.NewRunner(GetThreads(), tasksMaxCapacity, false) + chunkUploaderProducerConsumer := parallel.NewRunner(GetChunkUploaderThreads(), tasksMaxCapacity, false) + chunkBuilderProducerConsumer := parallel.NewRunner(GetChunkBuilderThreads(), tasksMaxCapacity, false) chunkUploaderProducerConsumer.SetFinishedNotification(true) chunkBuilderProducerConsumer.SetFinishedNotification(true) errorsQueue := clientUtils.NewErrorsQueue(1) @@ -310,7 +310,7 @@ func pollUploads(phaseBase *phaseBase, srcUpService *srcUserPluginService, uploa // Fill chunk data batch till full. Return if no new chunk data is available. func fillChunkDataBatch(chunksLifeCycleManager *ChunksLifeCycleManager, uploadChunkChan chan UploadedChunk) { - for chunksLifeCycleManager.totalChunks < GetThreads() { + for chunksLifeCycleManager.totalChunks < GetChunkUploaderThreads() { select { case data := <-uploadChunkChan: currentNodeId := nodeId(data.NodeId) diff --git a/artifactory/commands/transferfiles/transfer.go b/artifactory/commands/transferfiles/transfer.go index 71482d62a..d2ab16c6a 100644 --- a/artifactory/commands/transferfiles/transfer.go +++ b/artifactory/commands/transferfiles/transfer.go @@ -588,19 +588,20 @@ func (tdc *TransferFilesCommand) getAllLocalRepos(serverDetails *config.ServerDe func (tdc *TransferFilesCommand) initCurThreads(buildInfoRepo bool) error { // Use default threads if settings file doesn't exist or an error occurred. - curThreads = utils.DefaultThreads + curChunkUploaderThreads = utils.DefaultThreads + curChunkBuilderThreads = utils.DefaultThreads settings, err := utils.LoadTransferSettings() if err != nil { return err } if settings != nil { - curThreads = settings.CalcNumberOfThreads(buildInfoRepo) - if buildInfoRepo && curThreads < settings.ThreadsNumber { + curChunkBuilderThreads, curChunkUploaderThreads = settings.CalcNumberOfThreads(buildInfoRepo) + if buildInfoRepo && curChunkUploaderThreads < settings.ThreadsNumber { log.Info("Build info transferring - using reduced number of threads") } } - log.Info("Running with maximum", strconv.Itoa(curThreads), "working threads...") + log.Info("Running with maximum", strconv.Itoa(curChunkUploaderThreads), "working threads...") return nil } diff --git a/artifactory/commands/transferfiles/transfer_test.go b/artifactory/commands/transferfiles/transfer_test.go index 2b745159b..2cfbf380b 100644 --- a/artifactory/commands/transferfiles/transfer_test.go +++ b/artifactory/commands/transferfiles/transfer_test.go @@ -203,7 +203,8 @@ func TestUploadChunkAndPollUploads(t *testing.T) { // Sends chunk to upload, polls on chunk three times - once when it is still in progress, once after done received and once to notify back to the source. func uploadChunkAndPollTwice(t *testing.T, phaseBase *phaseBase, fileSample api.FileRepresentation) { - curThreads = 8 + curChunkUploaderThreads = coreUtils.DefaultThreads + curChunkBuilderThreads = coreUtils.DefaultThreads uploadChunksChan := make(chan UploadedChunk, 3) doneChan := make(chan bool, 1) var runWaitGroup sync.WaitGroup diff --git a/artifactory/commands/transferfiles/utils.go b/artifactory/commands/transferfiles/utils.go index e41f50d3a..99ad5f3f9 100644 --- a/artifactory/commands/transferfiles/utils.go +++ b/artifactory/commands/transferfiles/utils.go @@ -48,7 +48,8 @@ type ( ) var AqlPaginationLimit = DefaultAqlPaginationLimit -var curThreads int +var curChunkBuilderThreads int +var curChunkUploaderThreads int type UploadedChunk struct { api.UploadChunkResponse @@ -190,7 +191,7 @@ var processedUploadChunksMutex sync.Mutex func incrCurProcessedChunksWhenPossible() bool { processedUploadChunksMutex.Lock() defer processedUploadChunksMutex.Unlock() - if curProcessedUploadChunks < GetThreads() { + if curProcessedUploadChunks < GetChunkUploaderThreads() { curProcessedUploadChunks++ return true } @@ -318,8 +319,12 @@ func newUploadedChunkStruct(uploadChunkResponse api.UploadChunkResponse, chunk a } } -func GetThreads() int { - return curThreads +func GetChunkBuilderThreads() int { + return curChunkBuilderThreads +} + +func GetChunkUploaderThreads() int { + return curChunkUploaderThreads } // Periodically reads settings file and updates the number of threads. @@ -349,16 +354,20 @@ func updateThreads(pcWrapper *producerConsumerWrapper, buildInfoRepo bool) error if err != nil || settings == nil { return err } - calculatedNumberOfThreads := settings.CalcNumberOfThreads(buildInfoRepo) - if curThreads != calculatedNumberOfThreads { + calculatedChunkBuilderThreads, calculatedChunkUploaderThreads := settings.CalcNumberOfThreads(buildInfoRepo) + if curChunkUploaderThreads != calculatedChunkUploaderThreads { if pcWrapper != nil { - updateProducerConsumerMaxParallel(pcWrapper.chunkBuilderProducerConsumer, calculatedNumberOfThreads) - updateProducerConsumerMaxParallel(pcWrapper.chunkUploaderProducerConsumer, calculatedNumberOfThreads) + if curChunkBuilderThreads != calculatedChunkBuilderThreads { + updateProducerConsumerMaxParallel(pcWrapper.chunkBuilderProducerConsumer, calculatedChunkBuilderThreads) + } + updateProducerConsumerMaxParallel(pcWrapper.chunkUploaderProducerConsumer, calculatedChunkUploaderThreads) } - log.Info(fmt.Sprintf("Number of threads have been updated to %s (was %s).", strconv.Itoa(calculatedNumberOfThreads), strconv.Itoa(curThreads))) - curThreads = calculatedNumberOfThreads + log.Info(fmt.Sprintf("Number of threads has been updated to %s (was %s).", strconv.Itoa(calculatedChunkUploaderThreads), strconv.Itoa(curChunkUploaderThreads))) + curChunkBuilderThreads = calculatedChunkBuilderThreads + curChunkUploaderThreads = calculatedChunkUploaderThreads } else { - log.Debug("No change to the number of threads have been detected.") + log.Debug(fmt.Sprintf("No change to the number of threads has been detected. Max chunks builder threads: %d. Max chunks uploader threads: %d.", + calculatedChunkBuilderThreads, calculatedChunkUploaderThreads)) } return nil } diff --git a/artifactory/commands/transferfiles/utils_test.go b/artifactory/commands/transferfiles/utils_test.go index 0f38b2100..80a30000c 100644 --- a/artifactory/commands/transferfiles/utils_test.go +++ b/artifactory/commands/transferfiles/utils_test.go @@ -15,10 +15,13 @@ import ( "github.com/jfrog/jfrog-cli-core/v2/artifactory/commands/transferfiles/api" "github.com/jfrog/jfrog-cli-core/v2/artifactory/commands/transferfiles/state" + artifactoryutils "github.com/jfrog/jfrog-cli-core/v2/artifactory/utils" "github.com/jfrog/jfrog-cli-core/v2/utils/config" "github.com/jfrog/jfrog-cli-core/v2/utils/tests" "github.com/jfrog/jfrog-client-go/artifactory/services" "github.com/jfrog/jfrog-client-go/artifactory/services/utils" + "github.com/jfrog/jfrog-client-go/utils/log" + clientutilstests "github.com/jfrog/jfrog-client-go/utils/tests" "github.com/stretchr/testify/assert" ) @@ -391,3 +394,42 @@ func createUniqueFileAndAssertCounter(t *testing.T, tmpDir, prefix string, expec assert.NoError(t, os.WriteFile(filePath, nil, 0644)) assert.True(t, strings.HasSuffix(filePath, strconv.Itoa(expectedCounter)+".json")) } + +var updateThreadsProvider = []struct { + threadsNumber int + expectedChunkBuilderThreads int + expectedChunkUploaderThreads int + buildInfo bool +}{ + {artifactoryutils.DefaultThreads - 1, artifactoryutils.DefaultThreads - 1, artifactoryutils.DefaultThreads - 1, false}, + {artifactoryutils.DefaultThreads, artifactoryutils.DefaultThreads, artifactoryutils.DefaultThreads, false}, + {artifactoryutils.MaxBuildInfoThreads + 1, artifactoryutils.MaxBuildInfoThreads + 1, artifactoryutils.MaxBuildInfoThreads + 1, false}, + {artifactoryutils.MaxChunkBuilderThreads + 1, artifactoryutils.MaxChunkBuilderThreads, artifactoryutils.MaxChunkBuilderThreads + 1, false}, + + {artifactoryutils.DefaultThreads - 1, artifactoryutils.DefaultThreads - 1, artifactoryutils.DefaultThreads - 1, true}, + {artifactoryutils.DefaultThreads, artifactoryutils.DefaultThreads, artifactoryutils.DefaultThreads, true}, + {artifactoryutils.MaxBuildInfoThreads + 1, artifactoryutils.MaxBuildInfoThreads, artifactoryutils.MaxBuildInfoThreads, true}, + {artifactoryutils.MaxChunkBuilderThreads + 1, artifactoryutils.MaxBuildInfoThreads, artifactoryutils.MaxBuildInfoThreads, true}, +} + +func TestUpdateThreads(t *testing.T) { + cleanUpJfrogHome, err := tests.SetJfrogHome() + assert.NoError(t, err) + defer cleanUpJfrogHome() + + previousLog := clientutilstests.RedirectLogOutputToNil() + defer func() { + log.SetLogger(previousLog) + }() + + for _, testCase := range updateThreadsProvider { + t.Run(strconv.Itoa(testCase.threadsNumber)+" Build Info: "+strconv.FormatBool(testCase.buildInfo), func(t *testing.T) { + transferSettings := &artifactoryutils.TransferSettings{ThreadsNumber: testCase.threadsNumber} + assert.NoError(t, artifactoryutils.SaveTransferSettings(transferSettings)) + + assert.NoError(t, updateThreads(nil, testCase.buildInfo)) + assert.Equal(t, testCase.expectedChunkBuilderThreads, curChunkBuilderThreads) + assert.Equal(t, testCase.expectedChunkUploaderThreads, curChunkUploaderThreads) + }) + } +} diff --git a/artifactory/utils/transfersettings.go b/artifactory/utils/transfersettings.go index cf3bc198c..1a3d36fd8 100644 --- a/artifactory/utils/transfersettings.go +++ b/artifactory/utils/transfersettings.go @@ -14,8 +14,11 @@ import ( const ( // DefaultThreads is the default number of threads working while transferring Artifactory's data - DefaultThreads = 8 + DefaultThreads = 8 + // Maximum working threads allowed to execute the AQL queries and upload chunks for build-info repositories MaxBuildInfoThreads = 8 + // Maximum working threads allowed to execute the AQL queries + MaxChunkBuilderThreads = 16 transferSettingsFile = "transfer.conf" transferSettingsLockFile = "transfer-settings" @@ -25,11 +28,17 @@ type TransferSettings struct { ThreadsNumber int `json:"threadsNumber,omitempty"` } -func (ts *TransferSettings) CalcNumberOfThreads(buildInfoRepo bool) int { +func (ts *TransferSettings) CalcNumberOfThreads(buildInfoRepo bool) (chunkBuilderThreads, chunkUploaderThreads int) { + chunkBuilderThreads = ts.ThreadsNumber + chunkUploaderThreads = ts.ThreadsNumber if buildInfoRepo && MaxBuildInfoThreads < ts.ThreadsNumber { - return MaxBuildInfoThreads + chunkBuilderThreads = MaxBuildInfoThreads + chunkUploaderThreads = MaxBuildInfoThreads } - return ts.ThreadsNumber + if MaxChunkBuilderThreads < chunkBuilderThreads { + chunkBuilderThreads = MaxChunkBuilderThreads + } + return } func LoadTransferSettings() (settings *TransferSettings, err error) {