From 062e91993c6bc24ed32f476dcbac7ac4140db616 Mon Sep 17 00:00:00 2001 From: Eugene-Mark Date: Mon, 2 Aug 2021 11:59:50 +0800 Subject: [PATCH 1/2] [PMEM-SHUFFLE-39] Fix the bug that pmem-shuffle without RPMP fails to pass Terasort benchmark --- .../org/apache/spark/shuffle/pmof/BaseShuffleReader.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/shuffle/pmof/BaseShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/pmof/BaseShuffleReader.scala index 8399abbf..5e367bfa 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/pmof/BaseShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/pmof/BaseShuffleReader.scala @@ -87,6 +87,11 @@ private[spark] class BaseShuffleReader[K, C](handle: BaseShuffleHandle[K, _, C], fetchContinuousBlocksInBatch ).toCompletionIterator + /** + * Force iterator to traverse itself and update internal counter + **/ + wrappedStreams.size + val serializerInstance = dep.serializer.newInstance() // Create a key/value iterator for each stream From c0b224b26277c86e84e52e7b2f2b72fe665c4753 Mon Sep 17 00:00:00 2001 From: Eugene-Mark Date: Mon, 2 Aug 2021 16:36:27 +0800 Subject: [PATCH 2/2] [PMEM-SHUFFLE-41] Remove RocksDB from RPMP's metastore --- .gitignore | 1 - rpmp/CMakeLists.txt | 10 ++- rpmp/pmpool/CMakeLists.txt | 2 +- .../proxy/metastore/ConnectionFacade.cc | 63 ++----------------- .../pmpool/proxy/metastore/ConnectionFacade.h | 12 +--- .../pmpool/proxy/metastore/MetastoreFacade.cc | 8 +-- rpmp/pmpool/proxy/metastore/MetastoreFacade.h | 3 +- rpmp/pmpool/proxy/metastore/rocksdb/Rocks.cc | 35 ----------- rpmp/pmpool/proxy/metastore/rocksdb/Rocks.h | 28 --------- .../metastore/rocksdb/RocksConnection.cc | 60 ------------------ .../proxy/metastore/rocksdb/RocksConnection.h | 27 -------- rpmp/test/rocksdb_test.cc | 44 ------------- 12 files changed, 14 insertions(+), 279 deletions(-) delete mode 100644 rpmp/pmpool/proxy/metastore/rocksdb/Rocks.cc delete mode 100644 rpmp/pmpool/proxy/metastore/rocksdb/Rocks.h delete mode 100644 rpmp/pmpool/proxy/metastore/rocksdb/RocksConnection.cc delete mode 100644 rpmp/pmpool/proxy/metastore/rocksdb/RocksConnection.h delete mode 100644 rpmp/test/rocksdb_test.cc diff --git a/.gitignore b/.gitignore index d9de765f..58c6c9d4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,6 @@ **/target/** *.class *.iml -*.sh # logs *.log diff --git a/rpmp/CMakeLists.txt b/rpmp/CMakeLists.txt index 6dbd049c..1784b913 100644 --- a/rpmp/CMakeLists.txt +++ b/rpmp/CMakeLists.txt @@ -48,13 +48,11 @@ file(COPY bin/stop-rpmp.sh DESTINATION ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}) add_executable(data-server main.cc) add_executable(chashtest test/chash_test.cc) -add_executable(rocksdbtest test/rocksdb_test.cc) add_executable(proxy-server ProxyMain.cc) add_executable(client Client.cc) -target_link_libraries(chashtest pmpool spdlog hiredis jsoncpp rocksdb redis++) -target_link_libraries(data-server pmpool spdlog hiredis jsoncpp rocksdb redis++) -target_link_libraries(proxy-server pmpool spdlog hiredis jsoncpp rocksdb redis++) -target_link_libraries(client pmpool spdlog hiredis jsoncpp rocksdb redis++) -target_link_libraries(rocksdbtest spdlog hiredis jsoncpp rocksdb redis++) +target_link_libraries(chashtest pmpool spdlog hiredis jsoncpp redis++) +target_link_libraries(data-server pmpool spdlog hiredis jsoncpp redis++) +target_link_libraries(proxy-server pmpool spdlog hiredis jsoncpp redis++) +target_link_libraries(client pmpool spdlog hiredis jsoncpp redis++) file(COPY ${PROJECT_SOURCE_DIR}/config DESTINATION .) diff --git a/rpmp/pmpool/CMakeLists.txt b/rpmp/pmpool/CMakeLists.txt index ce8c8194..23cd94ec 100644 --- a/rpmp/pmpool/CMakeLists.txt +++ b/rpmp/pmpool/CMakeLists.txt @@ -2,7 +2,7 @@ add_library(pmpool_client_jni SHARED Event.cc ProxyEvent.cc client/PmPoolClient. target_link_libraries(pmpool_client_jni LINK_PUBLIC ${Boost_LIBRARIES} hpnl) set_target_properties(pmpool_client_jni PROPERTIES LIBRARY_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/lib") -add_library(pmpool SHARED DataServer.cc Protocol.cc Event.cc ProxyEvent.cc NetworkServer.cc proxy/metastore/MetastoreFacade.cc proxy/metastore/ConnectionFacade.cc proxy/metastore/redis/Redis.cc proxy/metastore/rocksdb/Rocks.cc proxy/metastore/rocksdb/RocksConnection.cc HeartbeatClient.cc HeartbeatEvent.cc hash/xxhash.cc client/PmPoolClient.cc client/NetworkClient.cc client/ProxyClient.cc proxy/clientService/ClientService.cc Proxy.cc proxy/replicaService/ReplicaService.cc DataService/DataServerService.cc proxy/NodeManager.cc proxy/tracker/Tracker.cc) +add_library(pmpool SHARED DataServer.cc Protocol.cc Event.cc ProxyEvent.cc NetworkServer.cc proxy/metastore/MetastoreFacade.cc proxy/metastore/ConnectionFacade.cc proxy/metastore/redis/Redis.cc HeartbeatClient.cc HeartbeatEvent.cc hash/xxhash.cc client/PmPoolClient.cc client/NetworkClient.cc client/ProxyClient.cc proxy/clientService/ClientService.cc Proxy.cc proxy/replicaService/ReplicaService.cc DataService/DataServerService.cc proxy/NodeManager.cc proxy/tracker/Tracker.cc) target_link_libraries(pmpool LINK_PUBLIC ${Boost_LIBRARIES} hpnl pmemobj) set_target_properties(pmpool PROPERTIES LIBRARY_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/lib") diff --git a/rpmp/pmpool/proxy/metastore/ConnectionFacade.cc b/rpmp/pmpool/proxy/metastore/ConnectionFacade.cc index 537ddeaf..503cad9b 100644 --- a/rpmp/pmpool/proxy/metastore/ConnectionFacade.cc +++ b/rpmp/pmpool/proxy/metastore/ConnectionFacade.cc @@ -4,14 +4,9 @@ #include #include -#include "rocksdb/db.h" -#include "rocksdb/slice.h" -#include "rocksdb/options.h" - #include "redis/Redis.h" using namespace std; -using namespace ROCKSDB_NAMESPACE; ConnectionFacade::ConnectionFacade(std::shared_ptr config, std::shared_ptr log,string type){ config_ = config; @@ -19,72 +14,24 @@ ConnectionFacade::ConnectionFacade(std::shared_ptr config, std::shared_p type_ = type; } -//RocksDB -int ConnectionFacade::connect(string DBPath){ - Options options; - // Optimize RocksDB. This is the easiest way to get RocksDB to perform well - options.IncreaseParallelism(); - options.OptimizeLevelStyleCompaction(); - // create the DB if it's not already present - options.create_if_missing = true; - - // open DB - Status s = DB::Open(options, DBPath, &db_); - - if (s.ok() == true){ - setConnected(true); - return 0; - } - setConnected(false); - return -1; -} - string ConnectionFacade::put(string key, string value){ - if(type_ == ROCKS){ - Status s = db_->Put(WriteOptions(), key, value); - return s.ToString(); - }else{ - return redis_->set(key, value); - } + return redis_->set(key, value); } string ConnectionFacade::get(string key){ - if(type_ == ROCKS){ - string value; - Status s = db_->Get(ReadOptions(), key, &value); - return value; - }else{ - return redis_->get(key); - } + return redis_->get(key); } int ConnectionFacade::exists(string key){ - if(type_ == ROCKS){ - string value; - Status s = db_->Get(ReadOptions(), key, &value); - if(s.ok()){ - return 1; - } - return 0; - }else{ - return redis_->exists(key); - } + return redis_->exists(key); } std::unordered_set ConnectionFacade::scanAll(){ - if(type_ == ROCKS){ - //Do nothing - }else{ - return redis_->scanAll(); - } + return redis_->scanAll(); } std::unordered_set ConnectionFacade::scan(std::string pattern){ - if(type_ == ROCKS){ - //Do nothing - }else{ - return redis_->scan(pattern); - } + return redis_->scan(pattern); } //Redis diff --git a/rpmp/pmpool/proxy/metastore/ConnectionFacade.h b/rpmp/pmpool/proxy/metastore/ConnectionFacade.h index 7da576f8..9d68f3b0 100644 --- a/rpmp/pmpool/proxy/metastore/ConnectionFacade.h +++ b/rpmp/pmpool/proxy/metastore/ConnectionFacade.h @@ -4,25 +4,20 @@ #include #include -#include "rocksdb/db.h" -#include "rocksdb/slice.h" -#include "rocksdb/options.h" #include "redis/Redis.h" #include "pmpool/Config.h" #include "pmpool/RLog.h" using namespace std; -using namespace ROCKSDB_NAMESPACE; /** - * Facade for connection to Redis and RocksDB + * Facade for connection to Redis * **/ class ConnectionFacade: public std::enable_shared_from_this{ public: - // RocksDB - ConnectionFacade(std::shared_ptr config, std::shared_ptr log, string type); + ConnectionFacade(std::shared_ptr config, std::shared_ptr log,string type); // Redis int connect(); // Common @@ -40,10 +35,7 @@ class ConnectionFacade: public std::enable_shared_from_this{ bool connected_; int setConnected(bool connected); string type_; - string ROCKS = "ROCKS"; string REDIS = "REDIS"; - // RocksDB - DB *db_; // Redis shared_ptr redis_; }; diff --git a/rpmp/pmpool/proxy/metastore/MetastoreFacade.cc b/rpmp/pmpool/proxy/metastore/MetastoreFacade.cc index 01884835..354cc548 100644 --- a/rpmp/pmpool/proxy/metastore/MetastoreFacade.cc +++ b/rpmp/pmpool/proxy/metastore/MetastoreFacade.cc @@ -2,7 +2,6 @@ #include "../../Config.h" #include "MetastoreFacade.h" -#include "rocksdb/Rocks.h" #include "redis/Redis.h" #include "json/json.h" @@ -17,12 +16,7 @@ MetastoreFacade::MetastoreFacade(std::shared_ptr config, std::shared_ptr bool MetastoreFacade::connect() { int res = 0; - if(type_ == ROCKS){ - string DBPath = "/tmp/rocksdb_simple_example"; - res = connection_->connect(DBPath); - }else if(type_ == REDIS){ - res = connection_->connect(); - } + res = connection_->connect(); if (res == 0) { log_->get_console_log()->info("Successfully connected to metastore database"); return true; diff --git a/rpmp/pmpool/proxy/metastore/MetastoreFacade.h b/rpmp/pmpool/proxy/metastore/MetastoreFacade.h index 3bcb1388..dae5ad0f 100644 --- a/rpmp/pmpool/proxy/metastore/MetastoreFacade.h +++ b/rpmp/pmpool/proxy/metastore/MetastoreFacade.h @@ -10,7 +10,7 @@ class Config; /** - * Facade for metastore, either Redis or RocksDB + * Facade for metastore * **/ class MetastoreFacade: public std::enable_shared_from_this{ @@ -31,7 +31,6 @@ class MetastoreFacade: public std::enable_shared_from_this{ std::string port_; std::string type_; std::string REDIS = "REDIS"; - std::string ROCKS = "ROCKS"; }; #endif \ No newline at end of file diff --git a/rpmp/pmpool/proxy/metastore/rocksdb/Rocks.cc b/rpmp/pmpool/proxy/metastore/rocksdb/Rocks.cc deleted file mode 100644 index 247c5c79..00000000 --- a/rpmp/pmpool/proxy/metastore/rocksdb/Rocks.cc +++ /dev/null @@ -1,35 +0,0 @@ -#include -#include "../../../Config.h" - -#include "Rocks.h" -#include "json/json.h" - -Rocks::Rocks(std::shared_ptr config, std::shared_ptr log){ - log_ = log; - config_ = config; - rocksConnection_ = std::make_shared(); -} - -bool Rocks::connect(string DBPath) { - int res = rocksConnection_->connect(DBPath); - if (res == 0) { - log_->get_console_log()->info("Successfully connected to rocksdb database"); - return true; - } - log_->get_console_log()->error("Failed to connect to rocksdb database"); - return false; -} - -string Rocks::set(string key, string value){ - rocksConnection_->put(key, value); - return ""; -} - -string Rocks::get(string key){ - string value = rocksConnection_->get(key); - return value; -}; - -int Rocks::exists(string key){ - return rocksConnection_->exists(key); -} \ No newline at end of file diff --git a/rpmp/pmpool/proxy/metastore/rocksdb/Rocks.h b/rpmp/pmpool/proxy/metastore/rocksdb/Rocks.h deleted file mode 100644 index 98512f02..00000000 --- a/rpmp/pmpool/proxy/metastore/rocksdb/Rocks.h +++ /dev/null @@ -1,28 +0,0 @@ -#ifndef SPARK_PMOF_ROCKS_H -#define SPARK_PMOF_ROCKS_H - -#include - -#include "pmpool/Config.h" -#include "pmpool/RLog.h" -#include "RocksConnection.h" - -class Config; - -class Rocks: public std::enable_shared_from_this{ -public: - Rocks(std::shared_ptr configs, std::shared_ptr log); - bool connect(string DBPath); - string set(string key, string value); - string get(string key); - int exists(string key); - -private: - std::shared_ptr rocksConnection_; - std::shared_ptr config_; - std::shared_ptr log_; - std::string address_; - std::string port_; -}; - -#endif \ No newline at end of file diff --git a/rpmp/pmpool/proxy/metastore/rocksdb/RocksConnection.cc b/rpmp/pmpool/proxy/metastore/rocksdb/RocksConnection.cc deleted file mode 100644 index 17bad80e..00000000 --- a/rpmp/pmpool/proxy/metastore/rocksdb/RocksConnection.cc +++ /dev/null @@ -1,60 +0,0 @@ -#include "RocksConnection.h" - -#include -#include -#include - -#include "rocksdb/db.h" -#include "rocksdb/slice.h" -#include "rocksdb/options.h" - -using namespace std; -using namespace ROCKSDB_NAMESPACE; - -int RocksConnection::connect(string DBPath){ - Options options; - // Optimize RocksDB. This is the easiest way to get RocksDB to perform well - options.IncreaseParallelism(); - options.OptimizeLevelStyleCompaction(); - // create the DB if it's not already present - options.create_if_missing = true; - - // open DB - Status s = DB::Open(options, DBPath, &db_); - - if (s.ok() == true){ - setConnected(true); - return 0; - } - setConnected(false); - return -1; -} - -bool RocksConnection::isConnected(){ - return connected_; -} - -int RocksConnection::setConnected(bool connected){ - connected_ = connected; - return 0; -} - -Status RocksConnection::put(string key, string value){ - Status s = db_->Put(WriteOptions(), key, value); - return s; -} - -string RocksConnection::get(string key){ - string value; - Status s = db_->Get(ReadOptions(), key, &value); - return value; -} - -int RocksConnection::exists(string key){ - string value; - Status s = db_->Get(ReadOptions(), key, &value); - if(s.ok()){ - return 1; - } - return 0; -} diff --git a/rpmp/pmpool/proxy/metastore/rocksdb/RocksConnection.h b/rpmp/pmpool/proxy/metastore/rocksdb/RocksConnection.h deleted file mode 100644 index 4cf3061f..00000000 --- a/rpmp/pmpool/proxy/metastore/rocksdb/RocksConnection.h +++ /dev/null @@ -1,27 +0,0 @@ -#ifndef SPARK_PMOF_ROCKSCONNECTION_H -#define SPARK_PMOF_ROCKSCONNECTION_H - -#include -#include - -#include "rocksdb/db.h" -#include "rocksdb/slice.h" -#include "rocksdb/options.h" - -using namespace std; -using namespace ROCKSDB_NAMESPACE; - -class RocksConnection: public std::enable_shared_from_this{ -public: - int connect(string DBPath); - bool isConnected(); - Status put(string key, string value); - string get(string key); - int exists(string key); -private: - bool connected_; - int setConnected(bool connected); - DB *db_; -}; - -#endif \ No newline at end of file diff --git a/rpmp/test/rocksdb_test.cc b/rpmp/test/rocksdb_test.cc deleted file mode 100644 index 3c56a564..00000000 --- a/rpmp/test/rocksdb_test.cc +++ /dev/null @@ -1,44 +0,0 @@ -#include -#include - -#include "rocksdb/db.h" -#include "rocksdb/slice.h" -#include "rocksdb/options.h" - -using namespace ROCKSDB_NAMESPACE; -using namespace std; - -#if defined(OS_WIN) -std::string kDBPath = "C:\\Windows\\TEMP\\rocksdb_simple_example"; -#else -std::string kDBPath = "/tmp/rocksdb_simple_example"; -#endif - -int put_and_get(){ - DB* db; - Options options; - // Optimize RocksDB. This is the easiest way to get RocksDB to perform well - options.IncreaseParallelism(); - options.OptimizeLevelStyleCompaction(); - // create the DB if it's not already present - options.create_if_missing = true; - // open DB - Status s = DB::Open(options, kDBPath, &db); - assert(s.ok()); - cout<Put(WriteOptions(), "key1", "value1"); - assert(s.ok()); - std::string value; - // get value - s = db->Get(ReadOptions(), "key1", &value); - assert(s.ok()); - assert(value == "value"); - cout<<"value: "<