Skip to content

Commit

Permalink
simplify graph signal handler (vesoft-inc#449)
Browse files Browse the repository at this point in the history
#### What type of PR is this?
- [x] bug
- [ ] feature
- [ ] enhancement

#### What does this PR do?
closes vesoft-inc#3441 
simplify graphd signal handler:Set stop condition instead of waiting all workers stop

#### Which issue(s)/PR(s) this PR relates to?
In vesoft-inc#3437 , it simplify storage SIGINT signal handler
  
#### Special notes for your reviewer, ex. impact of this fix, etc:


#### Additional context/ Design document:


#### Checklist:
- [ ] Documentation affected (Please add the label if documentation needs to be modified.)
- [ ] Incompatibility (If it breaks the compatibility, please describe it and add the corresponding label.)
- [ ] If it's needed to cherry-pick (If cherry-pick to some branches is required, please label the destination version(s).)
- [ ] Performance impacted: Consumes more CPU/Memory

#### Release notes:

Please confirm whether to be reflected in release notes and how to describe:
>                                                                 `


Migrated from vesoft-inc#3542

Co-authored-by: endy.li <25311962+heroicNeZha@users.noreply.github.com>
  • Loading branch information
nebula-bots and heroicNeZha authored Jan 7, 2022
1 parent 57cf00b commit b1bea3d
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 46 deletions.
56 changes: 10 additions & 46 deletions src/daemons/GraphDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@
#include <thrift/lib/cpp2/server/ThriftServer.h>

#include "common/base/Base.h"
#include "common/base/SignalHandler.h"
#include "common/fs/FileUtils.h"
#include "common/network/NetworkUtils.h"
#include "common/process/ProcessUtils.h"
#include "common/ssl/SSLConfig.h"
#include "common/time/TimezoneInfo.h"
#include "daemons/SetupLogging.h"
#include "graph/service/GraphFlags.h"
#include "graph/service/GraphServer.h"
#include "graph/service/GraphService.h"
#include "graph/stats/GraphStats.h"
#include "version/Version.h"
Expand All @@ -30,12 +30,9 @@ using nebula::fs::FileUtils;
using nebula::graph::GraphService;
using nebula::network::NetworkUtils;

static std::unique_ptr<apache::thrift::ThriftServer> gServer;

static void signalHandler(int sig);
static Status setupSignalHandler();
static void printHelp(const char *prog);
static void setupThreadManager();
extern Status setupAuditLog();
#if defined(__x86_64__)
extern Status setupBreakpad();
Expand All @@ -44,6 +41,8 @@ extern Status setupBreakpad();
DECLARE_string(flagfile);
DECLARE_bool(containerized);

std::unique_ptr<nebula::graph::GraphServer> gServer;

int main(int argc, char *argv[]) {
google::SetVersionString(nebula::versionString());
if (argc == 1) {
Expand Down Expand Up @@ -164,47 +163,22 @@ int main(int argc, char *argv[]) {
}
LOG(INFO) << "Number of worker threads: " << FLAGS_num_worker_threads;

auto threadFactory = std::make_shared<folly::NamedThreadFactory>("graph-netio");
auto ioThreadPool = std::make_shared<folly::IOThreadPoolExecutor>(FLAGS_num_netio_threads,
std::move(threadFactory));
gServer = std::make_unique<apache::thrift::ThriftServer>();
gServer->setIOThreadPool(ioThreadPool);

auto interface = std::make_shared<GraphService>();
status = interface->init(ioThreadPool, localhost);
if (!status.ok()) {
LOG(ERROR) << status;
return EXIT_FAILURE;
}

gServer->setPort(localhost.port);
gServer->setInterface(std::move(interface));
gServer->setReusePort(FLAGS_reuse_port);
gServer->setIdleTimeout(std::chrono::seconds(FLAGS_client_idle_timeout_secs));
gServer->setNumAcceptThreads(FLAGS_num_accept_threads);
gServer->setListenBacklog(FLAGS_listen_backlog);
if (FLAGS_enable_ssl || FLAGS_enable_graph_ssl) {
gServer->setSSLConfig(nebula::sslContextConfig());
}
setupThreadManager();

// Setup the signal handlers
status = setupSignalHandler();
if (!status.ok()) {
LOG(ERROR) << status;
return EXIT_FAILURE;
}

FLOG_INFO("Starting nebula-graphd on %s:%d\n", localhost.host.c_str(), localhost.port);
try {
gServer->serve(); // Blocking wait until shut down via gServer->stop()
} catch (const std::exception &e) {
FLOG_ERROR("Exception thrown while starting the RPC server: %s", e.what());
gServer = std::make_unique<nebula::graph::GraphServer>(localhost);

if (!gServer->start()) {
LOG(ERROR) << "The graph server start failed";
return EXIT_FAILURE;
}

FLOG_INFO("nebula-graphd on %s:%d has been stopped", localhost.host.c_str(), localhost.port);

gServer->waitUntilStop();
LOG(INFO) << "The graph Daemon stopped";
return EXIT_SUCCESS;
}

Expand All @@ -219,7 +193,7 @@ void signalHandler(int sig) {
case SIGINT:
case SIGTERM:
FLOG_INFO("Signal %d(%s) received, stopping this server", sig, ::strsignal(sig));
gServer->stop();
gServer->notifyStop();
break;
default:
FLOG_ERROR("Signal %d(%s) received but ignored", sig, ::strsignal(sig));
Expand All @@ -229,13 +203,3 @@ void signalHandler(int sig) {
void printHelp(const char *prog) {
fprintf(stderr, "%s --flagfile <config_file>\n", prog);
}

void setupThreadManager() {
int numThreads =
FLAGS_num_worker_threads > 0 ? FLAGS_num_worker_threads : gServer->getNumIOWorkerThreads();
std::shared_ptr<apache::thrift::concurrency::ThreadManager> threadManager(
PriorityThreadManager::newPriorityThreadManager(numThreads));
threadManager->setNamePrefix("executor");
threadManager->start();
gServer->setThreadManager(threadManager);
}
1 change: 1 addition & 0 deletions src/graph/service/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ nebula_add_library(
nebula_add_library(
service_obj OBJECT
GraphService.cpp
GraphServer.cpp
)

nebula_add_library(
Expand Down
105 changes: 105 additions & 0 deletions src/graph/service/GraphServer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/
#include "GraphServer.h"

#include <memory>
#include <utility>

#include "graph/service/GraphFlags.h"
#include "graph/service/GraphService.h"
namespace nebula {
namespace graph {

GraphServer::GraphServer(HostAddr localHost) : localHost_(std::move(localHost)) {}

GraphServer::~GraphServer() {
stop();
}

bool GraphServer::start() {
auto threadFactory = std::make_shared<folly::NamedThreadFactory>("graph-netio");
auto ioThreadPool = std::make_shared<folly::IOThreadPoolExecutor>(FLAGS_num_netio_threads,
std::move(threadFactory));
int numThreads = FLAGS_num_worker_threads > 0 ? FLAGS_num_worker_threads
: thriftServer_->getNumIOWorkerThreads();
std::shared_ptr<apache::thrift::concurrency::ThreadManager> threadManager(
PriorityThreadManager::newPriorityThreadManager(numThreads));
threadManager->setNamePrefix("executor");
threadManager->start();

thriftServer_ = std::make_unique<apache::thrift::ThriftServer>();
thriftServer_->setIOThreadPool(ioThreadPool);

auto interface = std::make_shared<GraphService>();
auto status = interface->init(ioThreadPool, localHost_);
if (!status.ok()) {
LOG(ERROR) << status;
return false;
}

graphThread_ = std::make_unique<std::thread>([&] {
thriftServer_->setPort(localHost_.port);
thriftServer_->setInterface(std::move(interface));
thriftServer_->setReusePort(FLAGS_reuse_port);
thriftServer_->setIdleTimeout(std::chrono::seconds(FLAGS_client_idle_timeout_secs));
thriftServer_->setNumAcceptThreads(FLAGS_num_accept_threads);
thriftServer_->setListenBacklog(FLAGS_listen_backlog);
if (FLAGS_enable_ssl || FLAGS_enable_graph_ssl) {
thriftServer_->setSSLConfig(nebula::sslContextConfig());
}
thriftServer_->setThreadManager(threadManager);

serverStatus_.store(STATUS_RUNNING);
FLOG_INFO("Starting nebula-graphd on %s:%d\n", localHost_.host.c_str(), localHost_.port);
try {
thriftServer_->serve(); // Blocking wait until shut down via thriftServer_->stop()
} catch (const std::exception &e) {
FLOG_ERROR("Exception thrown while starting the graph RPC server: %s", e.what());
}
serverStatus_.store(STATUS_STOPPED);
FLOG_INFO("nebula-graphd on %s:%d has been stopped", localHost_.host.c_str(), localHost_.port);
});

while (serverStatus_ == STATUS_UNINITIALIZED) {
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
return true;
}

void GraphServer::waitUntilStop() {
{
std::unique_lock<std::mutex> lkStop(muStop_);
cvStop_.wait(lkStop, [&] { return serverStatus_ != STATUS_RUNNING; });
}

thriftServer_->stop();

graphThread_->join();
}

void GraphServer::notifyStop() {
std::unique_lock<std::mutex> lkStop(muStop_);
if (serverStatus_ == STATUS_RUNNING) {
serverStatus_ = STATUS_STOPPED;
cvStop_.notify_one();
}
}

void GraphServer::stop() {
if (serverStatus_.load() == ServiceStatus::STATUS_STOPPED) {
LOG(INFO) << "The graph server has been stopped";
return;
}

ServiceStatus serverExpected = ServiceStatus::STATUS_RUNNING;
serverStatus_.compare_exchange_strong(serverExpected, STATUS_STOPPED);

if (thriftServer_) {
thriftServer_->stop();
}
}

} // namespace graph
} // namespace nebula
46 changes: 46 additions & 0 deletions src/graph/service/GraphServer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/
#include <thrift/lib/cpp2/server/ThriftServer.h>

#include <cstdint>
#include <mutex>
#include <thread>

#include "common/base/Base.h"
#include "common/base/SignalHandler.h"
#include "common/network/NetworkUtils.h"
namespace nebula {
namespace graph {
class GraphServer {
public:
explicit GraphServer(HostAddr localHost);

~GraphServer();

// Return false if failed.
bool start();

void stop();

// used for signal handler to set an internal stop flag
void notifyStop();

void waitUntilStop();

private:
HostAddr localHost_;

std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool_;
std::shared_ptr<apache::thrift::concurrency::ThreadManager> workers_;
std::unique_ptr<apache::thrift::ThriftServer> thriftServer_;
std::unique_ptr<std::thread> graphThread_;

enum ServiceStatus : uint8_t { STATUS_UNINITIALIZED = 0, STATUS_RUNNING = 1, STATUS_STOPPED = 2 };
std::atomic<ServiceStatus> serverStatus_{STATUS_UNINITIALIZED};
std::mutex muStop_;
std::condition_variable cvStop_;
};
} // namespace graph
} // namespace nebula

0 comments on commit b1bea3d

Please sign in to comment.