Skip to content

Commit

Permalink
revert consul naming service (apache#2129)
Browse files Browse the repository at this point in the history
* revert consul naming service

* revert consul naming service

* revert consul naming service
  • Loading branch information
chenBright authored and Yang Liming committed Jun 25, 2023
1 parent 04ae594 commit 79203fc
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 25 deletions.
6 changes: 6 additions & 0 deletions src/brpc/periodic_naming_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ int PeriodicNamingService::RunNamingService(
actions->ResetServers(servers);
}

// If `bthread_stop' is called to stop the ns bthread when `brpc::Join‘ is called
// in `GetServers' to wait for a rpc to complete. The bthread will be woken up,
// reset `TaskMeta::interrupted' and continue to join the rpc. After the rpc is complete,
// `bthread_usleep' will not sense the interrupt signal and sleep successfully.
// Finally, the ns bthread will never exit. So need to check the stop status of
// the bthread here and exit the bthread in time.
if (bthread_stopped(bthread_self())) {
RPC_VLOG << "Quit NamingServiceThread=" << bthread_self();
return 0;
Expand Down
63 changes: 49 additions & 14 deletions src/brpc/policy/consul_naming_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ DEFINE_int32(consul_blocking_query_wait_secs, 60,
DEFINE_bool(consul_enable_degrade_to_file_naming_service, false,
"Use local backup file when consul cannot connect");
DEFINE_string(consul_file_naming_service_dir, "",
"When it degraded to file naming service, the file with name of the "
"service name will be searched in this dir to use.");
"When it degraded to file naming service, the file with name of the "
"service name will be searched in this dir to use.");
DEFINE_int32(consul_retry_interval_ms, 500,
"Wait so many milliseconds before retry when error happens");

Expand All @@ -62,14 +62,6 @@ std::string RapidjsonValueToString(const BUTIL_RAPIDJSON_NAMESPACE::Value& value
return buffer.GetString();
}

ConsulNamingService::ConsulNamingService()
: _backup_file_loaded(false), _consul_connected(false) {}

int ConsulNamingService::GetNamingServiceAccessIntervalMs() const {
return FLAGS_consul_retry_interval_ms > 0 ? FLAGS_consul_retry_interval_ms :
PeriodicNamingService::GetNamingServiceAccessIntervalMs();
}

int ConsulNamingService::DegradeToOtherServiceIfNeeded(const char* service_name,
std::vector<ServerNode>* servers) {
if (FLAGS_consul_enable_degrade_to_file_naming_service && !_backup_file_loaded) {
Expand Down Expand Up @@ -122,8 +114,8 @@ int ConsulNamingService::GetServers(const char* service_name,
if (index != nullptr) {
if (*index == _consul_index) {
LOG_EVERY_N(INFO, 100) << "There is no service changed for the list of "
<< service_name
<< ", consul_index: " << _consul_index;
<< service_name
<< ", consul_index: " << _consul_index;
return -1;
}
} else {
Expand Down Expand Up @@ -208,7 +200,7 @@ int ConsulNamingService::GetServers(const char* service_name,
if (servers->empty() && !services.Empty()) {
LOG(ERROR) << "All service about " << service_name
<< " from consul is invalid, refuse to update servers";
return -1;
return -1;
}

RPC_VLOG << "Got " << servers->size()
Expand All @@ -217,6 +209,49 @@ int ConsulNamingService::GetServers(const char* service_name,
return 0;
}

int ConsulNamingService::RunNamingService(const char* service_name,
NamingServiceActions* actions) {
std::vector<ServerNode> servers;
bool ever_reset = false;
for (;;) {
servers.clear();
const int rc = GetServers(service_name, &servers);
// If `bthread_stop' is called to stop the ns bthread when `brpc::Join‘ is called
// in `GetServers' to wait for a rpc to complete. The bthread will be woken up,
// reset `TaskMeta::interrupted' and continue to join the rpc. After the rpc is complete,
// `bthread_usleep' will not sense the interrupt signal and sleep successfully.
// Finally, the ns bthread will never exit. So need to check the stop status of
// the bthread here and exit the bthread in time.
if (bthread_stopped(bthread_self())) {
RPC_VLOG << "Quit NamingServiceThread=" << bthread_self();
return 0;
}
if (rc == 0) {
ever_reset = true;
actions->ResetServers(servers);
} else {
if (!ever_reset) {
// ResetServers must be called at first time even if GetServers
// failed, to wake up callers to `WaitForFirstBatchOfServers'
ever_reset = true;
servers.clear();
actions->ResetServers(servers);
}
if (bthread_usleep(std::max(FLAGS_consul_retry_interval_ms, 1) * butil::Time::kMicrosecondsPerMillisecond) < 0) {
if (errno == ESTOP) {
RPC_VLOG << "Quit NamingServiceThread=" << bthread_self();
return 0;
}
PLOG(FATAL) << "Fail to sleep";
return -1;
}
}
}
CHECK(false);
return -1;
}


void ConsulNamingService::Describe(std::ostream& os,
const DescribeOptions&) const {
os << "consul";
Expand All @@ -231,4 +266,4 @@ void ConsulNamingService::Destroy() {
}

} // namespace policy
} // namespace brpc
} // namespace brpc
20 changes: 9 additions & 11 deletions src/brpc/policy/consul_naming_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,21 @@
#ifndef BRPC_POLICY_CONSUL_NAMING_SERVICE
#define BRPC_POLICY_CONSUL_NAMING_SERVICE

#include "brpc/periodic_naming_service.h"
#include "brpc/naming_service.h"
#include "brpc/channel.h"


namespace brpc {
class Channel;
namespace policy {

class ConsulNamingService : public PeriodicNamingService {
public:
ConsulNamingService();

class ConsulNamingService : public NamingService {
private:
int GetServers(const char* service_name,
std::vector<ServerNode>* servers) override;
int RunNamingService(const char* service_name,
NamingServiceActions* actions) override;

int GetNamingServiceAccessIntervalMs() const override;
int GetServers(const char* service_name,
std::vector<ServerNode>* servers);

void Describe(std::ostream& os, const DescribeOptions&) const override;

Expand All @@ -50,12 +48,12 @@ class ConsulNamingService : public PeriodicNamingService {
Channel _channel;
std::string _consul_index;
std::string _consul_url;
bool _backup_file_loaded;
bool _consul_connected;
bool _backup_file_loaded = false;
bool _consul_connected = false;
};

} // namespace policy
} // namespace brpc


#endif //BRPC_POLICY_CONSUL_NAMING_SERVICE
#endif //BRPC_POLICY_CONSUL_NAMING_SERVICE

0 comments on commit 79203fc

Please sign in to comment.