Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
Merge pull request #40 from Eugene-Mark/oap-1.2
Browse files Browse the repository at this point in the history
[PMEM-SHUFFLE-39] Fix the bug that pmem-shuffle without RPMP fails to pass Terasort benchmark due to latest patch.
  • Loading branch information
Jian-Zhang authored Aug 2, 2021
2 parents 7991afd + c0b224b commit c68a0ef
Show file tree
Hide file tree
Showing 13 changed files with 19 additions and 279 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
**/target/**
*.class
*.iml
*.sh

# logs
*.log
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ private[spark] class BaseShuffleReader[K, C](handle: BaseShuffleHandle[K, _, C],
fetchContinuousBlocksInBatch
).toCompletionIterator

/**
* Force iterator to traverse itself and update internal counter
**/
wrappedStreams.size

val serializerInstance = dep.serializer.newInstance()

// Create a key/value iterator for each stream
Expand Down
10 changes: 4 additions & 6 deletions rpmp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,11 @@ file(COPY bin/stop-rpmp.sh DESTINATION ${CMAKE_RUNTIME_OUTPUT_DIRECTORY})

add_executable(data-server main.cc)
add_executable(chashtest test/chash_test.cc)
add_executable(rocksdbtest test/rocksdb_test.cc)
add_executable(proxy-server ProxyMain.cc)
add_executable(client Client.cc)
target_link_libraries(chashtest pmpool spdlog hiredis jsoncpp rocksdb redis++)
target_link_libraries(data-server pmpool spdlog hiredis jsoncpp rocksdb redis++)
target_link_libraries(proxy-server pmpool spdlog hiredis jsoncpp rocksdb redis++)
target_link_libraries(client pmpool spdlog hiredis jsoncpp rocksdb redis++)
target_link_libraries(rocksdbtest spdlog hiredis jsoncpp rocksdb redis++)
target_link_libraries(chashtest pmpool spdlog hiredis jsoncpp redis++)
target_link_libraries(data-server pmpool spdlog hiredis jsoncpp redis++)
target_link_libraries(proxy-server pmpool spdlog hiredis jsoncpp redis++)
target_link_libraries(client pmpool spdlog hiredis jsoncpp redis++)

file(COPY ${PROJECT_SOURCE_DIR}/config DESTINATION .)
2 changes: 1 addition & 1 deletion rpmp/pmpool/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ add_library(pmpool_client_jni SHARED Event.cc ProxyEvent.cc client/PmPoolClient.
target_link_libraries(pmpool_client_jni LINK_PUBLIC ${Boost_LIBRARIES} hpnl)
set_target_properties(pmpool_client_jni PROPERTIES LIBRARY_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/lib")

add_library(pmpool SHARED DataServer.cc Protocol.cc Event.cc ProxyEvent.cc NetworkServer.cc proxy/metastore/MetastoreFacade.cc proxy/metastore/ConnectionFacade.cc proxy/metastore/redis/Redis.cc proxy/metastore/rocksdb/Rocks.cc proxy/metastore/rocksdb/RocksConnection.cc HeartbeatClient.cc HeartbeatEvent.cc hash/xxhash.cc client/PmPoolClient.cc client/NetworkClient.cc client/ProxyClient.cc proxy/clientService/ClientService.cc Proxy.cc proxy/replicaService/ReplicaService.cc DataService/DataServerService.cc proxy/NodeManager.cc proxy/tracker/Tracker.cc)
add_library(pmpool SHARED DataServer.cc Protocol.cc Event.cc ProxyEvent.cc NetworkServer.cc proxy/metastore/MetastoreFacade.cc proxy/metastore/ConnectionFacade.cc proxy/metastore/redis/Redis.cc HeartbeatClient.cc HeartbeatEvent.cc hash/xxhash.cc client/PmPoolClient.cc client/NetworkClient.cc client/ProxyClient.cc proxy/clientService/ClientService.cc Proxy.cc proxy/replicaService/ReplicaService.cc DataService/DataServerService.cc proxy/NodeManager.cc proxy/tracker/Tracker.cc)

target_link_libraries(pmpool LINK_PUBLIC ${Boost_LIBRARIES} hpnl pmemobj)
set_target_properties(pmpool PROPERTIES LIBRARY_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/lib")
Expand Down
63 changes: 5 additions & 58 deletions rpmp/pmpool/proxy/metastore/ConnectionFacade.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,87 +4,34 @@
#include <iostream>
#include <memory>

#include "rocksdb/db.h"
#include "rocksdb/slice.h"
#include "rocksdb/options.h"

#include "redis/Redis.h"

using namespace std;
using namespace ROCKSDB_NAMESPACE;

ConnectionFacade::ConnectionFacade(std::shared_ptr<Config> config, std::shared_ptr<RLog> log,string type){
config_ = config;
log_ = log;
type_ = type;
}

//RocksDB
int ConnectionFacade::connect(string DBPath){
Options options;
// Optimize RocksDB. This is the easiest way to get RocksDB to perform well
options.IncreaseParallelism();
options.OptimizeLevelStyleCompaction();
// create the DB if it's not already present
options.create_if_missing = true;

// open DB
Status s = DB::Open(options, DBPath, &db_);

if (s.ok() == true){
setConnected(true);
return 0;
}
setConnected(false);
return -1;
}

string ConnectionFacade::put(string key, string value){
if(type_ == ROCKS){
Status s = db_->Put(WriteOptions(), key, value);
return s.ToString();
}else{
return redis_->set(key, value);
}
return redis_->set(key, value);
}

string ConnectionFacade::get(string key){
if(type_ == ROCKS){
string value;
Status s = db_->Get(ReadOptions(), key, &value);
return value;
}else{
return redis_->get(key);
}
return redis_->get(key);
}

int ConnectionFacade::exists(string key){
if(type_ == ROCKS){
string value;
Status s = db_->Get(ReadOptions(), key, &value);
if(s.ok()){
return 1;
}
return 0;
}else{
return redis_->exists(key);
}
return redis_->exists(key);
}

std::unordered_set<std::string> ConnectionFacade::scanAll(){
if(type_ == ROCKS){
//Do nothing
}else{
return redis_->scanAll();
}
return redis_->scanAll();
}

std::unordered_set<std::string> ConnectionFacade::scan(std::string pattern){
if(type_ == ROCKS){
//Do nothing
}else{
return redis_->scan(pattern);
}
return redis_->scan(pattern);
}

//Redis
Expand Down
12 changes: 2 additions & 10 deletions rpmp/pmpool/proxy/metastore/ConnectionFacade.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,20 @@
#include <memory>
#include <string>

#include "rocksdb/db.h"
#include "rocksdb/slice.h"
#include "rocksdb/options.h"
#include "redis/Redis.h"

#include "pmpool/Config.h"
#include "pmpool/RLog.h"

using namespace std;
using namespace ROCKSDB_NAMESPACE;

/**
* Facade for connection to Redis and RocksDB
* Facade for connection to Redis
*
**/
class ConnectionFacade: public std::enable_shared_from_this<ConnectionFacade>{
public:
// RocksDB
ConnectionFacade(std::shared_ptr<Config> config, std::shared_ptr<RLog> log, string type);
ConnectionFacade(std::shared_ptr<Config> config, std::shared_ptr<RLog> log,string type);
// Redis
int connect();
// Common
Expand All @@ -40,10 +35,7 @@ class ConnectionFacade: public std::enable_shared_from_this<ConnectionFacade>{
bool connected_;
int setConnected(bool connected);
string type_;
string ROCKS = "ROCKS";
string REDIS = "REDIS";
// RocksDB
DB *db_;
// Redis
shared_ptr<Redis> redis_;
};
Expand Down
8 changes: 1 addition & 7 deletions rpmp/pmpool/proxy/metastore/MetastoreFacade.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
#include "../../Config.h"

#include "MetastoreFacade.h"
#include "rocksdb/Rocks.h"
#include "redis/Redis.h"
#include "json/json.h"

Expand All @@ -17,12 +16,7 @@ MetastoreFacade::MetastoreFacade(std::shared_ptr<Config> config, std::shared_ptr

bool MetastoreFacade::connect() {
int res = 0;
if(type_ == ROCKS){
string DBPath = "/tmp/rocksdb_simple_example";
res = connection_->connect(DBPath);
}else if(type_ == REDIS){
res = connection_->connect();
}
res = connection_->connect();
if (res == 0) {
log_->get_console_log()->info("Successfully connected to metastore database");
return true;
Expand Down
3 changes: 1 addition & 2 deletions rpmp/pmpool/proxy/metastore/MetastoreFacade.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
class Config;

/**
* Facade for metastore, either Redis or RocksDB
* Facade for metastore
*
**/
class MetastoreFacade: public std::enable_shared_from_this<MetastoreFacade>{
Expand All @@ -31,7 +31,6 @@ class MetastoreFacade: public std::enable_shared_from_this<MetastoreFacade>{
std::string port_;
std::string type_;
std::string REDIS = "REDIS";
std::string ROCKS = "ROCKS";
};

#endif
35 changes: 0 additions & 35 deletions rpmp/pmpool/proxy/metastore/rocksdb/Rocks.cc

This file was deleted.

28 changes: 0 additions & 28 deletions rpmp/pmpool/proxy/metastore/rocksdb/Rocks.h

This file was deleted.

60 changes: 0 additions & 60 deletions rpmp/pmpool/proxy/metastore/rocksdb/RocksConnection.cc

This file was deleted.

27 changes: 0 additions & 27 deletions rpmp/pmpool/proxy/metastore/rocksdb/RocksConnection.h

This file was deleted.

Loading

0 comments on commit c68a0ef

Please sign in to comment.