Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wasm: clean up the code #36556

Merged
merged 11 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 4 additions & 27 deletions source/extensions/access_loggers/wasm/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ namespace Extensions {
namespace AccessLoggers {
namespace Wasm {

using Common::Wasm::PluginHandleSharedPtrThreadLocal;

AccessLog::InstanceSharedPtr
WasmAccessLogFactory::createAccessLogInstance(const Protobuf::Message& proto_config,
AccessLog::FilterPtr&& filter,
Expand All @@ -24,31 +22,10 @@ WasmAccessLogFactory::createAccessLogInstance(const Protobuf::Message& proto_con
const envoy::extensions::access_loggers::wasm::v3::WasmAccessLog&>(
proto_config, context.messageValidationVisitor());

auto plugin = std::make_shared<Common::Wasm::Plugin>(
config.config(), envoy::config::core::v3::TrafficDirection::UNSPECIFIED,
context.serverFactoryContext().localInfo(), nullptr /* listener_metadata */);

auto access_log = std::make_shared<WasmAccessLog>(plugin, nullptr, std::move(filter));

auto callback = [access_log, &context, plugin](Common::Wasm::WasmHandleSharedPtr base_wasm) {
// NB: the Slot set() call doesn't complete inline, so all arguments must outlive this call.
auto tls_slot = ThreadLocal::TypedSlot<PluginHandleSharedPtrThreadLocal>::makeUnique(
context.serverFactoryContext().threadLocal());
tls_slot->set([base_wasm, plugin](Event::Dispatcher& dispatcher) {
return std::make_shared<PluginHandleSharedPtrThreadLocal>(
Common::Wasm::getOrCreateThreadLocalPlugin(base_wasm, plugin, dispatcher));
});
access_log->setTlsSlot(std::move(tls_slot));
};

if (!Common::Wasm::createWasm(
plugin, context.scope().createScope(""), context.serverFactoryContext().clusterManager(),
context.initManager(), context.serverFactoryContext().mainThreadDispatcher(),
context.serverFactoryContext().api(), context.serverFactoryContext().lifecycleNotifier(),
remote_data_provider_, std::move(callback))) {
throw Common::Wasm::WasmException(
fmt::format("Unable to create Wasm access log {}", plugin->name_));
}
auto plugin_config = std::make_unique<Common::Wasm::PluginConfig>(
config.config(), context.serverFactoryContext(), context.scope(), context.initManager(),
envoy::config::core::v3::TrafficDirection::UNSPECIFIED, /*metadata=*/nullptr, false);
auto access_log = std::make_shared<WasmAccessLog>(std::move(plugin_config), std::move(filter));

context.serverFactoryContext().api().customStatNamespaces().registerStatNamespace(
Extensions::Common::Wasm::CustomStatNamespace);
Expand Down
1 change: 0 additions & 1 deletion source/extensions/access_loggers/wasm/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ class WasmAccessLogFactory : public AccessLog::AccessLogInstanceFactory,

private:
absl::flat_hash_map<std::string, std::string> convertJsonFormatToMap(ProtobufWkt::Struct config);
RemoteAsyncDataProviderPtr remote_data_provider_;
};

} // namespace Wasm
Expand Down
21 changes: 5 additions & 16 deletions source/extensions/access_loggers/wasm/wasm_access_log_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@ using Envoy::Extensions::Common::Wasm::PluginSharedPtr;

class WasmAccessLog : public AccessLog::Instance {
public:
WasmAccessLog(const PluginSharedPtr& plugin,
ThreadLocal::TypedSlotPtr<PluginHandleSharedPtrThreadLocal>&& tls_slot,
WasmAccessLog(Extensions::Common::Wasm::PluginConfigPtr plugin_config,
AccessLog::FilterPtr filter)
: plugin_(plugin), tls_slot_(std::move(tls_slot)), filter_(std::move(filter)) {}
: plugin_config_(std::move(plugin_config)), filter_(std::move(filter)) {}

void log(const Formatter::HttpFormatterContext& log_context,
const StreamInfo::StreamInfo& stream_info) override {
Expand All @@ -28,23 +27,13 @@ class WasmAccessLog : public AccessLog::Instance {
}
}

if (tls_slot_ != nullptr) {
if (auto handle = tls_slot_->get()->handle(); handle != nullptr) {
if (handle->wasmHandle()) {
handle->wasmHandle()->wasm()->log(plugin_, log_context, stream_info);
}
}
if (Common::Wasm::Wasm* wasm = plugin_config_->wasm(); wasm != nullptr) {
wasm->log(plugin_config_->plugin(), log_context, stream_info);
}
}

void setTlsSlot(ThreadLocal::TypedSlotPtr<PluginHandleSharedPtrThreadLocal>&& tls_slot) {
ASSERT(tls_slot_ == nullptr);
tls_slot_ = std::move(tls_slot);
}

private:
PluginSharedPtr plugin_;
ThreadLocal::TypedSlotPtr<PluginHandleSharedPtrThreadLocal> tls_slot_;
Common::Wasm::PluginConfigPtr plugin_config_;
AccessLog::FilterPtr filter_;
};

Expand Down
47 changes: 5 additions & 42 deletions source/extensions/bootstrap/wasm/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,53 +16,16 @@ namespace Wasm {
void WasmServiceExtension::onServerInitialized() { createWasm(context_); }

void WasmServiceExtension::createWasm(Server::Configuration::ServerFactoryContext& context) {
auto plugin = std::make_shared<Common::Wasm::Plugin>(
config_.config(), envoy::config::core::v3::TrafficDirection::UNSPECIFIED, context.localInfo(),
nullptr);

auto callback = [this, &context, plugin](Common::Wasm::WasmHandleSharedPtr base_wasm) {
if (!base_wasm) {
if (plugin->fail_open_) {
ENVOY_LOG(error, "Unable to create Wasm service {}", plugin->name_);
} else {
ENVOY_LOG(critical, "Unable to create Wasm service {}", plugin->name_);
}
return;
}
if (config_.singleton()) {
// Return a Wasm VM which will be stored as a singleton by the Server.
wasm_service_ = std::make_unique<WasmService>(
plugin, Common::Wasm::getOrCreateThreadLocalPlugin(base_wasm, plugin,
context.mainThreadDispatcher()));
return;
}
// Per-thread WASM VM.
// NB: the Slot set() call doesn't complete inline, so all arguments must outlive this call.
auto tls_slot =
ThreadLocal::TypedSlot<Common::Wasm::PluginHandleSharedPtrThreadLocal>::makeUnique(
context.threadLocal());
tls_slot->set([base_wasm, plugin](Event::Dispatcher& dispatcher) {
return std::make_shared<Common::Wasm::PluginHandleSharedPtrThreadLocal>(
Common::Wasm::getOrCreateThreadLocalPlugin(base_wasm, plugin, dispatcher));
});
wasm_service_ = std::make_unique<WasmService>(plugin, std::move(tls_slot));
};

if (!Common::Wasm::createWasm(plugin, context.scope().createScope(""), context.clusterManager(),
context.initManager(), context.mainThreadDispatcher(),
context.api(), context.lifecycleNotifier(), remote_data_provider_,
std::move(callback))) {
// NB: throw if we get a synchronous configuration failures as this is how such failures are
// reported to xDS.
throw Common::Wasm::WasmException(
fmt::format("Unable to create Wasm service {}", plugin->name_));
}
plugin_config_ = std::make_unique<Common::Wasm::PluginConfig>(
config_.config(), context, context.scope(), context.initManager(),
envoy::config::core::v3::TrafficDirection::UNSPECIFIED, /*metadata=*/nullptr,
config_.singleton());
}

Server::BootstrapExtensionPtr
WasmFactory::createBootstrapExtension(const Protobuf::Message& config,
Server::Configuration::ServerFactoryContext& context) {
auto typed_config =
const auto& typed_config =
MessageUtil::downcastAndValidate<const envoy::extensions::wasm::v3::WasmService&>(
config, context.messageValidationContext().staticValidationVisitor());
context.api().customStatNamespaces().registerStatNamespace(
Expand Down
27 changes: 6 additions & 21 deletions source/extensions/bootstrap/wasm/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,12 @@ namespace Extensions {
namespace Bootstrap {
namespace Wasm {

using Common::Wasm::PluginConfig;
using Common::Wasm::PluginConfigPtr;
using Common::Wasm::PluginHandleSharedPtrThreadLocal;
using Envoy::Extensions::Common::Wasm::PluginHandleSharedPtr;
using Envoy::Extensions::Common::Wasm::PluginSharedPtr;

class WasmService {
public:
WasmService(PluginSharedPtr plugin, PluginHandleSharedPtr singleton)
: plugin_(plugin), singleton_(std::move(singleton)) {}
WasmService(PluginSharedPtr plugin,
ThreadLocal::TypedSlotPtr<PluginHandleSharedPtrThreadLocal>&& tls_slot)
: plugin_(plugin), tls_slot_(std::move(tls_slot)) {}

private:
PluginSharedPtr plugin_;
PluginHandleSharedPtr singleton_;
ThreadLocal::TypedSlotPtr<PluginHandleSharedPtrThreadLocal> tls_slot_;
};

using WasmServicePtr = std::unique_ptr<WasmService>;

class WasmFactory : public Server::Configuration::BootstrapExtensionFactory {
public:
~WasmFactory() override = default;
Expand All @@ -53,9 +39,9 @@ class WasmServiceExtension : public Server::BootstrapExtension, Logger::Loggable
WasmServiceExtension(const envoy::extensions::wasm::v3::WasmService& config,
Server::Configuration::ServerFactoryContext& context)
: config_(config), context_(context) {}
WasmService& wasmService() {
ASSERT(wasm_service_ != nullptr);
return *wasm_service_;
PluginConfig& wasmService() {
ASSERT(plugin_config_ != nullptr);
return *plugin_config_;
}
void onServerInitialized() override;

Expand All @@ -64,8 +50,7 @@ class WasmServiceExtension : public Server::BootstrapExtension, Logger::Loggable

envoy::extensions::wasm::v3::WasmService config_;
Server::Configuration::ServerFactoryContext& context_;
WasmServicePtr wasm_service_;
RemoteAsyncDataProviderPtr remote_data_provider_;
PluginConfigPtr plugin_config_;
};

} // namespace Wasm
Expand Down
6 changes: 0 additions & 6 deletions source/extensions/common/wasm/remote_async_datasource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,6 @@ static constexpr uint32_t RetryInitialDelayMilliseconds = 1000;
static constexpr uint32_t RetryMaxDelayMilliseconds = 10 * 1000;
static constexpr uint32_t RetryCount = 1;

absl::optional<std::string> getPath(const envoy::config::core::v3::DataSource& source) {
return source.specifier_case() == envoy::config::core::v3::DataSource::SpecifierCase::kFilename
? absl::make_optional(source.filename())
: absl::nullopt;
}

RemoteAsyncDataProvider::RemoteAsyncDataProvider(
Upstream::ClusterManager& cm, Init::Manager& manager,
const envoy::config::core::v3::RemoteDataSource& source, Event::Dispatcher& dispatcher,
Expand Down
115 changes: 115 additions & 0 deletions source/extensions/common/wasm/wasm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,121 @@ getOrCreateThreadLocalPlugin(const WasmHandleSharedPtr& base_wasm, const PluginS
getPluginHandleFactory()));
}

PluginConfig::PluginConfig(const envoy::extensions::wasm::v3::PluginConfig& config,
Server::Configuration::ServerFactoryContext& context,
Stats::Scope& scope, Init::Manager& init_manager,
envoy::config::core::v3::TrafficDirection direction,
const envoy::config::core::v3::Metadata* metadata, bool singleton)
: is_singleton_handle_(singleton) {

ASSERT_IS_MAIN_OR_TEST_THREAD();

plugin_ = std::make_shared<Plugin>(config, direction, context.localInfo(), metadata);

auto callback = [this, &context](WasmHandleSharedPtr base_wasm) {
wbpcode marked this conversation as resolved.
Show resolved Hide resolved
if (base_wasm == nullptr) {
ENVOY_LOG(critical, "Plugin {} failed to load", plugin_->name_);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some cases of the old logic that this refactoring consolidates used different log levels here depending on whether or not plugin->fail_open_ was true. Can that be preserved?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also (optionally) you might consider having the PluginConfig constructor take an additional string param for the role of the plugin (e.g. "stat sink" vs. "access logger" vs. "HTTP filter" etc.), so that can be retained in the log message here. While it isn't strictly necessary functionality-wise, I think it is useful information in the log message, since plugin failure could occur in a number of contexts.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some cases of the old logic that this refactoring consolidates used different log levels here depending on whether or not plugin->fail_open_ was true. Can that be preserved?

The fail_open self will be deprecated in favor of new FailurePolicy in the Envoy. And I think it acutally make no big sense to keep that logic.

Copy link
Member Author

@wbpcode wbpcode Oct 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also (optionally) you might consider having the PluginConfig constructor take an additional string param for the role of the plugin (e.g. "stat sink" vs. "access logger" vs. "HTTP filter" etc.), so that can be retained in the log message here. While it isn't strictly necessary functionality-wise, I think it is useful information in the log message, since plugin failure could occur in a number of contexts.

I personally think the plugin name should be enough to distinguish different extension and their type. But I don't oppose it. I will add a TODO first until some acutally user ask it. (By the way, we created wasm bootstrap extension, wasm network extension, http extension, logger extension, etc, but we don't know how much of them is what the users actually want. At least from recently problems/bug, I think even the http wasm extension is not be accepted/used widely. We do too much things before get actual practising feedbacks and requirements.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fail_open self will be deprecated in favor of new FailurePolicy in the Envoy. And I think it acutally make no big sense to keep that logic.

SG--didn't know about that!

I personally think the plugin name should be enough to distinguish different extension and their type. But I don't oppose it. I will add a TODO first until some acutally user ask it.

Fair enough--SGTM.

}

if (is_singleton_handle_) {
plugin_handle_ =
getOrCreateThreadLocalPlugin(base_wasm, plugin_, context.mainThreadDispatcher());
return;
}
auto thread_local_handle =
ThreadLocal::TypedSlot<Common::Wasm::PluginHandleSharedPtrThreadLocal>::makeUnique(
context.threadLocal());
// NB: the Slot set() call doesn't complete inline, so all arguments must outlive this call.
thread_local_handle->set([base_wasm, plugin = this->plugin_](Event::Dispatcher& dispatcher) {
return std::make_shared<PluginHandleSharedPtrThreadLocal>(
getOrCreateThreadLocalPlugin(base_wasm, plugin, dispatcher));
});
plugin_handle_ = std::move(thread_local_handle);
};

if (!Common::Wasm::createWasm(plugin_, scope.createScope(""), context.clusterManager(),
init_manager, context.mainThreadDispatcher(), context.api(),
context.lifecycleNotifier(), remote_data_provider_,
std::move(callback))) {
// TODO(wbpcode): use absl::Status to return error rather than throw.
throw Common::Wasm::WasmException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we restructure this to avoid exceptions?

Copy link
Member Author

@wbpcode wbpcode Oct 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great suggestion.

Copy link
Member Author

@wbpcode wbpcode Oct 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I finally add a TODO first. Because this is in the constructor and we need to use creation_status to handle the error. That means we need to change code of lots of different positions at where this PluginConfig is used.

We can put it to separated PR.

fmt::format("Unable to create Wasm plugin {}", plugin_->name_));
}
}

// Simple helper function to get the Wasm* from a WasmHandle.
Wasm* wasmHandleWasm(WasmHandleSharedPtr& h) { return h != nullptr ? h->wasm().get() : nullptr; }

std::shared_ptr<Context> PluginConfig::createContext() {
if (absl::holds_alternative<absl::monostate>(plugin_handle_)) {
return nullptr;
}

if (is_singleton_handle_) {
// Use critical log because this error should not happen in production.
ENVOY_LOG(critical, "CreateContext() only works for thread local plugins.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not release assert if this never should be called by the implementation?

Copy link
Member Author

@wbpcode wbpcode Oct 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is finally removed at #36456. Finally, we use the same way to treat the singleton handle and thread local handle.
So, I think we can leave it there for now?

return nullptr;
}

ASSERT(absl::holds_alternative<ThreadLocalPluginHandle>(plugin_handle_));
auto thread_local_handle = absl::get<ThreadLocalPluginHandle>(plugin_handle_).get();

if (!thread_local_handle->currentThreadRegistered()) {
return nullptr;
}
auto plugin_holder = thread_local_handle->get();
if (!plugin_holder.has_value()) {
return nullptr;
}

if (plugin_holder->handle == nullptr) {
return nullptr;
}

Wasm* wasm = wasmHandleWasm(plugin_holder->handle->wasmHandle());

if (!wasm || wasm->isFailed()) {
if (plugin_->fail_open_) {
return nullptr; // Fail open skips adding this filter to callbacks.
} else {
return std::make_shared<Context>(
nullptr, 0,
plugin_holder->handle); // Fail closed is handled by an empty Context.
}
}
return std::make_shared<Context>(wasm, plugin_holder->handle->rootContextId(),
plugin_holder->handle);
}

Wasm* PluginConfig::wasm() {
if (absl::holds_alternative<absl::monostate>(plugin_handle_)) {
return nullptr;
}

if (is_singleton_handle_) {
ASSERT(absl::holds_alternative<SinglePluginHandle>(plugin_handle_));
PluginHandleSharedPtr singleton_handle = absl::get<SinglePluginHandle>(plugin_handle_);
return singleton_handle != nullptr ? wasmHandleWasm(singleton_handle->wasmHandle()) : nullptr;
}

ASSERT(absl::holds_alternative<ThreadLocalPluginHandle>(plugin_handle_));
auto thread_local_handle = absl::get<ThreadLocalPluginHandle>(plugin_handle_).get();

if (!thread_local_handle->currentThreadRegistered()) {
return nullptr;
}
auto plugin_holder = thread_local_handle->get();
if (!plugin_holder.has_value()) {
return nullptr;
}

if (plugin_holder->handle == nullptr) {
return nullptr;
}

return wasmHandleWasm(plugin_holder->handle->wasmHandle());
}

} // namespace Wasm
} // namespace Common
} // namespace Extensions
Expand Down
36 changes: 31 additions & 5 deletions source/extensions/common/wasm/wasm.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <memory>

#include "envoy/common/exception.h"
#include "envoy/extensions/wasm/v3/wasm.pb.h"
#include "envoy/extensions/wasm/v3/wasm.pb.validate.h"
#include "envoy/http/filter.h"
#include "envoy/server/lifecycle_notifier.h"
Expand Down Expand Up @@ -154,11 +155,8 @@ using PluginHandleSharedPtr = std::shared_ptr<PluginHandle>;

class PluginHandleSharedPtrThreadLocal : public ThreadLocal::ThreadLocalObject {
public:
PluginHandleSharedPtrThreadLocal(PluginHandleSharedPtr handle) : handle_(handle){};
PluginHandleSharedPtr& handle() { return handle_; }

private:
PluginHandleSharedPtr handle_;
PluginHandleSharedPtrThreadLocal(PluginHandleSharedPtr handle) : handle(std::move(handle)) {}
PluginHandleSharedPtr handle;
};

using CreateWasmCallback = std::function<void(WasmHandleSharedPtr)>;
Expand All @@ -183,6 +181,34 @@ void clearCodeCacheForTesting();
void setTimeOffsetForCodeCacheForTesting(MonotonicTime::duration d);
WasmEvent toWasmEvent(const std::shared_ptr<WasmHandleBase>& wasm);

class PluginConfig : Logger::Loggable<Logger::Id::wasm> {
public:
// TODO(wbpcode): the code of PluginConfig will be shared cross all Wasm extensions (loggers,
// http filters, etc.), we may extend the constructor to takes a static string view to tell
// the type of the plugin if needed.
PluginConfig(const envoy::extensions::wasm::v3::PluginConfig& config,
Server::Configuration::ServerFactoryContext& context, Stats::Scope& scope,
Init::Manager& init_manager, envoy::config::core::v3::TrafficDirection direction,
const envoy::config::core::v3::Metadata* metadata, bool singleton);

std::shared_ptr<Context> createContext();
Wasm* wasm();
const PluginSharedPtr& plugin() { return plugin_; }

private:
using SinglePluginHandle = PluginHandleSharedPtr;
using ThreadLocalPluginHandle = ThreadLocal::TypedSlotPtr<PluginHandleSharedPtrThreadLocal>;

PluginSharedPtr plugin_;
RemoteAsyncDataProviderPtr remote_data_provider_;
const bool is_singleton_handle_{};

absl::variant<absl::monostate, SinglePluginHandle, ThreadLocalPluginHandle> plugin_handle_;
};

using PluginConfigPtr = std::unique_ptr<PluginConfig>;
using PluginConfigSharedPtr = std::shared_ptr<PluginConfig>;

} // namespace Wasm
} // namespace Common
} // namespace Extensions
Expand Down
Loading