Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transfer - Limit max AQL threads to 16 #1032

Merged
merged 3 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions artifactory/commands/transferfiles/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ type producerConsumerWrapper struct {
}

func newProducerConsumerWrapper() producerConsumerWrapper {
chunkUploaderProducerConsumer := parallel.NewRunner(GetThreads(), tasksMaxCapacity, false)
chunkBuilderProducerConsumer := parallel.NewRunner(GetThreads(), tasksMaxCapacity, false)
chunkUploaderProducerConsumer := parallel.NewRunner(GetChunkUploaderThreads(), tasksMaxCapacity, false)
chunkBuilderProducerConsumer := parallel.NewRunner(GetChunkBuilderThreads(), tasksMaxCapacity, false)
chunkUploaderProducerConsumer.SetFinishedNotification(true)
chunkBuilderProducerConsumer.SetFinishedNotification(true)
errorsQueue := clientUtils.NewErrorsQueue(1)
Expand Down Expand Up @@ -310,7 +310,7 @@ func pollUploads(phaseBase *phaseBase, srcUpService *srcUserPluginService, uploa

// Fill chunk data batch till full. Return if no new chunk data is available.
func fillChunkDataBatch(chunksLifeCycleManager *ChunksLifeCycleManager, uploadChunkChan chan UploadedChunk) {
for chunksLifeCycleManager.totalChunks < GetThreads() {
for chunksLifeCycleManager.totalChunks < GetChunkUploaderThreads() {
select {
case data := <-uploadChunkChan:
currentNodeId := nodeId(data.NodeId)
Expand Down
9 changes: 5 additions & 4 deletions artifactory/commands/transferfiles/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,19 +588,20 @@ func (tdc *TransferFilesCommand) getAllLocalRepos(serverDetails *config.ServerDe

func (tdc *TransferFilesCommand) initCurThreads(buildInfoRepo bool) error {
// Use default threads if settings file doesn't exist or an error occurred.
curThreads = utils.DefaultThreads
curChunkUploaderThreads = utils.DefaultThreads
curChunkBuilderThreads = utils.DefaultThreads
settings, err := utils.LoadTransferSettings()
if err != nil {
return err
}
if settings != nil {
curThreads = settings.CalcNumberOfThreads(buildInfoRepo)
if buildInfoRepo && curThreads < settings.ThreadsNumber {
curChunkBuilderThreads, curChunkUploaderThreads = settings.CalcNumberOfThreads(buildInfoRepo)
if buildInfoRepo && curChunkUploaderThreads < settings.ThreadsNumber {
log.Info("Build info transferring - using reduced number of threads")
}
}

log.Info("Running with maximum", strconv.Itoa(curThreads), "working threads...")
log.Info("Running with maximum", strconv.Itoa(curChunkUploaderThreads), "working threads...")
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion artifactory/commands/transferfiles/transfer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ func TestUploadChunkAndPollUploads(t *testing.T) {

// Sends chunk to upload, polls on chunk three times - once when it is still in progress, once after done received and once to notify back to the source.
func uploadChunkAndPollTwice(t *testing.T, phaseBase *phaseBase, fileSample api.FileRepresentation) {
curThreads = 8
curChunkUploaderThreads = coreUtils.DefaultThreads
curChunkBuilderThreads = coreUtils.DefaultThreads
uploadChunksChan := make(chan UploadedChunk, 3)
doneChan := make(chan bool, 1)
var runWaitGroup sync.WaitGroup
Expand Down
31 changes: 20 additions & 11 deletions artifactory/commands/transferfiles/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ type (
)

var AqlPaginationLimit = DefaultAqlPaginationLimit
var curThreads int
var curChunkBuilderThreads int
var curChunkUploaderThreads int

type UploadedChunk struct {
api.UploadChunkResponse
Expand Down Expand Up @@ -190,7 +191,7 @@ var processedUploadChunksMutex sync.Mutex
func incrCurProcessedChunksWhenPossible() bool {
processedUploadChunksMutex.Lock()
defer processedUploadChunksMutex.Unlock()
if curProcessedUploadChunks < GetThreads() {
if curProcessedUploadChunks < GetChunkUploaderThreads() {
curProcessedUploadChunks++
return true
}
Expand Down Expand Up @@ -318,8 +319,12 @@ func newUploadedChunkStruct(uploadChunkResponse api.UploadChunkResponse, chunk a
}
}

func GetThreads() int {
return curThreads
func GetChunkBuilderThreads() int {
return curChunkBuilderThreads
}

func GetChunkUploaderThreads() int {
return curChunkUploaderThreads
}

// Periodically reads settings file and updates the number of threads.
Expand Down Expand Up @@ -349,16 +354,20 @@ func updateThreads(pcWrapper *producerConsumerWrapper, buildInfoRepo bool) error
if err != nil || settings == nil {
return err
}
calculatedNumberOfThreads := settings.CalcNumberOfThreads(buildInfoRepo)
if curThreads != calculatedNumberOfThreads {
calculatedChunkBuilderThreads, calculatedChunkUploaderThreads := settings.CalcNumberOfThreads(buildInfoRepo)
if curChunkUploaderThreads != calculatedChunkUploaderThreads {
if pcWrapper != nil {
updateProducerConsumerMaxParallel(pcWrapper.chunkBuilderProducerConsumer, calculatedNumberOfThreads)
updateProducerConsumerMaxParallel(pcWrapper.chunkUploaderProducerConsumer, calculatedNumberOfThreads)
if curChunkBuilderThreads != calculatedChunkBuilderThreads {
updateProducerConsumerMaxParallel(pcWrapper.chunkBuilderProducerConsumer, calculatedChunkBuilderThreads)
}
updateProducerConsumerMaxParallel(pcWrapper.chunkUploaderProducerConsumer, calculatedChunkUploaderThreads)
}
log.Info(fmt.Sprintf("Number of threads have been updated to %s (was %s).", strconv.Itoa(calculatedNumberOfThreads), strconv.Itoa(curThreads)))
curThreads = calculatedNumberOfThreads
log.Info(fmt.Sprintf("Number of threads has been updated to %s (was %s).", strconv.Itoa(calculatedChunkUploaderThreads), strconv.Itoa(curChunkUploaderThreads)))
curChunkBuilderThreads = calculatedChunkBuilderThreads
curChunkUploaderThreads = calculatedChunkUploaderThreads
} else {
log.Debug("No change to the number of threads have been detected.")
log.Debug(fmt.Sprintf("No change to the number of threads has been detected. Max chunks builder threads: %d. Max chunks uploader threads: %d.",
calculatedChunkBuilderThreads, calculatedChunkUploaderThreads))
}
return nil
}
Expand Down
42 changes: 42 additions & 0 deletions artifactory/commands/transferfiles/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@ import (

"github.com/jfrog/jfrog-cli-core/v2/artifactory/commands/transferfiles/api"
"github.com/jfrog/jfrog-cli-core/v2/artifactory/commands/transferfiles/state"
artifactoryutils "github.com/jfrog/jfrog-cli-core/v2/artifactory/utils"
"github.com/jfrog/jfrog-cli-core/v2/utils/config"
"github.com/jfrog/jfrog-cli-core/v2/utils/tests"
"github.com/jfrog/jfrog-client-go/artifactory/services"
"github.com/jfrog/jfrog-client-go/artifactory/services/utils"
"github.com/jfrog/jfrog-client-go/utils/log"
clientutilstests "github.com/jfrog/jfrog-client-go/utils/tests"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -391,3 +394,42 @@ func createUniqueFileAndAssertCounter(t *testing.T, tmpDir, prefix string, expec
assert.NoError(t, os.WriteFile(filePath, nil, 0644))
assert.True(t, strings.HasSuffix(filePath, strconv.Itoa(expectedCounter)+".json"))
}

var updateThreadsProvider = []struct {
threadsNumber int
expectedChunkBuilderThreads int
expectedChunkUploaderThreads int
buildInfo bool
}{
{artifactoryutils.DefaultThreads - 1, artifactoryutils.DefaultThreads - 1, artifactoryutils.DefaultThreads - 1, false},
{artifactoryutils.DefaultThreads, artifactoryutils.DefaultThreads, artifactoryutils.DefaultThreads, false},
{artifactoryutils.MaxBuildInfoThreads + 1, artifactoryutils.MaxBuildInfoThreads + 1, artifactoryutils.MaxBuildInfoThreads + 1, false},
{artifactoryutils.MaxChunkBuilderThreads + 1, artifactoryutils.MaxChunkBuilderThreads, artifactoryutils.MaxChunkBuilderThreads + 1, false},

{artifactoryutils.DefaultThreads - 1, artifactoryutils.DefaultThreads - 1, artifactoryutils.DefaultThreads - 1, true},
{artifactoryutils.DefaultThreads, artifactoryutils.DefaultThreads, artifactoryutils.DefaultThreads, true},
{artifactoryutils.MaxBuildInfoThreads + 1, artifactoryutils.MaxBuildInfoThreads, artifactoryutils.MaxBuildInfoThreads, true},
{artifactoryutils.MaxChunkBuilderThreads + 1, artifactoryutils.MaxBuildInfoThreads, artifactoryutils.MaxBuildInfoThreads, true},
}

func TestUpdateThreads(t *testing.T) {
cleanUpJfrogHome, err := tests.SetJfrogHome()
assert.NoError(t, err)
defer cleanUpJfrogHome()

previousLog := clientutilstests.RedirectLogOutputToNil()
defer func() {
log.SetLogger(previousLog)
}()

for _, testCase := range updateThreadsProvider {
t.Run(strconv.Itoa(testCase.threadsNumber)+" Build Info: "+strconv.FormatBool(testCase.buildInfo), func(t *testing.T) {
transferSettings := &artifactoryutils.TransferSettings{ThreadsNumber: testCase.threadsNumber}
assert.NoError(t, artifactoryutils.SaveTransferSettings(transferSettings))

assert.NoError(t, updateThreads(nil, testCase.buildInfo))
assert.Equal(t, testCase.expectedChunkBuilderThreads, curChunkBuilderThreads)
assert.Equal(t, testCase.expectedChunkUploaderThreads, curChunkUploaderThreads)
})
}
}
17 changes: 13 additions & 4 deletions artifactory/utils/transfersettings.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ import (

const (
// DefaultThreads is the default number of threads working while transferring Artifactory's data
DefaultThreads = 8
DefaultThreads = 8
// Maximum working threads allowed to execute the AQL queries and upload chunks for build-info repositories
MaxBuildInfoThreads = 8
// Maximum working threads allowed to execute the AQL queries
MaxChunkBuilderThreads = 16

transferSettingsFile = "transfer.conf"
transferSettingsLockFile = "transfer-settings"
Expand All @@ -25,11 +28,17 @@ type TransferSettings struct {
ThreadsNumber int `json:"threadsNumber,omitempty"`
}

func (ts *TransferSettings) CalcNumberOfThreads(buildInfoRepo bool) int {
func (ts *TransferSettings) CalcNumberOfThreads(buildInfoRepo bool) (chunkBuilderThreads, chunkUploaderThreads int) {
chunkBuilderThreads = ts.ThreadsNumber
chunkUploaderThreads = ts.ThreadsNumber
if buildInfoRepo && MaxBuildInfoThreads < ts.ThreadsNumber {
return MaxBuildInfoThreads
chunkBuilderThreads = MaxBuildInfoThreads
chunkUploaderThreads = MaxBuildInfoThreads
}
return ts.ThreadsNumber
if MaxChunkBuilderThreads < chunkBuilderThreads {
chunkBuilderThreads = MaxChunkBuilderThreads
}
return
}

func LoadTransferSettings() (settings *TransferSettings, err error) {
Expand Down
Loading