Skip to content

Commit

Permalink
Merge pull request #197 from ns1labs/feature/handler-count-consumers
Browse files Browse the repository at this point in the history
support consumer counter in StreamHandlers
  • Loading branch information
leoparente authored Feb 2, 2022
2 parents 1498e3a + 13445c2 commit c5a22c3
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 0 deletions.
14 changes: 14 additions & 0 deletions src/HandlerManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,20 @@ class HandlerManager : public AbstractManager<StreamHandler>
virtual ~HandlerManager()
{
}

// override to atomically ensure we don't remove if there are active consumers
void module_remove(const std::string &name) override
{
std::unique_lock lock(_map_mutex);
if (_map.count(name) == 0) {
throw std::runtime_error("module name does not exist");
}
if (_map[name]->consumer_count()) {
throw std::runtime_error("unable to remove, handler has consumers");
}
_map[name]->stop();
_map.erase(name);
}
};

}
Expand Down
1 change: 1 addition & 0 deletions src/StreamHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class StreamHandler : public AbstractRunnableModule

virtual ~StreamHandler(){};

virtual size_t consumer_count() const = 0;
virtual void window_json(json &j, uint64_t period, bool merged) = 0;
virtual void window_prometheus(std::stringstream &out, Metric::LabelMap add_labels = {}) = 0;
};
Expand Down
6 changes: 6 additions & 0 deletions src/handlers/dhcp/DhcpStreamHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ class DhcpStreamHandler final : public visor::StreamMetricsHandler<DhcpMetricsMa
{
return "dhcp";
}

size_t consumer_count() const override
{
return 0;
}

void start() override;
void stop() override;
};
Expand Down
6 changes: 6 additions & 0 deletions src/handlers/dns/DnsStreamHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,12 @@ class DnsStreamHandler final : public visor::StreamMetricsHandler<DnsMetricsMana
{
return "dns";
}

size_t consumer_count() const override
{
return udp_signal.slot_count();
}

void start() override;
void stop() override;
void info_json(json &j) const override;
Expand Down
6 changes: 6 additions & 0 deletions src/handlers/mock/MockStreamHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ class MockStreamHandler final : public visor::StreamMetricsHandler<MockMetricsMa
{
return "mock";
}

size_t consumer_count() const override
{
return 0;
}

void start() override;
void stop() override;
};
Expand Down
6 changes: 6 additions & 0 deletions src/handlers/net/NetStreamHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ class NetStreamHandler final : public visor::StreamMetricsHandler<NetworkMetrics
{
return "packets";
}

size_t consumer_count() const override
{
return 0;
}

void start() override;
void stop() override;
};
Expand Down
6 changes: 6 additions & 0 deletions src/handlers/pcap/PcapStreamHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ class PcapStreamHandler final : public visor::StreamMetricsHandler<PcapMetricsMa
{
return "pcap";
}

size_t consumer_count() const override
{
return 0;
}

void start() override;
void stop() override;
};
Expand Down
33 changes: 33 additions & 0 deletions src/tests/test_policies.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,39 @@ TEST_CASE("Policies", "[policies]")
REQUIRE_NOTHROW(registry.policy_manager()->remove_policy("default_view"));
}

SECTION("Good Config, test remove sequence handler policy and add again")
{
CoreRegistry registry;
registry.start(nullptr);
YAML::Node config_file = YAML::Load(policies_config_hseq);

CHECK(config_file["visor"]["policies"]);
CHECK(config_file["visor"]["policies"].IsMap());

REQUIRE_NOTHROW(registry.tap_manager()->load(config_file["visor"]["taps"], true));
REQUIRE_NOTHROW(registry.policy_manager()->load(config_file["visor"]["policies"]));

REQUIRE(registry.policy_manager()->module_exists("default_view"));
auto [policy, lock] = registry.policy_manager()->module_get_locked("default_view");
CHECK(policy->name() == "default_view");
CHECK(policy->input_stream()->running());
CHECK(policy->modules()[0]->running());
CHECK(policy->modules()[1]->running());
lock.unlock();

REQUIRE_NOTHROW(registry.policy_manager()->remove_policy("default_view"));

REQUIRE_NOTHROW(registry.policy_manager()->load(config_file["visor"]["policies"]));
REQUIRE(registry.policy_manager()->module_exists("default_view"));
auto [new_policy, new_lock] = registry.policy_manager()->module_get_locked("default_view");
CHECK(new_policy->name() == "default_view");
CHECK(new_policy->input_stream()->running());
CHECK(new_policy->modules()[0]->running());
CHECK(new_policy->modules()[1]->running());
new_lock.unlock();
REQUIRE_NOTHROW(registry.policy_manager()->remove_policy("default_view"));
}

SECTION("Good Config, policies with same tap and input")
{
CoreRegistry registry;
Expand Down

0 comments on commit c5a22c3

Please sign in to comment.