Skip to content

Commit

Permalink
wasm: clean up the code (envoyproxy#36556)
Browse files Browse the repository at this point in the history
Commit Message: wasm: clean up the code
Additional Description:

When I doing the envoyproxy#36456, I found
there are lots of redundant code in the wasm extensions. And the wasm
loading and creations are spread out in multiple different positions.
This redundancy and fragmentation make envoyproxy#36456 become more and more
complex.

Finally, I split the code clean up out as an independent PR. 

This PR doesn't change any logic but only merge duplicated logic.

Risk Level: n/a.
Testing: n/a.
Docs Changes: n/a.
Release Notes: n/a.

---------

Signed-off-by: wangbaiping <wangbaiping@bytedance.com>
  • Loading branch information
wbpcode authored Oct 17, 2024
1 parent 0b551a8 commit f63b61a
Show file tree
Hide file tree
Showing 23 changed files with 248 additions and 333 deletions.
31 changes: 4 additions & 27 deletions source/extensions/access_loggers/wasm/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ namespace Extensions {
namespace AccessLoggers {
namespace Wasm {

using Common::Wasm::PluginHandleSharedPtrThreadLocal;

AccessLog::InstanceSharedPtr
WasmAccessLogFactory::createAccessLogInstance(const Protobuf::Message& proto_config,
AccessLog::FilterPtr&& filter,
Expand All @@ -24,31 +22,10 @@ WasmAccessLogFactory::createAccessLogInstance(const Protobuf::Message& proto_con
const envoy::extensions::access_loggers::wasm::v3::WasmAccessLog&>(
proto_config, context.messageValidationVisitor());

auto plugin = std::make_shared<Common::Wasm::Plugin>(
config.config(), envoy::config::core::v3::TrafficDirection::UNSPECIFIED,
context.serverFactoryContext().localInfo(), nullptr /* listener_metadata */);

auto access_log = std::make_shared<WasmAccessLog>(plugin, nullptr, std::move(filter));

auto callback = [access_log, &context, plugin](Common::Wasm::WasmHandleSharedPtr base_wasm) {
// NB: the Slot set() call doesn't complete inline, so all arguments must outlive this call.
auto tls_slot = ThreadLocal::TypedSlot<PluginHandleSharedPtrThreadLocal>::makeUnique(
context.serverFactoryContext().threadLocal());
tls_slot->set([base_wasm, plugin](Event::Dispatcher& dispatcher) {
return std::make_shared<PluginHandleSharedPtrThreadLocal>(
Common::Wasm::getOrCreateThreadLocalPlugin(base_wasm, plugin, dispatcher));
});
access_log->setTlsSlot(std::move(tls_slot));
};

if (!Common::Wasm::createWasm(
plugin, context.scope().createScope(""), context.serverFactoryContext().clusterManager(),
context.initManager(), context.serverFactoryContext().mainThreadDispatcher(),
context.serverFactoryContext().api(), context.serverFactoryContext().lifecycleNotifier(),
remote_data_provider_, std::move(callback))) {
throw Common::Wasm::WasmException(
fmt::format("Unable to create Wasm access log {}", plugin->name_));
}
auto plugin_config = std::make_unique<Common::Wasm::PluginConfig>(
config.config(), context.serverFactoryContext(), context.scope(), context.initManager(),
envoy::config::core::v3::TrafficDirection::UNSPECIFIED, /*metadata=*/nullptr, false);
auto access_log = std::make_shared<WasmAccessLog>(std::move(plugin_config), std::move(filter));

context.serverFactoryContext().api().customStatNamespaces().registerStatNamespace(
Extensions::Common::Wasm::CustomStatNamespace);
Expand Down
1 change: 0 additions & 1 deletion source/extensions/access_loggers/wasm/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ class WasmAccessLogFactory : public AccessLog::AccessLogInstanceFactory,

private:
absl::flat_hash_map<std::string, std::string> convertJsonFormatToMap(ProtobufWkt::Struct config);
RemoteAsyncDataProviderPtr remote_data_provider_;
};

} // namespace Wasm
Expand Down
21 changes: 5 additions & 16 deletions source/extensions/access_loggers/wasm/wasm_access_log_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@ using Envoy::Extensions::Common::Wasm::PluginSharedPtr;

class WasmAccessLog : public AccessLog::Instance {
public:
WasmAccessLog(const PluginSharedPtr& plugin,
ThreadLocal::TypedSlotPtr<PluginHandleSharedPtrThreadLocal>&& tls_slot,
WasmAccessLog(Extensions::Common::Wasm::PluginConfigPtr plugin_config,
AccessLog::FilterPtr filter)
: plugin_(plugin), tls_slot_(std::move(tls_slot)), filter_(std::move(filter)) {}
: plugin_config_(std::move(plugin_config)), filter_(std::move(filter)) {}

void log(const Formatter::HttpFormatterContext& log_context,
const StreamInfo::StreamInfo& stream_info) override {
Expand All @@ -28,23 +27,13 @@ class WasmAccessLog : public AccessLog::Instance {
}
}

if (tls_slot_ != nullptr) {
if (auto handle = tls_slot_->get()->handle(); handle != nullptr) {
if (handle->wasmHandle()) {
handle->wasmHandle()->wasm()->log(plugin_, log_context, stream_info);
}
}
if (Common::Wasm::Wasm* wasm = plugin_config_->wasm(); wasm != nullptr) {
wasm->log(plugin_config_->plugin(), log_context, stream_info);
}
}

void setTlsSlot(ThreadLocal::TypedSlotPtr<PluginHandleSharedPtrThreadLocal>&& tls_slot) {
ASSERT(tls_slot_ == nullptr);
tls_slot_ = std::move(tls_slot);
}

private:
PluginSharedPtr plugin_;
ThreadLocal::TypedSlotPtr<PluginHandleSharedPtrThreadLocal> tls_slot_;
Common::Wasm::PluginConfigPtr plugin_config_;
AccessLog::FilterPtr filter_;
};

Expand Down
47 changes: 5 additions & 42 deletions source/extensions/bootstrap/wasm/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,53 +16,16 @@ namespace Wasm {
void WasmServiceExtension::onServerInitialized() { createWasm(context_); }

void WasmServiceExtension::createWasm(Server::Configuration::ServerFactoryContext& context) {
auto plugin = std::make_shared<Common::Wasm::Plugin>(
config_.config(), envoy::config::core::v3::TrafficDirection::UNSPECIFIED, context.localInfo(),
nullptr);

auto callback = [this, &context, plugin](Common::Wasm::WasmHandleSharedPtr base_wasm) {
if (!base_wasm) {
if (plugin->fail_open_) {
ENVOY_LOG(error, "Unable to create Wasm service {}", plugin->name_);
} else {
ENVOY_LOG(critical, "Unable to create Wasm service {}", plugin->name_);
}
return;
}
if (config_.singleton()) {
// Return a Wasm VM which will be stored as a singleton by the Server.
wasm_service_ = std::make_unique<WasmService>(
plugin, Common::Wasm::getOrCreateThreadLocalPlugin(base_wasm, plugin,
context.mainThreadDispatcher()));
return;
}
// Per-thread WASM VM.
// NB: the Slot set() call doesn't complete inline, so all arguments must outlive this call.
auto tls_slot =
ThreadLocal::TypedSlot<Common::Wasm::PluginHandleSharedPtrThreadLocal>::makeUnique(
context.threadLocal());
tls_slot->set([base_wasm, plugin](Event::Dispatcher& dispatcher) {
return std::make_shared<Common::Wasm::PluginHandleSharedPtrThreadLocal>(
Common::Wasm::getOrCreateThreadLocalPlugin(base_wasm, plugin, dispatcher));
});
wasm_service_ = std::make_unique<WasmService>(plugin, std::move(tls_slot));
};

if (!Common::Wasm::createWasm(plugin, context.scope().createScope(""), context.clusterManager(),
context.initManager(), context.mainThreadDispatcher(),
context.api(), context.lifecycleNotifier(), remote_data_provider_,
std::move(callback))) {
// NB: throw if we get a synchronous configuration failures as this is how such failures are
// reported to xDS.
throw Common::Wasm::WasmException(
fmt::format("Unable to create Wasm service {}", plugin->name_));
}
plugin_config_ = std::make_unique<Common::Wasm::PluginConfig>(
config_.config(), context, context.scope(), context.initManager(),
envoy::config::core::v3::TrafficDirection::UNSPECIFIED, /*metadata=*/nullptr,
config_.singleton());
}

Server::BootstrapExtensionPtr
WasmFactory::createBootstrapExtension(const Protobuf::Message& config,
Server::Configuration::ServerFactoryContext& context) {
auto typed_config =
const auto& typed_config =
MessageUtil::downcastAndValidate<const envoy::extensions::wasm::v3::WasmService&>(
config, context.messageValidationContext().staticValidationVisitor());
context.api().customStatNamespaces().registerStatNamespace(
Expand Down
27 changes: 6 additions & 21 deletions source/extensions/bootstrap/wasm/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,12 @@ namespace Extensions {
namespace Bootstrap {
namespace Wasm {

using Common::Wasm::PluginConfig;
using Common::Wasm::PluginConfigPtr;
using Common::Wasm::PluginHandleSharedPtrThreadLocal;
using Envoy::Extensions::Common::Wasm::PluginHandleSharedPtr;
using Envoy::Extensions::Common::Wasm::PluginSharedPtr;

class WasmService {
public:
WasmService(PluginSharedPtr plugin, PluginHandleSharedPtr singleton)
: plugin_(plugin), singleton_(std::move(singleton)) {}
WasmService(PluginSharedPtr plugin,
ThreadLocal::TypedSlotPtr<PluginHandleSharedPtrThreadLocal>&& tls_slot)
: plugin_(plugin), tls_slot_(std::move(tls_slot)) {}

private:
PluginSharedPtr plugin_;
PluginHandleSharedPtr singleton_;
ThreadLocal::TypedSlotPtr<PluginHandleSharedPtrThreadLocal> tls_slot_;
};

using WasmServicePtr = std::unique_ptr<WasmService>;

class WasmFactory : public Server::Configuration::BootstrapExtensionFactory {
public:
~WasmFactory() override = default;
Expand All @@ -53,9 +39,9 @@ class WasmServiceExtension : public Server::BootstrapExtension, Logger::Loggable
WasmServiceExtension(const envoy::extensions::wasm::v3::WasmService& config,
Server::Configuration::ServerFactoryContext& context)
: config_(config), context_(context) {}
WasmService& wasmService() {
ASSERT(wasm_service_ != nullptr);
return *wasm_service_;
PluginConfig& wasmService() {
ASSERT(plugin_config_ != nullptr);
return *plugin_config_;
}
void onServerInitialized() override;

Expand All @@ -64,8 +50,7 @@ class WasmServiceExtension : public Server::BootstrapExtension, Logger::Loggable

envoy::extensions::wasm::v3::WasmService config_;
Server::Configuration::ServerFactoryContext& context_;
WasmServicePtr wasm_service_;
RemoteAsyncDataProviderPtr remote_data_provider_;
PluginConfigPtr plugin_config_;
};

} // namespace Wasm
Expand Down
6 changes: 0 additions & 6 deletions source/extensions/common/wasm/remote_async_datasource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,6 @@ static constexpr uint32_t RetryInitialDelayMilliseconds = 1000;
static constexpr uint32_t RetryMaxDelayMilliseconds = 10 * 1000;
static constexpr uint32_t RetryCount = 1;

absl::optional<std::string> getPath(const envoy::config::core::v3::DataSource& source) {
return source.specifier_case() == envoy::config::core::v3::DataSource::SpecifierCase::kFilename
? absl::make_optional(source.filename())
: absl::nullopt;
}

RemoteAsyncDataProvider::RemoteAsyncDataProvider(
Upstream::ClusterManager& cm, Init::Manager& manager,
const envoy::config::core::v3::RemoteDataSource& source, Event::Dispatcher& dispatcher,
Expand Down
115 changes: 115 additions & 0 deletions source/extensions/common/wasm/wasm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,121 @@ getOrCreateThreadLocalPlugin(const WasmHandleSharedPtr& base_wasm, const PluginS
getPluginHandleFactory()));
}

PluginConfig::PluginConfig(const envoy::extensions::wasm::v3::PluginConfig& config,
Server::Configuration::ServerFactoryContext& context,
Stats::Scope& scope, Init::Manager& init_manager,
envoy::config::core::v3::TrafficDirection direction,
const envoy::config::core::v3::Metadata* metadata, bool singleton)
: is_singleton_handle_(singleton) {

ASSERT_IS_MAIN_OR_TEST_THREAD();

plugin_ = std::make_shared<Plugin>(config, direction, context.localInfo(), metadata);

auto callback = [this, &context](WasmHandleSharedPtr base_wasm) {
if (base_wasm == nullptr) {
ENVOY_LOG(critical, "Plugin {} failed to load", plugin_->name_);
}

if (is_singleton_handle_) {
plugin_handle_ =
getOrCreateThreadLocalPlugin(base_wasm, plugin_, context.mainThreadDispatcher());
return;
}
auto thread_local_handle =
ThreadLocal::TypedSlot<Common::Wasm::PluginHandleSharedPtrThreadLocal>::makeUnique(
context.threadLocal());
// NB: the Slot set() call doesn't complete inline, so all arguments must outlive this call.
thread_local_handle->set([base_wasm, plugin = this->plugin_](Event::Dispatcher& dispatcher) {
return std::make_shared<PluginHandleSharedPtrThreadLocal>(
getOrCreateThreadLocalPlugin(base_wasm, plugin, dispatcher));
});
plugin_handle_ = std::move(thread_local_handle);
};

if (!Common::Wasm::createWasm(plugin_, scope.createScope(""), context.clusterManager(),
init_manager, context.mainThreadDispatcher(), context.api(),
context.lifecycleNotifier(), remote_data_provider_,
std::move(callback))) {
// TODO(wbpcode): use absl::Status to return error rather than throw.
throw Common::Wasm::WasmException(
fmt::format("Unable to create Wasm plugin {}", plugin_->name_));
}
}

// Simple helper function to get the Wasm* from a WasmHandle.
Wasm* wasmHandleWasm(WasmHandleSharedPtr& h) { return h != nullptr ? h->wasm().get() : nullptr; }

std::shared_ptr<Context> PluginConfig::createContext() {
if (absl::holds_alternative<absl::monostate>(plugin_handle_)) {
return nullptr;
}

if (is_singleton_handle_) {
// Use critical log because this error should not happen in production.
ENVOY_LOG(critical, "CreateContext() only works for thread local plugins.");
return nullptr;
}

ASSERT(absl::holds_alternative<ThreadLocalPluginHandle>(plugin_handle_));
auto thread_local_handle = absl::get<ThreadLocalPluginHandle>(plugin_handle_).get();

if (!thread_local_handle->currentThreadRegistered()) {
return nullptr;
}
auto plugin_holder = thread_local_handle->get();
if (!plugin_holder.has_value()) {
return nullptr;
}

if (plugin_holder->handle == nullptr) {
return nullptr;
}

Wasm* wasm = wasmHandleWasm(plugin_holder->handle->wasmHandle());

if (!wasm || wasm->isFailed()) {
if (plugin_->fail_open_) {
return nullptr; // Fail open skips adding this filter to callbacks.
} else {
return std::make_shared<Context>(
nullptr, 0,
plugin_holder->handle); // Fail closed is handled by an empty Context.
}
}
return std::make_shared<Context>(wasm, plugin_holder->handle->rootContextId(),
plugin_holder->handle);
}

Wasm* PluginConfig::wasm() {
if (absl::holds_alternative<absl::monostate>(plugin_handle_)) {
return nullptr;
}

if (is_singleton_handle_) {
ASSERT(absl::holds_alternative<SinglePluginHandle>(plugin_handle_));
PluginHandleSharedPtr singleton_handle = absl::get<SinglePluginHandle>(plugin_handle_);
return singleton_handle != nullptr ? wasmHandleWasm(singleton_handle->wasmHandle()) : nullptr;
}

ASSERT(absl::holds_alternative<ThreadLocalPluginHandle>(plugin_handle_));
auto thread_local_handle = absl::get<ThreadLocalPluginHandle>(plugin_handle_).get();

if (!thread_local_handle->currentThreadRegistered()) {
return nullptr;
}
auto plugin_holder = thread_local_handle->get();
if (!plugin_holder.has_value()) {
return nullptr;
}

if (plugin_holder->handle == nullptr) {
return nullptr;
}

return wasmHandleWasm(plugin_holder->handle->wasmHandle());
}

} // namespace Wasm
} // namespace Common
} // namespace Extensions
Expand Down
36 changes: 31 additions & 5 deletions source/extensions/common/wasm/wasm.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <memory>

#include "envoy/common/exception.h"
#include "envoy/extensions/wasm/v3/wasm.pb.h"
#include "envoy/extensions/wasm/v3/wasm.pb.validate.h"
#include "envoy/http/filter.h"
#include "envoy/server/lifecycle_notifier.h"
Expand Down Expand Up @@ -154,11 +155,8 @@ using PluginHandleSharedPtr = std::shared_ptr<PluginHandle>;

class PluginHandleSharedPtrThreadLocal : public ThreadLocal::ThreadLocalObject {
public:
PluginHandleSharedPtrThreadLocal(PluginHandleSharedPtr handle) : handle_(handle){};
PluginHandleSharedPtr& handle() { return handle_; }

private:
PluginHandleSharedPtr handle_;
PluginHandleSharedPtrThreadLocal(PluginHandleSharedPtr handle) : handle(std::move(handle)) {}
PluginHandleSharedPtr handle;
};

using CreateWasmCallback = std::function<void(WasmHandleSharedPtr)>;
Expand All @@ -183,6 +181,34 @@ void clearCodeCacheForTesting();
void setTimeOffsetForCodeCacheForTesting(MonotonicTime::duration d);
WasmEvent toWasmEvent(const std::shared_ptr<WasmHandleBase>& wasm);

class PluginConfig : Logger::Loggable<Logger::Id::wasm> {
public:
// TODO(wbpcode): the code of PluginConfig will be shared cross all Wasm extensions (loggers,
// http filters, etc.), we may extend the constructor to takes a static string view to tell
// the type of the plugin if needed.
PluginConfig(const envoy::extensions::wasm::v3::PluginConfig& config,
Server::Configuration::ServerFactoryContext& context, Stats::Scope& scope,
Init::Manager& init_manager, envoy::config::core::v3::TrafficDirection direction,
const envoy::config::core::v3::Metadata* metadata, bool singleton);

std::shared_ptr<Context> createContext();
Wasm* wasm();
const PluginSharedPtr& plugin() { return plugin_; }

private:
using SinglePluginHandle = PluginHandleSharedPtr;
using ThreadLocalPluginHandle = ThreadLocal::TypedSlotPtr<PluginHandleSharedPtrThreadLocal>;

PluginSharedPtr plugin_;
RemoteAsyncDataProviderPtr remote_data_provider_;
const bool is_singleton_handle_{};

absl::variant<absl::monostate, SinglePluginHandle, ThreadLocalPluginHandle> plugin_handle_;
};

using PluginConfigPtr = std::unique_ptr<PluginConfig>;
using PluginConfigSharedPtr = std::shared_ptr<PluginConfig>;

} // namespace Wasm
} // namespace Common
} // namespace Extensions
Expand Down
Loading

0 comments on commit f63b61a

Please sign in to comment.