Skip to content

Commit

Permalink
Add Fabric8 Kubernetes client integration (#5167)
Browse files Browse the repository at this point in the history
Motivation:

Fabric8 is one of the popular Kubernetes client implementations. It has
an abstract layer for HTTP and WebSocket protocols.

If Armeria provides a Kubernetes client to better support the cloud
infrastructure, other useful functions such as #4497 will be able to be
implemented based on it.
 
Modifications:

- Add `kubernetes` module to provide Fabric `StandardHttpClient`.
  - `ArmeriaHttpClientFactory` is automatically activated via Java SPI.
- Both HTTP and WebSocket clients have been implemented in compliance
with Reactive Streams specification.
- WebSocket is working over HTTP/1. WebSocket over HTTP/2 is disabled
for compatibility.
  - Forked test suites from the upstream repo.
- Miscellaneous) 
  - Allow `ProxyConfig` to configure proxy headers.
- Allow `WebSocketClient` to configure `HttpHeaders` and
`RequestOptions` when starting a WebSocket session.
 
Result:

You can use the Fabric Kubernetes client on top of the Armeria client.
  • Loading branch information
ikhoon authored Jan 29, 2024
1 parent 1201845 commit d2afcd9
Show file tree
Hide file tree
Showing 44 changed files with 2,685 additions and 28 deletions.
76 changes: 76 additions & 0 deletions .github/workflows/e2e-chaos-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
name: E2E Tests

on:
push:
branches:
- main
tags-ignore:
# The release versions will be verified by 'publish-release.yml'
- armeria-*
pull_request:

concurrency:
group: ci-e2e-chaos-tests-${{ github.event.pull_request.number || github.sha }}
cancel-in-progress: true

env:
CHAOS_MESH_VERSION: 2.6.2
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GRADLE_ENTERPRISE_ACCESS_KEY }}

jobs:
chaos-tests:
name: Kubernetes Chaos test
runs-on: ubuntu-latest
timeout-minutes: 120
if: github.repository == 'line/armeria'
steps:
- uses: actions/checkout@v4

- id: setup-jdk-19
name: Setup Java 19
uses: actions/setup-java@v4
with:
distribution: "temurin"
java-version: 19

- name: Setup Minikube
id: minikube
uses: medyagh/setup-minikube@latest

- name: Install Chaos Mesh
run: |
curl -sSL https://mirrors.chaos-mesh.org/v${CHAOS_MESH_VERSION}/install.sh | bash
kubectl wait --for=condition=Ready pods --all-namespaces --all --timeout=600s
shell: bash

- name: Setup Gradle
uses: gradle/gradle-build-action@v2

- name: Build Chaos test images
run: |
# The images should be built in the minikube docker environment
eval $(minikube -p minikube docker-env)
./gradlew --no-daemon --stacktrace :it:kubernetes-chaos-tests:k8sBuild
shell: bash

- name: Run Chaos Tests - network-delay.yaml
env:
CHAOS_TEST: network-delay.yaml
run: |
./gradlew --no-daemon --stacktrace :it:kubernetes-chaos-tests:test
shell: bash

- name: Run Chaos Tests - network-loss.yaml
env:
CHAOS_TEST: network-loss.yaml
run: |
# --rerun-tasks is required to run the tests because only the environment variable is changed
./gradlew --no-daemon --stacktrace :it:kubernetes-chaos-tests:test --rerun-tasks
shell: bash

- name: Run Chaos Tests - network-duplicate.yaml
env:
CHAOS_TEST: network-duplicate.yaml
run: |
./gradlew --no-daemon --stacktrace :it:kubernetes-chaos-tests:test --rerun-tasks
shell: bash
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.linecorp.armeria.client;

import static com.linecorp.armeria.common.SessionProtocol.httpAndHttpsValues;
import static com.linecorp.armeria.internal.common.ArmeriaHttpUtil.toNettyHttp1ClientHeaders;

import java.lang.reflect.Array;
import java.net.InetSocketAddress;
Expand Down Expand Up @@ -64,6 +65,7 @@
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.proxy.HttpProxyHandler;
import io.netty.handler.proxy.ProxyConnectException;
import io.netty.handler.proxy.ProxyHandler;
Expand Down Expand Up @@ -131,10 +133,11 @@ private void configureProxy(Channel ch, ProxyConfig proxyConfig, SessionProtocol
final ConnectProxyConfig connectProxyConfig = (ConnectProxyConfig) proxyConfig;
final String username = connectProxyConfig.username();
final String password = connectProxyConfig.password();
final HttpHeaders proxyHeaders = toNettyHttp1ClientHeaders(connectProxyConfig.headers());
if (username == null || password == null) {
proxyHandler = new HttpProxyHandler(proxyAddress);
proxyHandler = new HttpProxyHandler(proxyAddress, proxyHeaders);
} else {
proxyHandler = new HttpProxyHandler(proxyAddress, username, password);
proxyHandler = new HttpProxyHandler(proxyAddress, username, password, proxyHeaders);
}
break;
case HAPROXY:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.base.MoreObjects;

import com.linecorp.armeria.common.HttpHeaders;
import com.linecorp.armeria.common.annotation.Nullable;

/**
Expand All @@ -36,13 +37,16 @@ public final class ConnectProxyConfig extends ProxyConfig {
@Nullable
private final String password;

private final HttpHeaders headers;

private final boolean useTls;

ConnectProxyConfig(InetSocketAddress proxyAddress, @Nullable String username,
@Nullable String password, boolean useTls) {
@Nullable String password, HttpHeaders headers, boolean useTls) {
this.proxyAddress = proxyAddress;
this.username = username;
this.password = password;
this.headers = headers;
this.useTls = useTls;
}

Expand All @@ -67,6 +71,13 @@ public String password() {
return password;
}

/**
* Returns the configured {@link HttpHeaders}.
*/
public HttpHeaders headers() {
return headers;
}

/**
* Returns whether ssl is enabled.
*/
Expand All @@ -91,12 +102,13 @@ public boolean equals(@Nullable Object o) {
return useTls == that.useTls &&
proxyAddress.equals(that.proxyAddress) &&
Objects.equals(username, that.username) &&
Objects.equals(password, that.password);
Objects.equals(password, that.password) &&
headers.equals(that.headers);
}

@Override
public int hashCode() {
return Objects.hash(proxyAddress, username, password, useTls);
return Objects.hash(proxyAddress, username, password, headers, useTls);
}

@Override
Expand All @@ -106,6 +118,8 @@ public String toString() {
.add("proxyAddress", proxyAddress())
.add("username", username())
.add("password", maskPassword(username(), password()))
// Headers are omitted since they may contain sensitive information such as
// (Proxy-)Authorization.
.add("useTls", useTls())
.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import java.net.InetSocketAddress;

import com.linecorp.armeria.client.ClientFactory;
import com.linecorp.armeria.common.HttpHeaders;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.annotation.UnstableApi;
import com.linecorp.armeria.server.ServiceRequestContext;

/**
Expand Down Expand Up @@ -88,7 +90,7 @@ public static Socks5ProxyConfig socks5(
public static ConnectProxyConfig connect(InetSocketAddress proxyAddress) {
requireNonNull(proxyAddress, "proxyAddress");
checkArgument(!proxyAddress.isUnresolved(), "proxyAddress must be resolved");
return new ConnectProxyConfig(proxyAddress, null, null, false);
return new ConnectProxyConfig(proxyAddress, null, null, HttpHeaders.of(), false);
}

/**
Expand All @@ -100,7 +102,7 @@ public static ConnectProxyConfig connect(InetSocketAddress proxyAddress) {
public static ConnectProxyConfig connect(InetSocketAddress proxyAddress, boolean useTls) {
requireNonNull(proxyAddress, "proxyAddress");
checkArgument(!proxyAddress.isUnresolved(), "proxyAddress must be resolved");
return new ConnectProxyConfig(proxyAddress, null, null, useTls);
return new ConnectProxyConfig(proxyAddress, null, null, HttpHeaders.of(), useTls);
}

/**
Expand All @@ -113,10 +115,42 @@ public static ConnectProxyConfig connect(InetSocketAddress proxyAddress, boolean
*/
public static ConnectProxyConfig connect(
InetSocketAddress proxyAddress, String username, String password, boolean useTls) {
return connect(proxyAddress, username, password, HttpHeaders.of(), useTls);
}

/**
* Creates a {@code ProxyConfig} configuration for CONNECT protocol.
*
* @param proxyAddress the proxy address
* @param headers the {@link HttpHeaders} to send to the proxy
* @param useTls whether to use TLS to connect to the proxy
*/
@UnstableApi
public static ConnectProxyConfig connect(
InetSocketAddress proxyAddress, HttpHeaders headers, boolean useTls) {
requireNonNull(proxyAddress, "proxyAddress");
checkArgument(!proxyAddress.isUnresolved(), "proxyAddress must be resolved");
return new ConnectProxyConfig(proxyAddress, null, null, headers, useTls);
}

/**
* Creates a {@code ProxyConfig} configuration for CONNECT protocol.
*
* @param proxyAddress the proxy address
* @param username the username
* @param password the password
* @param headers the {@link HttpHeaders} to send to the proxy
* @param useTls whether to use TLS to connect to the proxy
*/
@UnstableApi
public static ConnectProxyConfig connect(InetSocketAddress proxyAddress, String username, String password,
HttpHeaders headers, boolean useTls) {
requireNonNull(proxyAddress, "proxyAddress");
checkArgument(!proxyAddress.isUnresolved(), "proxyAddress must be resolved");
return new ConnectProxyConfig(proxyAddress, requireNonNull(username, "username"),
requireNonNull(password, "password"), useTls);
requireNonNull(username, "username");
requireNonNull(password, "password");
requireNonNull(headers, "headers");
return new ConnectProxyConfig(proxyAddress, username, password, headers, useTls);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@
import com.linecorp.armeria.client.ClientRequestContext;
import com.linecorp.armeria.client.ClientRequestContextCaptor;
import com.linecorp.armeria.client.Clients;
import com.linecorp.armeria.client.RequestOptions;
import com.linecorp.armeria.client.WebClient;
import com.linecorp.armeria.client.endpoint.EndpointGroup;
import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpHeaderNames;
import com.linecorp.armeria.common.HttpHeaders;
import com.linecorp.armeria.common.HttpMethod;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpResponse;
Expand Down Expand Up @@ -83,16 +85,17 @@ final class DefaultWebSocketClient implements WebSocketClient {
}

@Override
public CompletableFuture<WebSocketSession> connect(String path) {
public CompletableFuture<WebSocketSession> connect(String path, HttpHeaders headers,
RequestOptions requestOptions) {
requireNonNull(path, "path");
final RequestHeaders requestHeaders = webSocketHeaders(path);
final RequestHeaders requestHeaders = webSocketHeaders(path, headers);

final CompletableFuture<StreamMessage<HttpData>> outboundFuture = new CompletableFuture<>();
final HttpRequest request = HttpRequest.of(requestHeaders, StreamMessage.of(outboundFuture));
final HttpResponse response;
final ClientRequestContext ctx;
try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) {
response = webClient.execute(request);
response = webClient.execute(request, requestOptions);
ctx = captor.get();
}
final SplitHttpResponse split =
Expand Down Expand Up @@ -127,21 +130,27 @@ public CompletableFuture<WebSocketSession> connect(String path) {
return result;
}

private RequestHeaders webSocketHeaders(String path) {
final RequestHeadersBuilder builder;
private RequestHeaders webSocketHeaders(String path, HttpHeaders headers) {
final RequestHeadersBuilder builder = RequestHeaders.builder();
if (!headers.isEmpty()) {
headers.forEach((k, v) -> builder.add(k, v));
}

if (scheme().sessionProtocol().isExplicitHttp2()) {
builder = RequestHeaders.builder(HttpMethod.CONNECT, path)
.set(HttpHeaderNames.PROTOCOL, HttpHeaderValues.WEBSOCKET.toString());
builder.method(HttpMethod.CONNECT)
.path(path)
.set(HttpHeaderNames.PROTOCOL, HttpHeaderValues.WEBSOCKET.toString());
} else {
builder = RequestHeaders.builder(HttpMethod.GET, path)
.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE.toString())
.set(HttpHeaderNames.UPGRADE, HttpHeaderValues.WEBSOCKET.toString());
final String secWebSocketKey = generateSecWebSocketKey();
builder.set(HttpHeaderNames.SEC_WEBSOCKET_KEY, secWebSocketKey);
builder.method(HttpMethod.GET)
.path(path)
.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE.toString())
.set(HttpHeaderNames.UPGRADE, HttpHeaderValues.WEBSOCKET.toString())
.set(HttpHeaderNames.SEC_WEBSOCKET_KEY, secWebSocketKey);
}

builder.set(HttpHeaderNames.SEC_WEBSOCKET_VERSION, "13");
if (!subprotocols.isEmpty()) {
if (!builder.contains(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL) && !subprotocols.isEmpty()) {
builder.set(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL, joinedSubprotocols);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,18 @@

package com.linecorp.armeria.client.websocket;

import static com.linecorp.armeria.internal.client.ClientUtil.UNDEFINED_URI;
import static java.util.Objects.requireNonNull;

import java.net.URI;
import java.util.concurrent.CompletableFuture;

import com.linecorp.armeria.client.ClientBuilderParams;
import com.linecorp.armeria.client.ClientOptions;
import com.linecorp.armeria.client.RequestOptions;
import com.linecorp.armeria.client.WebClient;
import com.linecorp.armeria.client.endpoint.EndpointGroup;
import com.linecorp.armeria.common.HttpHeaders;
import com.linecorp.armeria.common.Scheme;
import com.linecorp.armeria.common.SerializationFormat;
import com.linecorp.armeria.common.SessionProtocol;
Expand Down Expand Up @@ -78,11 +81,11 @@ static WebSocketClient of() {
return DefaultWebSocketClient.DEFAULT;
}

/**
* Returns a new {@link WebSocketClient} that connects to the specified {@code uri} using the
* default options.
*/
static WebSocketClient of(String uri) {
/**
* Returns a new {@link WebSocketClient} that connects to the specified {@code uri} using the
* default options.
*/
static WebSocketClient of(String uri) {
return builder(uri).build();
}

Expand Down Expand Up @@ -142,6 +145,13 @@ static WebSocketClient of(SessionProtocol protocol, EndpointGroup endpointGroup,
return builder(protocol, endpointGroup, path).build();
}

/**
* Returns a new {@link WebSocketClientBuilder} without a base URI.
*/
static WebSocketClientBuilder builder() {
return builder(UNDEFINED_URI);
}

/**
* Returns a new {@link WebSocketClientBuilder} created with the specified base {@code uri}.
*/
Expand Down Expand Up @@ -214,8 +224,31 @@ static WebSocketClientBuilder builder(SessionProtocol protocol, EndpointGroup en

/**
* Connects to the specified {@code path}.
*
* <p>Note that the returned {@link CompletableFuture} is exceptionally completes with
* {@link WebSocketClientHandshakeException} if the handshake failed.
*/
default CompletableFuture<WebSocketSession> connect(String path) {
return connect(path, HttpHeaders.of());
}

/**
* Connects to the specified {@code path} with the specified headers.
*
* <p>Note that the returned {@link CompletableFuture} is exceptionally completes with
* {@link WebSocketClientHandshakeException} if the handshake failed.
*/
default CompletableFuture<WebSocketSession> connect(String path, HttpHeaders headers) {
return connect(path, headers, RequestOptions.of());
}

/**
* Connects to the specified {@code path} with the specified {@link HttpHeaders} and {@link RequestOptions}.
*
* <p>Note that the returned {@link CompletableFuture} is exceptionally completes with
* {@link WebSocketClientHandshakeException} if the handshake failed.
*/
CompletableFuture<WebSocketSession> connect(String path);
CompletableFuture<WebSocketSession> connect(String path, HttpHeaders headers, RequestOptions options);

@Override
WebClient unwrap();
Expand Down
Loading

0 comments on commit d2afcd9

Please sign in to comment.