Skip to content

Commit

Permalink
update based on the envoyproxy#36556
Browse files Browse the repository at this point in the history
Signed-off-by: wangbaiping <wangbaiping@bytedance.com>
  • Loading branch information
wbpcode committed Oct 16, 2024
2 parents 8230003 + 0bbd11d commit eb2f707
Show file tree
Hide file tree
Showing 11 changed files with 74 additions and 61 deletions.
2 changes: 1 addition & 1 deletion source/extensions/access_loggers/wasm/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ WasmAccessLogFactory::createAccessLogInstance(const Protobuf::Message& proto_con

auto plugin_config = std::make_unique<Common::Wasm::PluginConfig>(
config.config(), context.serverFactoryContext(), context.scope(), context.initManager(),
envoy::config::core::v3::TrafficDirection::UNSPECIFIED, nullptr /* metadata */, false);
envoy::config::core::v3::TrafficDirection::UNSPECIFIED, /*metadata=*/nullptr, false);
auto access_log = std::make_shared<WasmAccessLog>(std::move(plugin_config), std::move(filter));

context.serverFactoryContext().api().customStatNamespaces().registerStatNamespace(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class WasmAccessLog : public AccessLog::Instance {
}
}

if (Common::Wasm::Wasm* wasm = plugin_config_->wasmOfHandle(); wasm != nullptr) {
if (Common::Wasm::Wasm* wasm = plugin_config_->wasm(); wasm != nullptr) {
wasm->log(plugin_config_->plugin(), log_context, stream_info);
}
}
Expand Down
5 changes: 3 additions & 2 deletions source/extensions/bootstrap/wasm/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ namespace Wasm {
void WasmServiceExtension::onServerInitialized() { createWasm(context_); }

void WasmServiceExtension::createWasm(Server::Configuration::ServerFactoryContext& context) {
wasm_service_ = std::make_unique<Common::Wasm::PluginConfig>(
plugin_config_ = std::make_unique<Common::Wasm::PluginConfig>(
config_.config(), context, context.scope(), context.initManager(),
envoy::config::core::v3::TrafficDirection::UNSPECIFIED, nullptr, config_.singleton());
envoy::config::core::v3::TrafficDirection::UNSPECIFIED, /*metadata=*/nullptr,
config_.singleton());
}

Server::BootstrapExtensionPtr
Expand Down
12 changes: 6 additions & 6 deletions source/extensions/bootstrap/wasm/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ namespace Extensions {
namespace Bootstrap {
namespace Wasm {

using Common::Wasm::PluginConfig;
using Common::Wasm::PluginConfigPtr;
using Common::Wasm::PluginHandleSharedPtrThreadLocal;
using Envoy::Extensions::Common::Wasm::PluginHandleSharedPtr;
using Envoy::Extensions::Common::Wasm::PluginSharedPtr;
using WasmService = Common::Wasm::PluginConfig;
using WasmServicePtr = std::unique_ptr<WasmService>;

class WasmFactory : public Server::Configuration::BootstrapExtensionFactory {
public:
Expand All @@ -39,9 +39,9 @@ class WasmServiceExtension : public Server::BootstrapExtension, Logger::Loggable
WasmServiceExtension(const envoy::extensions::wasm::v3::WasmService& config,
Server::Configuration::ServerFactoryContext& context)
: config_(config), context_(context) {}
WasmService& wasmService() {
ASSERT(wasm_service_ != nullptr);
return *wasm_service_;
PluginConfig& wasmService() {
ASSERT(plugin_config_ != nullptr);
return *plugin_config_;
}
void onServerInitialized() override;

Expand All @@ -50,7 +50,7 @@ class WasmServiceExtension : public Server::BootstrapExtension, Logger::Loggable

envoy::extensions::wasm::v3::WasmService config_;
Server::Configuration::ServerFactoryContext& context_;
WasmServicePtr wasm_service_;
PluginConfigPtr plugin_config_;
};

} // namespace Wasm
Expand Down
54 changes: 30 additions & 24 deletions source/extensions/common/wasm/wasm.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "source/extensions/common/wasm/wasm.h"
#include "wasm.h"

#include <algorithm>
#include <chrono>
Expand Down Expand Up @@ -488,7 +489,10 @@ getOrCreateThreadLocalPlugin(const WasmHandleSharedPtr& base_wasm, const PluginS
getPluginHandleFactory()));
}

Wasm* PluginConfig::mayReloadHandleIfNeeded(PluginHandleSharedPtrThreadLocal& handle_wrapper) {
// Simple helper function to get the Wasm* from a WasmHandle.
Wasm* wasmHandleWasm(WasmHandleSharedPtr& h) { return h != nullptr ? h->wasm().get() : nullptr; }

Wasm* PluginConfig::mayReloadHandleIfNeeded(SinglePluginHandle& handle_wrapper) {
// base_wasm_ is null means the plugin is not loaded successfully. Return anyway.
if (base_wasm_ == nullptr) {
return nullptr;
Expand All @@ -499,7 +503,7 @@ Wasm* PluginConfig::mayReloadHandleIfNeeded(PluginHandleSharedPtrThreadLocal& ha
return nullptr;
}

Wasm* wasm = handle_wrapper.handle->wasmOfHandle();
Wasm* wasm = wasmHandleWasm(handle_wrapper.handle->wasmHandle());

// Only runtime failure will be handled by reloading logic. If the wasm is not failed or
// failed with other errors, return it directly.
Expand Down Expand Up @@ -531,7 +535,7 @@ Wasm* PluginConfig::mayReloadHandleIfNeeded(PluginHandleSharedPtrThreadLocal& ha
handle_wrapper.last_load = now;
PluginHandleSharedPtr new_load = getOrCreateThreadLocalPlugin(base_wasm_, plugin_, dispatcher);
if (new_load != nullptr) {
Wasm* new_wasm = new_load->wasmOfHandle();
Wasm* new_wasm = wasmHandleWasm(new_load->wasmHandle());
if (new_wasm == nullptr || new_wasm->isFailed()) {
stats_handler_->onEvent(WasmEvent::VmReloadFailure);
} else {
Expand All @@ -541,25 +545,28 @@ Wasm* PluginConfig::mayReloadHandleIfNeeded(PluginHandleSharedPtrThreadLocal& ha
}

ASSERT(handle_wrapper.handle != nullptr);
return handle_wrapper.handle->wasmOfHandle();
return wasmHandleWasm(handle_wrapper.handle->wasmHandle());
}

std::pair<OptRef<PluginHandleSharedPtrThreadLocal>, Wasm*> PluginConfig::getPluginHandleAndWasm() {
if (!plugin_handle_initialized_) {
return {OptRef<PluginHandleSharedPtrThreadLocal>{}, nullptr};
std::pair<OptRef<PluginConfig::SinglePluginHandle>, Wasm*> PluginConfig::getPluginHandleAndWasm() {
if (absl::holds_alternative<std::monostate>(plugin_handle_)) {
return {OptRef<SinglePluginHandle>{}, nullptr};
}

if (is_singleton_handle_) {
return {singleton_handle_, mayReloadHandleIfNeeded(singleton_handle_)};
ASSERT(absl::holds_alternative<SinglePluginHandle>(plugin_handle_));
OptRef<SinglePluginHandle> singleton_handle = absl::get<SinglePluginHandle>(plugin_handle_);
return {singleton_handle, mayReloadHandleIfNeeded(singleton_handle.ref())};
}

ASSERT(thread_local_handle_ != nullptr);
if (!thread_local_handle_->currentThreadRegistered()) {
return {OptRef<PluginHandleSharedPtrThreadLocal>{}, nullptr};
ASSERT(absl::holds_alternative<ThreadLocalPluginHandle>(plugin_handle_));
auto* thread_local_handle = absl::get<ThreadLocalPluginHandle>(plugin_handle_).get();
if (!thread_local_handle->currentThreadRegistered()) {
return {OptRef<SinglePluginHandle>{}, nullptr};
}
auto plugin_holder = thread_local_handle_->get();
auto plugin_holder = thread_local_handle->get();
if (!plugin_holder.has_value()) {
return {OptRef<PluginHandleSharedPtrThreadLocal>{}, nullptr};
return {OptRef<SinglePluginHandle>{}, nullptr};
}

return {plugin_holder, mayReloadHandleIfNeeded(*plugin_holder)};
Expand Down Expand Up @@ -614,39 +621,38 @@ PluginConfig::PluginConfig(const envoy::extensions::wasm::v3::PluginConfig& conf
}

stats_handler_ = std::make_shared<StatsHandler>(scope, absl::StrCat("wasm.", config.name(), "."));

plugin_ = std::make_shared<Plugin>(config, direction, context.localInfo(), metadata);

auto callback = [this, &context](WasmHandleSharedPtr base_wasm) {
plugin_handle_initialized_ = true;
base_wasm_ = base_wasm;

if (base_wasm == nullptr) {
ENVOY_LOG(critical, "Plugin {} failed to load", plugin_->name_);
}

if (is_singleton_handle_) {
singleton_handle_.handle =
getOrCreateThreadLocalPlugin(base_wasm, plugin_, context.mainThreadDispatcher());
singleton_handle_.last_load = context.mainThreadDispatcher().timeSource().monotonicTime();
plugin_handle_ = SinglePluginHandle(
getOrCreateThreadLocalPlugin(base_wasm, plugin_, context.mainThreadDispatcher()),
context.mainThreadDispatcher().timeSource().monotonicTime());
return;
}

thread_local_handle_ =
ThreadLocal::TypedSlot<Common::Wasm::PluginHandleSharedPtrThreadLocal>::makeUnique(
context.threadLocal());
auto thread_local_handle =
ThreadLocal::TypedSlot<SinglePluginHandle>::makeUnique(context.threadLocal());
// NB: the Slot set() call doesn't complete inline, so all arguments must outlive this call.
thread_local_handle_->set([base_wasm, plugin = this->plugin_](Event::Dispatcher& dispatcher) {
return std::make_shared<PluginHandleSharedPtrThreadLocal>(
thread_local_handle->set([base_wasm, plugin = this->plugin_](Event::Dispatcher& dispatcher) {
return std::make_shared<SinglePluginHandle>(
getOrCreateThreadLocalPlugin(base_wasm, plugin, dispatcher),
dispatcher.timeSource().monotonicTime());
});
plugin_handle_ = std::move(thread_local_handle);
};

if (!Common::Wasm::createWasm(plugin_, scope.createScope(""), context.clusterManager(),
init_manager, context.mainThreadDispatcher(), context.api(),
context.lifecycleNotifier(), remote_data_provider_,
std::move(callback))) {
// TODO(wbpcode): use absl::Status to return error rather than throw.
throw Common::Wasm::WasmException(
fmt::format("Unable to create Wasm plugin {}", plugin_->name_));
}
Expand All @@ -673,7 +679,7 @@ std::shared_ptr<Context> PluginConfig::createContext() {
plugin_holder->handle);
}

Wasm* PluginConfig::wasmOfHandle() { return getPluginHandleAndWasm().second; }
Wasm* PluginConfig::wasm() { return getPluginHandleAndWasm().second; }

} // namespace Wasm
} // namespace Common
Expand Down
23 changes: 10 additions & 13 deletions source/extensions/common/wasm/wasm.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ class PluginHandle : public PluginHandleBase {
std::static_pointer_cast<PluginBase>(plugin)),
plugin_(plugin), wasm_handle_(wasm_handle) {}

Wasm* wasmOfHandle() { return wasm_handle_ != nullptr ? wasm_handle_->wasm().get() : nullptr; }
WasmHandleSharedPtr& wasmHandle() { return wasm_handle_; }
uint32_t rootContextId() { return wasm_handle_->wasm()->getRootContext(plugin_, false)->id(); }

Expand Down Expand Up @@ -189,29 +188,35 @@ WasmEvent toWasmEvent(const std::shared_ptr<WasmHandleBase>& wasm);

class PluginConfig : Logger::Loggable<Logger::Id::wasm> {
public:
// TODO(wbpcode): the code of PluginConfig will be shared cross all Wasm extensions (loggers,
// http filters, etc.), we may extend the constructor to takes a static string view to tell
// the type of the plugin if needed.
PluginConfig(const envoy::extensions::wasm::v3::PluginConfig& config,
Server::Configuration::ServerFactoryContext& context, Stats::Scope& scope,
Init::Manager& init_manager, envoy::config::core::v3::TrafficDirection direction,
const envoy::config::core::v3::Metadata* metadata, bool singleton);

std::shared_ptr<Context> createContext();
Wasm* wasmOfHandle();
Wasm* wasm();
const PluginSharedPtr& plugin() { return plugin_; }
WasmStats& wasmStats() { return stats_handler_->wasmStats(); }

using SinglePluginHandle = PluginHandleSharedPtrThreadLocal;
using ThreadLocalPluginHandle = ThreadLocal::TypedSlotPtr<SinglePluginHandle>;

private:
/**
* Get the latest wasm and plugin handle wrapper. The plugin handle may be reloaded if
* the wasm is failed and the policy allows it.
*/
std::pair<OptRef<PluginHandleSharedPtrThreadLocal>, Wasm*> getPluginHandleAndWasm();
std::pair<OptRef<SinglePluginHandle>, Wasm*> getPluginHandleAndWasm();

/**
* May reload the handle if the wasm if failed. The input handle will be updated if the
* handle is reloaded.
* @return the wasm pointer of the latest handle.
*/
Wasm* mayReloadHandleIfNeeded(PluginHandleSharedPtrThreadLocal& handle_wrapper);
Wasm* mayReloadHandleIfNeeded(SinglePluginHandle& handle_wrapper);

StatsHandlerSharedPtr stats_handler_;
FailurePolicy failure_policy_;
Expand All @@ -221,16 +226,8 @@ class PluginConfig : Logger::Loggable<Logger::Id::wasm> {
PluginSharedPtr plugin_;
RemoteAsyncDataProviderPtr remote_data_provider_;
const bool is_singleton_handle_{};

bool plugin_handle_initialized_{};
WasmHandleSharedPtr base_wasm_{};

// Plugin handle that works for all threads. Only one of thread_local_handle_ or
// singleton_handle_ will be set.
ThreadLocal::TypedSlotPtr<PluginHandleSharedPtrThreadLocal> thread_local_handle_;
// Plugin handle that works for the main. Only one of thread_local_handle_ or
// singleton_handle_ will be set.
PluginHandleSharedPtrThreadLocal singleton_handle_;
absl::variant<absl::monostate, SinglePluginHandle, ThreadLocalPluginHandle> plugin_handle_;
};

using PluginConfigPtr = std::unique_ptr<PluginConfig>;
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/stat_sinks/wasm/wasm_stat_sink_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class WasmStatSink : public Stats::Sink {
: plugin_config_(std::move(plugin_config)) {}

void flush(Stats::MetricSnapshot& snapshot) override {
if (Common::Wasm::Wasm* wasm = plugin_config_->wasmOfHandle(); wasm != nullptr) {
if (Common::Wasm::Wasm* wasm = plugin_config_->wasm(); wasm != nullptr) {
wasm->onStatsUpdate(plugin_config_->plugin(), snapshot);
}
}
Expand Down
2 changes: 0 additions & 2 deletions test/extensions/bootstrap/wasm/config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ namespace Envoy {
namespace Extensions {
namespace Wasm {

using Extensions::Bootstrap::Wasm::WasmServicePtr;

class WasmFactoryTest : public testing::TestWithParam<std::tuple<std::string, std::string>> {
protected:
WasmFactoryTest() {
Expand Down
14 changes: 7 additions & 7 deletions test/extensions/common/wasm/wasm_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1563,7 +1563,7 @@ root_id: "panic after sending local reply"
setUp(plugin_config_yaml, singleton);
Envoy::Buffer::OwnedImpl request_body;

Wasm* initial_wasm = plugin_config_->wasmOfHandle();
Wasm* initial_wasm = plugin_config_->wasm();
EXPECT_NE(nullptr, initial_wasm);
EXPECT_EQ(initial_wasm->fail_state(), proxy_wasm::FailState::Ok);

Expand All @@ -1589,7 +1589,7 @@ root_id: "panic after sending local reply"
// Create second context and reload the wasm automatically.
createContext();

Wasm* new_wasm = plugin_config_->wasmOfHandle();
Wasm* new_wasm = plugin_config_->wasm();
EXPECT_NE(nullptr, new_wasm);
EXPECT_NE(initial_wasm, new_wasm);
EXPECT_EQ(new_wasm->fail_state(), proxy_wasm::FailState::Ok);
Expand All @@ -1608,9 +1608,9 @@ root_id: "panic after sending local reply"
// The wasm should be in runtime error state again.
EXPECT_EQ(new_wasm->fail_state(), proxy_wasm::FailState::RuntimeError);

// The wasm failed again and the wasmOfHandle() will try to reload again but will backoff.
// The wasm failed again and the wasm() will try to reload again but will backoff.
// The previous wasm will be returned.
Wasm* new_wasm_2 = plugin_config_->wasmOfHandle();
Wasm* new_wasm_2 = plugin_config_->wasm();
EXPECT_NE(nullptr, new_wasm_2);
EXPECT_EQ(new_wasm_2, new_wasm);
EXPECT_EQ(new_wasm->fail_state(), proxy_wasm::FailState::RuntimeError);
Expand All @@ -1622,7 +1622,7 @@ root_id: "panic after sending local reply"
server_.dispatcher_.globalTimeSystem().advanceTimeWait(std::chrono::seconds(3));

// Now the wasm should be reloaded again.
Wasm* new_wasm_3 = plugin_config_->wasmOfHandle();
Wasm* new_wasm_3 = plugin_config_->wasm();
EXPECT_NE(nullptr, new_wasm_3);
EXPECT_NE(new_wasm_3, new_wasm);
EXPECT_EQ(new_wasm_3->fail_state(), proxy_wasm::FailState::Ok);
Expand Down Expand Up @@ -1667,7 +1667,7 @@ root_id: "panic after sending local reply"
setUp(plugin_config_yaml, singleton);
Envoy::Buffer::OwnedImpl request_body;

Wasm* initial_wasm = plugin_config_->wasmOfHandle();
Wasm* initial_wasm = plugin_config_->wasm();
EXPECT_NE(nullptr, initial_wasm);
EXPECT_EQ(initial_wasm->fail_state(), proxy_wasm::FailState::Ok);

Expand All @@ -1693,7 +1693,7 @@ root_id: "panic after sending local reply"
// Create second context but the wasm is not reloaded.
createContext();

Wasm* new_wasm = plugin_config_->wasmOfHandle();
Wasm* new_wasm = plugin_config_->wasm();
EXPECT_NE(nullptr, new_wasm);
EXPECT_EQ(new_wasm, initial_wasm);
EXPECT_EQ(new_wasm->fail_state(), proxy_wasm::FailState::RuntimeError);
Expand Down
11 changes: 11 additions & 0 deletions test/extensions/common/wasm/wasm_vm_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@ namespace Common {
namespace Wasm {
namespace {

TEST(EnvoyWasmVmIntegrationTest, EnvoyWasmVmIntegrationTest) {
{
EnvoyWasmVmIntegration wasm_vm_integration;
for (const auto l : {spdlog::level::trace, spdlog::level::debug, spdlog::level::info,
spdlog::level::warn, spdlog::level::err, spdlog::level::critical}) {
Logger::Registry::getLog(Logger::Id::wasm).set_level(l);
EXPECT_EQ(wasm_vm_integration.getLogLevel(), static_cast<proxy_wasm::LogLevel>(l));
}
}
}

class TestNullVmPlugin : public proxy_wasm::NullVmPlugin {
public:
TestNullVmPlugin() = default;
Expand Down
8 changes: 4 additions & 4 deletions test/extensions/filters/network/wasm/config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ TEST_P(WasmNetworkFilterConfigTest, FilterConfigFailClosed) {
envoy::extensions::filters::network::wasm::v3::Wasm proto_config;
TestUtility::loadFromYaml(yaml, proto_config);
NetworkFilters::Wasm::FilterConfig filter_config(proto_config, context_);
filter_config.wasmOfHandle()->fail(proxy_wasm::FailState::RuntimeError, "");
filter_config.wasm()->fail(proxy_wasm::FailState::RuntimeError, "");
auto context = filter_config.createContext();
EXPECT_EQ(context->wasm(), nullptr);
EXPECT_TRUE(context->isFailed());
Expand All @@ -220,7 +220,7 @@ TEST_P(WasmNetworkFilterConfigTest, FilterConfigFailOpen) {
envoy::extensions::filters::network::wasm::v3::Wasm proto_config;
TestUtility::loadFromYaml(yaml, proto_config);
NetworkFilters::Wasm::FilterConfig filter_config(proto_config, context_);
filter_config.wasmOfHandle()->fail(proxy_wasm::FailState::RuntimeError, "");
filter_config.wasm()->fail(proxy_wasm::FailState::RuntimeError, "");
EXPECT_EQ(filter_config.createContext(), nullptr);
}

Expand All @@ -243,7 +243,7 @@ TEST_P(WasmNetworkFilterConfigTest, FilterConfigCapabilitiesUnrestrictedByDefaul
envoy::extensions::filters::network::wasm::v3::Wasm proto_config;
TestUtility::loadFromYaml(yaml, proto_config);
NetworkFilters::Wasm::FilterConfig filter_config(proto_config, context_);
auto wasm = filter_config.wasmOfHandle();
auto wasm = filter_config.wasm();
EXPECT_TRUE(wasm->capabilityAllowed("proxy_log"));
EXPECT_TRUE(wasm->capabilityAllowed("proxy_on_vm_start"));
EXPECT_TRUE(wasm->capabilityAllowed("proxy_http_call"));
Expand Down Expand Up @@ -272,7 +272,7 @@ TEST_P(WasmNetworkFilterConfigTest, FilterConfigCapabilityRestriction) {
envoy::extensions::filters::network::wasm::v3::Wasm proto_config;
TestUtility::loadFromYaml(yaml, proto_config);
NetworkFilters::Wasm::FilterConfig filter_config(proto_config, context_);
auto wasm = filter_config.wasmOfHandle();
auto wasm = filter_config.wasm();
EXPECT_TRUE(wasm->capabilityAllowed("proxy_log"));
EXPECT_TRUE(wasm->capabilityAllowed("proxy_on_new_connection"));
EXPECT_FALSE(wasm->capabilityAllowed("proxy_http_call"));
Expand Down

0 comments on commit eb2f707

Please sign in to comment.