Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: integration with Envoy's API listener #616

Merged
merged 41 commits into from
Jan 30, 2020
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
4e4ace3
wip: headers, data, trailers working and unit tested
Dec 17, 2019
20d3e1e
wire up reset, and track stream state for cleanup
Dec 18, 2019
8418ee9
terminal states updated. Missing tests.
Dec 18, 2019
56914a0
cleanup
Dec 19, 2019
62ebd14
update types and config
Dec 23, 2019
fcb0870
wip integration with envoy and dynamic forward proxy
Dec 24, 2019
76aee26
use named listener
Dec 26, 2019
f069c82
api handle
Dec 27, 2019
f38e77c
fmt
Dec 27, 2019
cec541f
updated interfaces to keep up with envoy changes
Jan 7, 2020
9b8b5e2
first pass at closed state pushed down to the core
Jan 13, 2020
1ab5cd6
Merge branch 'master' into hcm-integration
Jan 13, 2020
cf69083
always zero-init
Jan 14, 2020
85d1acf
v3 types
Jan 14, 2020
3cd3a1a
Merge branch 'master' into hcm-integration
Jan 17, 2020
c9721d9
added fixme before merge
Jan 17, 2020
e131965
update after api listener PR merged upstream
Jan 21, 2020
f97b8b1
fmt
Jan 21, 2020
a98cfbf
update test
Jan 22, 2020
5e740ae
swift mock
Jan 22, 2020
e6d0d91
update cleanup and dispatchable
Jan 23, 2020
0f2ba0f
preferred network and cluster updates
Jan 24, 2020
0ef3ef3
buffering
Jan 24, 2020
65b7ba9
moved cancellation to synchronous code, and added test suite
Jan 25, 2020
58b60d1
fix
Jan 27, 2020
96ca3ca
clean up platform level cancel code
Jan 27, 2020
4ef47cb
fmt
Jan 27, 2020
ef68ba6
mutex on streams_
Jan 27, 2020
e0ac9cb
cross-thread unit test
Jan 28, 2020
644d338
missing BUILD file
Jan 28, 2020
024f15b
swift test
Jan 28, 2020
ddf302b
test output
Jan 28, 2020
413d813
full test suite for races
Jan 28, 2020
c718e49
fix memory leak
Jan 29, 2020
3ee492b
preferred network test
Jan 29, 2020
a3c2d8a
basic integration test
Jan 29, 2020
9fcbf08
fmt
Jan 29, 2020
191105a
comment clarity
Jan 29, 2020
f3d8ff2
dispatch lock for race between cancellation and non-terminal encodings
Jan 30, 2020
ed88f67
definition
Jan 30, 2020
8af70cf
increase timeout for perf build
Jan 30, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions library/common/config_template.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,38 @@
*/
const char* config_template = R"(
static_resources:
listeners:
- name: base_api_listener
address:
socket_address:
protocol: TCP
address: 0.0.0.0
port_value: 10000
api_listener:
api_listener:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
stat_prefix: hcm
route_config:
name: api_router
virtual_hosts:
- name: api
domains:
- "*"
routes:
- match:
prefix: "/"
route:
cluster: dynamic_forward_proxy_cluster
http_filters:
- name: envoy.filters.http.dynamic_forward_proxy
junr03 marked this conversation as resolved.
Show resolved Hide resolved
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.dynamic_forward_proxy.v3.FilterConfig
dns_cache_config:
name: dynamic_forward_proxy_cache_config
dns_lookup_family: V4_ONLY
- name: envoy.router
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
clusters:
- name: base # Note: the direct API depends on the existence of a cluster with this name.
connect_timeout: {{ connect_timeout_seconds }}s
Expand Down Expand Up @@ -54,7 +86,19 @@ const char* config_template = R"(
cluster_name: base_wwan
endpoints: *base_endpoints
transport_socket: *base_transport_socket
upstream_connection_options: *upstream_opts
type: LOGICAL_DNS
- name: dynamic_forward_proxy_cluster
junr03 marked this conversation as resolved.
Show resolved Hide resolved
connect_timeout: 1s
lb_policy: CLUSTER_PROVIDED
cluster_type:
name: envoy.clusters.dynamic_forward_proxy
typed_config:
"@type": type.googleapis.com/envoy.extensions.clusters.dynamic_forward_proxy.v3.ClusterConfig
dns_cache_config:
name: dynamic_forward_proxy_cache_config
dns_lookup_family: V4_ONLY
transport_socket: *base_transport_socket
upstream_connection_options: *upstream_opts
- name: stats
connect_timeout: {{ connect_timeout_seconds }}s
Expand Down
4 changes: 3 additions & 1 deletion library/common/engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ envoy_status_t Engine::run(std::string config, std::string log_level) {
postinit_callback_handler_ = main_common_->server()->lifecycleNotifier().registerCallback(
Envoy::Server::ServerLifecycleNotifier::Stage::PostInit, [this]() -> void {
Server::Instance* server = TS_UNCHECKED_READ(main_common_)->server();
http_dispatcher_->ready(server->dispatcher(), server->clusterManager());
auto api_listener = server->listenerManager().apiListener()->get().http();
ASSERT(api_listener.has_value());
http_dispatcher_->ready(server->dispatcher(), api_listener.value());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this additional delay (beyond construction) still necessary? We're no longer waiting on an initial round of DNS resolution and the filter dynamic forwarding filter contains built in buffering logic while we do wait. If we're unsure, maybe insert a TODO to follow up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still suggest adding a TODO and/or issue for this so we still have visibility to follow up.

});
} // mutex_

Expand Down
4 changes: 3 additions & 1 deletion library/common/http/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,18 @@ envoy_cc_library(
"//library/common/buffer:bridge_fragment_lib",
"//library/common/buffer:utility_lib",
"//library/common/http:header_utility_lib",
"//library/common/network:synthetic_address_lib",
"//library/common/types:c_types_lib",
"@envoy//include/envoy/buffer:buffer_interface",
"@envoy//include/envoy/event:dispatcher_interface",
"@envoy//include/envoy/http:api_listener_interface",
"@envoy//include/envoy/http:async_client_interface",
"@envoy//include/envoy/http:header_map_interface",
"@envoy//include/envoy/upstream:cluster_manager_interface",
"@envoy//source/common/buffer:buffer_lib",
"@envoy//source/common/common:lock_guard_lib",
"@envoy//source/common/common:minimal_logger_lib",
"@envoy//source/common/common:thread_lib",
"@envoy//source/common/http:codec_helper_lib",
"@envoy//source/common/http:headers_lib",
"@envoy//source/common/http:utility_lib",
],
Expand Down
316 changes: 198 additions & 118 deletions library/common/http/dispatcher.cc

Large diffs are not rendered by default.

93 changes: 68 additions & 25 deletions library/common/http/dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
#include <unordered_map>

#include "envoy/buffer/buffer.h"
#include "envoy/event/deferred_deletable.h"
#include "envoy/event/dispatcher.h"
#include "envoy/http/api_listener.h"
#include "envoy/http/async_client.h"
#include "envoy/http/codec.h"
#include "envoy/http/header_map.h"
#include "envoy/upstream/cluster_manager.h"

#include "common/common/logger.h"
#include "common/common/thread.h"
#include "common/http/codec_helper.h"

#include "absl/types/optional.h"
#include "library/common/types/c_types.h"
Expand All @@ -25,7 +28,7 @@ class Dispatcher : public Logger::Loggable<Logger::Id::http> {
public:
Dispatcher(std::atomic<envoy_network_t>& preferred_network);

void ready(Event::Dispatcher& event_dispatcher, Upstream::ClusterManager& cluster_manager);
void ready(Event::Dispatcher& event_dispatcher, ApiListener& api_listener);

/**
* Attempts to open a new stream to the remote. Note that this function is asynchronous and
Expand Down Expand Up @@ -84,28 +87,35 @@ class Dispatcher : public Logger::Loggable<Logger::Id::http> {
envoy_status_t resetStream(envoy_stream_t stream);

private:
class DirectStream;

/**
* Notifies caller of async HTTP stream status.
* Note the HTTP stream is full-duplex, even if the local to remote stream has been ended
* by sendHeaders/sendData with end_stream=true, sendTrailers, or locallyCloseStream
* DirectStreamCallbacks can continue to receive events until the remote to local stream is
* closed, or resetStream is called.
*/
class DirectStreamCallbacks : public AsyncClient::StreamCallbacks,
public Logger::Loggable<Logger::Id::http> {
class DirectStreamCallbacks : public StreamEncoder, public Logger::Loggable<Logger::Id::http> {
public:
DirectStreamCallbacks(envoy_stream_t stream_handle, envoy_http_callbacks bridge_callbacks,
DirectStreamCallbacks(DirectStream& direct_stream, envoy_http_callbacks bridge_callbacks,
Dispatcher& http_dispatcher);

// AsyncClient::StreamCallbacks
void onHeaders(HeaderMapPtr&& headers, bool end_stream) override;
void onData(Buffer::Instance& data, bool end_stream) override;
void onTrailers(HeaderMapPtr&& trailers) override;
void onComplete() override;
void onReset() override;
void onReset();
void onCancel();
void closeRemote(bool end_stream);

// StreamEncoder
void encodeHeaders(const HeaderMap& headers, bool end_stream) override;
void encodeData(Buffer::Instance& data, bool end_stream) override;
void encodeTrailers(const HeaderMap& trailers) override;
Stream& getStream() override;
// TODO: implement
void encode100ContinueHeaders(const HeaderMap&) override {}
void encodeMetadata(const MetadataMapVector&) override {}

private:
const envoy_stream_t stream_handle_;
DirectStream& direct_stream_;
const envoy_http_callbacks bridge_callbacks_;
absl::optional<envoy_error_code_t> error_code_;
absl::optional<envoy_data> error_message_;
Expand All @@ -118,25 +128,59 @@ class Dispatcher : public Logger::Loggable<Logger::Id::http> {
* Contains state about an HTTP stream; both in the outgoing direction via an underlying
* AsyncClient::Stream and in the incoming direction via DirectStreamCallbacks.
*/
class DirectStream {
class DirectStream : public Stream,
public StreamCallbackHelper,
public Event::DeferredDeletable,
public Logger::Loggable<Logger::Id::http> {
public:
DirectStream(envoy_stream_t stream_handle, AsyncClient::Stream& underlying_stream,
DirectStreamCallbacksPtr&& callbacks);

static AsyncClient::StreamOptions toNativeStreamOptions(envoy_stream_options stream_options);
DirectStream(envoy_stream_t stream_handle, Dispatcher& http_dispatcher);

// Stream
void addCallbacks(StreamCallbacks& callbacks) override { addCallbacks_(callbacks); }
void removeCallbacks(StreamCallbacks& callbacks) override { removeCallbacks_(callbacks); }
void resetStream(StreamResetReason) override;
const Network::Address::InstanceConstSharedPtr& connectionLocalAddress() override {
return parent_.address_;
}
// FIXME: before merge. implement
void readDisable(bool) override {}
uint32_t bufferLimit() override { return 65000; }

void closeLocal(bool end_stream);
void closeRemote(bool end_stream);
bool complete();
/**
* Return whether a callback should be allowed to continue with execution.
* This ensures at most one 'terminal' callback is issued for any given stream.
* FIXME: before merge. should this be in the DirectStreamCallbacks?
*
* @param close, whether the DirectStream should close if it has not closed before.
* @return bool, whether callbacks on this stream are dispatchable or not.
*/
bool dispatchable(bool close);
junr03 marked this conversation as resolved.
Show resolved Hide resolved
/**
* Return whether a callback should be allowed to continue with execution.
* This ensures at most one 'terminal' callback is issued for any given stream.
* FIXME: before merge. should this be in the DirectStreamCallbacks?
*
* @return bool, whether callbacks on this stream are dispatchable or not.
*/
bool dispatchable();

const envoy_stream_t stream_handle_;
std::atomic<bool> closed_{};
bool local_closed_{};
bool remote_closed_{};
// Used to issue outgoing HTTP stream operations.
AsyncClient::Stream& underlying_stream_;
StreamDecoder* stream_decoder_;
// Used to receive incoming HTTP stream operations.
const DirectStreamCallbacksPtr callbacks_;
DirectStreamCallbacksPtr callbacks_;
Dispatcher& parent_;

HeaderMapPtr headers_;
// TODO: because the client may send infinite metadata frames we need some ongoing way to
// free metadata ahead of object destruction.
// An implementation option would be to have drainable header maps, or done callbacks.
std::vector<HeaderMapPtr> metadata_;
HeaderMapPtr trailers_;
};

using DirectStreamPtr = std::unique_ptr<DirectStream>;
Expand All @@ -146,9 +190,6 @@ class Dispatcher : public Logger::Loggable<Logger::Id::http> {
* @param callback, the functor to post.
*/
void post(Event::PostCb callback);
// Everything in the below interface must only be accessed from the event_dispatcher's thread.
// This allows us to generally avoid synchronization.
AsyncClient& getClient();
DirectStream* getStream(envoy_stream_t stream_handle);
void cleanup(envoy_stream_t stream_handle);

Expand All @@ -157,9 +198,11 @@ class Dispatcher : public Logger::Loggable<Logger::Id::http> {
Thread::MutexBasicLockable dispatch_lock_;
std::list<Event::PostCb> init_queue_ GUARDED_BY(dispatch_lock_);
Event::Dispatcher* event_dispatcher_ GUARDED_BY(dispatch_lock_){};
Upstream::ClusterManager* cluster_manager_ GUARDED_BY(dispatch_lock_){};
ApiListener* api_listener_ GUARDED_BY(dispatch_lock_){};
std::unordered_map<envoy_stream_t, DirectStreamPtr> streams_;
std::atomic<envoy_network_t>& preferred_network_;
// Shared synthetic address across DirectStreams.
Network::Address::InstanceConstSharedPtr address_;
};

} // namespace Http
Expand Down
10 changes: 7 additions & 3 deletions library/common/jni_interface.cc
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ static void jvm_on_complete(void* context) {
env->DeleteGlobalRef(j_context);
}

static void jvm_on_cancel(void* context) {
// FIXME: before merge. implement this.
}

// Utility functions
static void jni_delete_global_ref(void* context) {
JNIEnv* env = get_env();
Expand Down Expand Up @@ -294,9 +298,9 @@ extern "C" JNIEXPORT jint JNICALL Java_io_envoyproxy_envoymobile_engine_JniLibra

// TODO: To be truly safe we may need stronger guarantees of operation ordering on this ref
jobject retained_context = env->NewGlobalRef(j_context);
envoy_http_callbacks native_callbacks = {jvm_on_headers, jvm_on_data, jvm_on_metadata,
jvm_on_trailers, jvm_on_error, jvm_on_complete,
retained_context};
envoy_http_callbacks native_callbacks = {jvm_on_headers, jvm_on_data, jvm_on_metadata,
jvm_on_trailers, jvm_on_error, jvm_on_complete,
jvm_on_cancel, retained_context};
envoy_stream_options stream_options = {buffer_for_retry == JNI_TRUE ? true : false};
envoy_status_t result =
start_stream(static_cast<envoy_stream_t>(stream_handle), native_callbacks, stream_options);
Expand Down
12 changes: 12 additions & 0 deletions library/common/network/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
licenses(["notice"]) # Apache 2

load("@envoy//bazel:envoy_build_system.bzl", "envoy_cc_library", "envoy_package")

envoy_package()

envoy_cc_library(
name = "synthetic_address_lib",
hdrs = ["synthetic_address_impl.h"],
repository = "@envoy",
deps = ["@envoy//include/envoy/network:address_interface"],
)
57 changes: 57 additions & 0 deletions library/common/network/synthetic_address_impl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#pragma once

#include <cerrno>

#include "envoy/network/address.h"

namespace Envoy {
namespace Network {
namespace Address {

// TODO(junr03): https://github.com/envoyproxy/envoy/pull/9362/ introduced API surface to the
// codec's Stream interface that made it necessary for Stream to be aware of its underlying
// connection. This class is created in order to stub out connections for Stream implementations
// that have no backing connection, e.g Envoy Mobile's DirectStream. It might be possible to
// eliminate this dependency.
class SyntheticAddressImpl : public Instance {
public:
SyntheticAddressImpl() {}

bool operator==(const Instance&) const {
// Every synthetic address is different from one another and other address types. In reality,
// whatever object owns a synthetic address can't rely on address equality for any logic as the
// address is just a stub.
return false;
}

const std::string& asString() const { return address_; }

absl::string_view asStringView() const { return address_; }

const std::string& logicalName() const { return address_; }

Api::SysCallIntResult bind(int) const {
// a socket should never be bound to a synthetic address.
return {-1, EADDRNOTAVAIL};
}

Api::SysCallIntResult connect(int) const {
// a socket should never connect to a synthetic address.
return {-1, EPROTOTYPE};
}

const Ip* ip() const { return nullptr; }

IoHandlePtr socket(SocketType) const { return nullptr; }

Type type() const {
// TODO(junr03): consider adding another type of address.
return Address::Type::Ip;
}

private:
const std::string address_{"synthetic"};
};
} // namespace Address
} // namespace Network
} // namespace Envoy
9 changes: 9 additions & 0 deletions library/common/types/c_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,14 @@ typedef void (*envoy_on_error_f)(envoy_error error, void* context);
*/
typedef void (*envoy_on_complete_f)(void* context);

/**
* Called when the async HTTP stream has been cancelled by the application (FIXME: before merge.
* word choice).
* @param context, contains the necessary state to carry out platform-specific dispatch and
* execution.
*/
typedef void (*envoy_on_cancel_f)(void* context);
junr03 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Called when the envoy engine is exiting.
*/
Expand All @@ -214,6 +222,7 @@ typedef struct {
envoy_on_trailers_f on_trailers;
envoy_on_error_f on_error;
envoy_on_complete_f on_complete;
envoy_on_cancel_f on_cancel;
void* context; // Will be passed through to callbacks to provide dispatch and execution state.
} envoy_http_callbacks;

Expand Down
4 changes: 1 addition & 3 deletions library/objective-c/EnvoyEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,8 @@ typedef NSDictionary<NSString *, NSArray<NSString *> *> EnvoyHeaders;
/**
Cancel the stream. This functions as an interrupt, and aborts further callbacks and handling of the
stream.

@return Success unless the stream has already been canceled.
*/
- (int)cancel;
- (void)cancel;

/**
Clean up the stream after it's closed (by completion, cancellation, or error).
Expand Down
Loading