Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into stream_info_cleanups
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Klein <mklein@lyft.com>
  • Loading branch information
mattklein123 committed Dec 19, 2020
2 parents d38f963 + 867b9e2 commit e157108
Show file tree
Hide file tree
Showing 18 changed files with 255 additions and 62 deletions.
4 changes: 2 additions & 2 deletions api/envoy/config/core/v3/base.proto
Original file line number Diff line number Diff line change
Expand Up @@ -331,10 +331,10 @@ message DataSource {
string filename = 1 [(validate.rules).string = {min_len: 1}];

// Bytes inlined in the configuration.
bytes inline_bytes = 2 [(validate.rules).bytes = {min_len: 1}];
bytes inline_bytes = 2;

// String inlined in the configuration.
string inline_string = 3 [(validate.rules).string = {min_len: 1}];
string inline_string = 3;
}
}

Expand Down
4 changes: 2 additions & 2 deletions api/envoy/config/core/v4alpha/base.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions generated_api_shadow/envoy/config/core/v3/base.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions generated_api_shadow/envoy/config/core/v4alpha/base.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion include/envoy/server/listener_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,9 @@ class ListenerManager {
/**
* Start all workers accepting new connections on all added listeners.
* @param guard_dog supplies the guard dog to use for thread watching.
* @param callback supplies the callback to complete server initialization.
*/
virtual void startWorkers(GuardDog& guard_dog) PURE;
virtual void startWorkers(GuardDog& guard_dog, std::function<void()> callback) PURE;

/**
* Stop all listeners from accepting new connections without actually removing any of them. This
Expand Down
15 changes: 11 additions & 4 deletions source/common/config/datasource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,27 @@ static constexpr uint32_t RetryCount = 1;

std::string read(const envoy::config::core::v3::DataSource& source, bool allow_empty,
Api::Api& api) {
std::string data;
switch (source.specifier_case()) {
case envoy::config::core::v3::DataSource::SpecifierCase::kFilename:
return api.fileSystem().fileReadToEnd(source.filename());
data = api.fileSystem().fileReadToEnd(source.filename());
break;
case envoy::config::core::v3::DataSource::SpecifierCase::kInlineBytes:
return source.inline_bytes();
data = source.inline_bytes();
break;
case envoy::config::core::v3::DataSource::SpecifierCase::kInlineString:
return source.inline_string();
data = source.inline_string();
break;
default:
if (!allow_empty) {
throw EnvoyException(
fmt::format("Unexpected DataSource::specifier_case(): {}", source.specifier_case()));
}
return "";
}
if (!allow_empty && data.empty()) {
throw EnvoyException("DataSource cannot be empty");
}
return data;
}

absl::optional<std::string> getPath(const envoy::config::core::v3::DataSource& source) {
Expand Down
6 changes: 6 additions & 0 deletions source/server/listener_hooks.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ class ListenerHooks {
*/
virtual void onWorkerListenerRemoved() PURE;

/**
* Called when all workers have started.
*/
virtual void onWorkersStarted() PURE;

/**
* Called when the Runtime::ScopedLoaderSingleton is created by the server.
*/
Expand All @@ -36,6 +41,7 @@ class DefaultListenerHooks : public ListenerHooks {
// ListenerHooks
void onWorkerListenerAdded() override {}
void onWorkerListenerRemoved() override {}
void onWorkersStarted() override {}
void onRuntimeCreated() override {}
};

Expand Down
15 changes: 9 additions & 6 deletions source/server/listener_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -884,7 +884,7 @@ bool ListenerManagerImpl::removeListenerInternal(const std::string& name,
return true;
}

void ListenerManagerImpl::startWorkers(GuardDog& guard_dog) {
void ListenerManagerImpl::startWorkers(GuardDog& guard_dog, std::function<void()> callback) {
ENVOY_LOG(info, "all dependencies initialized. starting workers");
ASSERT(!workers_started_);
workers_started_ = true;
Expand All @@ -899,11 +899,13 @@ void ListenerManagerImpl::startWorkers(GuardDog& guard_dog) {
ENVOY_LOG(debug, "starting worker {}", i);
ASSERT(warming_listeners_.empty());
for (const auto& listener : active_listeners_) {
addListenerToWorker(*worker, absl::nullopt, *listener, [this, listeners_pending_init]() {
if (--(*listeners_pending_init) == 0) {
stats_.workers_started_.set(1);
}
});
addListenerToWorker(*worker, absl::nullopt, *listener,
[this, listeners_pending_init, callback]() {
if (--(*listeners_pending_init) == 0) {
stats_.workers_started_.set(1);
callback();
}
});
}
worker->start(guard_dog);
if (enable_dispatcher_stats_) {
Expand All @@ -913,6 +915,7 @@ void ListenerManagerImpl::startWorkers(GuardDog& guard_dog) {
}
if (active_listeners_.empty()) {
stats_.workers_started_.set(1);
callback();
}
}

Expand Down
2 changes: 1 addition & 1 deletion source/server/listener_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ class ListenerManagerImpl : public ListenerManager, Logger::Loggable<Logger::Id:
listeners(ListenerState state = ListenerState::ACTIVE) override;
uint64_t numConnections() const override;
bool removeListener(const std::string& listener_name) override;
void startWorkers(GuardDog& guard_dog) override;
void startWorkers(GuardDog& guard_dog, std::function<void()> callback) override;
void stopListeners(StopListenersType stop_listeners_type) override;
void stopWorkers() override;
void beginListenerUpdate() override { error_state_tracker_.clear(); }
Expand Down
27 changes: 17 additions & 10 deletions source/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ InstanceImpl::InstanceImpl(
: nullptr),
grpc_context_(store.symbolTable()), http_context_(store.symbolTable()),
router_context_(store.symbolTable()), process_context_(std::move(process_context)),
main_thread_id_(std::this_thread::get_id()), server_contexts_(*this) {
main_thread_id_(std::this_thread::get_id()), hooks_(hooks), server_contexts_(*this) {
try {
if (!options.logPath().empty()) {
try {
Expand Down Expand Up @@ -609,15 +609,22 @@ void InstanceImpl::onRuntimeReady() {
}

void InstanceImpl::startWorkers() {
listener_manager_->startWorkers(*worker_guard_dog_);
initialization_timer_->complete();
// Update server stats as soon as initialization is done.
updateServerStats();
workers_started_ = true;
// At this point we are ready to take traffic and all listening ports are up. Notify our parent
// if applicable that they can stop listening and drain.
restarter_.drainParentListeners();
drain_manager_->startParentShutdownSequence();
// The callback will be called after workers are started.
listener_manager_->startWorkers(*worker_guard_dog_, [this]() {
if (isShutdown()) {
return;
}

initialization_timer_->complete();
// Update server stats as soon as initialization is done.
updateServerStats();
workers_started_ = true;
hooks_.onWorkersStarted();
// At this point we are ready to take traffic and all listening ports are up. Notify our
// parent if applicable that they can stop listening and drain.
restarter_.drainParentListeners();
drain_manager_->startParentShutdownSequence();
});
}

Runtime::LoaderPtr InstanceUtil::createRuntime(Instance& server,
Expand Down
1 change: 1 addition & 0 deletions source/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ class InstanceImpl final : Logger::Loggable<Logger::Id::main>,
// initialization_time is a histogram for tracking the initialization time across hot restarts
// whenever we have support for histogram merge across hot restarts.
Stats::TimespanPtr initialization_timer_;
ListenerHooks& hooks_;

ServerFactoryContextImpl server_contexts_;

Expand Down
30 changes: 30 additions & 0 deletions test/common/config/datasource_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,36 @@ TEST_F(AsyncDataSourceTest, LoadLocalDataSource) {
EXPECT_EQ(async_data, "xxxxxx");
}

TEST_F(AsyncDataSourceTest, LoadLocalEmptyDataSource) {
AsyncDataSourcePb config;

std::string yaml = R"EOF(
local:
inline_string: ""
)EOF";
TestUtility::loadFromYamlAndValidate(yaml, config);
EXPECT_TRUE(config.has_local());

std::string async_data;

EXPECT_CALL(init_manager_, add(_)).WillOnce(Invoke([this](const Init::Target& target) {
init_target_handle_ = target.createHandle("test");
}));

local_data_provider_ = std::make_unique<Config::DataSource::LocalAsyncDataProvider>(
init_manager_, config.local(), true, *api_, [&](const std::string& data) {
EXPECT_EQ(init_manager_.state(), Init::Manager::State::Initializing);
EXPECT_EQ(data, "");
async_data = data;
});

EXPECT_CALL(init_manager_, state()).WillOnce(Return(Init::Manager::State::Initializing));
EXPECT_CALL(init_watcher_, ready());

init_target_handle_->initialize(init_watcher_);
EXPECT_EQ(async_data, "");
}

TEST_F(AsyncDataSourceTest, LoadRemoteDataSourceNoCluster) {
AsyncDataSourcePb config;

Expand Down
55 changes: 55 additions & 0 deletions test/integration/local_reply_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -411,4 +411,59 @@ TEST_P(LocalReplyIntegrationTest, ShouldFormatResponseToCustomString) {
EXPECT_EQ(response->body(), "513 - customized body text");
}

// Should return formatted text/plain response.
TEST_P(LocalReplyIntegrationTest, ShouldFormatResponseToEmptyBody) {
const std::string yaml = R"EOF(
mappers:
- filter:
status_code_filter:
comparison:
op: EQ
value:
default_value: 503
runtime_key: key_b
status_code: 513
body:
inline_string: ""
body_format:
text_format_source:
inline_string: ""
)EOF";
setLocalReplyConfig(yaml);
initialize();

codec_client_ = makeHttpConnection(lookupPort("http"));

auto encoder_decoder = codec_client_->startRequest(
Http::TestRequestHeaderMapImpl{{":method", "POST"},
{":path", "/test/long/url"},
{":scheme", "http"},
{":authority", "host"},
{"test-header", "exact-match-value-2"}});
auto response = std::move(encoder_decoder.second);

ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_));

ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_));
ASSERT_TRUE(upstream_request_->waitForHeadersComplete());
ASSERT_TRUE(fake_upstream_connection_->close());
ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect());
response->waitForEndStream();

if (downstream_protocol_ == Http::CodecClient::Type::HTTP1) {
ASSERT_TRUE(codec_client_->waitForDisconnect());
} else {
codec_client_->close();
}

EXPECT_FALSE(upstream_request_->complete());
EXPECT_EQ(0U, upstream_request_->bodyLength());

EXPECT_TRUE(response->complete());

EXPECT_EQ("513", response->headers().Status()->value().getStringView());

EXPECT_EQ(response->body(), "");
}

} // namespace Envoy
1 change: 1 addition & 0 deletions test/integration/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ class IntegrationTestServer : public Logger::Loggable<Logger::Id::testing>,
on_server_ready_cb_ = std::move(on_server_ready);
}
void onRuntimeCreated() override {}
void onWorkersStarted() override {}

void start(const Network::Address::IpVersion version,
std::function<void()> on_server_init_function, bool deterministic,
Expand Down
2 changes: 1 addition & 1 deletion test/mocks/server/listener_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class MockListenerManager : public ListenerManager {
(ListenerState state));
MOCK_METHOD(uint64_t, numConnections, (), (const));
MOCK_METHOD(bool, removeListener, (const std::string& listener_name));
MOCK_METHOD(void, startWorkers, (GuardDog & guard_dog));
MOCK_METHOD(void, startWorkers, (GuardDog & guard_dog, std::function<void()> callback));
MOCK_METHOD(void, stopListeners, (StopListenersType listeners_type));
MOCK_METHOD(void, stopWorkers, ());
MOCK_METHOD(void, beginListenerUpdate, ());
Expand Down
Loading

0 comments on commit e157108

Please sign in to comment.