Skip to content

Commit

Permalink
Merge pull request envoyproxy#142 from kyessenov/filter_state
Browse files Browse the repository at this point in the history
Cherry-pick filter state PRs needed for TCP telemetry
  • Loading branch information
kyessenov authored Feb 7, 2020
2 parents cde27a5 + 4cae37c commit 892453e
Show file tree
Hide file tree
Showing 22 changed files with 295 additions and 231 deletions.
2 changes: 2 additions & 0 deletions include/envoy/stream_info/filter_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,5 +150,7 @@ class FilterState {
virtual Object* getDataMutableGeneric(absl::string_view data_name) PURE;
};

using FilterStateSharedPtr = std::shared_ptr<FilterState>;

} // namespace StreamInfo
} // namespace Envoy
10 changes: 9 additions & 1 deletion include/envoy/stream_info/stream_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -479,9 +479,17 @@ class StreamInfo {
* filters (append only). Both object types can be consumed by multiple filters.
* @return the filter state associated with this request.
*/
virtual FilterState& filterState() PURE;
virtual const FilterStateSharedPtr& filterState() PURE;
virtual const FilterState& filterState() const PURE;

/**
* Filter State object to be shared between upstream and downstream filters.
* @param pointer to upstream connections filter state.
* @return pointer to filter state to be used by upstream connections.
*/
virtual const FilterStateSharedPtr& upstreamFilterState() const PURE;
virtual void setUpstreamFilterState(const FilterStateSharedPtr& filter_state) PURE;

/**
* @param SNI value requested.
*/
Expand Down
14 changes: 7 additions & 7 deletions source/common/router/router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -386,10 +386,10 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::HeaderMap& headers, bool e
// Extract debug configuration from filter state. This is used further along to determine whether
// we should append cluster and host headers to the response, and whether to forward the request
// upstream.
const StreamInfo::FilterState& filter_state = callbacks_->streamInfo().filterState();
const StreamInfo::FilterStateSharedPtr& filter_state = callbacks_->streamInfo().filterState();
const DebugConfig* debug_config =
filter_state.hasData<DebugConfig>(DebugConfig::key())
? &(filter_state.getDataReadOnly<DebugConfig>(DebugConfig::key()))
filter_state->hasData<DebugConfig>(DebugConfig::key())
? &(filter_state->getDataReadOnly<DebugConfig>(DebugConfig::key()))
: nullptr;

// TODO: Maybe add a filter API for this.
Expand Down Expand Up @@ -531,7 +531,7 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::HeaderMap& headers, bool e
// TODO: Add SAN verification here and use it from dynamic_forward_proxy
// Update filter state with the host/authority to use for setting SNI in the transport
// socket options. This is referenced during the getConnPool() call below.
callbacks_->streamInfo().filterState().setData(
callbacks_->streamInfo().filterState()->setData(
Network::UpstreamServerName::key(),
std::make_unique<Network::UpstreamServerName>(host_str),
StreamInfo::FilterState::StateType::Mutable);
Expand Down Expand Up @@ -631,7 +631,7 @@ Http::ConnectionPool::Instance* Filter::getConnPool() {
// Note: Cluster may downgrade HTTP2 to HTTP1 based on runtime configuration.
Http::Protocol protocol = cluster_->upstreamHttpProtocol(callbacks_->streamInfo().protocol());
transport_socket_options_ = Network::TransportSocketOptionsUtility::fromFilterState(
callbacks_->streamInfo().filterState());
*callbacks_->streamInfo().filterState());

return config_.cm_.httpConnPoolForCluster(route_entry_->clusterName(), route_entry_->priority(),
protocol, this);
Expand Down Expand Up @@ -1376,13 +1376,13 @@ bool Filter::setupRedirect(const Http::HeaderMap& headers, UpstreamRequest& upst
attempting_internal_redirect_with_complete_stream_ =
upstream_request.upstream_timing_.last_upstream_rx_byte_received_ && downstream_end_stream_;

StreamInfo::FilterState& filter_state = callbacks_->streamInfo().filterState();
const StreamInfo::FilterStateSharedPtr& filter_state = callbacks_->streamInfo().filterState();

// As with setupRetry, redirects are not supported for streaming requests yet.
if (downstream_end_stream_ &&
!callbacks_->decodingBuffer() && // Redirects with body not yet supported.
location != nullptr &&
convertRequestHeadersForInternalRedirect(*downstream_headers_, filter_state,
convertRequestHeadersForInternalRedirect(*downstream_headers_, *filter_state,
route_entry_->maxInternalRedirects(), *location,
*callbacks_->connection()) &&
callbacks_->recreateStream()) {
Expand Down
12 changes: 10 additions & 2 deletions source/common/stream_info/stream_info_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,16 @@ struct StreamInfoImpl : public StreamInfo {
(*metadata_.mutable_filter_metadata())[name].MergeFrom(value);
};

FilterState& filterState() override { return *filter_state_; }
const FilterStateSharedPtr& filterState() override { return filter_state_; }
const FilterState& filterState() const override { return *filter_state_; }

const FilterStateSharedPtr& upstreamFilterState() const override {
return upstream_filter_state_;
}
void setUpstreamFilterState(const FilterStateSharedPtr& filter_state) override {
upstream_filter_state_ = filter_state;
}

void setRequestedServerName(absl::string_view requested_server_name) override {
requested_server_name_ = std::string(requested_server_name);
}
Expand Down Expand Up @@ -261,7 +268,8 @@ struct StreamInfoImpl : public StreamInfo {
bool health_check_request_{};
const Router::RouteEntry* route_entry_{};
envoy::config::core::v3::Metadata metadata_{};
std::shared_ptr<FilterStateImpl> filter_state_;
FilterStateSharedPtr filter_state_;
FilterStateSharedPtr upstream_filter_state_;
std::string route_name_;

private:
Expand Down
6 changes: 4 additions & 2 deletions source/common/tcp_proxy/tcp_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,10 @@ Config::Config(const envoy::extensions::filters::network::tcp_proxy::v3::TcpProx

RouteConstSharedPtr Config::getRegularRouteFromEntries(Network::Connection& connection) {
// First check if the per-connection state to see if we need to route to a pre-selected cluster
if (connection.streamInfo().filterState().hasData<PerConnectionCluster>(
if (connection.streamInfo().filterState()->hasData<PerConnectionCluster>(
PerConnectionCluster::key())) {
const PerConnectionCluster& per_connection_cluster =
connection.streamInfo().filterState().getDataReadOnly<PerConnectionCluster>(
connection.streamInfo().filterState()->getDataReadOnly<PerConnectionCluster>(
PerConnectionCluster::key());

envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy::DeprecatedV1::TCPRoute
Expand Down Expand Up @@ -467,6 +467,8 @@ void Filter::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data,
getStreamInfo().onUpstreamHostSelected(host);
getStreamInfo().setUpstreamLocalAddress(connection.localAddress());
getStreamInfo().setUpstreamSslConnection(connection.streamInfo().downstreamSslConnection());
read_callbacks_->connection().streamInfo().setUpstreamFilterState(
connection.streamInfo().filterState());

// Simulate the event that onPoolReady represents.
upstream_callbacks_->onEvent(Network::ConnectionEvent::Connected);
Expand Down
48 changes: 22 additions & 26 deletions source/extensions/common/wasm/context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -348,32 +348,29 @@ WasmResult serializeValue(Filters::Common::Expr::CelValue value, std::string* re
class WasmStateWrapper : public google::api::expr::runtime::CelMap {
public:
WasmStateWrapper(const StreamInfo::FilterState& filter_state,
const StreamInfo::FilterState* connection_filter_state)
: filter_state_(filter_state), connection_filter_state_(connection_filter_state) {}
WasmStateWrapper(const StreamInfo::FilterState& filter_state)
: filter_state_(filter_state), connection_filter_state_(nullptr) {}
const StreamInfo::FilterState* upstream_connection_filter_state)
: filter_state_(filter_state),
upstream_connection_filter_state_(upstream_connection_filter_state) {}
absl::optional<google::api::expr::runtime::CelValue>
operator[](google::api::expr::runtime::CelValue key) const override {
if (!key.IsString()) {
return {};
}
auto value = key.StringOrDie().value();
try {
if (filter_state_.hasData<WasmState>(value)) {
const WasmState& result = filter_state_.getDataReadOnly<WasmState>(value);
return google::api::expr::runtime::CelValue::CreateBytes(&result.value());
} catch (const EnvoyException& e) {
// If doesn't exist in request filter state, try looking up in connection filter state.
try {
if (connection_filter_state_) {
const WasmState& result = connection_filter_state_->getDataReadOnly<WasmState>(value);
return google::api::expr::runtime::CelValue::CreateBytes(&result.value());
}
} catch (const EnvoyException& e) {
return {};
}
return {};
}

if (upstream_connection_filter_state_ &&
upstream_connection_filter_state_->hasData<WasmState>(value)) {
const WasmState& result =
upstream_connection_filter_state_->getDataReadOnly<WasmState>(value);
return google::api::expr::runtime::CelValue::CreateBytes(&result.value());
}
return {};
}

int size() const override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
bool empty() const override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
const google::api::expr::runtime::CelList* ListKeys() const override {
Expand All @@ -382,7 +379,7 @@ class WasmStateWrapper : public google::api::expr::runtime::CelMap {

private:
const StreamInfo::FilterState& filter_state_;
const StreamInfo::FilterState* connection_filter_state_;
const StreamInfo::FilterState* upstream_connection_filter_state_;
};

#define PROPERTY_TOKENS(_f) \
Expand Down Expand Up @@ -423,14 +420,9 @@ Context::FindValue(absl::string_view name, Protobuf::Arena* arena) const {
break;
case PropertyToken::FILTER_STATE:
if (info) {
const Envoy::Network::Connection* connection = getConnection();
if (connection) {
return CelValue::CreateMap(Protobuf::Arena::Create<WasmStateWrapper>(
arena, info->filterState(), &connection->streamInfo().filterState()));
} else {
return CelValue::CreateMap(
Protobuf::Arena::Create<WasmStateWrapper>(arena, info->filterState()));
}

return CelValue::CreateMap(Protobuf::Arena::Create<WasmStateWrapper>(
arena, info->filterState(), info->upstreamFilterState().get()));
}
break;
case PropertyToken::REQUEST:
Expand Down Expand Up @@ -1004,6 +996,10 @@ const Network::Connection* Context::getConnection() const {
return encoder_callbacks_->connection();
} else if (decoder_callbacks_) {
return decoder_callbacks_->connection();
} else if (network_read_filter_callbacks_) {
return &network_read_filter_callbacks_->connection();
} else if (network_write_filter_callbacks_) {
return &network_write_filter_callbacks_->connection();
}
return nullptr;
}
Expand All @@ -1013,7 +1009,7 @@ WasmResult Context::setProperty(absl::string_view key, absl::string_view seriali
if (!stream_info) {
return WasmResult::NotFound;
}
stream_info->filterState().setData(key, std::make_unique<WasmState>(serialized_value),
stream_info->filterState()->setData(key, std::make_unique<WasmState>(serialized_value),
StreamInfo::FilterState::StateType::Mutable);
return WasmResult::Ok;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class GrpcStatsFilter : public Http::PassThroughFilter {
if (filter_object_ == nullptr) {
auto state = std::make_unique<GrpcStatsObject>();
filter_object_ = state.get();
decoder_callbacks_->streamInfo().filterState().setData(
decoder_callbacks_->streamInfo().filterState()->setData(
HttpFilterNames::get().GrpcStats, std::move(state),
StreamInfo::FilterState::StateType::Mutable,
StreamInfo::FilterState::LifeSpan::FilterChain);
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/filters/http/jwt_authn/filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::HeaderMap& headers, bool)

// Verify the JWT token, onComplete() will be called when completed.
const auto* verifier =
config_->findVerifier(headers, decoder_callbacks_->streamInfo().filterState());
config_->findVerifier(headers, *decoder_callbacks_->streamInfo().filterState());
if (!verifier) {
onComplete(Status::Ok);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Network::FilterStatus SniClusterFilter::onNewConnection() {
if (!sni.empty()) {
// Set the tcp_proxy cluster to the same value as SNI. The data is mutable to allow
// other filters to change it.
read_callbacks_->connection().streamInfo().filterState().setData(
read_callbacks_->connection().streamInfo().filterState()->setData(
TcpProxy::PerConnectionCluster::key(),
std::make_unique<TcpProxy::PerConnectionCluster>(sni),
StreamInfo::FilterState::StateType::Mutable,
Expand Down
64 changes: 33 additions & 31 deletions test/common/access_log/access_log_formatter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1027,16 +1027,16 @@ TEST(AccessLogFormatterTest, DynamicMetadataFormatter) {
TEST(AccessLogFormatterTest, FilterStateFormatter) {
Http::TestHeaderMapImpl header;
StreamInfo::MockStreamInfo stream_info;
stream_info.filter_state_.setData("key",
std::make_unique<Router::StringAccessorImpl>("test_value"),
StreamInfo::FilterState::StateType::ReadOnly);
stream_info.filter_state_.setData("key-struct",
std::make_unique<TestSerializedStructFilterState>(),
StreamInfo::FilterState::StateType::ReadOnly);
stream_info.filter_state_.setData("key-no-serialization",
std::make_unique<StreamInfo::FilterState::Object>(),
StreamInfo::FilterState::StateType::ReadOnly);
stream_info.filter_state_.setData(
stream_info.filter_state_->setData("key",
std::make_unique<Router::StringAccessorImpl>("test_value"),
StreamInfo::FilterState::StateType::ReadOnly);
stream_info.filter_state_->setData("key-struct",
std::make_unique<TestSerializedStructFilterState>(),
StreamInfo::FilterState::StateType::ReadOnly);
stream_info.filter_state_->setData("key-no-serialization",
std::make_unique<StreamInfo::FilterState::Object>(),
StreamInfo::FilterState::StateType::ReadOnly);
stream_info.filter_state_->setData(
"key-serialization-error",
std::make_unique<TestSerializedStructFilterState>(std::chrono::seconds(-281474976710656)),
StreamInfo::FilterState::StateType::ReadOnly);
Expand Down Expand Up @@ -1299,11 +1299,12 @@ TEST(AccessLogFormatterTest, JsonFormatterTypedDynamicMetadataTest) {
TEST(AccessLogFormatterTets, JsonFormatterFilterStateTest) {
Http::TestHeaderMapImpl header;
StreamInfo::MockStreamInfo stream_info;
stream_info.filter_state_.setData("test_key",
std::make_unique<Router::StringAccessorImpl>("test_value"),
StreamInfo::FilterState::StateType::ReadOnly);
stream_info.filter_state_.setData("test_obj", std::make_unique<TestSerializedStructFilterState>(),
StreamInfo::FilterState::StateType::ReadOnly);
stream_info.filter_state_->setData("test_key",
std::make_unique<Router::StringAccessorImpl>("test_value"),
StreamInfo::FilterState::StateType::ReadOnly);
stream_info.filter_state_->setData("test_obj",
std::make_unique<TestSerializedStructFilterState>(),
StreamInfo::FilterState::StateType::ReadOnly);
EXPECT_CALL(Const(stream_info), filterState()).Times(testing::AtLeast(1));

std::unordered_map<std::string, std::string> expected_json_map = {
Expand All @@ -1320,11 +1321,12 @@ TEST(AccessLogFormatterTets, JsonFormatterFilterStateTest) {
TEST(AccessLogFormatterTets, JsonFormatterTypedFilterStateTest) {
Http::TestHeaderMapImpl header;
StreamInfo::MockStreamInfo stream_info;
stream_info.filter_state_.setData("test_key",
std::make_unique<Router::StringAccessorImpl>("test_value"),
StreamInfo::FilterState::StateType::ReadOnly);
stream_info.filter_state_.setData("test_obj", std::make_unique<TestSerializedStructFilterState>(),
StreamInfo::FilterState::StateType::ReadOnly);
stream_info.filter_state_->setData("test_key",
std::make_unique<Router::StringAccessorImpl>("test_value"),
StreamInfo::FilterState::StateType::ReadOnly);
stream_info.filter_state_->setData("test_obj",
std::make_unique<TestSerializedStructFilterState>(),
StreamInfo::FilterState::StateType::ReadOnly);
EXPECT_CALL(Const(stream_info), filterState()).Times(testing::AtLeast(1));

std::unordered_map<std::string, std::string> key_mapping = {
Expand Down Expand Up @@ -1414,9 +1416,9 @@ TEST(AccessLogFormatterTest, JsonFormatterTypedTest) {
ProtobufWkt::Struct s;
(*s.mutable_fields())["list"] = list;

stream_info.filter_state_.setData("test_obj",
std::make_unique<TestSerializedStructFilterState>(s),
StreamInfo::FilterState::StateType::ReadOnly);
stream_info.filter_state_->setData("test_obj",
std::make_unique<TestSerializedStructFilterState>(s),
StreamInfo::FilterState::StateType::ReadOnly);
EXPECT_CALL(Const(stream_info), filterState()).Times(testing::AtLeast(1));

std::unordered_map<std::string, std::string> key_mapping = {
Expand Down Expand Up @@ -1491,14 +1493,14 @@ TEST(AccessLogFormatterTest, CompositeFormatterSuccess) {

{
EXPECT_CALL(Const(stream_info), filterState()).Times(testing::AtLeast(1));
stream_info.filter_state_.setData("testing",
std::make_unique<Router::StringAccessorImpl>("test_value"),
StreamInfo::FilterState::StateType::ReadOnly,
StreamInfo::FilterState::LifeSpan::FilterChain);
stream_info.filter_state_.setData("serialized",
std::make_unique<TestSerializedUnknownFilterState>(),
StreamInfo::FilterState::StateType::ReadOnly,
StreamInfo::FilterState::LifeSpan::FilterChain);
stream_info.filter_state_->setData("testing",
std::make_unique<Router::StringAccessorImpl>("test_value"),
StreamInfo::FilterState::StateType::ReadOnly,
StreamInfo::FilterState::LifeSpan::FilterChain);
stream_info.filter_state_->setData("serialized",
std::make_unique<TestSerializedUnknownFilterState>(),
StreamInfo::FilterState::StateType::ReadOnly,
StreamInfo::FilterState::LifeSpan::FilterChain);
const std::string format = "%FILTER_STATE(testing)%|%FILTER_STATE(serialized)%|"
"%FILTER_STATE(testing):8%|%FILTER_STATE(nonexisting)%";
FormatterImpl formatter(format);
Expand Down
Loading

0 comments on commit 892453e

Please sign in to comment.