Skip to content

Commit

Permalink
Merge branch 'dev' of https://github.com/jfrog/jfrog-cli-core into re…
Browse files Browse the repository at this point in the history
…po-env
  • Loading branch information
RobiNino committed Jul 5, 2023
2 parents 7f07bd5 + 45dd4f9 commit edb634c
Show file tree
Hide file tree
Showing 41 changed files with 1,643 additions and 400 deletions.
31 changes: 2 additions & 29 deletions artifactory/commands/transferconfig/transferconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import (
"bytes"
"context"
"fmt"
"github.com/jfrog/gofrog/version"
"net/http"
"os"
"strings"
"time"

"github.com/jfrog/gofrog/datastructures"
"github.com/jfrog/gofrog/version"

"github.com/jfrog/jfrog-cli-core/v2/artifactory/commands/generic"
"github.com/jfrog/jfrog-cli-core/v2/artifactory/commands/transferconfig/configxmlutils"
commandsUtils "github.com/jfrog/jfrog-cli-core/v2/artifactory/commands/utils"
Expand Down Expand Up @@ -152,9 +152,6 @@ func (tcc *TransferConfigCommand) Run() (err error) {
}

tcc.LogTitle("Phase 5/5 - Import repositories to the target Artifactory")
if err = tcc.deleteConflictingRepositories(selectedRepos); err != nil {
return
}
if err = tcc.TransferRepositoriesToTarget(selectedRepos, remoteRepos); err != nil {
return
}
Expand Down Expand Up @@ -528,30 +525,6 @@ func (tcc *TransferConfigCommand) getWorkingDirParam() string {
return ""
}

func (tcc *TransferConfigCommand) deleteConflictingRepositories(selectedRepos map[utils.RepoType][]string) error {
log.Info("Deleting conflicting repositories in the target Artifactory server, if any exist...")
targetRepos, err := tcc.TargetArtifactoryManager.GetAllRepositories()
if err != nil {
return err
}
allSourceRepos := datastructures.MakeSet[string]()
for _, selectedReposWithType := range selectedRepos {
for _, selectedRepo := range selectedReposWithType {
allSourceRepos.Add(selectedRepo)
}
}

for _, targetRepo := range *targetRepos {
if allSourceRepos.Exists(targetRepo.Key) {
if err = tcc.TargetArtifactoryManager.DeleteRepository(targetRepo.Key); err != nil {
return err
}
}
}
log.Info("Done deleting conflicting repositories")
return nil
}

// Make sure that the source Artifactory version is sufficient.
// Returns the source Artifactory version.
func (tcc *TransferConfigCommand) validateMinVersion() error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,12 @@ func (w *SplitContentWriter) closeCurrentFile() error {
return err
}
if w.writer.GetFilePath() != "" {
fullPath := filepath.Join(w.dirPath, fmt.Sprintf("%s-%d.json", w.filePrefix, w.fileIndex))
fullPath, err := getUniqueErrorOrDelayFilePath(w.dirPath, func() string {
return w.filePrefix
})
if err != nil {
return err
}
log.Debug(fmt.Sprintf("Saving split content JSON file to: %s.", fullPath))
if err := fileutils.MoveFile(w.writer.GetFilePath(), fullPath); err != nil {
return fmt.Errorf("saving file failed! failed moving %s to %s: %w", w.writer.GetFilePath(), fullPath, err)
Expand Down
32 changes: 16 additions & 16 deletions artifactory/commands/transferfiles/errorshandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/jfrog/jfrog-client-go/utils/io/fileutils"
"github.com/jfrog/jfrog-client-go/utils/log"
"os"
"path/filepath"
"time"
)

Expand Down Expand Up @@ -42,9 +41,7 @@ type TransferErrorsMng struct {
type errorWriter struct {
writer *content.ContentWriter
errorCount int
// In case we have multiple errors files - we index them
fileIndex int
filePath string
filePath string
}

type errorWriterMng struct {
Expand Down Expand Up @@ -116,7 +113,7 @@ func (mng *TransferErrorsMng) start() (err error) {
if err != nil {
return err
}
writerRetry, retryFilePath, err := mng.newContentWriter(retryablePath, 0)
writerRetry, retryFilePath, err := mng.newUniqueContentWriter(retryablePath)
if err != nil {
return err
}
Expand All @@ -126,14 +123,14 @@ func (mng *TransferErrorsMng) start() (err error) {
err = e
}
}()
writerMng.retryable = errorWriter{writer: writerRetry, fileIndex: 0, filePath: retryFilePath}
writerMng.retryable = errorWriter{writer: writerRetry, filePath: retryFilePath}
// Init the content writer which is responsible for writing 'skipped errors' into files.
// In the next run we won't retry and upload those files.
skippedPath, err := getJfrogTransferRepoSkippedDir(mng.repoKey)
if err != nil {
return err
}
writerSkip, skipFilePath, err := mng.newContentWriter(skippedPath, 0)
writerSkip, skipFilePath, err := mng.newUniqueContentWriter(skippedPath)
if err != nil {
return err
}
Expand All @@ -143,7 +140,7 @@ func (mng *TransferErrorsMng) start() (err error) {
err = e
}
}()
writerMng.skipped = errorWriter{writer: writerSkip, fileIndex: 0, filePath: skipFilePath}
writerMng.skipped = errorWriter{writer: writerSkip, filePath: skipFilePath}
mng.errorWriterMng = writerMng

// Read errors from channel and write them to files.
Expand All @@ -156,17 +153,22 @@ func (mng *TransferErrorsMng) start() (err error) {
return
}

func (mng *TransferErrorsMng) newContentWriter(dirPath string, index int) (*content.ContentWriter, string, error) {
func (mng *TransferErrorsMng) newUniqueContentWriter(dirPath string) (*content.ContentWriter, string, error) {
writer, err := content.NewContentWriter("errors", true, false)
if err != nil {
return nil, "", err
}
errorsFilePath := filepath.Join(dirPath, getErrorsFileName(mng.repoKey, mng.phaseId, mng.phaseStartTime, index))
errorsFilePath, err := getUniqueErrorOrDelayFilePath(dirPath, func() string {
return getErrorsFileNamePrefix(mng.repoKey, mng.phaseId, mng.phaseStartTime)
})
if err != nil {
return nil, "", err
}
return writer, errorsFilePath, nil
}

func getErrorsFileName(repoKey string, phaseId int, phaseStartTime string, index int) string {
return fmt.Sprintf("%s-%d-%s-%d.json", repoKey, phaseId, phaseStartTime, index)
func getErrorsFileNamePrefix(repoKey string, phaseId int, phaseStartTime string) string {
return fmt.Sprintf("%s-%d-%s", repoKey, phaseId, phaseStartTime)
}

func (mng *TransferErrorsMng) writeErrorContent(e ExtendedFileUploadStatusResponse) error {
Expand Down Expand Up @@ -197,12 +199,11 @@ func (mng *TransferErrorsMng) writeSkippedErrorContent(e ExtendedFileUploadStatu
return err
}
// Initialize variables for new errors file
mng.errorWriterMng.skipped.fileIndex++
dirPath, err := getJfrogTransferRepoSkippedDir(mng.repoKey)
if err != nil {
return err
}
mng.errorWriterMng.skipped.writer, mng.errorWriterMng.skipped.filePath, err = mng.newContentWriter(dirPath, mng.errorWriterMng.skipped.fileIndex)
mng.errorWriterMng.skipped.writer, mng.errorWriterMng.skipped.filePath, err = mng.newUniqueContentWriter(dirPath)
if err != nil {
return err
}
Expand All @@ -222,12 +223,11 @@ func (mng *TransferErrorsMng) writeRetryableErrorContent(e ExtendedFileUploadSta
return err
}
// Initialize variables for new errors file
mng.errorWriterMng.retryable.fileIndex++
dirPath, err := getJfrogTransferRepoRetryableDir(mng.repoKey)
if err != nil {
return err
}
mng.errorWriterMng.retryable.writer, mng.errorWriterMng.retryable.filePath, err = mng.newContentWriter(dirPath, mng.errorWriterMng.retryable.fileIndex)
mng.errorWriterMng.retryable.writer, mng.errorWriterMng.retryable.filePath, err = mng.newUniqueContentWriter(dirPath)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion artifactory/commands/transferfiles/errorshandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,6 @@ func writeEmptyErrorsFile(t *testing.T, repoKey string, retryable bool, phase, c
assert.NoError(t, err)
assert.NoError(t, fileutils.CreateDirIfNotExist(errorsDirPath))

fileName := getErrorsFileName(repoKey, phase, state.ConvertTimeToEpochMilliseconds(time.Now()), counter)
fileName := fmt.Sprintf("%s-%d.json", getErrorsFileNamePrefix(repoKey, phase, state.ConvertTimeToEpochMilliseconds(time.Now())), counter)
assert.NoError(t, os.WriteFile(filepath.Join(errorsDirPath, fileName), nil, 0644))
}
34 changes: 34 additions & 0 deletions artifactory/commands/transferfiles/filediff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package transferfiles

import (
"testing"

"github.com/jfrog/jfrog-cli-core/v2/artifactory/commands/transferfiles/api"
servicesUtils "github.com/jfrog/jfrog-client-go/artifactory/services/utils"
"github.com/stretchr/testify/assert"
)

var convertResultsToFileRepresentationTestCases = []struct {
input servicesUtils.ResultItem
expectedOutput api.FileRepresentation
}{
{
servicesUtils.ResultItem{Repo: repo1Key, Path: "path-in-repo", Name: "file-name", Type: "file", Size: 100},
api.FileRepresentation{Repo: repo1Key, Path: "path-in-repo", Name: "file-name", Size: 100},
},
{
servicesUtils.ResultItem{Repo: repo1Key, Path: "path-in-repo", Name: "folder-name", Type: "folder"},
api.FileRepresentation{Repo: repo1Key, Path: "path-in-repo/folder-name"},
},
{
servicesUtils.ResultItem{Repo: repo1Key, Path: ".", Name: "folder-name", Type: "folder"},
api.FileRepresentation{Repo: repo1Key, Path: "folder-name"},
},
}

func TestConvertResultsToFileRepresentation(t *testing.T) {
for _, testCase := range convertResultsToFileRepresentationTestCases {
files := convertResultsToFileRepresentation([]servicesUtils.ResultItem{testCase.input})
assert.Equal(t, []api.FileRepresentation{testCase.expectedOutput}, files)
}
}
47 changes: 34 additions & 13 deletions artifactory/commands/transferfiles/filesdiff.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package transferfiles

import (
"fmt"
"path"
"time"

"github.com/jfrog/gofrog/parallel"
Expand Down Expand Up @@ -113,7 +114,7 @@ func (f *filesDiffPhase) handleTimeFrameFilesDiff(pcWrapper *producerConsumerWra

paginationI := 0
for {
result, err := f.getTimeFrameFilesDiff(fromTimestamp, toTimestamp, paginationI)
result, lastPage, err := f.getTimeFrameFilesDiff(fromTimestamp, toTimestamp, paginationI)
if err != nil {
return err
}
Expand Down Expand Up @@ -145,7 +146,7 @@ func (f *filesDiffPhase) handleTimeFrameFilesDiff(pcWrapper *producerConsumerWra
return err
}

if len(result) < AqlPaginationLimit {
if lastPage {
break
}
paginationI++
Expand All @@ -163,12 +164,26 @@ func (f *filesDiffPhase) handleTimeFrameFilesDiff(pcWrapper *producerConsumerWra

func convertResultsToFileRepresentation(results []servicesUtils.ResultItem) (files []api.FileRepresentation) {
for _, result := range results {
files = append(files, api.FileRepresentation{
Repo: result.Repo,
Path: result.Path,
Name: result.Name,
Size: result.Size,
})
switch result.Type {
case "folder":
var pathInRepo string
if result.Path == "." {
pathInRepo = result.Name
} else {
pathInRepo = path.Join(result.Path, result.Name)
}
files = append(files, api.FileRepresentation{
Repo: result.Repo,
Path: pathInRepo,
})
default:
files = append(files, api.FileRepresentation{
Repo: result.Repo,
Path: result.Path,
Name: result.Name,
Size: result.Size,
})
}
}
return
}
Expand All @@ -177,7 +192,11 @@ func convertResultsToFileRepresentation(results []servicesUtils.ResultItem) (fil
// fromTimestamp - Time in RFC3339 represents the start time
// toTimestamp - Time in RFC3339 represents the end time
// paginationOffset - Requested page
func (f *filesDiffPhase) getTimeFrameFilesDiff(fromTimestamp, toTimestamp string, paginationOffset int) (result []servicesUtils.ResultItem, err error) {
// Return values:
// result - The list of changed files and folders between the input timestamps
// lastPage - True if we are in the last AQL page and it is not needed to run another AQL requests
// err - The error, if any occurred
func (f *filesDiffPhase) getTimeFrameFilesDiff(fromTimestamp, toTimestamp string, paginationOffset int) (result []servicesUtils.ResultItem, lastPage bool, err error) {
var timeFrameFilesDiff *servicesUtils.AqlSearchResult
if f.packageType == docker {
// Handle Docker repositories.
Expand All @@ -187,9 +206,11 @@ func (f *filesDiffPhase) getTimeFrameFilesDiff(fromTimestamp, toTimestamp string
timeFrameFilesDiff, err = f.getNonDockerTimeFrameFilesDiff(fromTimestamp, toTimestamp, paginationOffset)
}
if err != nil {
return []servicesUtils.ResultItem{}, err
return []servicesUtils.ResultItem{}, true, err
}
return f.locallyGeneratedFilter.FilterLocallyGenerated(timeFrameFilesDiff.Results)
lastPage = len(timeFrameFilesDiff.Results) < AqlPaginationLimit
result, err = f.locallyGeneratedFilter.FilterLocallyGenerated(timeFrameFilesDiff.Results)
return
}

func (f *filesDiffPhase) getNonDockerTimeFrameFilesDiff(fromTimestamp, toTimestamp string, paginationOffset int) (aqlResult *servicesUtils.AqlSearchResult, err error) {
Expand Down Expand Up @@ -242,7 +263,7 @@ func (f *filesDiffPhase) getDockerTimeFrameFilesDiff(fromTimestamp, toTimestamp

func generateDiffAqlQuery(repoKey, fromTimestamp, toTimestamp string, paginationOffset int) string {
query := fmt.Sprintf(`items.find({"$and":[{"modified":{"$gte":"%s"}},{"modified":{"$lt":"%s"}},{"repo":"%s","type":"any"}]})`, fromTimestamp, toTimestamp, repoKey)
query += `.include("repo","path","name","modified","size")`
query += `.include("repo","path","name","type","modified","size")`
query += fmt.Sprintf(`.sort({"$asc":["modified"]}).offset(%d).limit(%d)`, paginationOffset*AqlPaginationLimit, AqlPaginationLimit)
return query
}
Expand All @@ -265,7 +286,7 @@ func generateGetDirContentAqlQuery(repoKey string, paths []string) string {
func generateDockerManifestAqlQuery(repoKey, fromTimestamp, toTimestamp string, paginationOffset int) string {
query := `items.find({"$and":`
query += fmt.Sprintf(`[{"repo":"%s"},{"modified":{"$gte":"%s"}},{"modified":{"$lt":"%s"}},{"$or":[{"name":"manifest.json"},{"name":"list.manifest.json"}]}`, repoKey, fromTimestamp, toTimestamp)
query += `]}).include("repo","path","name","modified")`
query += `]}).include("repo","path","name","type","modified")`
query += fmt.Sprintf(`.sort({"$asc":["modified"]}).offset(%d).limit(%d)`, paginationOffset*AqlPaginationLimit, AqlPaginationLimit)
return query
}
4 changes: 3 additions & 1 deletion artifactory/commands/transferfiles/fulltransfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,9 @@ func (m *fullTransferPhase) searchAndHandleFolderContents(params folderParams, p
}

// Add the folder as a candidate to transfer. The reason is that we'd like to transfer only folders with properties or empty folders.
curUploadChunk.AppendUploadCandidateIfNeeded(api.FileRepresentation{Repo: m.repoKey, Path: params.relativePath, NonEmptyDir: len(result) > 0}, m.buildInfoRepo)
if params.relativePath != "." {
curUploadChunk.AppendUploadCandidateIfNeeded(api.FileRepresentation{Repo: m.repoKey, Path: params.relativePath, NonEmptyDir: len(result) > 0}, m.buildInfoRepo)
}

// Empty folder
if paginationI == 0 && len(result) == 0 {
Expand Down
16 changes: 11 additions & 5 deletions artifactory/commands/transferfiles/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,19 +259,25 @@ func pollUploads(phaseBase *phaseBase, srcUpService *srcUserPluginService, uploa
if phaseBase != nil {
timeEstMng = &phaseBase.stateManager.TimeEstimationManager
}
for {
for i := 0; ; i++ {
if ShouldStop(phaseBase, nil, errorsChannelMng) {
return
}
time.Sleep(waitTimeBetweenChunkStatusSeconds * time.Second)

// 'Working threads' are determined by how many upload chunks are currently being processed by the source Artifactory instance.
if err := phaseBase.stateManager.SetWorkingThreads(curProcessedUploadChunks); err != nil {
log.Error("Couldn't set the current number of working threads:", err.Error())
// Run once per 3 minutes
if i%60 == 0 {
// 'Working threads' are determined by how many upload chunks are currently being processed by the source Artifactory instance.
if err := phaseBase.stateManager.SetWorkingThreads(curProcessedUploadChunks); err != nil {
log.Error("Couldn't set the current number of working threads:", err.Error())
}
}

// Each uploading thread receive a token and a node id from the source via the uploadChunkChan, so this go routine can poll on its status.
// Each uploading thread receives a token and a node id from the source via the uploadChunkChan, so this go routine can poll on its status.
fillChunkDataBatch(&chunksLifeCycleManager, uploadChunkChan)
if err := chunksLifeCycleManager.StoreStaleChunks(phaseBase.stateManager); err != nil {
log.Error("Couldn't store the stale chunks:", err.Error())
}
// When totalChunks size is zero, it means that all the tokens are uploaded,
// we received 'DONE' for all of them, and we notified the source that they can be deleted from the memory.
// If during the polling some chunks data were lost due to network issues, either on the client or on the source,
Expand Down
13 changes: 13 additions & 0 deletions artifactory/commands/transferfiles/state/runstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,19 @@ type TransferRunStatus struct {
WorkingThreads int `json:"working_threads,omitempty"`
TransferFailures uint `json:"transfer_failures,omitempty"`
TimeEstimationManager `json:"time_estimation,omitempty"`
StaleChunks []StaleChunks `json:"stale_chunks,omitempty"`
}

// This structure contains a collection of chunks that have been undergoing processing for over 30 minutes
type StaleChunks struct {
NodeID string `json:"node_id,omitempty"`
Chunks []StaleChunk `json:"stale_node_chunks,omitempty"`
}

type StaleChunk struct {
ChunkID string `json:"chunk_id,omitempty"`
Files []string `json:"files,omitempty"`
Sent int64 `json:"sent,omitempty"`
}

func (ts *TransferRunStatus) action(action ActionOnStatusFunc) error {
Expand Down
Loading

0 comments on commit edb634c

Please sign in to comment.