Skip to content

Commit

Permalink
Abstract ConsulNamingService into a PeriodicNamingService
Browse files Browse the repository at this point in the history
  • Loading branch information
chenBright committed Feb 11, 2023
1 parent 5bda7a0 commit ef48e53
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 48 deletions.
2 changes: 1 addition & 1 deletion src/brpc/periodic_naming_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ int PeriodicNamingService::RunNamingService(
const char* service_name, NamingServiceActions* actions) {
std::vector<ServerNode> servers;
bool ever_reset = false;
for (;;) {
while (true) {
servers.clear();
const int rc = GetServers(service_name, &servers);
if (bthread_stopped(bthread_self())) {
Expand Down
2 changes: 1 addition & 1 deletion src/brpc/periodic_naming_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class PeriodicNamingService : public NamingService {
virtual int GetNamingServiceAccessIntervalMs() const;

int RunNamingService(const char* service_name,
NamingServiceActions* actions);
NamingServiceActions* actions) override;
};

} // namespace brpc
Expand Down
46 changes: 8 additions & 38 deletions src/brpc/policy/consul_naming_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ std::string RapidjsonValueToString(const BUTIL_RAPIDJSON_NAMESPACE::Value& value
return buffer.GetString();
}

ConsulNamingService::ConsulNamingService()
: _backup_file_loaded(false), _consul_connected(false) {}

int ConsulNamingService::GetNamingServiceAccessIntervalMs() const {
return FLAGS_consul_retry_interval_ms > 0 ? FLAGS_consul_retry_interval_ms :
PeriodicNamingService::GetNamingServiceAccessIntervalMs();
}

int ConsulNamingService::DegradeToOtherServiceIfNeeded(const char* service_name,
std::vector<ServerNode>* servers) {
if (FLAGS_consul_enable_degrade_to_file_naming_service && !_backup_file_loaded) {
Expand Down Expand Up @@ -209,47 +217,9 @@ int ConsulNamingService::GetServers(const char* service_name,
return 0;
}

int ConsulNamingService::RunNamingService(const char* service_name,
NamingServiceActions* actions) {
std::vector<ServerNode> servers;
bool ever_reset = false;
for (;;) {
servers.clear();
const int rc = GetServers(service_name, &servers);
if (bthread_stopped(bthread_self())) {
RPC_VLOG << "Quit NamingServiceThread=" << bthread_self();
return 0;
}
if (rc == 0) {
ever_reset = true;
actions->ResetServers(servers);
} else {
if (!ever_reset) {
// ResetServers must be called at first time even if GetServers
// failed, to wake up callers to `WaitForFirstBatchOfServers'
ever_reset = true;
servers.clear();
actions->ResetServers(servers);
}
if (bthread_usleep(std::max(FLAGS_consul_retry_interval_ms, 1) * butil::Time::kMicrosecondsPerMillisecond) < 0) {
if (errno == ESTOP) {
RPC_VLOG << "Quit NamingServiceThread=" << bthread_self();
return 0;
}
PLOG(FATAL) << "Fail to sleep";
return -1;
}
}
}
CHECK(false);
return -1;
}


void ConsulNamingService::Describe(std::ostream& os,
const DescribeOptions&) const {
os << "consul";
return;
}

NamingService* ConsulNamingService::New() const {
Expand Down
18 changes: 10 additions & 8 deletions src/brpc/policy/consul_naming_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,23 @@
#ifndef BRPC_POLICY_CONSUL_NAMING_SERVICE
#define BRPC_POLICY_CONSUL_NAMING_SERVICE

#include "brpc/naming_service.h"
#include "brpc/periodic_naming_service.h"
#include "brpc/channel.h"


namespace brpc {
class Channel;
namespace policy {

class ConsulNamingService : public NamingService {
private:
int RunNamingService(const char* service_name,
NamingServiceActions* actions) override;
class ConsulNamingService : public PeriodicNamingService {
public:
ConsulNamingService();

private:
int GetServers(const char* service_name,
std::vector<ServerNode>* servers);
std::vector<ServerNode>* servers) override;

int GetNamingServiceAccessIntervalMs() const override;

void Describe(std::ostream& os, const DescribeOptions&) const override;

Expand All @@ -48,8 +50,8 @@ class ConsulNamingService : public NamingService {
Channel _channel;
std::string _consul_index;
std::string _consul_url;
bool _backup_file_loaded = false;
bool _consul_connected = false;
bool _backup_file_loaded;
bool _consul_connected;
};

} // namespace policy
Expand Down

0 comments on commit ef48e53

Please sign in to comment.