Skip to content

Commit

Permalink
Fix SNI events (#513)
Browse files Browse the repository at this point in the history
* Fix events when using SNI routing

The address in status/topology events needs to be mapped to internal
SNI address so that nodes will be properly added and removed.

* Additional fixes

* Formatting fix

* Simplify logic

Co-authored-by: Bret McGuire <bret.mcguire@datastax.com>
  • Loading branch information
mpenick and absurdfarce authored Oct 21, 2021
1 parent cf594b7 commit 07f8ade
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 38 deletions.
21 changes: 19 additions & 2 deletions src/address_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

using namespace datastax::internal::core;

bool DefaultAddressFactory::create(const Row* peers_row, const Host::Ptr& connected_host,
Address* output) {
bool AddressFactory::create(const Row* peers_row, const Host::Ptr& connected_host,
Address* output) {
Address connected_address = connected_host->address();
const Value* peer_value = peers_row->get_by_name("peer");
const Value* rpc_value = peers_row->get_by_name("rpc_address");
Expand Down Expand Up @@ -59,6 +59,12 @@ bool DefaultAddressFactory::create(const Row* peers_row, const Host::Ptr& connec
return true;
}

bool AddressFactory::is_peer(const Row* peers_row, const Host::Ptr& connected_host,
const Address& expected) {
Address address;
return create(peers_row, connected_host, &address) && address == expected;
}

bool SniAddressFactory::create(const Row* peers_row, const Host::Ptr& connected_host,
Address* output) {
CassUuid host_id;
Expand All @@ -78,3 +84,14 @@ bool SniAddressFactory::create(const Row* peers_row, const Host::Ptr& connected_
connected_host->address().port(), to_string(host_id));
return true;
}

bool SniAddressFactory::is_peer(const Row* peers_row, const Host::Ptr& connected_host,
const Address& expected) {
const Value* value = peers_row->get_by_name("rpc_address");
Address rpc_address;
if (!value ||
!value->decoder().as_inet(value->size(), connected_host->address().port(), &rpc_address)) {
return false;
}
return rpc_address == expected;
}
15 changes: 6 additions & 9 deletions src/address_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,15 @@ namespace datastax { namespace internal { namespace core {
class Row;

/**
* An interface for constructing `Address` from `system.local`/`system.peers` row data.
* An address factory that creates `Address` using the `rpc_address` column.
*/
class AddressFactory : public RefCounted<AddressFactory> {
public:
typedef SharedRefPtr<AddressFactory> Ptr;
virtual ~AddressFactory() {}
virtual bool create(const Row* peers_row, const Host::Ptr& connected_host, Address* output) = 0;
};

/**
* An address factory that creates `Address` using the `rpc_address` column.
*/
class DefaultAddressFactory : public AddressFactory {
virtual bool create(const Row* peers_row, const Host::Ptr& connected_host, Address* output);
virtual bool is_peer(const Row* peers_row, const Host::Ptr& connected_host,
const Address& expected);
};

/**
Expand All @@ -48,13 +43,15 @@ class DefaultAddressFactory : public AddressFactory {
*/
class SniAddressFactory : public AddressFactory {
virtual bool create(const Row* peers_row, const Host::Ptr& connected_host, Address* output);
virtual bool is_peer(const Row* peers_row, const Host::Ptr& connected_host,
const Address& expected);
};

inline AddressFactory* create_address_factory_from_config(const Config& config) {
if (config.cloud_secure_connection_config().is_loaded()) {
return new SniAddressFactory();
} else {
return new DefaultAddressFactory();
return new AddressFactory();
}
}

Expand Down
14 changes: 12 additions & 2 deletions src/cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,17 @@ LockedHostMap::LockedHostMap(const HostMap& hosts)
LockedHostMap::~LockedHostMap() { uv_mutex_destroy(&mutex_); }

LockedHostMap::const_iterator LockedHostMap::find(const Address& address) const {
return hosts_.find(address);
HostMap::const_iterator it = hosts_.find(address);
if (it == hosts_.end()) {
// If this is from an event (not SNI) and we're using SNI addresses then fallback to using the
// "rpc_address" to compare.
for (HostMap::const_iterator i = hosts_.begin(), end = hosts_.end(); i != end; ++i) {
if (i->second->rpc_address() == address) {
return i;
}
}
}
return it;
}

Host::Ptr LockedHostMap::get(const Address& address) const {
Expand Down Expand Up @@ -633,7 +643,7 @@ void Cluster::notify_host_remove(const Address& address) {
notify_or_record(ClusterEvent(ClusterEvent::HOST_DOWN, host));
}

hosts_.erase(address);
hosts_.erase(it->first);
for (LoadBalancingPolicy::Vec::const_iterator it = load_balancing_policies_.begin(),
end = load_balancing_policies_.end();
it != end; ++it) {
Expand Down
17 changes: 8 additions & 9 deletions src/control_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ static NopControlConnectionListener nop_listener__;
ControlConnectionSettings::ControlConnectionSettings()
: use_schema(CASS_DEFAULT_USE_SCHEMA)
, use_token_aware_routing(CASS_DEFAULT_USE_TOKEN_AWARE_ROUTING)
, address_factory(new DefaultAddressFactory()) {}
, address_factory(new AddressFactory()) {}

ControlConnectionSettings::ControlConnectionSettings(const Config& config)
: connection_settings(config)
Expand Down Expand Up @@ -395,17 +395,16 @@ void ControlConnection::handle_refresh_node(RefreshNodeCallback* callback) {
const Row* row = NULL;
ResultIterator rows(callback->result().get());

while (rows.next() && !found_host) {
row = rows.row();
if (callback->is_all_peers) {
Address address;
bool is_valid_address = settings_.address_factory->create(row, connection_->host(), &address);
if (is_valid_address && callback->address == address) {
if (callback->is_all_peers) {
while (!found_host && rows.next()) {
row = rows.row();
if (settings_.address_factory->is_peer(row, connection_->host(), callback->address)) {
found_host = true;
}
} else {
found_host = true;
}
} else if (rows.next()) {
row = rows.row();
found_host = true;
}

if (!found_host) {
Expand Down
41 changes: 26 additions & 15 deletions src/host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,6 @@ void Host::set(const Row* row, bool use_tokens) {
if (dse_server_version_ < VersionNumber(6, 7, 0)) {
server_version_ = VersionNumber(3, 11, 0);
}
} else {
LOG_WARN("Invalid DSE version string \"%s\" on host %s", dse_version_str.c_str(),
address().to_string().c_str());
}
}

Expand Down Expand Up @@ -153,37 +150,51 @@ void Host::set(const Row* row, bool use_tokens) {
"If this is incorrect you should configure a specific interface for rpc_address on "
"the server.",
address_string_.c_str());
v = row->get_by_name("listen_address"); // Available in system.local
if (v && !v->is_null()) {
v->decoder().as_inet(v->size(), address_.port(), &rpc_address_);
} else {
v = row->get_by_name("peer"); // Available in system.peers
if (v && !v->is_null()) {
v->decoder().as_inet(v->size(), address_.port(), &rpc_address_);
}
}
if (!rpc_address_.is_valid()) {
LOG_WARN("Unable to set rpc_address from either listen_address or peer");
}
}
} else {
LOG_WARN("No rpc_address for host %s in system.local or system.peers.",
address_string_.c_str());
}
}

static CassInet to_inet(const Host::Ptr& host) {
CassInet address;
if (host->address().is_resolved()) {
address.address_length = host->address().to_inet(address.address);
} else {
address.address_length = host->rpc_address().to_inet(&address.address);
}
return address;
}

ExternalHostListener::ExternalHostListener(const CassHostListenerCallback callback, void* data)
: callback_(callback)
, data_(data) {}

void ExternalHostListener::on_host_up(const Host::Ptr& host) {
CassInet address;
address.address_length = host->address().to_inet(address.address);
callback_(CASS_HOST_LISTENER_EVENT_UP, address, data_);
callback_(CASS_HOST_LISTENER_EVENT_UP, to_inet(host), data_);
}

void ExternalHostListener::on_host_down(const Host::Ptr& host) {
CassInet address;
address.address_length = host->address().to_inet(address.address);
callback_(CASS_HOST_LISTENER_EVENT_DOWN, address, data_);
callback_(CASS_HOST_LISTENER_EVENT_DOWN, to_inet(host), data_);
}

void ExternalHostListener::on_host_added(const Host::Ptr& host) {
CassInet address;
address.address_length = host->address().to_inet(address.address);
callback_(CASS_HOST_LISTENER_EVENT_ADD, address, data_);
callback_(CASS_HOST_LISTENER_EVENT_ADD, to_inet(host), data_);
}

void ExternalHostListener::on_host_removed(const Host::Ptr& host) {
CassInet address;
address.address_length = host->address().to_inet(address.address);
callback_(CASS_HOST_LISTENER_EVENT_REMOVE, address, data_);
callback_(CASS_HOST_LISTENER_EVENT_REMOVE, to_inet(host), data_);
}
2 changes: 1 addition & 1 deletion src/request_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ RequestProcessorSettings::RequestProcessorSettings()
, max_tracing_wait_time_ms(CASS_DEFAULT_MAX_TRACING_DATA_WAIT_TIME_MS)
, retry_tracing_wait_time_ms(CASS_DEFAULT_RETRY_TRACING_DATA_WAIT_TIME_MS)
, tracing_consistency(CASS_DEFAULT_TRACING_CONSISTENCY)
, address_factory(new DefaultAddressFactory()) {
, address_factory(new AddressFactory()) {
profiles.set_empty_key("");
}

Expand Down
1 change: 1 addition & 0 deletions src/wkt.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// clang-format off

#line 1 "wkt.rl"
/*
Expand Down

0 comments on commit 07f8ade

Please sign in to comment.