Skip to content

Commit

Permalink
Merge pull request #64 from cocaine/kobolog/multiversion-cluster
Browse files Browse the repository at this point in the history
[Bugfix] Compatibility with cocaine/cocaine-core#191.
  • Loading branch information
Andrey Sibiryov committed Apr 7, 2015
2 parents 73b6887 + 312e19d commit 5097b6b
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 60 deletions.
99 changes: 46 additions & 53 deletions ipvs/gateway.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,9 @@ class ipvs_t::remote_t {
COCAINE_DECLARE_NONCOPYABLE(remote_t)

struct info_t {
const std::string name;
metadata_t meta;
std::string service;
std::size_t version;
std::vector<tcp::endpoint> endpoints;
};

ipvs_t *const parent;
Expand All @@ -96,42 +97,40 @@ class ipvs_t::remote_t {
std::multimap<std::string, ipvs_dest_t> backends;

public:
remote_t(ipvs_t* parent, const std::string& name, unsigned int version, const graph_root_t& graph);
remote_t(ipvs_t* parent, const partition_t& name);
~remote_t();

// Observers

auto
reduce() const -> metadata_t;
reduce() const -> std::vector<tcp::endpoint>;

// Modifiers

void
size_t
insert(const std::string& uuid, const std::vector<tcp::endpoint>& endpoints);

void
size_t
remove(const std::string& uuid);

private:
void
format_address(union nf_inet_addr& target, const tcp::endpoint& endpoint);
};

ipvs_t::remote_t::remote_t(ipvs_t *const parent_, const std::string& name_, unsigned int version,
const graph_root_t& graph)
:
ipvs_t::remote_t::remote_t(ipvs_t *const parent_, const partition_t& name_):
parent(parent_),
info({name_, metadata_t({}, version, graph)})
info({std::get<0>(name_), std::get<1>(name_), {}})
{
ipvs_service_t handle;

scoped_attributes_t attributes(*parent->m_log, {
attribute::make("service", info.name)
attribute::make("service", info.service),
attribute::make("version", info.version)
});

auto port = parent->m_context.mapper.assign(info.name + ":virtual");
auto& endpoints = std::get<0>(info.meta);

auto port = parent->m_context.mapper.assign(cocaine::format("%s@%d:virtual", info.service,
info.version));
COCAINE_LOG_DEBUG(parent->m_log, "publishing virtual service");

for(auto it = parent->m_endpoints.begin(); it != parent->m_endpoints.end(); ++it) {
Expand Down Expand Up @@ -167,7 +166,7 @@ ipvs_t::remote_t::remote_t(ipvs_t *const parent_, const std::string& name_, unsi
}

// Store the endpoint.
endpoints.push_back(endpoint);
info.endpoints.push_back(endpoint);

// Store the kernel service handle.
services[handle.af] = handle;
Expand All @@ -177,17 +176,18 @@ ipvs_t::remote_t::remote_t(ipvs_t *const parent_, const std::string& name_, unsi
COCAINE_LOG_ERROR(parent->m_log, "no valid endpoints found for virtual service");

// Force disconnect the remote node.
throw std::system_error(EADDRNOTAVAIL, std::generic_category());
throw std::system_error(EADDRNOTAVAIL, std::system_category());
}

COCAINE_LOG_INFO(parent->m_log, "virtual service published on port %d with %d endpoint(s)",
port, endpoints.size()
port, info.endpoints.size()
);
}

ipvs_t::remote_t::~remote_t() {
COCAINE_LOG_DEBUG(parent->m_log, "cleaning up virtual service")(
"service", info.name
"service", info.service,
"version", info.version
);

for(auto it = backends.begin(); it != backends.end(); ++it) {
Expand Down Expand Up @@ -225,28 +225,28 @@ struct is_serving {
} // namespace

auto
ipvs_t::remote_t::reduce() const -> metadata_t {
ipvs_t::remote_t::reduce() const -> std::vector<tcp::endpoint> {
if(backends.empty()) {
throw std::system_error(error::service_not_available);
}

auto endpoints = std::vector<tcp::endpoint>();
auto builder = std::back_inserter(endpoints);
std::vector<tcp::endpoint> endpoints;

std::copy_if(std::get<0>(info.meta).begin(), std::get<0>(info.meta).end(), builder,
std::copy_if(info.endpoints.begin(), info.endpoints.end(), std::back_inserter(endpoints),
is_serving{backends}
);

return metadata_t { endpoints, std::get<1>(info.meta), std::get<2>(info.meta) };
return endpoints;
}

void
size_t
ipvs_t::remote_t::insert(const std::string& uuid, const std::vector<tcp::endpoint>& endpoints) {
ipvs_dest_t handle;

scoped_attributes_t attributes(*parent->m_log, {
attribute::make("service", info.name),
attribute::make("uuid", uuid)
attribute::make("service", info.service),
attribute::make("uuid", uuid),
attribute::make("version", info.version)
});

std::map<int, ipvs_dest_t> handles;
Expand Down Expand Up @@ -289,25 +289,24 @@ ipvs_t::remote_t::insert(const std::string& uuid, const std::vector<tcp::endpoin
COCAINE_LOG_ERROR(parent->m_log, "no valid endpoints found for destination");

// Force disconnect the remote node.
throw std::system_error(EHOSTUNREACH, std::generic_category());
throw std::system_error(EHOSTUNREACH, std::system_category());
}

for(auto it = handles.begin(); it != handles.end(); ++it) {
backends.insert({uuid, it->second});
}

COCAINE_LOG_INFO(parent->m_log, "destination registered with %d endpoint(s)", handles.size());

return backends.size();
}

void
size_t
ipvs_t::remote_t::remove(const std::string& uuid) {
if(!backends.count(uuid)) {
return;
}

COCAINE_LOG_INFO(parent->m_log, "removing destination with %d endpoint(s)", backends.count(uuid))(
"service", info.name,
"uuid", uuid
"service", info.service,
"uuid", uuid,
"version", info.version
);

for(auto it = backends.lower_bound(uuid); it != backends.upper_bound(uuid); ++it) {
Expand All @@ -319,7 +318,7 @@ ipvs_t::remote_t::remove(const std::string& uuid) {
}
}

backends.erase(uuid);
backends.erase(uuid); return backends.size();
}

void
Expand Down Expand Up @@ -419,44 +418,38 @@ ipvs_t::~ipvs_t() {
}

auto
ipvs_t::resolve(const std::string& name) const -> metadata_t {
ipvs_t::resolve(const partition_t& name) const -> std::vector<tcp::endpoint> {
auto ptr = m_remotes.synchronize();

if(!ptr->count(name)) {
throw std::system_error(error::service_not_available);
}

COCAINE_LOG_DEBUG(m_log, "providing service using virtual node")(
"service", name
);
COCAINE_LOG_DEBUG(m_log, "providing service using virtual node");

return ptr->at(name)->reduce();
}

void
ipvs_t::consume(const std::string& uuid, const std::string& name, const metadata_t& info) {
auto endpoints = std::vector<tcp::endpoint>();
auto graph = graph_root_t();
auto version = 0;

std::tie(endpoints, version, graph) = info;

size_t
ipvs_t::consume(const std::string& uuid,
const partition_t& name, const std::vector<tcp::endpoint>& endpoints)
{
auto ptr = m_remotes.synchronize();

if(!ptr->count(name)) {
(*ptr)[name] = std::make_unique<remote_t>(this, name, version, graph);
(*ptr)[name] = std::make_unique<remote_t>(this, name);
}

ptr->at(name)->insert(uuid, endpoints);
return ptr->at(name)->insert(uuid, endpoints);
}

void
ipvs_t::cleanup(const std::string& uuid, const std::string& name) {
size_t
ipvs_t::cleanup(const std::string& uuid, const partition_t& name) {
auto ptr = m_remotes.synchronize();

if(!ptr->count(name)) {
return;
return 0;
}

ptr->at(name)->remove(uuid);
return ptr->at(name)->remove(uuid);
}
18 changes: 11 additions & 7 deletions ipvs/gateway.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@

namespace cocaine { namespace gateway {

class ipvs_config_t {
class ipvs_config_t
{
public:
std::string scheduler;
unsigned int weight;
Expand All @@ -36,6 +37,8 @@ class ipvs_t:
{
class remote_t;

typedef std::map<partition_t, std::unique_ptr<remote_t>> remote_map_t;

context_t& m_context;

const std::unique_ptr<logging::log_t> m_log;
Expand All @@ -45,7 +48,7 @@ class ipvs_t:
std::vector<asio::ip::address> m_endpoints;

// Keeps track of IPVS configuration.
synchronized<std::map<std::string, std::unique_ptr<remote_t>>> m_remotes;
synchronized<remote_map_t> m_remotes;

public:
ipvs_t(context_t& context, const std::string& name, const dynamic_t& args);
Expand All @@ -55,15 +58,16 @@ class ipvs_t:

virtual
auto
resolve(const std::string& name) const -> metadata_t;
resolve(const partition_t& name) const -> std::vector<asio::ip::tcp::endpoint>;

virtual
void
consume(const std::string& uuid, const std::string& name, const metadata_t& info);
size_t
consume(const std::string& uuid,
const partition_t& name, const std::vector<asio::ip::tcp::endpoint>& endpoints);

virtual
void
cleanup(const std::string& uuid, const std::string& name);
size_t
cleanup(const std::string& uuid, const partition_t& name);
};

}} // namespace cocaine::gateway
Expand Down

0 comments on commit 5097b6b

Please sign in to comment.