Skip to content

Commit

Permalink
update storage client for plato
Browse files Browse the repository at this point in the history
  • Loading branch information
jievince committed Nov 19, 2021
1 parent 18e5344 commit 6e749b1
Show file tree
Hide file tree
Showing 24 changed files with 1,111 additions and 6 deletions.
115 changes: 115 additions & 0 deletions include/nebula/mclient/MetaClient.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#pragma once

#include <functional>
#include <memory>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>

#include "common/datatypes/HostAddr.h"
#include "common/thrift/ThriftTypes.h"
#include "nebula/sclient/ScanEdgeIter.h"

struct pair_hash {
template <class T1, class T2>
std::size_t operator()(const std::pair<T1, T2> &pair) const {
return std::hash<T1>()(pair.first) ^ std::hash<T2>()(pair.second);
}
};

namespace folly {

class IOThreadPoolExecutor;
template <class T>
class Promise;

} // namespace folly

namespace nebula {

namespace thrift {

template <class ClientType>
class ThriftClientManager;

} // namespace thrift

namespace meta {
namespace cpp2 {

enum class ListHostType;
class HostItem;
class MetaServiceAsyncClient;
class ListSpacesReq;
class ListSpacesResp;
class IdName;
class EdgeItem;
class ListEdgesReq;
class ListEdgesResp;

} // namespace cpp2
} // namespace meta

using SpaceIdName = std::pair<GraphSpaceID, std::string>;
using SpaceNameIdMap = std::unordered_map<std::string, GraphSpaceID>;
using SpaceEdgeNameTypeMap =
std::unordered_map<std::pair<GraphSpaceID, std::string>, EdgeType, pair_hash>;

class MetaClient {
public:
explicit MetaClient(const std::vector<std::string> &metaAddrs);

~MetaClient();

std::pair<bool, GraphSpaceID> getSpaceIdByNameFromCache(const std::string &name);

std::pair<bool, EdgeType> getEdgeTypeByNameFromCache(GraphSpaceID spaceId,
const std::string &name);

std::pair<bool, std::vector<PartitionID>> getPartsFromCache(GraphSpaceID spaceId);

std::pair<bool, HostAddr> getPartLeaderFromCache(GraphSpaceID spaceId, PartitionID partId);

private:
bool loadData();

std::pair<bool, std::vector<SpaceIdName>> listSpaces();

std::pair<bool, std::vector<meta::cpp2::HostItem>> listHosts(meta::cpp2::ListHostType tp);

std::pair<bool, std::vector<meta::cpp2::EdgeItem>> listEdgeSchemas(GraphSpaceID spaceId);

void loadLeader(const std::vector<nebula::meta::cpp2::HostItem> &hostItems,
const SpaceNameIdMap &spaceIndexByName);

std::vector<SpaceIdName> toSpaceIdName(const std::vector<meta::cpp2::IdName> &tIdNames);

template <class Request,
class RemoteFunc,
class RespGenerator,
class RpcResponse = typename std::result_of<RemoteFunc(
std::shared_ptr<meta::cpp2::MetaServiceAsyncClient>, Request)>::type::value_type,
class Response = typename std::result_of<RespGenerator(RpcResponse)>::type>
void getResponse(Request req,
RemoteFunc remoteFunc,
RespGenerator respGen,
folly::Promise<std::pair<bool, Response>> pro);

private:
std::vector<HostAddr> metaAddrs_;
SpaceNameIdMap spaceIndexByName_;
SpaceEdgeNameTypeMap spaceEdgeIndexByName_;
std::unordered_map<std::pair<GraphSpaceID, PartitionID>, HostAddr, pair_hash> spacePartLeaderMap_;
std::unordered_map<GraphSpaceID, std::vector<PartitionID>> spacePartsMap_;
std::shared_ptr<folly::IOThreadPoolExecutor> ioExecutor_;
std::shared_ptr<thrift::ThriftClientManager<meta::cpp2::MetaServiceAsyncClient>> clientsMan_;
};

} // namespace nebula
13 changes: 13 additions & 0 deletions include/nebula/sclient/Init.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#pragma once

namespace nebula {

void init(int *argc, char **argv[]);

} // namespace nebula
40 changes: 40 additions & 0 deletions include/nebula/sclient/ScanEdgeIter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#pragma once

#include <string>
#include <vector>

#include "common/datatypes/DataSet.h"

namespace nebula {
class StorageClient;

namespace storage {
namespace cpp2 {
class ScanEdgeRequest;
} // namespace cpp2
} // namespace storage

struct ScanEdgeIter {
ScanEdgeIter(StorageClient* client,
storage::cpp2::ScanEdgeRequest* req,
bool hasNext = true);

~ScanEdgeIter();

bool hasNext();

DataSet next();

StorageClient* client_;
storage::cpp2::ScanEdgeRequest* req_;
bool hasNext_;
std::string nextCursor_;
};

} // namespace nebula
94 changes: 94 additions & 0 deletions include/nebula/sclient/StorageClient.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/* Copyright (c) 2018 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#pragma once

#include <memory>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>

#include "ScanEdgeIter.h"
#include "common/datatypes/HostAddr.h"
#include "common/thrift/ThriftTypes.h"
#include "nebula/mclient/MetaClient.h"
#include "nebula/sclient/ScanEdgeIter.h"

namespace folly {
class IOThreadPoolExecutor;
template <class T>
class Promise;
} // namespace folly

namespace nebula {

namespace thrift {

template <class ClientType>
class ThriftClientManager;

} // namespace thrift

namespace meta {
namespace cpp2 {

class MetaServiceAsyncClient;

} // namespace cpp2
} // namespace meta

namespace storage {
namespace cpp2 {

class GraphStorageServiceAsyncClient;
class ScanCursor;
class ScanEdgeRequest;
class ScanEdgeResponse;

} // namespace cpp2
} // namespace storage

#define DEFAULT_LIMIT 1000
#define DEFAULT_START_TIME 0
#define DEFAULT_END_TIME std::numeric_limits<int64_t>::max()

class StorageClient {
friend class ScanEdgeIter;
public:
explicit StorageClient(const std::vector<std::string>& metaAddrs);

~StorageClient();

std::vector<PartitionID> getParts(const std::string& spaceName); // plato needed

ScanEdgeIter scanEdgeWithPart(std::string spaceName,
int32_t partID,
std::string edgeName,
std::vector<std::string> propNames,
int64_t limit = DEFAULT_LIMIT,
int64_t startTime = DEFAULT_START_TIME,
int64_t endTime = DEFAULT_END_TIME,
std::string filter = "",
bool onlyLatestVersion = false,
bool enableReadFromFollower = true); // plato needed

private:
std::pair<bool, storage::cpp2::ScanEdgeResponse> doScanEdge(
const storage::cpp2::ScanEdgeRequest& req);

template <typename Request, typename RemoteFunc, typename Response>
void getResponse(std::pair<HostAddr, Request>&& request,
RemoteFunc&& remoteFunc,
folly::Promise<std::pair<bool, Response>> pro);

private:
std::unique_ptr<MetaClient> mClient_;
std::shared_ptr<folly::IOThreadPoolExecutor> ioExecutor_;
std::shared_ptr<thrift::ThriftClientManager<storage::cpp2::GraphStorageServiceAsyncClient>>
clientsMan_;
};

} // namespace nebula
49 changes: 49 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ set(NEBULA_CLIENT_OBJS
$<TARGET_OBJECTS:graph_thrift_obj>
)

set(NEBULA_SCLIENT_OBJS
$<TARGET_OBJECTS:common_thrift_obj>
$<TARGET_OBJECTS:meta_thrift_obj>
$<TARGET_OBJECTS:storage_thrift_obj>
)

set(NEBULA_CLIENT_SOURCES
client/Connection.cpp
client/Init.cpp
Expand All @@ -30,6 +36,26 @@ set(NEBULA_CLIENT_SOURCES
geo/io/wkb/ByteOrderDataIOStream.cpp
)

set(NEBULA_SCLIENT_SOURCES
mclient/MetaClient.cpp
sclient/StorageClient.cpp
sclient/ScanEdgeIter.cpp
sclient/Init.cpp
datatypes/Date.cpp
datatypes/Edge.cpp
datatypes/Geography.cpp
datatypes/HostAddr.cpp
datatypes/List.cpp
datatypes/Map.cpp
datatypes/Path.cpp
datatypes/Set.cpp
datatypes/Value.cpp
datatypes/Vertex.cpp
geo/io/wkt/WKTWriter.cpp
geo/io/wkb/WKBWriter.cpp
geo/io/wkb/ByteOrderDataIOStream.cpp
)

set(NEBULA_CLIENT_LIBRARIES
${OPENSSL_SSL_LIBRARY}
${OPENSSL_CRYPTO_LIBRARY}
Expand Down Expand Up @@ -70,19 +96,42 @@ nebula_add_library(
${NEBULA_CLIENT_SOURCES}
)

nebula_add_library(
nebula_storage_client SHARED
${NEBULA_SCLIENT_OBJS}
${NEBULA_SCLIENT_SOURCES}
)

target_link_libraries(
nebula_graph_client
${NEBULA_CLIENT_LIBRARIES}
)

target_link_libraries(
nebula_storage_client
${NEBULA_CLIENT_LIBRARIES}
)

install(
TARGETS nebula_graph_client
LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}
)

install(
TARGETS nebula_storage_client
LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}
)

nebula_add_library(
nebula_graph_client_obj OBJECT
${NEBULA_CLIENT_SOURCES}
)

nebula_add_library(
nebula_storage_client_obj OBJECT
${NEBULA_SCLIENT_SOURCES}
)

nebula_add_subdirectory(client)
nebula_add_subdirectory(sclient)
nebula_add_subdirectory(mclient)
3 changes: 3 additions & 0 deletions src/client/tests/ClientTest.h → src/ClientTest.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

#pragma once

#include "common/datatypes/DataSet.h"
#include "common/graph/Response.h"

#include <gtest/gtest.h>
class ClientTest : public ::testing::Test {
protected:
Expand Down
2 changes: 1 addition & 1 deletion src/client/tests/AddressTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#include <nebula/client/Init.h>
#include <nebula/client/Session.h>

#include "./ClientTest.h"
#include "../../ClientTest.h"

// Require a nebula server could access

Expand Down
2 changes: 1 addition & 1 deletion src/client/tests/ConfigTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#include <nebula/client/Init.h>
#include <nebula/client/Session.h>

#include "./ClientTest.h"
#include "../../ClientTest.h"

// Require a nebula server could access

Expand Down
2 changes: 1 addition & 1 deletion src/client/tests/ConnectionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#include <nebula/client/Connection.h>
#include <nebula/client/Init.h>

#include "./ClientTest.h"
#include "../../ClientTest.h"

// Require a nebula server could access

Expand Down
2 changes: 1 addition & 1 deletion src/client/tests/SessionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#include <chrono>
#include <thread>

#include "./ClientTest.h"
#include "../../ClientTest.h"

// Require a nebula server could access

Expand Down
Loading

0 comments on commit 6e749b1

Please sign in to comment.