Skip to content

Commit

Permalink
Introduce guidance pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
oxidase committed Nov 16, 2017
1 parent 8d17596 commit e874f65
Showing 1 changed file with 34 additions and 24 deletions.
58 changes: 34 additions & 24 deletions src/extractor/edge_based_graph_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -409,9 +409,6 @@ void EdgeBasedGraphFactory::GenerateEdgeExpandedEdges(
const ConditionalRestrictionMap &conditional_restriction_map,
const WayRestrictionMap &way_restriction_map)
{

util::Log() << "Generating edge-expanded edges ";

restricted_turns_counter = 0;
skipped_uturns_counter = 0;
skipped_barrier_turns_counter = 0;
Expand Down Expand Up @@ -465,7 +462,6 @@ void EdgeBasedGraphFactory::GenerateEdgeExpandedEdges(
util::UnbufferedLog log;

const NodeID node_count = m_node_based_graph.GetNumberOfNodes();
util::Percent progress(log, node_count);

// Because we write TurnIndexBlock data as we go, we'll
// buffer them into groups of 1000 to reduce the syscall
Expand Down Expand Up @@ -998,32 +994,23 @@ void EdgeBasedGraphFactory::GenerateEdgeExpandedEdges(
return buffer_ptr;
});

std::vector<EdgeWithData> delayed_data;
std::vector<TurnData> delayed_turn_data;

// This counter is used to keep track of how far along we've made it
std::uint64_t nodes_completed = 0;

// Last part of the pipeline puts all the calculated data into the serial buffers
tbb::filter_t<PipelineBufferPtr, void> output_stage(
std::uint64_t ebg_nodes_completed = 0;
util::Percent ebg_progress(log, node_count);
std::vector<EdgeWithData> delayed_data;
tbb::filter_t<PipelineBufferPtr, void> output_ebg_stage(
tbb::filter::serial_in_order, [&](auto buffer_ptr) {

auto &buffer = *buffer_ptr;

progress.PrintStatus(nodes_completed += buffer.nodes_range.size());
ebg_progress.PrintStatus(ebg_nodes_completed += buffer.nodes_range.size());

// Copy data from local buffers into global
// EBG data
std::for_each(
buffer.continuous_data.begin(), buffer.continuous_data.end(), transfer_data);
conditionals.insert(
conditionals.end(), buffer.conditionals.begin(), buffer.conditionals.end());
// Guidance data
std::for_each(buffer.continuous_turn_data.begin(),
buffer.continuous_turn_data.end(),
[&turn_data_container](const auto &turn_data) {
turn_data_container.push_back(turn_data);
});

// NOTE: potential overflow here if we hit 2^32 routable edges
BOOST_ASSERT(m_edge_based_edge_list.size() <= std::numeric_limits<NodeID>::max());
Expand All @@ -1039,6 +1026,27 @@ void EdgeBasedGraphFactory::GenerateEdgeExpandedEdges(
// Copy via-way restrictions delayed data
delayed_data.insert(
delayed_data.end(), buffer.delayed_data.begin(), buffer.delayed_data.end());
});

// Last part of the pipeline puts all the calculated data into the serial buffers
std::uint64_t guidance_nodes_completed = 0;
util::Percent guidance_progress(log, node_count);
std::vector<TurnData> delayed_turn_data;
tbb::filter_t<PipelineBufferPtr, void> output_guidance_stage(
tbb::filter::serial_in_order, [&](auto buffer_ptr) {

auto &buffer = *buffer_ptr;

guidance_progress.PrintStatus(guidance_nodes_completed +=
buffer.nodes_range.size());

// Copy data from local buffers into global
std::for_each(buffer.continuous_turn_data.begin(),
buffer.continuous_turn_data.end(),
[&turn_data_container](const auto &turn_data) {
turn_data_container.push_back(turn_data);
});

delayed_turn_data.insert(delayed_turn_data.end(),
buffer.delayed_turn_data.begin(),
buffer.delayed_turn_data.end());
Expand All @@ -1049,15 +1057,17 @@ void EdgeBasedGraphFactory::GenerateEdgeExpandedEdges(
// to be balanced with the GRAINSIZE above - ideally, the pipeline puts as much work
// as possible in the `intersection_handler` step so that those parallel workers don't
// get blocked too much by the slower (io-performing) `buffer_storage`
util::Log() << "Generating edge-expanded edges ";
current_node = 0;
tbb::parallel_pipeline(tbb::task_scheduler_init::default_num_threads() * 5,
generator_stage & shape_analysis_stage & processor_stage &
guidance_stage & output_stage);
output_ebg_stage);

// TODO: remove sorting below
// std::sort(delayed_data.begin(), delayed_data.end(), [](auto const &lhs, auto const &rhs)
// {
// return lhs.edge.source < rhs.edge.source;
// });
util::Log() << "Generating guidance turns ";
current_node = 0;
tbb::parallel_pipeline(tbb::task_scheduler_init::default_num_threads() * 5,
generator_stage & shape_analysis_stage & guidance_stage &
output_guidance_stage);

// NOTE: buffer.delayed_data and buffer.delayed_turn_data have the same index
std::for_each(delayed_data.begin(), delayed_data.end(), transfer_data);
Expand Down

0 comments on commit e874f65

Please sign in to comment.