Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement tap selector #330

Merged
merged 13 commits into from
Aug 1, 2022
6 changes: 6 additions & 0 deletions RFCs/2021-04-16-75-taps.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,19 @@ visor:
input_type: pcap
config:
iface: eth0
tags:
virtual: true
vhost: 1
# an sflow tap which listens on the given IP and port, referenced by the identifier "pop_switch"
pop_switch:
input_type: flow
config:
flow_type: sflow
port: 6343
bind: 192.168.1.1
tags:
virtual: false
vhost: 2
# a dnstap tap which gets its stream from the given socket, named "trex_tap"
trex_tap:
input_type: dnstap
Expand Down
8 changes: 6 additions & 2 deletions RFCs/2021-04-16-76-collection-policies.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,12 @@ visor:
description: "base chaning NET to DNS policy"
# input stream to create based on the given tap and optional filter config
input:
# this must reference a tap name, or application of the policy will fail
tap: anycast
# this must reference valid tags existent on applied taps, or application of the policy will fail
tap_selector:
# It can be either "any" or "all"
leoparente marked this conversation as resolved.
Show resolved Hide resolved
any:
virtual: true
vhost: 1
# this must match the input_type of the matching tap name, or application of the policy will fail
input_type: pcap
filter:
Expand Down
10 changes: 9 additions & 1 deletion src/AbstractMetricsManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ class AbstractMetricsManager
// this protects changes to the bucket container, _not_ changes to the bucket itself
mutable std::shared_mutex _bucket_mutex;
std::deque<std::unique_ptr<MetricsBucketClass>> _metric_buckets;

std::string _tap_name;
mutable std::shared_mutex _base_mutex;

/**
Expand Down Expand Up @@ -358,6 +358,10 @@ class AbstractMetricsManager
_deep_sample_rate = 1;
}

if (window_config->config_exists("_internal_tap_name")) {
_tap_name = window_config->config_get<std::string>("_internal_tap_name");
}

if (window_config->config_exists("num_periods")) {
_num_periods = window_config->config_get<uint64_t>("num_periods");
}
Expand Down Expand Up @@ -504,6 +508,10 @@ class AbstractMetricsManager
throw PeriodException(err.str());
}

if (!_tap_name.empty() && add_labels.find("tap") == add_labels.end()) {
add_labels["tap"] = _tap_name;
}

_metric_buckets.at(period)->to_prometheus(out, add_labels);
}

Expand Down
4 changes: 4 additions & 0 deletions src/CoreServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,10 @@ void CoreServer::_setup_routes(const PrometheusConfig &prom_config)
mod->info_json(j[mod->name()]);
}
res.set_content(j.dump(), "text/json");
} catch (const std::invalid_argument &e) {
res.status = 422;
j["error"] = e.what();
res.set_content(j.dump(), "text/json");
} catch (const std::exception &e) {
res.status = 500;
j["error"] = e.what();
Expand Down
630 changes: 362 additions & 268 deletions src/Policies.cpp

Large diffs are not rendered by default.

34 changes: 26 additions & 8 deletions src/Policies.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,15 @@ class Policy : public AbstractRunnableModule
{
static constexpr size_t HANDLERS_SEQUENCE_SIZE = 1;

Tap *_tap;
std::vector<Tap *> _tap;
leoparente marked this conversation as resolved.
Show resolved Hide resolved
std::vector<InputStream *> _input_stream;
bool _modules_sequence;
InputStream *_input_stream;
std::vector<AbstractRunnableModule *> _modules;

public:
Policy(const std::string &name, Tap *tap, bool modules_sequence)
Policy(const std::string &name)
: AbstractRunnableModule(name)
, _tap(tap)
, _modules_sequence(modules_sequence)
, _input_stream(nullptr)
, _modules_sequence(false)
{
}

Expand All @@ -50,12 +48,22 @@ class Policy : public AbstractRunnableModule
return "policy";
}

void set_modules_sequence(bool sequence)
{
_modules_sequence = sequence;
}

void set_tap(Tap *tap)
leoparente marked this conversation as resolved.
Show resolved Hide resolved
{
_tap.push_back(tap);
}

void set_input_stream(InputStream *input_stream)
{
_input_stream = input_stream;
_input_stream.push_back(input_stream);
leoparente marked this conversation as resolved.
Show resolved Hide resolved
}

const InputStream *input_stream() const
const std::vector<InputStream *> &input_stream()
leoparente marked this conversation as resolved.
Show resolved Hide resolved
{
return _input_stream;
}
Expand Down Expand Up @@ -99,6 +107,16 @@ class PolicyManager : public AbstractManager<Policy>
uint32_t _default_deep_sample_rate{100};
std::map<std::string, std::unique_ptr<Configurable>> _global_handler_config;

struct HandlerData {
std::string name;
std::string type;
Config config;
Config filter;
};

void _validate_policy(const YAML::Node &policy_yaml, const std::string &policy_name, Policy *policy_ptr, Tap *tap = nullptr);
HandlerData _validate_handler(const YAML::const_iterator &hander_iterator, const std::string &policy_name, Config &window_config, bool sequence);

public:
PolicyManager(CoreRegistry *registry)
: _registry(registry)
Expand Down
61 changes: 56 additions & 5 deletions src/Taps.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,22 @@ void TapManager::load(const YAML::Node &tap_yaml, bool strict)

for (YAML::const_iterator it = tap_yaml.begin(); it != tap_yaml.end(); ++it) {
if (!it->first.IsScalar()) {
throw ConfigException("expecting tap identifier");
throw TapException("expecting tap identifier");
}
auto tap_name = it->first.as<std::string>();
spdlog::get("visor")->info("tap [{}]: parsing", tap_name);
if (!it->second.IsMap()) {
throw ConfigException("expecting tap configuration map");
throw TapException("expecting tap configuration map");
}
if (!it->second["input_type"] || !it->second["input_type"].IsScalar()) {
throw ConfigException("missing or invalid tap type key 'input_type'");
throw TapException("missing or invalid tap type key 'input_type'");
}
auto input_type = it->second["input_type"].as<std::string>();

auto input_plugin = _registry->input_plugins().find(input_type);
if (input_plugin == _registry->input_plugins().end()) {
if (strict) {
throw ConfigException(fmt::format("Tap '{}' requires input stream type '{}' which is not available", tap_name, input_type));
throw TapException(fmt::format("Tap '{}' requires input stream type '{}' which is not available", tap_name, input_type));
} else {
spdlog::get("visor")->warn("Tap '{}' requires input stream type '{}' which is not available; skipping", tap_name, input_type);
continue;
Expand All @@ -46,11 +46,18 @@ void TapManager::load(const YAML::Node &tap_yaml, bool strict)

if (it->second["config"]) {
if (!it->second["config"].IsMap()) {
throw ConfigException("tap configuration is not a map");
throw TapException("tap configuration is not a map");
}
tap_module->config_set_yaml(it->second["config"]);
}

if (it->second["tags"]) {
if (!it->second["tags"].IsMap()) {
throw TapException("tap tags is not a map");
}
tap_module->tags_set_yaml(it->second["tags"]);
}

// will throw if it already exists. nothing else to clean up
module_add(std::move(tap_module));

Expand All @@ -76,4 +83,48 @@ std::unique_ptr<InputStream> Tap::instantiate(const Configurable *config, const
return module;
}

bool Tap::tags_match_selector_yaml(const YAML::Node &tag_yaml, bool all)
{
bool any_match = false;
for (YAML::const_iterator it = tag_yaml.begin(); it != tag_yaml.end(); ++it) {
if (!it->second.IsScalar()) {
throw TapException(fmt::format("tag key '{}' must have scalar value", it->first));
}

auto key = it->first.as<std::string>();
if (!_tags->config_exists(key)) {
if (all) {
return false;
} else {
continue;
}
}

// the yaml library doesn't discriminate between scalar types, so we have to do that ourselves
auto value = it->second.as<std::string>();
if (std::regex_match(value, std::regex("[0-9]+"))) {
if (_tags->config_get<uint64_t>(key) == it->second.as<uint64_t>()) {
any_match = true;
} else if (all) {
return false;
}

} else if (std::regex_match(value, std::regex("true|false", std::regex_constants::icase))) {
if (_tags->config_get<bool>(key) == it->second.as<bool>()) {
any_match = true;
} else if (all) {
return false;
}
} else {
if (_tags->config_get<std::string>(key) == value) {
any_match = true;
} else if (all) {
return false;
}
}
}

return any_match;
}

}
23 changes: 23 additions & 0 deletions src/Taps.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,30 @@ namespace visor {
class InputStream;
class Policy;

class TapException : public std::runtime_error
{
public:
TapException(const char *msg)
: std::runtime_error(msg)
{
}
TapException(const std::string &msg)
: std::runtime_error(msg)
{
}
};

class Tap : public AbstractModule
{

InputModulePlugin *_input_plugin;
std::unique_ptr<Configurable> _tags;

public:
Tap(const std::string &name, InputModulePlugin *input_plugin)
: AbstractModule(name)
, _input_plugin(input_plugin)
, _tags(std::make_unique<Configurable>())
{
assert(input_plugin);
}
Expand All @@ -32,6 +47,8 @@ class Tap : public AbstractModule

std::unique_ptr<InputStream> instantiate(const Configurable *config, const Configurable *filter, std::string input_name);

bool tags_match_selector_yaml(const YAML::Node &tag_yaml, bool all);

const InputModulePlugin *input_plugin() const
{
return _input_plugin;
Expand All @@ -42,6 +59,12 @@ class Tap : public AbstractModule
j["input_type"] = _input_plugin->plugin();
j["interface"] = _input_plugin->pluginInterface();
config_json(j["config"]);
_tags->config_json(j["tags"]);
}

void tags_set_yaml(const YAML::Node &tag_yaml)
{
_tags->config_set_yaml(tag_yaml);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ TEST_CASE("Check resources for pcap input", "[pcap][resources]")
resources_handler.start();
stream.start();
//add and remove policy
auto policy = std::make_unique<visor::Policy>("policy-test", nullptr, false);
auto policy = std::make_unique<visor::Policy>("policy-test");
stream.add_policy(policy.get());
stream.remove_policy(policy.get());
resources_handler.stop();
Expand Down
Loading