Skip to content

Commit

Permalink
Feature: Upload parallel (#15360)
Browse files Browse the repository at this point in the history
* wip

* review
  • Loading branch information
memsharded authored Jan 10, 2024
1 parent ef19f22 commit 9fb047f
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 73 deletions.
12 changes: 12 additions & 0 deletions conan/api/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,18 @@ class PackagesList:
def __init__(self):
self.recipes = {}

def split(self):
"""
Returns a list of PackageList, splitted one per reference.
This can be useful to parallelize things like upload, parallelizing per-reference
"""
result = []
for r, content in self.recipes.items():
subpkglist = PackagesList()
subpkglist.recipes[r] = content
result.append(subpkglist)
return result

def only_recipes(self):
result = {}
for ref, ref_dict in self.recipes.items():
Expand Down
14 changes: 10 additions & 4 deletions conan/api/output.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import fnmatch
import sys
import time
from threading import Lock

from colorama import Fore, Style

Expand Down Expand Up @@ -54,6 +55,7 @@ class ConanOutput:
_conan_output_level = LEVEL_STATUS
_silent_warn_tags = []
_warnings_as_errors = []
lock = Lock()

def __init__(self, scope=""):
self.stream = sys.stderr
Expand Down Expand Up @@ -127,8 +129,11 @@ def write(self, data, fg=None, bg=None, newline=False):

if newline:
data = "%s\n" % data
self.stream.write(data)
self.stream.flush()

with self.lock:
self.stream.write(data)
self.stream.flush()

return self

def rewrite_line(self, line):
Expand Down Expand Up @@ -162,8 +167,9 @@ def _write_message(self, msg, fg=None, bg=None):
else:
ret += "{}".format(msg)

self.stream.write("{}\n".format(ret))
self.stream.flush()
with self.lock:
self.stream.write("{}\n".format(ret))
self.stream.flush()

def trace(self, msg):
if self._conan_output_level <= LEVEL_TRACE:
Expand Down
44 changes: 44 additions & 0 deletions conan/api/subapi/upload.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import os
import time
from multiprocessing.pool import ThreadPool

from conan.api.output import ConanOutput
from conan.internal.cache.home_paths import HomePaths
Expand Down Expand Up @@ -57,6 +59,48 @@ def upload(self, package_list, remote):
executor = UploadExecutor(app)
executor.upload(package_list, remote)

def upload_full(self, package_list, remote, enabled_remotes, check_integrity=False, force=False,
metadata=None, dry_run=False):
""" Does the whole process of uploading, including the possibility of parallelizing
per recipe:
- calls check_integrity
- checks which revision already exist in the server (not necessary to upoad)
- prepare the artifacts to upload (compress .tgz)
- execute the actual upload
- upload potential sources backups
"""

def _upload_pkglist(pkglist, subtitle=lambda _: None):
if check_integrity:
subtitle("Checking integrity of cache packages")
self.conan_api.cache.check_integrity(pkglist)
# Check if the recipes/packages are in the remote
subtitle("Checking server existing packages")
self.check_upstream(pkglist, remote, enabled_remotes, force)
subtitle("Preparing artifacts for upload")
self.prepare(pkglist, enabled_remotes, metadata)

if not dry_run:
subtitle("Uploading artifacts")
self.upload(pkglist, remote)
backup_files = self.get_backup_sources(pkglist)
self.upload_backup_sources(backup_files)

t = time.time()
ConanOutput().title(f"Uploading to remote {remote.name}")
parallel = self.conan_api.config.get("core.upload:parallel", default=1, check_type=int)
thread_pool = ThreadPool(parallel) if parallel > 1 else None
if not thread_pool or len(package_list.recipes) <= 1:
_upload_pkglist(package_list, subtitle=ConanOutput().subtitle)
else:
ConanOutput().subtitle(f"Uploading with {parallel} parallel threads")
thread_pool.map(_upload_pkglist, package_list.split())
if thread_pool:
thread_pool.close()
thread_pool.join()
elapsed = time.time() - t
ConanOutput().success(f"Upload complete in {int(elapsed)}s\n")

def get_backup_sources(self, package_list=None):
"""Get list of backup source files currently present in the cache,
either all of them if no argument, else filter by those belonging to the references in the package_list"""
Expand Down
3 changes: 2 additions & 1 deletion conan/cli/commands/cache.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from conan.api.conan_api import ConanAPI
from conan.api.model import ListPattern, MultiPackagesList
from conan.api.output import cli_out_write
from conan.api.output import cli_out_write, ConanOutput
from conan.cli import make_abs_path
from conan.cli.command import conan_command, conan_subcommand, OnceArgument
from conan.cli.commands.list import print_list_text, print_list_json
Expand Down Expand Up @@ -101,6 +101,7 @@ def cache_check_integrity(conan_api: ConanAPI, parser, subparser, *args):
ref_pattern = ListPattern(args.pattern, rrev="*", package_id="*", prev="*")
package_list = conan_api.list.select(ref_pattern, package_query=args.package_query)
conan_api.cache.check_integrity(package_list)
ConanOutput().success("Integrity check: ok")


@conan_subcommand(formatters={"text": print_list_text,
Expand Down
12 changes: 2 additions & 10 deletions conan/cli/commands/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,8 @@ def upload(conan_api: ConanAPI, parser, *args):
if not args.list and not args.confirm and "*" in args.pattern:
_ask_confirm_upload(conan_api, package_list)

if args.check:
conan_api.cache.check_integrity(package_list)
# Check if the recipes/packages are in the remote
conan_api.upload.check_upstream(package_list, remote, enabled_remotes, args.force)
conan_api.upload.prepare(package_list, enabled_remotes, args.metadata)

if not args.dry_run:
conan_api.upload.upload(package_list, remote)
backup_files = conan_api.upload.get_backup_sources(package_list)
conan_api.upload.upload_backup_sources(backup_files)
conan_api.upload.upload_full(package_list, remote, enabled_remotes, args.check,
args.force, args.metadata, args.dry_run)
elif args.list:
# Don't error on no recipes for automated workflows using list,
# but warn to tell the user that no packages were uploaded
Expand Down
15 changes: 6 additions & 9 deletions conans/client/cmd/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ def __init__(self, app: ConanApp):
self._app = app

def check(self, upload_bundle, remote, force):
ConanOutput().subtitle("Checking server existing packages")
for ref, recipe_bundle in upload_bundle.refs().items():
self._check_upstream_recipe(ref, recipe_bundle, remote, force)
for pref, prev_bundle in upload_bundle.prefs(ref, recipe_bundle).items():
Expand Down Expand Up @@ -81,7 +80,6 @@ def __init__(self, app: ConanApp, global_conf):
self._global_conf = global_conf

def prepare(self, upload_bundle, enabled_remotes):
ConanOutput().subtitle("Preparing artifacts for upload")
for ref, bundle in upload_bundle.refs().items():
layout = self._app.cache.recipe_layout(ref)
conanfile_path = layout.conanfile()
Expand Down Expand Up @@ -211,39 +209,38 @@ class UploadExecutor:
"""
def __init__(self, app: ConanApp):
self._app = app
self._output = ConanOutput()

def upload(self, upload_data, remote):
ConanOutput().subtitle("Uploading artifacts")
for ref, bundle in upload_data.refs().items():
if bundle.get("upload"):
self.upload_recipe(ref, bundle, remote)
for pref, prev_bundle in upload_data.prefs(ref, bundle).items():
if prev_bundle.get("upload"):
self.upload_package(pref, prev_bundle, remote)
ConanOutput().success("Upload complete\n")

def upload_recipe(self, ref, bundle, remote):
self._output.info(f"Uploading recipe '{ref.repr_notime()}'")
output = ConanOutput(scope=str(ref))
output.info(f"Uploading recipe '{ref.repr_notime()}'")
t1 = time.time()
cache_files = bundle["files"]

self._app.remote_manager.upload_recipe(ref, cache_files, remote)

duration = time.time() - t1
self._output.debug(f"Upload {ref} in {duration} time")
output.debug(f"Upload {ref} in {duration} time")
return ref

def upload_package(self, pref, prev_bundle, remote):
self._output.info(f"Uploading package '{pref.repr_notime()}'")
output = ConanOutput(scope=str(pref.ref))
output.info(f"Uploading package '{pref.repr_notime()}'")
cache_files = prev_bundle["files"]
assert (pref.revision is not None), "Cannot upload a package without PREV"
assert (pref.ref.revision is not None), "Cannot upload a package without RREV"

t1 = time.time()
self._app.remote_manager.upload_package(pref, cache_files, remote)
duration = time.time() - t1
self._output.debug(f"Upload {pref} in {duration} time")
output.debug(f"Upload {pref} in {duration} time")


def compress_files(files, name, dest_dir, compresslevel=None, ref=None):
Expand Down
21 changes: 17 additions & 4 deletions conans/client/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import sys
import types
import uuid
from threading import Lock

import yaml

Expand Down Expand Up @@ -52,7 +53,7 @@ def load_basic_module(self, conanfile_path, graph_lock=None, display="", remotes
return conanfile, cached[1]

try:
module, conanfile = parse_conanfile(conanfile_path)
module, conanfile = _parse_conanfile(conanfile_path)
if tested_python_requires:
conanfile.python_requires = tested_python_requires

Expand Down Expand Up @@ -288,10 +289,14 @@ class defining the Recipe, but also process possible existing generators
return result


def parse_conanfile(conanfile_path):
module, filename = load_python_file(conanfile_path)
_load_python_lock = Lock() # Loading our Python files is not thread-safe (modifies sys)


def _parse_conanfile(conanfile_path):
with _load_python_lock:
module, module_id = _load_python_file(conanfile_path)
try:
conanfile = _parse_module(module, filename)
conanfile = _parse_module(module, module_id)
return module, conanfile
except Exception as e: # re-raise with file name
raise ConanException("%s: %s" % (conanfile_path, str(e)))
Expand All @@ -300,6 +305,14 @@ def parse_conanfile(conanfile_path):
def load_python_file(conan_file_path):
""" From a given path, obtain the in memory python import module
"""
with _load_python_lock:
module, module_id = _load_python_file(conan_file_path)
return module, module_id


def _load_python_file(conan_file_path):
""" From a given path, obtain the in memory python import module
"""

if not os.path.exists(conan_file_path):
raise NotFoundException("%s not found!" % conan_file_path)
Expand Down
1 change: 1 addition & 0 deletions conans/model/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"core.version_ranges:resolve_prereleases": "Whether version ranges can resolve to pre-releases or not",
"core.upload:retry": "Number of retries in case of failure when uploading to Conan server",
"core.upload:retry_wait": "Seconds to wait between upload attempts to Conan server",
"core.upload:parallel": "Number of concurrent threads to upload packages",
"core.download:parallel": "Number of concurrent threads to download packages",
"core.download:retry": "Number of retries in case of failure when downloading from Conan server",
"core.download:retry_wait": "Seconds to wait between download attempts from Conan server",
Expand Down
58 changes: 13 additions & 45 deletions conans/test/integration/command/upload/test_upload_parallel.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
import textwrap

import pytest
from mock import patch
from requests import ConnectionError

from conans.test.assets.genconanfile import GenConanfile
from conans.test.utils.tools import TestClient, NO_SETTINGS_PACKAGE_ID, TestRequester
from conans.test.utils.tools import TestClient, TestRequester


@pytest.mark.xfail(reason="Upload parallel not migrated yet")
def test_upload_parallel_error():
"""Cause an error in the parallel transfer and see some message"""

Expand All @@ -22,72 +17,45 @@ def put(self, *args, **kwargs):
return super(FailOnReferencesUploader, self).put(*args, **kwargs)

client = TestClient(requester_class=FailOnReferencesUploader, default_server_user=True)
client.save({"global.conf": f"core.upload:parallel=2\ncore.upload:retry_wait=0"},
path=client.cache.cache_folder)
client.save({"conanfile.py": GenConanfile()})
client.run('remote login default admin -p password')
for index in range(4):
client.run('create . --name=lib{} --version=1.0 --user=user --channel=channel'.format(index))
client.run('upload lib* --parallel -c -r default --retry-wait=0', assert_error=True)
client.run('upload lib* -c -r default', assert_error=True)
assert "Connection fails with lib2 and lib4 references!" in client.out
assert "Execute upload again to retry upload the failed files" in client.out


@pytest.mark.xfail(reason="Upload parallel not migrated yet")
def test_upload_parallel_success():
"""Upload 2 packages in parallel with success"""

client = TestClient(default_server_user=True)
client.save({"global.conf": f"core.upload:parallel=2"}, path=client.cache.cache_folder)
client.save({"conanfile.py": GenConanfile()})
client.run('create . --name=lib0 --version=1.0 --user=user --channel=channel')
assert "lib0/1.0@user/channel: Package '{}' created".format(NO_SETTINGS_PACKAGE_ID) in client.out
client.run('create . --name=lib1 --version=1.0 --user=user --channel=channel')
assert "lib1/1.0@user/channel: Package '{}' created".format(NO_SETTINGS_PACKAGE_ID) in client.out
client.run('remote login default admin -p password')
client.run('upload lib* --parallel -c -r default')
assert "Uploading lib0/1.0@user/channel to remote 'default'" in client.out
assert "Uploading lib1/1.0@user/channel to remote 'default'" in client.out
client.run('upload lib* -c -r default')
assert "Uploading recipe 'lib0/1.0@user/channel#4d670581ccb765839f2239cc8dff8fbd'" in client.out
assert "Uploading recipe 'lib1/1.0@user/channel#4d670581ccb765839f2239cc8dff8fbd'" in client.out
client.run('search lib0/1.0@user/channel -r default')
assert "lib0/1.0@user/channel" in client.out
client.run('search lib1/1.0@user/channel -r default')
assert "lib1/1.0@user/channel" in client.out


@pytest.mark.xfail(reason="Upload parallel not migrated yet")
def test_upload_parallel_fail_on_interaction():
"""Upload 2 packages in parallel and fail because non_interactive forced"""

client = TestClient(default_server_user=True)
client.save({"global.conf": f"core.upload:parallel=2\ncore:non_interactive=True"},
path=client.cache.cache_folder)
client.save({"conanfile.py": GenConanfile()})
num_references = 2
for index in range(num_references):
client.run('create . --name=lib{} --version=1.0 --user=user --channel=channel'.format(index))
assert "lib{}/1.0@user/channel: Package '{}' created".format(
index,
NO_SETTINGS_PACKAGE_ID) in client.out
client.run('remote logout default')
client.run('upload lib* --parallel -c -r default', assert_error=True)
assert "ERROR: lib0/1.0@user/channel: Upload recipe to 'default' failed: " \
"Conan interactive mode disabled. [Remote: default]" in client.out


@pytest.mark.xfail(reason="Upload parallel not migrated yet")
def test_beat_character_long_upload():
client = TestClient(default_server_user=True)
slow_conanfile = textwrap.dedent("""
from conan import ConanFile
from conan.tools.files import copy
class MyPkg(ConanFile):
exports = "*"
def package(self):
copy(self, "*", self.source_folder, self.package_folder)
""")
client.save({"conanfile.py": slow_conanfile,
"hello.cpp": ""})
client.run("create . --name=pkg --version=0.1 --user=user --channel=stable")
client.run("remote login default admin --password=password")
with patch("conans.util.progress_bar.TIMEOUT_BEAT_SECONDS", -1):
with patch("conans.util.progress_bar.TIMEOUT_BEAT_CHARACTER", "%&$"):
client.run("upload pkg/0.1@user/stable -r default")
out = "".join(str(client.out).splitlines())
assert "Compressing package...%&$Uploading conan_package.tgz -> pkg/0.1@user/stable" in out
assert "%&$Uploading conan_export.tgz" in out
assert "%&$Uploading conaninfo.txt" in out
client.run('remote logout default')
client.run('upload lib* -c -r default', assert_error=True)
assert "ERROR: Conan interactive mode disabled. [Remote: default]" in client.out

0 comments on commit 9fb047f

Please sign in to comment.