From a3605b84485f8989ebc8a0c1fc0843c3a5751746 Mon Sep 17 00:00:00 2001 From: suncanghuai Date: Wed, 16 Nov 2022 16:29:12 +0800 Subject: [PATCH] introduce WeightBalancedLibra algorithm(vertex-cut graph partition) * implement WeightBalancedLibra algorithm based on paper pseudocode * add weight-related apis for class CoordinatedPartitionState * adjust set operations of class CoordinatedRecord * append a testcase in PartitionTest.cpp for WB-Libra --- include/Graph/Graph.hpp | 4 +- .../CoordinatedPartitionState.hpp | 80 ++++++- include/Partitioning/CoordinatedRecord.hpp | 36 ++- include/Partitioning/PartitionAlgorithm.hpp | 3 +- include/Partitioning/PartitionState.hpp | 6 +- include/Partitioning/Partitioner.hpp | 52 ++++- include/Partitioning/Utility/Globals.hpp | 7 +- include/Partitioning/WeightBalancedLibra.hpp | 205 ++++++++++++++++++ include/Utility/ConstValue.hpp | 1 + test/PartitionTest.cpp | 22 ++ 10 files changed, 400 insertions(+), 16 deletions(-) create mode 100644 include/Partitioning/WeightBalancedLibra.hpp diff --git a/include/Graph/Graph.hpp b/include/Graph/Graph.hpp index d3870eb3b..13dc61260 100644 --- a/include/Graph/Graph.hpp +++ b/include/Graph/Graph.hpp @@ -2486,10 +2486,10 @@ namespace CXXGRAPH } template - PartitionMap Graph::partitionGraph(PARTITIONING::PartitionAlgorithm algorithm, unsigned int numberOfPartitions, double param1, double param2, double param3, unsigned int numberOfthreads) const + PartitionMap Graph::partitionGraph(PARTITIONING::PartitionAlgorithm algorithm, unsigned int numberOfPartitions, double param1, double param2, double param3, unsigned int numberOfThreads) const { PartitionMap partitionMap; - PARTITIONING::Globals globals(numberOfPartitions, algorithm, param1, param2, param3, numberOfthreads); + PARTITIONING::Globals globals(numberOfPartitions, algorithm, param1, param2, param3, numberOfThreads); const T_EdgeSet & edgeSet = getEdgeSet(); globals.edgeCardinality = edgeSet.size(); globals.vertexCardinality = this->getNodeSet().size(); diff --git a/include/Partitioning/CoordinatedPartitionState.hpp b/include/Partitioning/CoordinatedPartitionState.hpp index 562f19496..9f9b1471f 100644 --- a/include/Partitioning/CoordinatedPartitionState.hpp +++ b/include/Partitioning/CoordinatedPartitionState.hpp @@ -42,12 +42,14 @@ namespace CXXGRAPH private: std::map>> record_map; std::vector machines_load_edges; + std::vector machines_weight_edges; std::vector machines_load_vertices; PartitionMap partition_map; Globals GLOBALS; int MAX_LOAD; std::shared_ptr machines_load_edges_mutex = nullptr; std::shared_ptr machines_load_vertices_mutex = nullptr; + std::shared_ptr machines_weight_edges_mutex = nullptr; std::shared_ptr record_map_mutex = nullptr; //DatWriter out; //to print the final partition of each edge public: @@ -56,10 +58,14 @@ namespace CXXGRAPH std::shared_ptr> getRecord(int x); int getMachineLoad(int m); + int getMachineWeight(int m); int getMachineLoadVertices(int m); void incrementMachineLoad(int m, const Edge *e); + void incrementMachineWeight(int m, const Edge *e); int getMinLoad(); int getMaxLoad(); + int getMachineWithMinWeight(); + int getMachineWithMinWeight(const std::set &partitions); std::vector getMachines_load(); int getTotalReplicas(); int getNumVertices(); @@ -70,12 +76,22 @@ namespace CXXGRAPH const PartitionMap &getPartitionMap(); }; template - CoordinatedPartitionState::CoordinatedPartitionState(Globals &G) : record_map(), GLOBALS(G), machines_load_edges_mutex(std::make_shared()), machines_load_vertices_mutex(std::make_shared()), record_map_mutex(std::make_shared()) + CoordinatedPartitionState::CoordinatedPartitionState(Globals &G) + : record_map(), + GLOBALS(G), + machines_load_edges_mutex(std::make_shared()), + machines_load_vertices_mutex(std::make_shared()), + machines_weight_edges_mutex(std::make_shared()), + record_map_mutex(std::make_shared()) { + machines_load_edges.reserve(GLOBALS.numberOfPartition); + machines_load_vertices.reserve(GLOBALS.numberOfPartition); + machines_weight_edges.reserve(GLOBALS.numberOfPartition); for (int i = 0; i < GLOBALS.numberOfPartition; ++i) { machines_load_edges.push_back(0); machines_load_vertices.push_back(0); + machines_weight_edges.push_back(0); partition_map[i] = std::make_shared>(i); } MAX_LOAD = 0; @@ -102,6 +118,13 @@ namespace CXXGRAPH return machines_load_edges.at(m); } + template + int CoordinatedPartitionState::getMachineWeight(int m) + { + std::lock_guard lock(*machines_weight_edges_mutex); + return machines_weight_edges.at(m); + } + template int CoordinatedPartitionState::getMachineLoadVertices(int m) { @@ -121,6 +144,23 @@ namespace CXXGRAPH partition_map[m]->addEdge(e); } template + void CoordinatedPartitionState::incrementMachineWeight(int m, const Edge *e) + { + std::lock_guard lock(*machines_weight_edges_mutex); + double edge_weight = CXXGRAPH::NEGLIGIBLE_WEIGHT; + if (e->isWeighted().has_value() && e->isWeighted().value()) + { + edge_weight = (dynamic_cast(e))->getWeight(); + } + machines_weight_edges[m] = machines_weight_edges[m] + edge_weight; + //double new_value = machines_weight_edges[m]; + //if (new_value > MAX_LOAD) + //{ + // MAX_LOAD = new_value; + //} + partition_map[m]->addEdge(e); + } + template int CoordinatedPartitionState::getMinLoad() { std::lock_guard lock(*machines_load_edges_mutex); @@ -141,6 +181,44 @@ namespace CXXGRAPH return MAX_LOAD; } template + int CoordinatedPartitionState::getMachineWithMinWeight() + { + std::lock_guard lock(*machines_weight_edges_mutex); + + double MIN_LOAD = std::numeric_limits::max(); + int machine_id = 0; + for (int i = 0; i < machines_weight_edges.size(); ++i) + { + double loadi = machines_weight_edges[i]; + if (loadi < MIN_LOAD) + { + MIN_LOAD = loadi; + machine_id = i; + } + } + + return machine_id; + } + template + int CoordinatedPartitionState::getMachineWithMinWeight(const std::set &partitions) + { + std::lock_guard lock(*machines_weight_edges_mutex); + + double MIN_LOAD = std::numeric_limits::max(); + int machine_id = 0; + for (const auto &partition_id : partitions) + { + double loadi = machines_weight_edges.at(partition_id); + if (loadi < MIN_LOAD) + { + MIN_LOAD = loadi; + machine_id = partition_id; + } + } + + return machine_id; + } + template std::vector CoordinatedPartitionState::getMachines_load() { std::lock_guard lock(*machines_load_edges_mutex); diff --git a/include/Partitioning/CoordinatedRecord.hpp b/include/Partitioning/CoordinatedRecord.hpp index 5798241e9..fe0ff1e7d 100644 --- a/include/Partitioning/CoordinatedRecord.hpp +++ b/include/Partitioning/CoordinatedRecord.hpp @@ -52,9 +52,35 @@ namespace CXXGRAPH void incrementDegree(); void addAll(std::set &set); - std::set intersection(CoordinatedRecord &x, CoordinatedRecord &y); + std::set partition_intersection(std::shared_ptr other); + std::set partition_union(std::shared_ptr other); + std::set partition_difference(std::shared_ptr other); }; template + std::set CoordinatedRecord::partition_intersection(std::shared_ptr other) + { + std::set result; + set_intersection(this->partitions.begin(), this->partitions.end(), other->partitions.begin(), other->partitions.end(), + std::inserter(result, result.begin())); + return result; + } + template + std::set CoordinatedRecord::partition_union(std::shared_ptr other) + { + std::set result; + set_union(this->partitions.begin(), this->partitions.end(), other->partitions.begin(), other->partitions.end(), + std::inserter(result, result.begin())); + return result; + } + template + std::set CoordinatedRecord::partition_difference(std::shared_ptr other) + { + std::set result; + set_difference(this->partitions.begin(), this->partitions.end(), other->partitions.begin(), other->partitions.end(), + std::inserter(result, result.begin())); + return result; + } + template CoordinatedRecord::CoordinatedRecord() : partitions() { lock = new std::mutex(); @@ -120,14 +146,6 @@ namespace CXXGRAPH { partitions.insert(set.begin(), set.end()); } - template - std::set CoordinatedRecord::intersection(CoordinatedRecord &x, CoordinatedRecord &y) - { - std::set result; - set_intersection(x.partitions.begin(), x.partitions.end(), y.partitions.begin(), y.partitions.end(), - std::inserter(result, result.begin())); - return result; - } } } diff --git a/include/Partitioning/PartitionAlgorithm.hpp b/include/Partitioning/PartitionAlgorithm.hpp index 4e20c2daf..99e6085c7 100644 --- a/include/Partitioning/PartitionAlgorithm.hpp +++ b/include/Partitioning/PartitionAlgorithm.hpp @@ -33,7 +33,8 @@ namespace CXXGRAPH GREEDY_VC_ALG, ///< A Greedy Algorithm HDRF_ALG, ///< High-Degree (are) Replicated First (HDRF) Algorithm (Stream-Based Vertex-Cut Partitioning) EBV_ALG, ///< Edge-Balanced Vertex-Cut Offline Algorithm (EBV) - ALG_2 + ALG_2, + WB_LIBRA, ///< Weighted Balanced Libra }; typedef E_PartitionAlgorithm PartitionAlgorithm; } diff --git a/include/Partitioning/PartitionState.hpp b/include/Partitioning/PartitionState.hpp index 07edffe2f..3e4557e7d 100644 --- a/include/Partitioning/PartitionState.hpp +++ b/include/Partitioning/PartitionState.hpp @@ -34,10 +34,14 @@ namespace CXXGRAPH public: virtual std::shared_ptr> getRecord(int x) = 0; virtual int getMachineLoad(int m) = 0; + virtual int getMachineWeight(int m) = 0; virtual int getMachineLoadVertices(int m) = 0; - virtual void incrementMachineLoad(int m,const Edge* e) = 0; + virtual void incrementMachineLoad(int m, const Edge* e) = 0; + virtual void incrementMachineWeight(int m, const Edge* e) = 0; virtual int getMinLoad() = 0; virtual int getMaxLoad() = 0; + virtual int getMachineWithMinWeight() = 0; + virtual int getMachineWithMinWeight(const std::set &partitions) = 0; virtual std::vector getMachines_load() = 0; virtual int getTotalReplicas() = 0; virtual int getNumVertices() = 0; diff --git a/include/Partitioning/Partitioner.hpp b/include/Partitioning/Partitioner.hpp index c7a629fc1..e585f5e5a 100644 --- a/include/Partitioning/Partitioner.hpp +++ b/include/Partitioning/Partitioner.hpp @@ -33,6 +33,7 @@ #include "EdgeBalancedVertexCut.hpp" #include "GreedyVertexCut.hpp" #include "EBV.hpp" +#include "WeightBalancedLibra.hpp" namespace CXXGRAPH { @@ -73,8 +74,32 @@ namespace CXXGRAPH } else if (GLOBALS.partitionStategy == PartitionAlgorithm::EBV_ALG) { algorithm = new EBV(GLOBALS); - } + } else if (GLOBALS.partitionStategy == PartitionAlgorithm::WB_LIBRA) + { + // precompute weight sum + double weight_sum = 0.0; + for (const auto &edge_it : *(this->dataset)) + { + weight_sum += (edge_it->isWeighted().has_value() && edge_it->isWeighted().value()) ? dynamic_cast(edge_it)->getWeight() : CXXGRAPH::NEGLIGIBLE_WEIGHT; + } + double lambda = std::max(1.0, GLOBALS.param1); + double P = static_cast(GLOBALS.numberOfPartition); + // avoid divide by zero when some parameters are invalid + double weight_sum_bound = (GLOBALS.numberOfPartition == 0) ? 0.0 : lambda * weight_sum / P; + // precompute degrees of vertices + std::unordered_map vertices_degrees; + for (const auto &edge_it : *(this->dataset)) + { + auto nodePair = edge_it->getNodePair(); + std::size_t u = nodePair.first->getId(); + std::size_t v = nodePair.second->getId(); + vertices_degrees[u]++; + vertices_degrees[v]++; + } + + algorithm = new WeightBalancedLibra(GLOBALS, weight_sum_bound, move(vertices_degrees)); + } } template @@ -93,6 +118,31 @@ namespace CXXGRAPH } else if (GLOBALS.partitionStategy == PartitionAlgorithm::EBV_ALG) { algorithm = new EBV(GLOBALS); + } else if (GLOBALS.partitionStategy == PartitionAlgorithm::WB_LIBRA) + { + // precompute weight sum + double weight_sum = 0.0; + for (const auto &edge_it : *(this->dataset)) + { + weight_sum += (edge_it->isWeighted().has_value() && edge_it->isWeighted().value()) ? dynamic_cast(edge_it)->getWeight() : CXXGRAPH::NEGLIGIBLE_WEIGHT; + } + double lambda = GLOBALS.param1; + double P = static_cast(GLOBALS.numberOfPartition); + // avoid divide by zero when some parameters are invalid + double weight_sum_bound = (GLOBALS.numberOfPartition == 0) ? 0.0 : lambda * weight_sum / P; + + // precompute degrees of vertices + std::unordered_map vertices_degrees; + for (const auto &edge_it : *(this->dataset)) + { + auto nodePair = edge_it->getNodePair(); + std::size_t u = nodePair.first->getId(); + std::size_t v = nodePair.second->getId(); + vertices_degrees[u]++; + vertices_degrees[v]++; + } + + algorithm = new WeightBalancedLibra(GLOBALS, weight_sum_bound, move(vertices_degrees)); } } diff --git a/include/Partitioning/Utility/Globals.hpp b/include/Partitioning/Utility/Globals.hpp index a13a5cea0..9929da692 100644 --- a/include/Partitioning/Utility/Globals.hpp +++ b/include/Partitioning/Utility/Globals.hpp @@ -54,13 +54,18 @@ namespace CXXGRAPH { }; inline Globals::Globals(int numberOfPartiton, PartitionAlgorithm algorithm,double param1, double param2, double param3, unsigned int threads) - { + { this->numberOfPartition = numberOfPartiton; this->partitionStategy = algorithm; this->threads = threads; this->param1 = param1; this->param2 = param2; this->param3 = param3; + if (this->numberOfPartition <= 0) + { + std::cout << "ERROR: numberOfPartition " << numberOfPartition << std::endl; + exit(-1); + } } inline Globals::~Globals() diff --git a/include/Partitioning/WeightBalancedLibra.hpp b/include/Partitioning/WeightBalancedLibra.hpp new file mode 100644 index 000000000..b38f715d0 --- /dev/null +++ b/include/Partitioning/WeightBalancedLibra.hpp @@ -0,0 +1,205 @@ +/***********************************************************/ +/*** ______ ____ ______ _ ***/ +/*** / ___\ \/ /\ \/ / ___|_ __ __ _ _ __ | |__ ***/ +/*** | | \ / \ / | _| '__/ _` | '_ \| '_ \ ***/ +/*** | |___ / \ / \ |_| | | | (_| | |_) | | | | ***/ +/*** \____/_/\_\/_/\_\____|_| \__,_| .__/|_| |_| ***/ +/*** |_| ***/ +/***********************************************************/ +/*** Header-Only C++ Library for Graph ***/ +/*** Representation and Algorithms ***/ +/***********************************************************/ +/*** Author: ZigRazor ***/ +/*** E-Mail: zigrazor@gmail.com ***/ +/***********************************************************/ +/*** Collaboration: ----------- ***/ +/***********************************************************/ +/*** License: AGPL v3.0 ***/ +/***********************************************************/ + +#ifndef __CXXGRAPH_PARTITIONING_WEIGHTBALANCEDLIBRA_H__ +#define __CXXGRAPH_PARTITIONING_WEIGHTBALANCEDLIBRA_H__ + +#pragma once + +#include "Partitioning/Utility/Globals.hpp" +#include "Edge/Edge.hpp" +#include "PartitionStrategy.hpp" +#include + +namespace CXXGRAPH +{ + namespace PARTITIONING + { + /** + * @brief A Vertex Cut Partioning Algorithm ( as described by this paper https://arxiv.org/pdf/2010.04414.pdf ) + * @details This algorithm is a greedy algorithm that partitions the graph into n sets of vertices ( as described by this paper https://arxiv.org/pdf/2010.04414.pdf ). + */ + template + class WeightBalancedLibra : public PartitionStrategy + { + private: + Globals GLOBALS; + double weight_sum_bound; + std::unordered_map vertices_degrees; + + public: + explicit WeightBalancedLibra(Globals &G, double _weight_sum_bound, std::unordered_map &&_vertices_degrees); + ~WeightBalancedLibra(); + + void performStep(const Edge &e, PartitionState &Sstate); + }; + template + WeightBalancedLibra::WeightBalancedLibra(Globals &G, double _weight_sum_bound, std::unordered_map &&_vertices_degrees) + : GLOBALS(G), + weight_sum_bound(_weight_sum_bound), + vertices_degrees(_vertices_degrees) + { + } + template + WeightBalancedLibra::~WeightBalancedLibra() + { + } + template + void WeightBalancedLibra::performStep(const Edge &e, PartitionState &state) + { + int P = GLOBALS.numberOfPartition; + auto nodePair = e.getNodePair(); + size_t u = nodePair.first->getId(); + size_t v = nodePair.second->getId(); + std::shared_ptr> u_record = state.getRecord(u); + std::shared_ptr> v_record = state.getRecord(v); + + //*** ASK FOR LOCK + bool locks_taken = false; + while (!locks_taken) + { + srand((unsigned)time(NULL)); + int usleep_time = 2; + while (!u_record->getLock()) + { + std::this_thread::sleep_for(std::chrono::microseconds(usleep_time)); + usleep_time = (int)pow(usleep_time, 2); + } + usleep_time = 2; + if (u != v) + { + while ((locks_taken = v_record->getLock()) == false) + { + std::this_thread::sleep_for(std::chrono::microseconds(usleep_time)); + usleep_time = (int)pow(usleep_time, 2); + + if (usleep_time > GLOBALS.SLEEP_LIMIT) + { + u_record->releaseLock(); + //performStep(e, state); + break; + } //TO AVOID DEADLOCK + } + } + else + { + locks_taken = true; + } + } + //*** LOCK TAKEN + int machine_id = -1; + std::set &u_partition = u_record->getPartitions(); + std::set &v_partition = v_record->getPartitions(); + + //Case 1: no edges of two nodes have been assigned + if (u_partition.empty() && v_partition.empty()) + { + machine_id = state.getMachineWithMinWeight(); + } + //Case 2: one or more edges of node v have been assigned but that of node u haven't + else if (u_partition.empty() && !v_partition.empty()) + { + machine_id = state.getMachineWithMinWeight(v_partition); + if (state.getMachineWeight(machine_id) >= weight_sum_bound) + { + machine_id = state.getMachineWithMinWeight(); + } + } + //Case 3: one or more edges of node u have been assigned but that of node v haven't + else if (!u_partition.empty() && v_partition.empty()) + { + machine_id = state.getMachineWithMinWeight(u_partition); + if (state.getMachineWeight(machine_id) >= weight_sum_bound) + { + machine_id = state.getMachineWithMinWeight(); + } + } + //Case 4: one or more edges of both nodes have been assigned + else + { + std::shared_ptr> u_coord_record = std::dynamic_pointer_cast>(u_record); + std::shared_ptr> v_coord_record = std::dynamic_pointer_cast>(v_record); + const auto &uv_intersection = u_coord_record->partition_intersection(v_coord_record); + + // Case 4.1: no common partitions for both nodes + if (uv_intersection.empty()) + { + size_t u_degree = vertices_degrees[u]; + size_t v_degree = vertices_degrees[v]; + + // according to paper, s refers to node with lower degree, t = {u, v} - {s} + size_t s_node = (u_degree > v_degree) ? v : u; + size_t t_node = (u_degree > v_degree) ? u : v; + std::set &s_partition = (u_degree > v_degree) ? v_partition : u_partition; + std::set &t_partition = (u_degree > v_degree) ? u_partition : v_partition; + + machine_id = state.getMachineWithMinWeight(s_partition); + if (state.getMachineWeight(machine_id) >= weight_sum_bound) + { + machine_id = state.getMachineWithMinWeight(t_partition); + if (state.getMachineWeight(machine_id) >= weight_sum_bound) + { + machine_id = state.getMachineWithMinWeight(); + } + } + } + // Case 4.2: there are some common partitions for both nodes + else + { + machine_id = state.getMachineWithMinWeight(uv_intersection); + if (state.getMachineWeight(machine_id) >= weight_sum_bound) + { + const auto &uv_union = u_coord_record->partition_union(v_coord_record); + machine_id = state.getMachineWithMinWeight(uv_union); + if (machine_id >= weight_sum_bound) + { + machine_id = state.getMachineWithMinWeight(); + } + } + } + } + + if (machine_id < 0 || machine_id >= P) + { + std::cout << "ERROR: wrong partition id" << std::endl; + exit(-1); + } + + //1-UPDATE RECORDS + if (!u_record->hasReplicaInPartition(machine_id)) + { + u_record->addPartition(machine_id); + } + if (!v_record->hasReplicaInPartition(machine_id)) + { + v_record->addPartition(machine_id); + } + + //2-UPDATE EDGES + state.incrementMachineWeight(machine_id, &e); + + //*** RELEASE LOCK + u_record->releaseLock(); + v_record->releaseLock(); + return; + } + } +} + +#endif // __CXXGRAPH_PARTITIONING_WEIGHTBALANCEDLIBRA_H__ \ No newline at end of file diff --git a/include/Utility/ConstValue.hpp b/include/Utility/ConstValue.hpp index dca5fdbcd..121004c8a 100644 --- a/include/Utility/ConstValue.hpp +++ b/include/Utility/ConstValue.hpp @@ -27,6 +27,7 @@ namespace CXXGRAPH { constexpr double INF_DOUBLE = std::numeric_limits::max(); + constexpr double NEGLIGIBLE_WEIGHT = 1e-7; } #endif // __CXXGRAPH_CONSTVALUE_H__ \ No newline at end of file diff --git a/test/PartitionTest.cpp b/test/PartitionTest.cpp index 20679ed97..6f4520f50 100644 --- a/test/PartitionTest.cpp +++ b/test/PartitionTest.cpp @@ -303,4 +303,26 @@ TEST(PartitionTest, test_8) //std::cout << *partitionMap.at(i) << std::endl; ASSERT_EQ(partitionMap.at(i)->getPartitionId(), i); } +} + +TEST(PartitionTest, test_9) +{ + CXXGRAPH::Graph graph; + for (auto e : edges) + { + graph.addEdge(&(*e.second)); + } + auto partitionMap = graph.partitionGraph(CXXGRAPH::PARTITIONING::PartitionAlgorithm::WB_LIBRA, 4, 1.0, 0.0, 0.0, 4); + unsigned int totalEdgeInPartition = 0; + for (const auto &elem : partitionMap) + { + totalEdgeInPartition += elem.second->getEdgeSet().size(); + //std::cout << elem.second->getEdgeSet().size() << std::endl; + } + //std::cout << "Total Edge in Partition: " << totalEdgeInPartition << std::endl; + ASSERT_EQ(totalEdgeInPartition, 10000); + for (int i = 0; i < 4; ++i) + { + ASSERT_EQ(partitionMap.at(i)->getPartitionId(), i); + } } \ No newline at end of file