Skip to content

Commit

Permalink
Implement tap selector (#330)
Browse files Browse the repository at this point in the history
* Implement tap selector

* Avoid editing original policy when interacting over handlers

* break out create policy method and add TapException

* add error 422 when tap select does not match any existent tap
  • Loading branch information
Leonardo Parente authored Aug 1, 2022
1 parent c201aa6 commit f2e5d8f
Show file tree
Hide file tree
Showing 11 changed files with 794 additions and 309 deletions.
6 changes: 6 additions & 0 deletions RFCs/2021-04-16-75-taps.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,19 @@ visor:
input_type: pcap
config:
iface: eth0
tags:
virtual: true
vhost: 1
# an sflow tap which listens on the given IP and port, referenced by the identifier "pop_switch"
pop_switch:
input_type: flow
config:
flow_type: sflow
port: 6343
bind: 192.168.1.1
tags:
virtual: false
vhost: 2
# a dnstap tap which gets its stream from the given socket, named "trex_tap"
trex_tap:
input_type: dnstap
Expand Down
8 changes: 6 additions & 2 deletions RFCs/2021-04-16-76-collection-policies.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,12 @@ visor:
description: "base chaning NET to DNS policy"
# input stream to create based on the given tap and optional filter config
input:
# this must reference a tap name, or application of the policy will fail
tap: anycast
# this must reference valid tags existent on applied taps, or application of the policy will fail
tap_selector:
# It can be either "any" or "all"
any:
virtual: true
vhost: 1
# this must match the input_type of the matching tap name, or application of the policy will fail
input_type: pcap
filter:
Expand Down
10 changes: 9 additions & 1 deletion src/AbstractMetricsManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ class AbstractMetricsManager
// this protects changes to the bucket container, _not_ changes to the bucket itself
mutable std::shared_mutex _bucket_mutex;
std::deque<std::unique_ptr<MetricsBucketClass>> _metric_buckets;

std::string _tap_name;
mutable std::shared_mutex _base_mutex;

/**
Expand Down Expand Up @@ -358,6 +358,10 @@ class AbstractMetricsManager
_deep_sample_rate = 1;
}

if (window_config->config_exists("_internal_tap_name")) {
_tap_name = window_config->config_get<std::string>("_internal_tap_name");
}

if (window_config->config_exists("num_periods")) {
_num_periods = window_config->config_get<uint64_t>("num_periods");
}
Expand Down Expand Up @@ -504,6 +508,10 @@ class AbstractMetricsManager
throw PeriodException(err.str());
}

if (!_tap_name.empty() && add_labels.find("tap") == add_labels.end()) {
add_labels["tap"] = _tap_name;
}

_metric_buckets.at(period)->to_prometheus(out, add_labels);
}

Expand Down
4 changes: 4 additions & 0 deletions src/CoreServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,10 @@ void CoreServer::_setup_routes(const PrometheusConfig &prom_config)
mod->info_json(j[mod->name()]);
}
res.set_content(j.dump(), "text/json");
} catch (const std::invalid_argument &e) {
res.status = 422;
j["error"] = e.what();
res.set_content(j.dump(), "text/json");
} catch (const std::exception &e) {
res.status = 500;
j["error"] = e.what();
Expand Down
630 changes: 362 additions & 268 deletions src/Policies.cpp

Large diffs are not rendered by default.

38 changes: 28 additions & 10 deletions src/Policies.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,15 @@ class Policy : public AbstractRunnableModule
{
static constexpr size_t HANDLERS_SEQUENCE_SIZE = 1;

Tap *_tap;
std::vector<Tap *> _taps;
std::vector<InputStream *> _input_streams;
bool _modules_sequence;
InputStream *_input_stream;
std::vector<AbstractRunnableModule *> _modules;

public:
Policy(const std::string &name, Tap *tap, bool modules_sequence)
Policy(const std::string &name)
: AbstractRunnableModule(name)
, _tap(tap)
, _modules_sequence(modules_sequence)
, _input_stream(nullptr)
, _modules_sequence(false)
{
}

Expand All @@ -50,14 +48,24 @@ class Policy : public AbstractRunnableModule
return "policy";
}

void set_input_stream(InputStream *input_stream)
void set_modules_sequence(bool sequence)
{
_input_stream = input_stream;
_modules_sequence = sequence;
}

const InputStream *input_stream() const
void add_tap(Tap *tap)
{
return _input_stream;
_taps.push_back(tap);
}

void add_input_stream(InputStream *input_stream)
{
_input_streams.push_back(input_stream);
}

const std::vector<InputStream *> &input_stream() const
{
return _input_streams;
}

void add_module(AbstractRunnableModule *m)
Expand Down Expand Up @@ -99,6 +107,16 @@ class PolicyManager : public AbstractManager<Policy>
uint32_t _default_deep_sample_rate{100};
std::map<std::string, std::unique_ptr<Configurable>> _global_handler_config;

struct HandlerData {
std::string name;
std::string type;
Config config;
Config filter;
};

void _validate_policy(const YAML::Node &policy_yaml, const std::string &policy_name, Policy *policy_ptr, Tap *tap = nullptr);
HandlerData _validate_handler(const YAML::const_iterator &hander_iterator, const std::string &policy_name, Config &window_config, bool sequence);

public:
PolicyManager(CoreRegistry *registry)
: _registry(registry)
Expand Down
61 changes: 56 additions & 5 deletions src/Taps.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,22 @@ void TapManager::load(const YAML::Node &tap_yaml, bool strict)

for (YAML::const_iterator it = tap_yaml.begin(); it != tap_yaml.end(); ++it) {
if (!it->first.IsScalar()) {
throw ConfigException("expecting tap identifier");
throw TapException("expecting tap identifier");
}
auto tap_name = it->first.as<std::string>();
spdlog::get("visor")->info("tap [{}]: parsing", tap_name);
if (!it->second.IsMap()) {
throw ConfigException("expecting tap configuration map");
throw TapException("expecting tap configuration map");
}
if (!it->second["input_type"] || !it->second["input_type"].IsScalar()) {
throw ConfigException("missing or invalid tap type key 'input_type'");
throw TapException("missing or invalid tap type key 'input_type'");
}
auto input_type = it->second["input_type"].as<std::string>();

auto input_plugin = _registry->input_plugins().find(input_type);
if (input_plugin == _registry->input_plugins().end()) {
if (strict) {
throw ConfigException(fmt::format("Tap '{}' requires input stream type '{}' which is not available", tap_name, input_type));
throw TapException(fmt::format("Tap '{}' requires input stream type '{}' which is not available", tap_name, input_type));
} else {
spdlog::get("visor")->warn("Tap '{}' requires input stream type '{}' which is not available; skipping", tap_name, input_type);
continue;
Expand All @@ -46,11 +46,18 @@ void TapManager::load(const YAML::Node &tap_yaml, bool strict)

if (it->second["config"]) {
if (!it->second["config"].IsMap()) {
throw ConfigException("tap configuration is not a map");
throw TapException("tap configuration is not a map");
}
tap_module->config_set_yaml(it->second["config"]);
}

if (it->second["tags"]) {
if (!it->second["tags"].IsMap()) {
throw TapException("tap tags is not a map");
}
tap_module->tags_set_yaml(it->second["tags"]);
}

// will throw if it already exists. nothing else to clean up
module_add(std::move(tap_module));

Expand All @@ -76,4 +83,48 @@ std::unique_ptr<InputStream> Tap::instantiate(const Configurable *config, const
return module;
}

bool Tap::tags_match_selector_yaml(const YAML::Node &tag_yaml, bool all)
{
bool any_match = false;
for (YAML::const_iterator it = tag_yaml.begin(); it != tag_yaml.end(); ++it) {
if (!it->second.IsScalar()) {
throw TapException(fmt::format("tag key '{}' must have scalar value", it->first));
}

auto key = it->first.as<std::string>();
if (!_tags->config_exists(key)) {
if (all) {
return false;
} else {
continue;
}
}

// the yaml library doesn't discriminate between scalar types, so we have to do that ourselves
auto value = it->second.as<std::string>();
if (std::regex_match(value, std::regex("[0-9]+"))) {
if (_tags->config_get<uint64_t>(key) == it->second.as<uint64_t>()) {
any_match = true;
} else if (all) {
return false;
}

} else if (std::regex_match(value, std::regex("true|false", std::regex_constants::icase))) {
if (_tags->config_get<bool>(key) == it->second.as<bool>()) {
any_match = true;
} else if (all) {
return false;
}
} else {
if (_tags->config_get<std::string>(key) == value) {
any_match = true;
} else if (all) {
return false;
}
}
}

return any_match;
}

}
23 changes: 23 additions & 0 deletions src/Taps.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,30 @@ namespace visor {
class InputStream;
class Policy;

class TapException : public std::runtime_error
{
public:
TapException(const char *msg)
: std::runtime_error(msg)
{
}
TapException(const std::string &msg)
: std::runtime_error(msg)
{
}
};

class Tap : public AbstractModule
{

InputModulePlugin *_input_plugin;
std::unique_ptr<Configurable> _tags;

public:
Tap(const std::string &name, InputModulePlugin *input_plugin)
: AbstractModule(name)
, _input_plugin(input_plugin)
, _tags(std::make_unique<Configurable>())
{
assert(input_plugin);
}
Expand All @@ -32,6 +47,8 @@ class Tap : public AbstractModule

std::unique_ptr<InputStream> instantiate(const Configurable *config, const Configurable *filter, std::string input_name);

bool tags_match_selector_yaml(const YAML::Node &tag_yaml, bool all);

const InputModulePlugin *input_plugin() const
{
return _input_plugin;
Expand All @@ -42,6 +59,12 @@ class Tap : public AbstractModule
j["input_type"] = _input_plugin->plugin();
j["interface"] = _input_plugin->pluginInterface();
config_json(j["config"]);
_tags->config_json(j["tags"]);
}

void tags_set_yaml(const YAML::Node &tag_yaml)
{
_tags->config_set_yaml(tag_yaml);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ TEST_CASE("Check resources for pcap input", "[pcap][resources]")
resources_handler.start();
stream.start();
//add and remove policy
auto policy = std::make_unique<visor::Policy>("policy-test", nullptr, false);
auto policy = std::make_unique<visor::Policy>("policy-test");
stream.add_policy(policy.get());
stream.remove_policy(policy.get());
resources_handler.stop();
Expand Down
Loading

0 comments on commit f2e5d8f

Please sign in to comment.