Skip to content

Commit

Permalink
Last pass according to review
Browse files Browse the repository at this point in the history
  • Loading branch information
JohanMabille committed Oct 19, 2023
1 parent 9b3bdfc commit be1fea3
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 55 deletions.
9 changes: 2 additions & 7 deletions libmamba/include/mamba/core/package_fetcher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,15 @@ namespace mamba

void set_progress_callback(progress_callback_t cb);

void run();
void run(std::size_t downloaded_size);

Result get_result() const;
void clear_cache() const;
Result run();
Result run(std::size_t downloaded_size);

private:

progress_callback_t* get_progress_callback();

PackageFetcher* p_fetcher;
ExtractOptions m_options;
bool m_is_valid;
bool m_is_extracted;
std::optional<progress_callback_t> m_progress_callback = std::nullopt;
};

Expand Down
10 changes: 6 additions & 4 deletions libmamba/src/core/download_progress_bar.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,10 @@ namespace mamba
}
else
{
total_str = to_human_readable_filesize(static_cast<double>(r.progress_bar().total()), 1);
total_str = to_human_readable_filesize(
static_cast<double>(r.progress_bar().total()),
1
);
}
r.total.set_value(fmt::format("{:>7}", total_str));

Expand Down Expand Up @@ -257,6 +260,8 @@ namespace mamba
)
{
assert(extract_tasks.size() >= dl_requests.size());
auto& pbar_manager = Console::instance().init_progress_bar_manager(ProgressBarMode::aggregated
);
m_extract_bar.reserve(extract_tasks.size());
m_throttle_time.resize(dl_requests.size(), std::chrono::steady_clock::now());
m_download_bar.reserve(dl_requests.size());
Expand All @@ -281,9 +286,6 @@ namespace mamba
init_aggregated_download();
init_aggregated_extract();

auto& pbar_manager = static_cast<AggregatedBarManager&>(
Console::instance().progress_bar_manager()
);
pbar_manager.start();
pbar_manager.watch_print();

Expand Down
35 changes: 12 additions & 23 deletions libmamba/src/core/package_fetcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,35 +32,24 @@ namespace mamba
m_progress_callback = std::move(cb);
}

void PackageExtractTask::run()
auto PackageExtractTask::run() -> Result
{
m_is_valid = true;
m_is_extracted = p_fetcher->extract(m_options);
bool is_valid = true;
bool is_extracted = p_fetcher->extract(m_options);
return { is_valid, is_extracted };
}

void PackageExtractTask::run(std::size_t downloaded_size)
auto PackageExtractTask::run(std::size_t downloaded_size) -> Result
{
using ValidationResult = PackageFetcher::ValidationResult;
ValidationResult validation_res = p_fetcher->validate(downloaded_size, get_progress_callback());
m_is_valid = validation_res == ValidationResult::VALID;
if (!m_is_valid)
const bool is_valid = validation_res == ValidationResult::VALID;
bool is_extracted = false;
if (is_valid)
{
m_is_extracted = false;
is_extracted = p_fetcher->extract(m_options, get_progress_callback());
}
else
{
m_is_extracted = p_fetcher->extract(m_options, get_progress_callback());
}
}

auto PackageExtractTask::get_result() const -> Result
{
return { m_is_valid, m_is_extracted };
}

void PackageExtractTask::clear_cache() const
{
p_fetcher->clear_cache();
return { is_valid, is_extracted };
}

auto PackageExtractTask::get_progress_callback() -> progress_callback_t*
Expand Down Expand Up @@ -277,7 +266,7 @@ namespace mamba
LOG_DEBUG << "Decompressing '" << m_tarball_path.string() << "'";
try
{
fs::u8path extract_path = get_extract_path(filename(), m_cache_path);
const fs::u8path extract_path = get_extract_path(filename(), m_cache_path);
// Be sure the first writable cache doesn't contain invalid extracted package
clear_extract_path(extract_path);
extract_impl(m_tarball_path, extract_path, options);
Expand Down Expand Up @@ -308,7 +297,7 @@ namespace mamba
void PackageFetcher::clear_cache() const
{
fs::remove_all(m_tarball_path);
fs::u8path dest_dir = strip_package_extension(m_tarball_path.string());
const fs::u8path dest_dir = strip_package_extension(m_tarball_path.string());
fs::remove_all(dest_dir);
}

Expand Down
40 changes: 19 additions & 21 deletions libmamba/src/core/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -894,7 +894,7 @@ namespace mamba
return extract_tasks;
}

using ExtractTrackerList = std::vector<std::future<void>>;
using ExtractTrackerList = std::vector<std::future<PackageExtractTask::Result>>;

MultiDownloadRequest build_download_requests(
FetcherList& fetchers,
Expand All @@ -910,8 +910,8 @@ namespace mamba
++fit, ++eit)
{
auto ceit = eit; // Apple Clang cannot capture eit
auto task = std::make_shared<std::packaged_task<void(std::size_t)>>(
[ceit](std::size_t downloaded_size) { ceit->run(downloaded_size); }
auto task = std::make_shared<std::packaged_task<PackageExtractTask::Result(std::size_t)>>(
[ceit](std::size_t downloaded_size) { return ceit->run(downloaded_size); }
);
extract_trackers.push_back(task->get_future());
download_requests.push_back(fit->build_download_request(
Expand All @@ -927,17 +927,19 @@ namespace mamba
return download_requests;
}

void schedule_extractions(
void schedule_remaining_extractions(
ExtractTaskList& extract_tasks,
ExtractTrackerList& extract_trackers,
std::size_t download_size
)
{
// We schedule extractions for packages that don't need to be downloaded,
// because downloading a package already triggers its extraction.
for (auto it = extract_tasks.begin() + static_cast<std::ptrdiff_t>(download_size);
it != extract_tasks.end();
++it)
{
std::packaged_task task{ [=] { it->run(); } };
std::packaged_task task{ [=] { return it->run(); } };
extract_trackers.push_back(task.get_future());
MainExecutor::instance().schedule(std::move(task));
}
Expand All @@ -951,24 +953,25 @@ namespace mamba
)
{
auto result = download(std::move(requests), context, options, monitor);
bool all_downloaded = std::accumulate(
bool all_downloaded = std::all_of(
result.begin(),
result.end(),
true,
[](bool acc, const auto& r) { return acc && r; }
[](const auto& r) { return r; }
);
return all_downloaded;
}

bool clear_invalid_caches(const ExtractTaskList& extract_tasks)
bool clear_invalid_caches(const FetcherList& fetchers, ExtractTrackerList& trackers)
{
bool all_valid = true;
for (auto eit = extract_tasks.begin(); eit != extract_tasks.end(); ++eit)
for (auto [fit, eit] = std::tuple{ fetchers.begin(), trackers.begin() };
eit != trackers.end();
++fit, ++eit)
{
PackageExtractTask::Result res = eit->get_result();
PackageExtractTask::Result res = eit->get();
if (!res.valid || !res.extracted)
{
eit->clear_cache();
fit->clear_cache();
all_valid = false;
}
}
Expand All @@ -978,12 +981,7 @@ namespace mamba

bool MTransaction::fetch_extract_packages()
{
// TODO: move this to the PackageDownloadMonitor
auto& pbar_manager = Console::instance().init_progress_bar_manager(ProgressBarMode::aggregated
);

auto& channel_context = m_pool.channel_context();
auto& ctx = channel_context.context();
auto& ctx = m_pool.channel_context().context();
PackageFetcherSemaphore::set_max(ctx.threads_params.extract_threads);

FetcherList fetchers = build_fetchers(m_pool, m_solution, m_multi_cache);
Expand Down Expand Up @@ -1026,7 +1024,7 @@ namespace mamba
monitor->observe(download_requests, extract_tasks, download_options);
}

schedule_extractions(extract_tasks, extract_trackers, download_size);
schedule_remaining_extractions(extract_tasks, extract_trackers, download_size);
bool all_downloaded = trigger_download(
std::move(download_requests),
ctx,
Expand All @@ -1045,8 +1043,8 @@ namespace mamba
task.wait();
}

const bool all_valid = clear_invalid_caches(extract_tasks);
// TODO: see if we can remove this in the caller
const bool all_valid = clear_invalid_caches(fetchers, extract_trackers);
// TODO: see if we can move this into the caller
if (!all_valid)
{
throw std::runtime_error(std::string("Found incorrect downloads. Aborting"));
Expand Down

0 comments on commit be1fea3

Please sign in to comment.