From 13445c25cda02bca1dd73b947a1a348c32407758 Mon Sep 17 00:00:00 2001 From: Leonardo Parente Date: Mon, 31 Jan 2022 10:12:32 -0400 Subject: [PATCH] support consumer counter in StreamHandlers --- src/HandlerManager.h | 14 ++++++++++++ src/StreamHandler.h | 1 + src/handlers/dhcp/DhcpStreamHandler.h | 6 +++++ src/handlers/dns/DnsStreamHandler.h | 6 +++++ src/handlers/mock/MockStreamHandler.h | 6 +++++ src/handlers/net/NetStreamHandler.h | 6 +++++ src/handlers/pcap/PcapStreamHandler.h | 6 +++++ src/tests/test_policies.cpp | 33 +++++++++++++++++++++++++++ 8 files changed, 78 insertions(+) diff --git a/src/HandlerManager.h b/src/HandlerManager.h index 6bce028b0..007974fa2 100644 --- a/src/HandlerManager.h +++ b/src/HandlerManager.h @@ -24,6 +24,20 @@ class HandlerManager : public AbstractManager virtual ~HandlerManager() { } + + // override to atomically ensure we don't remove if there are active consumers + void module_remove(const std::string &name) override + { + std::unique_lock lock(_map_mutex); + if (_map.count(name) == 0) { + throw std::runtime_error("module name does not exist"); + } + if (_map[name]->consumer_count()) { + throw std::runtime_error("unable to remove, handler has consumers"); + } + _map[name]->stop(); + _map.erase(name); + } }; } diff --git a/src/StreamHandler.h b/src/StreamHandler.h index 992ea40b2..992f18789 100644 --- a/src/StreamHandler.h +++ b/src/StreamHandler.h @@ -33,6 +33,7 @@ class StreamHandler : public AbstractRunnableModule virtual ~StreamHandler(){}; + virtual size_t consumer_count() const = 0; virtual void window_json(json &j, uint64_t period, bool merged) = 0; virtual void window_prometheus(std::stringstream &out, Metric::LabelMap add_labels = {}) = 0; }; diff --git a/src/handlers/dhcp/DhcpStreamHandler.h b/src/handlers/dhcp/DhcpStreamHandler.h index 20c01da6a..960283d65 100644 --- a/src/handlers/dhcp/DhcpStreamHandler.h +++ b/src/handlers/dhcp/DhcpStreamHandler.h @@ -104,6 +104,12 @@ class DhcpStreamHandler final : public visor::StreamMetricsHandlerremove_policy("default_view")); } + SECTION("Good Config, test remove sequence handler policy and add again") + { + CoreRegistry registry; + registry.start(nullptr); + YAML::Node config_file = YAML::Load(policies_config_hseq); + + CHECK(config_file["visor"]["policies"]); + CHECK(config_file["visor"]["policies"].IsMap()); + + REQUIRE_NOTHROW(registry.tap_manager()->load(config_file["visor"]["taps"], true)); + REQUIRE_NOTHROW(registry.policy_manager()->load(config_file["visor"]["policies"])); + + REQUIRE(registry.policy_manager()->module_exists("default_view")); + auto [policy, lock] = registry.policy_manager()->module_get_locked("default_view"); + CHECK(policy->name() == "default_view"); + CHECK(policy->input_stream()->running()); + CHECK(policy->modules()[0]->running()); + CHECK(policy->modules()[1]->running()); + lock.unlock(); + + REQUIRE_NOTHROW(registry.policy_manager()->remove_policy("default_view")); + + REQUIRE_NOTHROW(registry.policy_manager()->load(config_file["visor"]["policies"])); + REQUIRE(registry.policy_manager()->module_exists("default_view")); + auto [new_policy, new_lock] = registry.policy_manager()->module_get_locked("default_view"); + CHECK(new_policy->name() == "default_view"); + CHECK(new_policy->input_stream()->running()); + CHECK(new_policy->modules()[0]->running()); + CHECK(new_policy->modules()[1]->running()); + new_lock.unlock(); + REQUIRE_NOTHROW(registry.policy_manager()->remove_policy("default_view")); + } + SECTION("Good Config, policies with same tap and input") { CoreRegistry registry;