Skip to content

Commit

Permalink
factory context simplification: update code in contrib to access serv…
Browse files Browse the repository at this point in the history
…er-wide resource only by server context (envoyproxy#30867)

* factory context simplification: update code in contrib to access server-wide resource only by server context

Signed-off-by: wbpcode <wbphub@live.com>

* fix test

Signed-off-by: wbpcode <wbphub@live.com>

---------

Signed-off-by: wbpcode <wbphub@live.com>
  • Loading branch information
code authored Nov 16, 2023
1 parent 699d454 commit 2b7aaf0
Show file tree
Hide file tree
Showing 25 changed files with 104 additions and 71 deletions.
7 changes: 5 additions & 2 deletions contrib/client_ssl_auth/filters/network/source/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ Network::FilterFactoryCb ClientSslAuthConfigFactory::createFilterFactoryFromProt
ASSERT(!proto_config.auth_api_cluster().empty());
ASSERT(!proto_config.stat_prefix().empty());

auto& server_context = context.getServerFactoryContext();

ClientSslAuthConfigSharedPtr filter_config(ClientSslAuthConfig::create(
proto_config, context.threadLocal(), context.clusterManager(), context.mainThreadDispatcher(),
context.scope(), context.api().randomGenerator()));
proto_config, server_context.threadLocal(), server_context.clusterManager(),
server_context.mainThreadDispatcher(), context.scope(),
server_context.api().randomGenerator()));
return [filter_config](Network::FilterManager& filter_manager) -> void {
filter_manager.addReadFilter(std::make_shared<ClientSslAuthFilter>(filter_config));
};
Expand Down
12 changes: 6 additions & 6 deletions contrib/client_ssl_auth/filters/network/test/config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ auth_api_cluster: fake_cluster
envoy::extensions::filters::network::client_ssl_auth::v3::ClientSSLAuth proto_config;
TestUtility::loadFromYamlAndValidate(yaml, proto_config);
NiceMock<Server::Configuration::MockFactoryContext> context;
context.cluster_manager_.initializeClusters({"fake_cluster"}, {});
context.cluster_manager_.initializeThreadLocalClusters({"fake_cluster"});
context.server_factory_context_.cluster_manager_.initializeClusters({"fake_cluster"}, {});
context.server_factory_context_.cluster_manager_.initializeThreadLocalClusters({"fake_cluster"});
ClientSslAuthConfigFactory factory;
Network::FilterFactoryCb cb = factory.createFilterFactoryFromProto(proto_config, context);
Network::MockConnection connection;
Expand All @@ -62,8 +62,8 @@ auth_api_cluster: fake_cluster
envoy::extensions::filters::network::client_ssl_auth::v3::ClientSSLAuth proto_config;
TestUtility::loadFromYamlAndValidate(yaml, proto_config);
NiceMock<Server::Configuration::MockFactoryContext> context;
context.cluster_manager_.initializeClusters({"fake_cluster"}, {});
context.cluster_manager_.initializeThreadLocalClusters({"fake_cluster"});
context.server_factory_context_.cluster_manager_.initializeClusters({"fake_cluster"}, {});
context.server_factory_context_.cluster_manager_.initializeThreadLocalClusters({"fake_cluster"});
ClientSslAuthConfigFactory factory;
Network::FilterFactoryCb cb = factory.createFilterFactoryFromProto(proto_config, context);
Network::MockConnection connection;
Expand All @@ -79,8 +79,8 @@ auth_api_cluster: fake_cluster
)EOF" + GetParam();

NiceMock<Server::Configuration::MockFactoryContext> context;
context.cluster_manager_.initializeClusters({"fake_cluster"}, {});
context.cluster_manager_.initializeThreadLocalClusters({"fake_cluster"});
context.server_factory_context_.cluster_manager_.initializeClusters({"fake_cluster"}, {});
context.server_factory_context_.cluster_manager_.initializeThreadLocalClusters({"fake_cluster"});
ClientSslAuthConfigFactory factory;
envoy::extensions::filters::network::client_ssl_auth::v3::ClientSSLAuth proto_config =
*dynamic_cast<envoy::extensions::filters::network::client_ssl_auth::v3::ClientSSLAuth*>(
Expand Down
2 changes: 1 addition & 1 deletion contrib/dlb/source/connection_balancer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ DlbConnectionBalanceFactory::createConnectionBalancerFromProto(

fallback_policy = dlb_config.fallback_policy();

const uint32_t worker_num = context.options().concurrency();
const uint32_t worker_num = context.getServerFactoryContext().options().concurrency();

if (worker_num > 32) {
return fallback("Dlb connection balanncer only supports up to 32 worker threads, "
Expand Down
2 changes: 1 addition & 1 deletion contrib/dlb/source/connection_balancer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class DlbConnectionBalanceFactory : public Envoy::Network::ConnectionBalanceFact

Envoy::Network::ConnectionBalancerSharedPtr
createConnectionBalancerFromProto(const Protobuf::Message& config,
Server::Configuration::FactoryContext& context) override;
Server::Configuration::FactoryContext&) override;

std::string name() const override { return "envoy.network.connection_balance.dlb"; }

Expand Down
2 changes: 1 addition & 1 deletion contrib/dlb/test/config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ TEST_F(DlbConnectionBalanceFactoryTest, TooManyThreads) {
envoy::config::core::v3::TypedExtensionConfig typed_config;
DlbConnectionBalanceFactory factory;
NiceMock<Server::Configuration::MockFactoryContext> context;
context.options_.concurrency_ = 33;
context.server_factory_context_.options_.concurrency_ = 33;

envoy::extensions::network::connection_balance::dlb::v3alpha::Dlb dlb;
makeDlbConnectionBalanceConfig(typed_config, dlb);
Expand Down
5 changes: 3 additions & 2 deletions contrib/dynamo/filters/http/source/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ Http::FilterFactoryCb DynamoFilterConfig::createFilterFactoryFromProtoTyped(
Server::Configuration::FactoryContext& context) {
auto stats = std::make_shared<DynamoStats>(context.scope(), stats_prefix);
return [&context, stats](Http::FilterChainFactoryCallbacks& callbacks) -> void {
callbacks.addStreamFilter(std::make_shared<Dynamo::DynamoFilter>(
context.runtime(), stats, context.mainThreadDispatcher().timeSource()));
callbacks.addStreamFilter(
std::make_shared<Dynamo::DynamoFilter>(context.getServerFactoryContext().runtime(), stats,
context.getServerFactoryContext().timeSource()));
};
}

Expand Down
12 changes: 7 additions & 5 deletions contrib/generic_proxy/filters/network/source/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ Factory::routeConfigProviderFromProto(const ProxyConfig& config,
std::vector<NamedFilterFactoryCb> Factory::filtersFactoryFromProto(
const ProtobufWkt::RepeatedPtrField<envoy::config::core::v3::TypedExtensionConfig>& filters,
const std::string stats_prefix, Envoy::Server::Configuration::FactoryContext& context) {

std::vector<NamedFilterFactoryCb> factories;
bool has_terminal_filter = false;
std::string terminal_filter_name;
Expand Down Expand Up @@ -87,11 +86,14 @@ std::vector<NamedFilterFactoryCb> Factory::filtersFactoryFromProto(
Envoy::Network::FilterFactoryCb
Factory::createFilterFactoryFromProtoTyped(const ProxyConfig& proto_config,
Envoy::Server::Configuration::FactoryContext& context) {
auto& server_context = context.getServerFactoryContext();

std::shared_ptr<RouteConfigProviderManager> route_config_provider_manager =
context.singletonManager().getTyped<RouteConfigProviderManager>(
server_context.singletonManager().getTyped<RouteConfigProviderManager>(
SINGLETON_MANAGER_REGISTERED_NAME(generic_route_config_provider_manager),
[&context] { return std::make_shared<RouteConfigProviderManagerImpl>(context.admin()); });
[&server_context] {
return std::make_shared<RouteConfigProviderManagerImpl>(server_context.admin());
});

auto tracer_manager = Tracing::TracerManagerImpl::singleton(context);

Expand Down Expand Up @@ -122,7 +124,7 @@ Factory::createFilterFactoryFromProtoTyped(const ProxyConfig& proto_config,
filtersFactoryFromProto(proto_config.filters(), proto_config.stat_prefix(), context),
std::move(tracer), std::move(tracing_config), std::move(access_logs), context);

return [route_config_provider_manager, tracer_manager, config, &context,
return [route_config_provider_manager, tracer_manager, config, &server_context,
custom_proxy_factory](Envoy::Network::FilterManager& filter_manager) -> void {
// Create filter by the custom filter factory if the custom filter factory is not null.
if (custom_proxy_factory != nullptr) {
Expand All @@ -131,7 +133,7 @@ Factory::createFilterFactoryFromProtoTyped(const ProxyConfig& proto_config,
}

filter_manager.addReadFilter(std::make_shared<Filter>(
config, context.mainThreadDispatcher().timeSource(), context.runtime()));
config, server_context.mainThreadDispatcher().timeSource(), server_context.runtime()));
};
}

Expand Down
3 changes: 2 additions & 1 deletion contrib/generic_proxy/filters/network/source/proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ class FilterConfigImpl : public FilterConfig {
codec_factory_(std::move(codec)), route_config_provider_(std::move(route_config_provider)),
factories_(std::move(factories)), drain_decision_(context.drainDecision()),
tracer_(std::move(tracer)), tracing_config_(std::move(tracing_config)),
access_logs_(std::move(access_logs)), time_source_(context.timeSource()) {}
access_logs_(std::move(access_logs)),
time_source_(context.getServerFactoryContext().timeSource()) {}

// FilterConfig
RouteEntryConstSharedPtr routeEntry(const Request& request) const override {
Expand Down
5 changes: 2 additions & 3 deletions contrib/generic_proxy/filters/network/test/config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,8 @@ version_info: "1"
factory_context.server_factory_context_.cluster_manager_.subscription_factory_.callbacks_
->onConfigUpdate(decoded_resources.refvec_, response.version_info())
.ok());
auto message_ptr =
factory_context.admin_.config_tracker_.config_tracker_callbacks_["genericrds_routes"](
universal_name_matcher);
auto message_ptr = factory_context.server_factory_context_.admin_.config_tracker_
.config_tracker_callbacks_["genericrds_routes"](universal_name_matcher);
const auto& dump =
TestUtility::downcastAndValidate<const envoy::admin::v3::RoutesConfigDump&>(*message_ptr);
EXPECT_EQ(1, dump.dynamic_route_configs().size());
Expand Down
2 changes: 1 addition & 1 deletion contrib/golang/filters/network/source/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Network::FilterFactoryCb GolangConfigFactory::createFilterFactoryFromProtoTyped(
Server::Configuration::FactoryContext& context) {
is_terminal_filter_ = proto_config.is_terminal_filter();

UpstreamConn::initThreadLocalStorage(context, context.threadLocal());
UpstreamConn::initThreadLocalStorage(context, context.getServerFactoryContext().threadLocal());

FilterConfigSharedPtr config = std::make_shared<FilterConfig>(proto_config);
std::string config_str;
Expand Down
4 changes: 3 additions & 1 deletion contrib/golang/filters/network/source/golang.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ class Filter : public Network::Filter,
void close(Network::ConnectionCloseType close_type);

Event::Dispatcher* dispatcher() { return dispatcher_; }
Upstream::ClusterManager& clusterManager() { return context_.clusterManager(); }
Upstream::ClusterManager& clusterManager() {
return context_.getServerFactoryContext().clusterManager();
}

std::string getLocalAddrStr() const { return local_addr_; }
std::string getRemoteAddrStr() const { return addr_; };
Expand Down
9 changes: 6 additions & 3 deletions contrib/golang/filters/network/source/upstream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,19 @@ void UpstreamConn::initThreadLocalStorage(Server::Configuration::FactoryContext&
std::call_once(store.init_once_, [&context, &tls, &store]() {
// should be the singleton for use by the entire server.
ClusterManagerContainer& cluster_manager_container = clusterManagerContainer();
cluster_manager_container.cluster_manager_ = &context.clusterManager();
cluster_manager_container.cluster_manager_ =
&context.getServerFactoryContext().clusterManager();

SlotPtrContainer& slot_ptr_container = slotPtrContainer();
slot_ptr_container.slot_ = tls.allocateSlot();

Thread::ThreadId main_thread_id = context.api().threadFactory().currentThreadId();
Thread::ThreadId main_thread_id =
context.getServerFactoryContext().api().threadFactory().currentThreadId();
slot_ptr_container.slot_->set(
[&context, main_thread_id,
&store](Event::Dispatcher& dispatcher) -> ThreadLocal::ThreadLocalObjectSharedPtr {
if (context.api().threadFactory().currentThreadId() == main_thread_id) {
if (context.getServerFactoryContext().api().threadFactory().currentThreadId() ==
main_thread_id) {
return nullptr;
}

Expand Down
6 changes: 4 additions & 2 deletions contrib/golang/filters/network/test/config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ class GolangFilterConfigTestBase {
void testConfig(envoy::extensions::filters::network::golang::v3alpha::Config& config) {
EXPECT_CALL(slot_allocator_, allocateSlot())
.WillRepeatedly(Invoke(&slot_allocator_, &ThreadLocal::MockInstance::allocateSlotMock));
ON_CALL(context_, threadLocal()).WillByDefault(ReturnRef(slot_allocator_));
ON_CALL(context_.api_, threadFactory()).WillByDefault(ReturnRef(thread_factory_));
ON_CALL(context_.server_factory_context_, threadLocal())
.WillByDefault(ReturnRef(slot_allocator_));
ON_CALL(context_.server_factory_context_.api_, threadFactory())
.WillByDefault(ReturnRef(thread_factory_));

Network::FilterFactoryCb cb;
EXPECT_NO_THROW({ cb = factory_.createFilterFactoryFromProto(config, context_); });
Expand Down
29 changes: 18 additions & 11 deletions contrib/golang/filters/network/test/upstream_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ class UpstreamConnTest : public testing::Test {
void initialize() {
EXPECT_CALL(slot_allocator_, allocateSlot())
.WillRepeatedly(Invoke(&slot_allocator_, &ThreadLocal::MockInstance::allocateSlotMock));
context_.cluster_manager_.initializeClusters({"plainText"}, {});
context_.cluster_manager_.initializeThreadLocalClusters({"plainText"});
ON_CALL(context_.api_, threadFactory()).WillByDefault(ReturnRef(thread_factory_));
context_.server_factory_context_.cluster_manager_.initializeClusters({"plainText"}, {});
context_.server_factory_context_.cluster_manager_.initializeThreadLocalClusters({"plainText"});
ON_CALL(context_.server_factory_context_.api_, threadFactory())
.WillByDefault(ReturnRef(thread_factory_));
UpstreamConn::initThreadLocalStorage(context_, slot_allocator_);
dso_ = std::make_shared<Dso::MockNetworkFilterDsoImpl>();
upConn_ = std::make_shared<UpstreamConn>(addr_, dso_, 0, &dispatcher_);
Expand All @@ -64,23 +65,29 @@ TEST_F(UpstreamConnTest, ConnectUpstream) {
"envoy.network.transport_socket.original_dst_address");
EXPECT_EQ(dst_addr->address()->asString(), addr_);

EXPECT_CALL(context_.cluster_manager_.thread_local_cluster_.tcp_conn_pool_, newConnection(_))
EXPECT_CALL(
context_.server_factory_context_.cluster_manager_.thread_local_cluster_.tcp_conn_pool_,
newConnection(_))
.WillOnce(
Invoke([&](Tcp::ConnectionPool::Callbacks& cb) -> Tcp::ConnectionPool::Cancellable* {
context_.cluster_manager_.thread_local_cluster_.tcp_conn_pool_.newConnectionImpl(cb);
context_.cluster_manager_.thread_local_cluster_.tcp_conn_pool_.poolReady(
upstream_connection_);
context_.server_factory_context_.cluster_manager_.thread_local_cluster_.tcp_conn_pool_
.newConnectionImpl(cb);
context_.server_factory_context_.cluster_manager_.thread_local_cluster_.tcp_conn_pool_
.poolReady(upstream_connection_);
return nullptr;
}));
EXPECT_CALL(*dso_.get(), envoyGoFilterOnUpstreamConnectionReady(_, _));
upConn_->connect();

EXPECT_CALL(context_.cluster_manager_.thread_local_cluster_.tcp_conn_pool_, newConnection(_))
EXPECT_CALL(
context_.server_factory_context_.cluster_manager_.thread_local_cluster_.tcp_conn_pool_,
newConnection(_))
.WillOnce(
Invoke([&](Tcp::ConnectionPool::Callbacks& cb) -> Tcp::ConnectionPool::Cancellable* {
context_.cluster_manager_.thread_local_cluster_.tcp_conn_pool_.newConnectionImpl(cb);
context_.cluster_manager_.thread_local_cluster_.tcp_conn_pool_.poolFailure(
ConnectionPool::PoolFailureReason::RemoteConnectionFailure, true);
context_.server_factory_context_.cluster_manager_.thread_local_cluster_.tcp_conn_pool_
.newConnectionImpl(cb);
context_.server_factory_context_.cluster_manager_.thread_local_cluster_.tcp_conn_pool_
.poolFailure(ConnectionPool::PoolFailureReason::RemoteConnectionFailure, true);
return nullptr;
}));
EXPECT_CALL(*dso_.get(),
Expand Down
4 changes: 2 additions & 2 deletions contrib/kafka/filters/network/source/broker/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ Network::FilterFactoryCb KafkaConfigFactory::createFilterFactoryFromProtoTyped(
const BrokerFilterConfigSharedPtr filter_config =
std::make_shared<BrokerFilterConfig>(proto_config);
return [&context, filter_config](Network::FilterManager& filter_manager) -> void {
Network::FilterSharedPtr filter =
std::make_shared<KafkaBrokerFilter>(context.scope(), context.timeSource(), *filter_config);
Network::FilterSharedPtr filter = std::make_shared<KafkaBrokerFilter>(
context.scope(), context.getServerFactoryContext().timeSource(), *filter_config);
filter_manager.addFilter(filter);
};
}
Expand Down
9 changes: 6 additions & 3 deletions contrib/kafka/filters/network/source/mesh/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,18 @@ Network::FilterFactoryCb KafkaMeshConfigFactory::createFilterFactoryFromProtoTyp
const UpstreamKafkaConfigurationSharedPtr configuration =
std::make_shared<UpstreamKafkaConfigurationImpl>(config);

auto& server_context = context.getServerFactoryContext();

// Shared upstream facade (connects us to upstream Kafka clusters).
const UpstreamKafkaFacadeSharedPtr upstream_kafka_facade =
std::make_shared<UpstreamKafkaFacadeImpl>(*configuration, context.threadLocal(),
context.api().threadFactory());
std::make_shared<UpstreamKafkaFacadeImpl>(*configuration, server_context.threadLocal(),
server_context.api().threadFactory());

// Manager for consumers shared across downstream connections
// (connects us to upstream Kafka clusters).
const RecordCallbackProcessorSharedPtr shared_consumer_manager =
std::make_shared<SharedConsumerManagerImpl>(*configuration, context.api().threadFactory());
std::make_shared<SharedConsumerManagerImpl>(*configuration,
server_context.api().threadFactory());

return [configuration, upstream_kafka_facade,
shared_consumer_manager](Network::FilterManager& filter_manager) -> void {
Expand Down
3 changes: 2 additions & 1 deletion contrib/kafka/filters/network/test/mesh/config_unit_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ advertised_port: 19092

testing::NiceMock<Server::Configuration::MockFactoryContext> context;
testing::NiceMock<MockThreadFactory> thread_factory;
ON_CALL(context.api_, threadFactory()).WillByDefault(ReturnRef(thread_factory));
ON_CALL(context.server_factory_context_.api_, threadFactory())
.WillByDefault(ReturnRef(thread_factory));
KafkaMeshConfigFactory factory;

Network::FilterFactoryCb cb = factory.createFilterFactoryFromProto(proto_config, context);
Expand Down
2 changes: 1 addition & 1 deletion contrib/rocketmq_proxy/filters/network/source/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Network::FilterFactoryCb RocketmqProxyFilterConfigFactory::createFilterFactoryFr
std::shared_ptr<ConfigImpl> filter_config = std::make_shared<ConfigImpl>(proto_config, context);
return [filter_config, &context](Network::FilterManager& filter_manager) -> void {
filter_manager.addReadFilter(std::make_shared<ConnectionManager>(
*filter_config, context.mainThreadDispatcher().timeSource()));
*filter_config, context.getServerFactoryContext().mainThreadDispatcher().timeSource()));
};
}

Expand Down
7 changes: 4 additions & 3 deletions contrib/sip_proxy/filters/network/source/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ Network::FilterFactoryCb SipProxyFilterConfigFactory::createFilterFactoryFromPro
Stats::ScopeSharedPtr stats_scope =
context.scope().createScope(fmt::format("cluster.{}.sip_cluster", cluster));
auto transaction_info_ptr = std::make_shared<Router::TransactionInfo>(
cluster, context.threadLocal(),
cluster, context.getServerFactoryContext().threadLocal(),
static_cast<std::chrono::milliseconds>(
PROTOBUF_GET_MS_OR_DEFAULT(proto_config.settings(), transaction_timeout, 32000)));
transaction_info_ptr->init();
Expand All @@ -64,8 +64,9 @@ Network::FilterFactoryCb SipProxyFilterConfigFactory::createFilterFactoryFromPro
return
[filter_config, &context, transaction_infos](Network::FilterManager& filter_manager) -> void {
filter_manager.addReadFilter(std::make_shared<ConnectionManager>(
*filter_config, context.api().randomGenerator(),
context.mainThreadDispatcher().timeSource(), context, transaction_infos));
*filter_config, context.getServerFactoryContext().api().randomGenerator(),
context.getServerFactoryContext().mainThreadDispatcher().timeSource(), context,
transaction_infos));
};
}

Expand Down
9 changes: 5 additions & 4 deletions contrib/sip_proxy/filters/network/source/router/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ SipFilters::FilterFactoryCb RouterFilterConfigFactory::createFilterFactoryFromPr
std::shared_ptr<RouterFilterConfig> config(
new RouterFilterConfigImpl(proto_config, stat_prefix, context));

return [config, &context,
stat_prefix](SipFilters::FilterChainFactoryCallbacks& callbacks) -> void {
callbacks.addDecoderFilter(std::make_shared<Router>(config, context.clusterManager(), context));
};
return
[config, &context, stat_prefix](SipFilters::FilterChainFactoryCallbacks& callbacks) -> void {
callbacks.addDecoderFilter(std::make_shared<Router>(
config, context.getServerFactoryContext().clusterManager(), context));
};
}

/**
Expand Down
Loading

0 comments on commit 2b7aaf0

Please sign in to comment.