From dde4a05c570df3316fa53d98f91ff5b5265c158e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=94=D0=B5=D0=BC=D0=B8=D0=B4=D0=BE=D0=B2=20=D0=98=D0=B2?= =?UTF-8?q?=D0=B0=D0=BD?= Date: Thu, 29 Aug 2024 22:09:11 +0500 Subject: [PATCH 01/12] NatsWebSocketOpts improvements (#610) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Separate model NatsWebSocketOpts * Сalled NatsTlsOpts.AuthenticateAsClientAsync parameters before passing it to the ConfigureWebSocketOpts * RequestHeaders overwrite the header specified in ConfigureWebSocketOpts --- .../Internal/WebSocketConnection.cs | 25 ++++++--- src/NATS.Client.Core/NatsConnection.cs | 4 +- src/NATS.Client.Core/NatsOpts.cs | 22 +------- src/NATS.Client.Core/NatsWebSocketOpts.cs | 28 ++++++++++ .../WebSocketOptionsTest.cs | 52 +++++++++++-------- 5 files changed, 81 insertions(+), 50 deletions(-) create mode 100644 src/NATS.Client.Core/NatsWebSocketOpts.cs diff --git a/src/NATS.Client.Core/Internal/WebSocketConnection.cs b/src/NATS.Client.Core/Internal/WebSocketConnection.cs index 74085e21e..857071c79 100644 --- a/src/NATS.Client.Core/Internal/WebSocketConnection.cs +++ b/src/NATS.Client.Core/Internal/WebSocketConnection.cs @@ -1,3 +1,4 @@ +using System.Net.Security; using System.Net.Sockets; using System.Net.WebSockets; using System.Runtime.CompilerServices; @@ -39,13 +40,14 @@ public Task ConnectAsync(Uri uri, CancellationToken cancellationToken) /// /// Connect with Timeout. When failed, Dispose this connection. /// - public async ValueTask ConnectAsync(Uri uri, NatsOpts opts) + public async ValueTask ConnectAsync(NatsUri uri, NatsOpts opts) { using var cts = new CancellationTokenSource(opts.ConnectTimeout); try { - await InvokeCallbackForClientWebSocketOptionsAsync(opts, uri, _socket.Options, cts.Token).ConfigureAwait(false); - await _socket.ConnectAsync(uri, cts.Token).ConfigureAwait(false); + var sslClientAuthenticationOptions = await opts.TlsOpts.AuthenticateAsClientOptionsAsync(uri).ConfigureAwait(true); + await InvokeCallbackForClientWebSocketOptionsAsync(opts, uri.Uri, _socket.Options, sslClientAuthenticationOptions, cts.Token).ConfigureAwait(false); + await _socket.ConnectAsync(uri.Uri, cts.Token).ConfigureAwait(false); } catch (Exception ex) { @@ -133,11 +135,22 @@ public void SignalDisconnected(Exception exception) } [MethodImpl(MethodImplOptions.AggressiveInlining)] - private async Task InvokeCallbackForClientWebSocketOptionsAsync(NatsOpts opts, Uri uri, ClientWebSocketOptions options, CancellationToken token) + private async Task InvokeCallbackForClientWebSocketOptionsAsync(NatsOpts opts, Uri uri, ClientWebSocketOptions clientWebSocketOptions, SslClientAuthenticationOptions? sslClientAuthenticationOptions, CancellationToken token) { - if (opts.ConfigureWebSocketOpts != null) + if (opts.NatsWebSocketOpts.ConfigureWebSocketOpts != null) { - await opts.ConfigureWebSocketOpts(uri, options, token).ConfigureAwait(false); + var x509CertificateCollection = sslClientAuthenticationOptions?.ClientCertificates; + var remoteCertificateValidationCallback = sslClientAuthenticationOptions?.RemoteCertificateValidationCallback; + + await opts.NatsWebSocketOpts.ConfigureWebSocketOpts(uri, clientWebSocketOptions, token, x509CertificateCollection, remoteCertificateValidationCallback).ConfigureAwait(false); + } + + if (opts.NatsWebSocketOpts.RequestHeaders != null) + { + foreach (var (name, value) in opts.NatsWebSocketOpts.RequestHeaders) + { + clientWebSocketOptions.SetRequestHeader(name, value); + } } } } diff --git a/src/NATS.Client.Core/NatsConnection.cs b/src/NATS.Client.Core/NatsConnection.cs index 6e4967b97..8cb53df30 100644 --- a/src/NATS.Client.Core/NatsConnection.cs +++ b/src/NATS.Client.Core/NatsConnection.cs @@ -318,7 +318,7 @@ private async ValueTask InitialConnectAsync() if (uri.IsWebSocket) { var conn = new WebSocketConnection(); - await conn.ConnectAsync(uri.Uri, Opts).ConfigureAwait(false); + await conn.ConnectAsync(uri, Opts).ConfigureAwait(false); _socket = conn; } else @@ -606,7 +606,7 @@ private async void ReconnectLoop() { _logger.LogDebug(NatsLogEvents.Connection, "Trying to reconnect using WebSocket {Url} [{ReconnectCount}]", url, reconnectCount); var conn = new WebSocketConnection(); - await conn.ConnectAsync(url.Uri, Opts).ConfigureAwait(false); + await conn.ConnectAsync(url, Opts).ConfigureAwait(false); _socket = conn; } else diff --git a/src/NATS.Client.Core/NatsOpts.cs b/src/NATS.Client.Core/NatsOpts.cs index 49b1ef1d9..5a13f582e 100644 --- a/src/NATS.Client.Core/NatsOpts.cs +++ b/src/NATS.Client.Core/NatsOpts.cs @@ -115,27 +115,7 @@ public sealed record NatsOpts /// public BoundedChannelFullMode SubPendingChannelFullMode { get; init; } = BoundedChannelFullMode.DropNewest; - /// - /// An optional async callback handler for manipulation of ClientWebSocketOptions used for WebSocket connections. - /// - /// - /// This can be used to set authorization header and other HTTP header values. - /// Note: Setting HTTP header values is not supported by Blazor WebAssembly as the underlying browser implementation does not support adding headers to a WebSocket. - /// The callback's execution time contributes to the connection establishment subject to the . - /// Implementors should use the passed CancellationToken for async operations called by this handler. - /// - /// - /// await using var nats = new NatsConnection(new NatsOpts - /// { - /// Url = "ws://localhost:8080", - /// ConfigureWebSocketOpts = (serverUri, clientWsOpts, ct) => - /// { - /// clientWsOpts.SetRequestHeader("authorization", $"Bearer MY_TOKEN"); - /// return ValueTask.CompletedTask; - /// }, - /// }); - /// - public Func? ConfigureWebSocketOpts { get; init; } = null; + public NatsWebSocketOpts NatsWebSocketOpts { get; init; } = NatsWebSocketOpts.Default; internal NatsUri[] GetSeedUris() { diff --git a/src/NATS.Client.Core/NatsWebSocketOpts.cs b/src/NATS.Client.Core/NatsWebSocketOpts.cs new file mode 100644 index 000000000..cbdae6572 --- /dev/null +++ b/src/NATS.Client.Core/NatsWebSocketOpts.cs @@ -0,0 +1,28 @@ +using System.Net.Security; +using System.Net.WebSockets; +using System.Security.Cryptography.X509Certificates; +using Microsoft.Extensions.Primitives; + +namespace NATS.Client.Core; + +/// +/// Options for ClientWebSocketOptions +/// +public sealed record NatsWebSocketOpts +{ + public static readonly NatsWebSocketOpts Default = new(); + + /// + /// An optional, HTTP headers adding to clientWebSocketOptions + /// + /// + /// Note: Setting HTTP header values is not supported by Blazor WebAssembly as the underlying browser implementation does not support adding headers to a WebSocket. + /// + public Dictionary? RequestHeaders { get; init; } + + /// + /// An optional, async callback handler for manipulation of ClientWebSocketOptions used for WebSocket connections. + /// Implementors should use the passed CancellationToken for async operations called by this handler. + /// + public Func? ConfigureWebSocketOpts { get; init; } = null; +} diff --git a/tests/NATS.Client.Core.Tests/WebSocketOptionsTest.cs b/tests/NATS.Client.Core.Tests/WebSocketOptionsTest.cs index fce13df15..f31eb9641 100644 --- a/tests/NATS.Client.Core.Tests/WebSocketOptionsTest.cs +++ b/tests/NATS.Client.Core.Tests/WebSocketOptionsTest.cs @@ -16,7 +16,7 @@ public async Task MockWebsocketServer_PubSubWithCancelAndReconnect_ShouldCallbac List pubs = new(); await using var server = new MockServer( - handler: (client, cmd) => + (client, cmd) => { if (cmd.Name == "PUB") { @@ -32,12 +32,12 @@ public async Task MockWebsocketServer_PubSubWithCancelAndReconnect_ShouldCallbac return Task.CompletedTask; }, Log, - info: $"{{\"max_payload\":{1024 * 4}}}", - cancellationToken: cts.Token); + $"{{\"max_payload\":{1024 * 4}}}", + cts.Token); await using var wsServer = new WebSocketMockServer( server.Url, - connectHandler: (httpContext) => + httpContext => { return true; }, @@ -56,12 +56,15 @@ public async Task MockWebsocketServer_PubSubWithCancelAndReconnect_ShouldCallbac { Url = wsServer.WebSocketUrl, LoggerFactory = testLogger, - ConfigureWebSocketOpts = (serverUri, clientWsOpts, ct) => + NatsWebSocketOpts = new NatsWebSocketOpts { - tokenCount++; - Log($"[C] ConfigureWebSocketOpts {serverUri}, accessToken TOKEN_{tokenCount}"); - clientWsOpts.SetRequestHeader("authorization", $"Bearer TOKEN_{tokenCount}"); - return ValueTask.CompletedTask; + ConfigureWebSocketOpts = (serverUri, clientWsOpts, certificate, remoteCertificateValidationCallback, ct) => + { + tokenCount++; + Log($"[C] ConfigureWebSocketOpts {serverUri}, accessToken TOKEN_{tokenCount}"); + clientWsOpts.SetRequestHeader("authorization", $"Bearer TOKEN_{tokenCount}"); + return ValueTask.CompletedTask; + }, }, }); @@ -209,18 +212,23 @@ public async Task HttpErrorDuringReconnect_ShouldContinueToReconnect() Log($"[NC] {m.Message}"); }); - await using var nats = new NatsConnection(new NatsOpts + var natsOpts = new NatsOpts { Url = wsServer.WebSocketUrl, LoggerFactory = testLogger, - ConfigureWebSocketOpts = (serverUri, clientWsOpts, ct) => + NatsWebSocketOpts = new NatsWebSocketOpts() { - tokenCount++; - Log($"[C] ConfigureWebSocketOpts {serverUri}, accessToken TOKEN_{tokenCount}"); - clientWsOpts.SetRequestHeader("authorization", $"Bearer TOKEN_{tokenCount}"); - return ValueTask.CompletedTask; + ConfigureWebSocketOpts = (serverUri, clientWsOpts, certificateCollection, remoteCertificateValidationCallBack, ct) => + { + tokenCount++; + Log($"[C] ConfigureWebSocketOpts {serverUri}, accessToken TOKEN_{tokenCount}"); + clientWsOpts.SetRequestHeader("authorization", $"Bearer TOKEN_{tokenCount}"); + return ValueTask.CompletedTask; + }, }, - }); + }; + + await using var nats = new NatsConnection(natsOpts); Log($"[C] connect {server.Url}"); @@ -297,18 +305,20 @@ void Log(string m) Log($"[NC] {m.Message}"); }); - await using var nats = new NatsConnection(new NatsOpts + var natsOpts = new NatsOpts { Url = "ws://localhost:1234", LoggerFactory = testLogger, - ConfigureWebSocketOpts = (serverUri, clientWsOpts, ct) => + NatsWebSocketOpts = new NatsWebSocketOpts() { - throw new Exception("Error in callback"); + ConfigureWebSocketOpts = (_, _, _, _, _) + => throw new Exception("Error in callback"), }, - }); + }; + await using var nats = new NatsConnection(natsOpts); // expect: NATS.Client.Core.NatsException : can not connect uris: ws://localhost:1234 - var exception = await Assert.ThrowsAsync(async () => await nats.ConnectAsync()); + await Assert.ThrowsAsync(async () => await nats.ConnectAsync()); } private void Log(string m) From c39da392daf90dc615ff4f3bca4b0f0f839c63a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=94=D0=B5=D0=BC=D0=B8=D0=B4=D0=BE=D0=B2=20=D0=98=D0=B2?= =?UTF-8?q?=D0=B0=D0=BD?= Date: Fri, 30 Aug 2024 22:53:17 +0500 Subject: [PATCH 02/12] NatsWebSocketOpts improvements (#610) * Added test --- .../WebSocketOptionsTest.cs | 75 +++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/tests/NATS.Client.Core.Tests/WebSocketOptionsTest.cs b/tests/NATS.Client.Core.Tests/WebSocketOptionsTest.cs index f31eb9641..470a1544e 100644 --- a/tests/NATS.Client.Core.Tests/WebSocketOptionsTest.cs +++ b/tests/NATS.Client.Core.Tests/WebSocketOptionsTest.cs @@ -1,5 +1,7 @@ using System.Net; +using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Primitives; using NATS.Client.TestUtilities; namespace NATS.Client.Core.Tests; @@ -321,6 +323,79 @@ void Log(string m) await Assert.ThrowsAsync(async () => await nats.ConnectAsync()); } + [Fact] + public async Task HttpHeadersWebSocketServer_ShouldBeConsistsOfRequestHeadersHeaders() + { + var expectedHeaderValue = "HeaderFromDictionary"; + + void AssertAction(IHeaderDictionary requestHeaders) + { + var (_, value) = requestHeaders.Single(h => h.Key == "Header"); + Assert.Equal(expectedHeaderValue, value); + } + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + + await using var server = new MockServer( + handler: (client, cmd) => + { + if (cmd is { Name: "PUB", Subject: "close" }) + { + client.Close(); + } + + return Task.CompletedTask; + }, + Log, + info: $"{{\"max_payload\":{1024 * 4}}}", + cancellationToken: cts.Token); + + await using var wsServer = new WebSocketMockServer( + server.Url, + connectHandler: (httpContext) => + { + AssertAction(httpContext.Request.Headers); + return true; + }, + Log, + cts.Token); + + var natsOpts = new NatsOpts + { + Url = wsServer.WebSocketUrl, + NatsWebSocketOpts = new NatsWebSocketOpts + { + RequestHeaders = new Dictionary { { "Header", expectedHeaderValue } }, + ConfigureWebSocketOpts = (_, clientWsOpts, _, _, _) => + { + clientWsOpts.SetRequestHeader("Header", "HeaderFromCallBack"); + return ValueTask.CompletedTask; + }, + }, + }; + await using var nats = new NatsConnection(natsOpts); + + // expect: NATS.Client.Core.NatsException : can not connect uris: ws://localhost:1234 + await nats.ConnectAsync(); + + await nats.PublishAsync("close", "x", cancellationToken: cts.Token); + + for (var i = 1; i <= 10; i++) + { + try + { + await nats.PingAsync(cts.Token); + break; + } + catch (OperationCanceledException) + { + if (i == 10) + throw; + await Task.Delay(100 * i, cts.Token); + } + } + } + private void Log(string m) { lock (_logs) From 863e41588fe34cf91030c98a83034139a2769e85 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Sun, 25 Aug 2024 21:29:48 +0100 Subject: [PATCH 03/12] Build fixes --- .github/workflows/perf.yml | 7 +++---- .github/workflows/test.yml | 10 ++++------ 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/.github/workflows/perf.yml b/.github/workflows/perf.yml index 5ac95c646..e60b70b12 100644 --- a/.github/workflows/perf.yml +++ b/.github/workflows/perf.yml @@ -24,16 +24,15 @@ jobs: steps: - name: Install nats run: | - rel=$(curl -H 'Authorization: token ${{ secrets.AUTH_TOKEN_FOR_GITHUB_API }}' -s https://api.github.com/repos/nats-io/natscli/releases/latest | jq -r .tag_name | sed s/v//) + rel=$(curl https://api.mtmk.net/gh/v1/releases/tag/nats-io/natscli/latest | sed s/v//) wget https://github.com/nats-io/natscli/releases/download/v$rel/nats-$rel-linux-amd64.zip unzip nats-$rel-linux-amd64.zip sudo mv nats-$rel-linux-amd64/nats /usr/local/bin - gh_api_url="https://api.github.com/repos/nats-io/nats-server/releases" branch="${{ matrix.config.branch }}" if [[ $branch == "v"* ]]; then - branch=$(curl -H 'Authorization: token ${{ secrets.AUTH_TOKEN_FOR_GITHUB_API }}' -s $gh_api_url | jq -r '.[].tag_name' | grep $branch | sort -V | tail -1) + branch=$(curl https://api.mtmk.net/gh/v1/releases/tag/nats-io/nats-server/$branch) elif [[ $branch == "latest" ]]; then - branch=$(curl -H 'Authorization: token ${{ secrets.AUTH_TOKEN_FOR_GITHUB_API }}' -s $gh_api_url/latest | jq -r .tag_name) + branch=$(curl https://api.mtmk.net/gh/v1/releases/tag/nats-io/nats-server/latest) fi for i in 1 2 3 do diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 2fca6fd31..58087512b 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -24,12 +24,11 @@ jobs: steps: - name: Install nats-server run: | - gh_api_url="https://api.github.com/repos/nats-io/nats-server/releases" branch="${{ matrix.config.branch }}" if [[ $branch == "v"* ]]; then - branch=$(curl -H 'Authorization: token ${{ secrets.AUTH_TOKEN_FOR_GITHUB_API }}' -s $gh_api_url | jq -r '.[].tag_name' | grep $branch | sort -V | tail -1) + branch=$(curl https://api.mtmk.net/gh/v1/releases/tag/nats-io/nats-server/$branch) elif [[ $branch == "latest" ]]; then - branch=$(curl -H 'Authorization: token ${{ secrets.AUTH_TOKEN_FOR_GITHUB_API }}' -s $gh_api_url/latest | jq -r .tag_name) + branch=$(curl https://api.mtmk.net/gh/v1/releases/tag/nats-io/nats-server/latest) fi for i in 1 2 3 do @@ -119,12 +118,11 @@ jobs: shell: bash run: | mkdir tools-nats-server && cd tools-nats-server - gh_api_url="https://api.github.com/repos/nats-io/nats-server/releases" branch="${{ matrix.config.branch }}" if [[ $branch == "v"* ]]; then - branch=$(curl -H 'Authorization: token ${{ secrets.AUTH_TOKEN_FOR_GITHUB_API }}' -s $gh_api_url | jq -r '.[].tag_name' | grep $branch | sort -V | tail -1) + branch=$(curl https://api.mtmk.net/gh/v1/releases/tag/nats-io/nats-server/$branch) elif [[ $branch == "latest" ]]; then - branch=$(curl -H 'Authorization: token ${{ secrets.AUTH_TOKEN_FOR_GITHUB_API }}' -s $gh_api_url/latest | jq -r .tag_name) + branch=$(curl https://api.mtmk.net/gh/v1/releases/tag/nats-io/nats-server/latest) fi for i in 1 2 3 do From a234b420d88a0ee786df320a9406510c71ccb864 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Mon, 26 Aug 2024 11:54:45 +0100 Subject: [PATCH 04/12] Build fixes --- .github/workflows/perf.yml | 7 +------ .github/workflows/test.yml | 14 ++------------ 2 files changed, 3 insertions(+), 18 deletions(-) diff --git a/.github/workflows/perf.yml b/.github/workflows/perf.yml index e60b70b12..85c4efd7f 100644 --- a/.github/workflows/perf.yml +++ b/.github/workflows/perf.yml @@ -28,12 +28,7 @@ jobs: wget https://github.com/nats-io/natscli/releases/download/v$rel/nats-$rel-linux-amd64.zip unzip nats-$rel-linux-amd64.zip sudo mv nats-$rel-linux-amd64/nats /usr/local/bin - branch="${{ matrix.config.branch }}" - if [[ $branch == "v"* ]]; then - branch=$(curl https://api.mtmk.net/gh/v1/releases/tag/nats-io/nats-server/$branch) - elif [[ $branch == "latest" ]]; then - branch=$(curl https://api.mtmk.net/gh/v1/releases/tag/nats-io/nats-server/latest) - fi + branch=$(curl https://api.mtmk.net/gh/v1/releases/tag/nats-io/nats-server/${{ matrix.config.branch }}) for i in 1 2 3 do curl -sf https://binaries.nats.dev/nats-io/nats-server/v2@$branch | PREFIX=. sh && break || sleep 30 diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 58087512b..ca5e1605f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -24,12 +24,7 @@ jobs: steps: - name: Install nats-server run: | - branch="${{ matrix.config.branch }}" - if [[ $branch == "v"* ]]; then - branch=$(curl https://api.mtmk.net/gh/v1/releases/tag/nats-io/nats-server/$branch) - elif [[ $branch == "latest" ]]; then - branch=$(curl https://api.mtmk.net/gh/v1/releases/tag/nats-io/nats-server/latest) - fi + branch=$(curl https://api.mtmk.net/gh/v1/releases/tag/nats-io/nats-server/${{ matrix.config.branch }}) for i in 1 2 3 do curl -sf https://binaries.nats.dev/nats-io/nats-server/v2@$branch | PREFIX=. sh && break || sleep 30 @@ -118,12 +113,7 @@ jobs: shell: bash run: | mkdir tools-nats-server && cd tools-nats-server - branch="${{ matrix.config.branch }}" - if [[ $branch == "v"* ]]; then - branch=$(curl https://api.mtmk.net/gh/v1/releases/tag/nats-io/nats-server/$branch) - elif [[ $branch == "latest" ]]; then - branch=$(curl https://api.mtmk.net/gh/v1/releases/tag/nats-io/nats-server/latest) - fi + branch=$(curl https://api.mtmk.net/gh/v1/releases/tag/nats-io/nats-server/${{ matrix.config.branch }}) for i in 1 2 3 do curl -sf https://binaries.nats.dev/nats-io/nats-server/v2@$branch | PREFIX=. sh && break || sleep 30 From 9fa2f180dccb41dd2318a9c8870dff7ca7fd89c1 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Mon, 26 Aug 2024 20:49:24 +0100 Subject: [PATCH 05/12] Build fixes --- .github/workflows/perf.yml | 4 ++-- .github/workflows/test.yml | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/perf.yml b/.github/workflows/perf.yml index 85c4efd7f..c814e556a 100644 --- a/.github/workflows/perf.yml +++ b/.github/workflows/perf.yml @@ -24,11 +24,11 @@ jobs: steps: - name: Install nats run: | - rel=$(curl https://api.mtmk.net/gh/v1/releases/tag/nats-io/natscli/latest | sed s/v//) + rel=$(curl https://api.mtmk.dev/gh/v1/releases/tag/nats-io/natscli/latest | sed s/v//) wget https://github.com/nats-io/natscli/releases/download/v$rel/nats-$rel-linux-amd64.zip unzip nats-$rel-linux-amd64.zip sudo mv nats-$rel-linux-amd64/nats /usr/local/bin - branch=$(curl https://api.mtmk.net/gh/v1/releases/tag/nats-io/nats-server/${{ matrix.config.branch }}) + branch=$(curl https://api.mtmk.dev/gh/v1/releases/tag/nats-io/nats-server/${{ matrix.config.branch }}) for i in 1 2 3 do curl -sf https://binaries.nats.dev/nats-io/nats-server/v2@$branch | PREFIX=. sh && break || sleep 30 diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index ca5e1605f..3636ec585 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -24,7 +24,7 @@ jobs: steps: - name: Install nats-server run: | - branch=$(curl https://api.mtmk.net/gh/v1/releases/tag/nats-io/nats-server/${{ matrix.config.branch }}) + branch=$(curl https://api.mtmk.dev/gh/v1/releases/tag/nats-io/nats-server/${{ matrix.config.branch }}) for i in 1 2 3 do curl -sf https://binaries.nats.dev/nats-io/nats-server/v2@$branch | PREFIX=. sh && break || sleep 30 @@ -113,7 +113,7 @@ jobs: shell: bash run: | mkdir tools-nats-server && cd tools-nats-server - branch=$(curl https://api.mtmk.net/gh/v1/releases/tag/nats-io/nats-server/${{ matrix.config.branch }}) + branch=$(curl https://api.mtmk.dev/gh/v1/releases/tag/nats-io/nats-server/${{ matrix.config.branch }}) for i in 1 2 3 do curl -sf https://binaries.nats.dev/nats-io/nats-server/v2@$branch | PREFIX=. sh && break || sleep 30 From 718eef7e4b7e91c0de063783f279b220ead30948 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Fri, 30 Aug 2024 20:43:53 +0100 Subject: [PATCH 06/12] dotnet format --- src/NATS.Client.Core/NatsWebSocketOpts.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/NATS.Client.Core/NatsWebSocketOpts.cs b/src/NATS.Client.Core/NatsWebSocketOpts.cs index cbdae6572..019aa595a 100644 --- a/src/NATS.Client.Core/NatsWebSocketOpts.cs +++ b/src/NATS.Client.Core/NatsWebSocketOpts.cs @@ -1,4 +1,4 @@ -using System.Net.Security; +using System.Net.Security; using System.Net.WebSockets; using System.Security.Cryptography.X509Certificates; using Microsoft.Extensions.Primitives; From 4da55d4b018a28d84f5f0f84b9d59a0ecd876838 Mon Sep 17 00:00:00 2001 From: Caleb Lloyd Date: Wed, 4 Sep 2024 17:23:23 -0400 Subject: [PATCH 07/12] #623 reccomendations Signed-off-by: Caleb Lloyd --- .../Internal/WebSocketConnection.cs | 23 +-------- src/NATS.Client.Core/NatsOpts.cs | 4 +- src/NATS.Client.Core/NatsTlsOpts.cs | 2 +- src/NATS.Client.Core/NatsWebSocketOpts.cs | 51 ++++++++++++++++--- .../WebSocketOptionsTest.cs | 16 +++--- 5 files changed, 56 insertions(+), 40 deletions(-) diff --git a/src/NATS.Client.Core/Internal/WebSocketConnection.cs b/src/NATS.Client.Core/Internal/WebSocketConnection.cs index 857071c79..60c4e4649 100644 --- a/src/NATS.Client.Core/Internal/WebSocketConnection.cs +++ b/src/NATS.Client.Core/Internal/WebSocketConnection.cs @@ -45,8 +45,7 @@ public async ValueTask ConnectAsync(NatsUri uri, NatsOpts opts) using var cts = new CancellationTokenSource(opts.ConnectTimeout); try { - var sslClientAuthenticationOptions = await opts.TlsOpts.AuthenticateAsClientOptionsAsync(uri).ConfigureAwait(true); - await InvokeCallbackForClientWebSocketOptionsAsync(opts, uri.Uri, _socket.Options, sslClientAuthenticationOptions, cts.Token).ConfigureAwait(false); + await opts.WebSocketOpts.ApplyClientWebSocketOptionsAsync(_socket.Options, uri, opts.TlsOpts, cts.Token).ConfigureAwait(false); await _socket.ConnectAsync(uri.Uri, cts.Token).ConfigureAwait(false); } catch (Exception ex) @@ -133,24 +132,4 @@ public void SignalDisconnected(Exception exception) { _waitForClosedSource.TrySetResult(exception); } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private async Task InvokeCallbackForClientWebSocketOptionsAsync(NatsOpts opts, Uri uri, ClientWebSocketOptions clientWebSocketOptions, SslClientAuthenticationOptions? sslClientAuthenticationOptions, CancellationToken token) - { - if (opts.NatsWebSocketOpts.ConfigureWebSocketOpts != null) - { - var x509CertificateCollection = sslClientAuthenticationOptions?.ClientCertificates; - var remoteCertificateValidationCallback = sslClientAuthenticationOptions?.RemoteCertificateValidationCallback; - - await opts.NatsWebSocketOpts.ConfigureWebSocketOpts(uri, clientWebSocketOptions, token, x509CertificateCollection, remoteCertificateValidationCallback).ConfigureAwait(false); - } - - if (opts.NatsWebSocketOpts.RequestHeaders != null) - { - foreach (var (name, value) in opts.NatsWebSocketOpts.RequestHeaders) - { - clientWebSocketOptions.SetRequestHeader(name, value); - } - } - } } diff --git a/src/NATS.Client.Core/NatsOpts.cs b/src/NATS.Client.Core/NatsOpts.cs index 5a13f582e..2c2245b2a 100644 --- a/src/NATS.Client.Core/NatsOpts.cs +++ b/src/NATS.Client.Core/NatsOpts.cs @@ -28,6 +28,8 @@ public sealed record NatsOpts public NatsTlsOpts TlsOpts { get; init; } = NatsTlsOpts.Default; + public NatsWebSocketOpts WebSocketOpts { get; init; } = NatsWebSocketOpts.Default; + public INatsSerializerRegistry SerializerRegistry { get; init; } = NatsDefaultSerializerRegistry.Default; public ILoggerFactory LoggerFactory { get; init; } = NullLoggerFactory.Instance; @@ -115,8 +117,6 @@ public sealed record NatsOpts /// public BoundedChannelFullMode SubPendingChannelFullMode { get; init; } = BoundedChannelFullMode.DropNewest; - public NatsWebSocketOpts NatsWebSocketOpts { get; init; } = NatsWebSocketOpts.Default; - internal NatsUri[] GetSeedUris() { var urls = Url.Split(','); diff --git a/src/NATS.Client.Core/NatsTlsOpts.cs b/src/NATS.Client.Core/NatsTlsOpts.cs index 737184f56..4981d655b 100644 --- a/src/NATS.Client.Core/NatsTlsOpts.cs +++ b/src/NATS.Client.Core/NatsTlsOpts.cs @@ -107,7 +107,7 @@ internal bool HasTlsCerts internal TlsMode EffectiveMode(NatsUri uri) => Mode switch { - TlsMode.Auto => HasTlsCerts || uri.Uri.Scheme.ToLower() == "tls" ? TlsMode.Require : TlsMode.Prefer, + TlsMode.Auto => HasTlsCerts || uri.Uri.Scheme.ToLower() == "tls" || uri.Uri.Scheme.ToLower() == "wss" ? TlsMode.Require : TlsMode.Prefer, _ => Mode, }; diff --git a/src/NATS.Client.Core/NatsWebSocketOpts.cs b/src/NATS.Client.Core/NatsWebSocketOpts.cs index 019aa595a..66a3f1459 100644 --- a/src/NATS.Client.Core/NatsWebSocketOpts.cs +++ b/src/NATS.Client.Core/NatsWebSocketOpts.cs @@ -1,7 +1,6 @@ -using System.Net.Security; using System.Net.WebSockets; -using System.Security.Cryptography.X509Certificates; using Microsoft.Extensions.Primitives; +using NATS.Client.Core.Internal; namespace NATS.Client.Core; @@ -13,16 +12,54 @@ public sealed record NatsWebSocketOpts public static readonly NatsWebSocketOpts Default = new(); /// - /// An optional, HTTP headers adding to clientWebSocketOptions + /// An optional dictionary of HTTP request headers to be sent with the WebSocket request. /// /// - /// Note: Setting HTTP header values is not supported by Blazor WebAssembly as the underlying browser implementation does not support adding headers to a WebSocket. + /// Note: this setting will be ignored when running in the Browser, such as when using Blazor WebAssembly, + /// as the underlying Browser implementation does not support adding headers to a WebSocket. /// - public Dictionary? RequestHeaders { get; init; } + public IDictionary? RequestHeaders { get; init; } /// - /// An optional, async callback handler for manipulation of ClientWebSocketOptions used for WebSocket connections. + /// An optional async callback handler for manipulation of ClientWebSocketOptions used for WebSocket connections. /// Implementors should use the passed CancellationToken for async operations called by this handler. /// - public Func? ConfigureWebSocketOpts { get; init; } = null; + public Func? ConfigureClientWebSocketOptions { get; init; } = null; + + internal async ValueTask ApplyClientWebSocketOptionsAsync( + ClientWebSocketOptions clientWebSocketOptions, + NatsUri uri, + NatsTlsOpts tlsOpts, + CancellationToken cancellationToken) + { + // todo: test that this doesn't throw in Blazor, otherwise we need a way to detect Blazor and skip + if (RequestHeaders != null) + { + foreach (var entry in RequestHeaders) + { + foreach (var value in entry.Value) + { + clientWebSocketOptions.SetRequestHeader(entry.Key, value); + } + } + } + + if (tlsOpts.TryTls(uri)) + { + var authenticateAsClientOptions = await tlsOpts.AuthenticateAsClientOptionsAsync(uri).ConfigureAwait(false); + if (authenticateAsClientOptions.ClientCertificates != null) + { + clientWebSocketOptions.ClientCertificates = authenticateAsClientOptions.ClientCertificates; + } + +#if !NETSTANDARD2_0 + clientWebSocketOptions.RemoteCertificateValidationCallback = authenticateAsClientOptions.RemoteCertificateValidationCallback; +#endif + } + + if (ConfigureClientWebSocketOptions != null) + { + await ConfigureClientWebSocketOptions(uri.Uri, clientWebSocketOptions, cancellationToken).ConfigureAwait(false); + } + } } diff --git a/tests/NATS.Client.Core.Tests/WebSocketOptionsTest.cs b/tests/NATS.Client.Core.Tests/WebSocketOptionsTest.cs index 470a1544e..03b08dd19 100644 --- a/tests/NATS.Client.Core.Tests/WebSocketOptionsTest.cs +++ b/tests/NATS.Client.Core.Tests/WebSocketOptionsTest.cs @@ -58,9 +58,9 @@ public async Task MockWebsocketServer_PubSubWithCancelAndReconnect_ShouldCallbac { Url = wsServer.WebSocketUrl, LoggerFactory = testLogger, - NatsWebSocketOpts = new NatsWebSocketOpts + WebSocketOpts = new NatsWebSocketOpts { - ConfigureWebSocketOpts = (serverUri, clientWsOpts, certificate, remoteCertificateValidationCallback, ct) => + ConfigureClientWebSocketOptions = (serverUri, clientWsOpts, _) => { tokenCount++; Log($"[C] ConfigureWebSocketOpts {serverUri}, accessToken TOKEN_{tokenCount}"); @@ -218,9 +218,9 @@ public async Task HttpErrorDuringReconnect_ShouldContinueToReconnect() { Url = wsServer.WebSocketUrl, LoggerFactory = testLogger, - NatsWebSocketOpts = new NatsWebSocketOpts() + WebSocketOpts = new NatsWebSocketOpts() { - ConfigureWebSocketOpts = (serverUri, clientWsOpts, certificateCollection, remoteCertificateValidationCallBack, ct) => + ConfigureClientWebSocketOptions = (serverUri, clientWsOpts, _) => { tokenCount++; Log($"[C] ConfigureWebSocketOpts {serverUri}, accessToken TOKEN_{tokenCount}"); @@ -311,9 +311,9 @@ void Log(string m) { Url = "ws://localhost:1234", LoggerFactory = testLogger, - NatsWebSocketOpts = new NatsWebSocketOpts() + WebSocketOpts = new NatsWebSocketOpts() { - ConfigureWebSocketOpts = (_, _, _, _, _) + ConfigureClientWebSocketOptions = (_, _, _) => throw new Exception("Error in callback"), }, }; @@ -363,10 +363,10 @@ void AssertAction(IHeaderDictionary requestHeaders) var natsOpts = new NatsOpts { Url = wsServer.WebSocketUrl, - NatsWebSocketOpts = new NatsWebSocketOpts + WebSocketOpts = new NatsWebSocketOpts { RequestHeaders = new Dictionary { { "Header", expectedHeaderValue } }, - ConfigureWebSocketOpts = (_, clientWsOpts, _, _, _) => + ConfigureClientWebSocketOptions = (_, clientWsOpts, _) => { clientWsOpts.SetRequestHeader("Header", "HeaderFromCallBack"); return ValueTask.CompletedTask; From e6b9d1c0ee62ecd9897caba8b65b22768fb51ccb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=94=D0=B5=D0=BC=D0=B8=D0=B4=D0=BE=D0=B2=20=D0=98=D0=B2?= =?UTF-8?q?=D0=B0=D0=BD?= Date: Sun, 8 Sep 2024 16:46:32 +0500 Subject: [PATCH 08/12] NatsWebSocketOpts improvements (#610) * Fence added, an occurs exceptions when add headers to blazor --- src/NATS.Client.Core/Internal/DotnetRuntimeConstants.cs | 8 ++++++++ src/NATS.Client.Core/NatsWebSocketOpts.cs | 4 ++-- 2 files changed, 10 insertions(+), 2 deletions(-) create mode 100644 src/NATS.Client.Core/Internal/DotnetRuntimeConstants.cs diff --git a/src/NATS.Client.Core/Internal/DotnetRuntimeConstants.cs b/src/NATS.Client.Core/Internal/DotnetRuntimeConstants.cs new file mode 100644 index 000000000..4b9432fa6 --- /dev/null +++ b/src/NATS.Client.Core/Internal/DotnetRuntimeConstants.cs @@ -0,0 +1,8 @@ +using System.Runtime.InteropServices; + +namespace NATS.Client.Core.Internal; + +internal static class DotnetRuntimeConstants +{ + public static readonly OSPlatform BrowserPlatform = OSPlatform.Create("Browser"); +} diff --git a/src/NATS.Client.Core/NatsWebSocketOpts.cs b/src/NATS.Client.Core/NatsWebSocketOpts.cs index 66a3f1459..c652326c3 100644 --- a/src/NATS.Client.Core/NatsWebSocketOpts.cs +++ b/src/NATS.Client.Core/NatsWebSocketOpts.cs @@ -1,4 +1,5 @@ using System.Net.WebSockets; +using System.Runtime.InteropServices; using Microsoft.Extensions.Primitives; using NATS.Client.Core.Internal; @@ -32,8 +33,7 @@ internal async ValueTask ApplyClientWebSocketOptionsAsync( NatsTlsOpts tlsOpts, CancellationToken cancellationToken) { - // todo: test that this doesn't throw in Blazor, otherwise we need a way to detect Blazor and skip - if (RequestHeaders != null) + if (RequestHeaders != null && !RuntimeInformation.IsOSPlatform(DotnetRuntimeConstants.BrowserPlatform)) { foreach (var entry in RequestHeaders) { From 441112d0252b1245e8d99067159b73db1cd07974 Mon Sep 17 00:00:00 2001 From: Caleb Lloyd Date: Sun, 8 Sep 2024 20:23:15 -0400 Subject: [PATCH 09/12] Revert "NatsWebSocketOpts improvements (#610)" This reverts commit e6b9d1c0ee62ecd9897caba8b65b22768fb51ccb. --- src/NATS.Client.Core/Internal/DotnetRuntimeConstants.cs | 8 -------- src/NATS.Client.Core/NatsWebSocketOpts.cs | 4 ++-- 2 files changed, 2 insertions(+), 10 deletions(-) delete mode 100644 src/NATS.Client.Core/Internal/DotnetRuntimeConstants.cs diff --git a/src/NATS.Client.Core/Internal/DotnetRuntimeConstants.cs b/src/NATS.Client.Core/Internal/DotnetRuntimeConstants.cs deleted file mode 100644 index 4b9432fa6..000000000 --- a/src/NATS.Client.Core/Internal/DotnetRuntimeConstants.cs +++ /dev/null @@ -1,8 +0,0 @@ -using System.Runtime.InteropServices; - -namespace NATS.Client.Core.Internal; - -internal static class DotnetRuntimeConstants -{ - public static readonly OSPlatform BrowserPlatform = OSPlatform.Create("Browser"); -} diff --git a/src/NATS.Client.Core/NatsWebSocketOpts.cs b/src/NATS.Client.Core/NatsWebSocketOpts.cs index c652326c3..66a3f1459 100644 --- a/src/NATS.Client.Core/NatsWebSocketOpts.cs +++ b/src/NATS.Client.Core/NatsWebSocketOpts.cs @@ -1,5 +1,4 @@ using System.Net.WebSockets; -using System.Runtime.InteropServices; using Microsoft.Extensions.Primitives; using NATS.Client.Core.Internal; @@ -33,7 +32,8 @@ internal async ValueTask ApplyClientWebSocketOptionsAsync( NatsTlsOpts tlsOpts, CancellationToken cancellationToken) { - if (RequestHeaders != null && !RuntimeInformation.IsOSPlatform(DotnetRuntimeConstants.BrowserPlatform)) + // todo: test that this doesn't throw in Blazor, otherwise we need a way to detect Blazor and skip + if (RequestHeaders != null) { foreach (var entry in RequestHeaders) { From 011b680318c92fcd1157897d26ada063c4c53439 Mon Sep 17 00:00:00 2001 From: Caleb Lloyd Date: Sun, 8 Sep 2024 22:16:05 -0400 Subject: [PATCH 10/12] WebSocketSecure tests Signed-off-by: Caleb Lloyd --- src/NATS.Client.Core/NatsTlsOpts.cs | 2 +- src/NATS.Client.Core/NatsWebSocketOpts.cs | 38 ++++++++++-- .../NatsConnectionTest.Transports.cs | 8 +++ tests/NATS.Client.Core.Tests/TlsClientTest.cs | 16 +++-- tests/NATS.Client.TestUtilities/NatsServer.cs | 3 +- .../NatsServerOpts.cs | 61 +++++++++++-------- 6 files changed, 89 insertions(+), 39 deletions(-) diff --git a/src/NATS.Client.Core/NatsTlsOpts.cs b/src/NATS.Client.Core/NatsTlsOpts.cs index 4981d655b..737184f56 100644 --- a/src/NATS.Client.Core/NatsTlsOpts.cs +++ b/src/NATS.Client.Core/NatsTlsOpts.cs @@ -107,7 +107,7 @@ internal bool HasTlsCerts internal TlsMode EffectiveMode(NatsUri uri) => Mode switch { - TlsMode.Auto => HasTlsCerts || uri.Uri.Scheme.ToLower() == "tls" || uri.Uri.Scheme.ToLower() == "wss" ? TlsMode.Require : TlsMode.Prefer, + TlsMode.Auto => HasTlsCerts || uri.Uri.Scheme.ToLower() == "tls" ? TlsMode.Require : TlsMode.Prefer, _ => Mode, }; diff --git a/src/NATS.Client.Core/NatsWebSocketOpts.cs b/src/NATS.Client.Core/NatsWebSocketOpts.cs index 66a3f1459..7ddc9f975 100644 --- a/src/NATS.Client.Core/NatsWebSocketOpts.cs +++ b/src/NATS.Client.Core/NatsWebSocketOpts.cs @@ -1,4 +1,5 @@ using System.Net.WebSockets; +using System.Security.Cryptography.X509Certificates; using Microsoft.Extensions.Primitives; using NATS.Client.Core.Internal; @@ -15,7 +16,7 @@ public sealed record NatsWebSocketOpts /// An optional dictionary of HTTP request headers to be sent with the WebSocket request. /// /// - /// Note: this setting will be ignored when running in the Browser, such as when using Blazor WebAssembly, + /// Not supported when running in the Browser, such as when using Blazor WebAssembly, /// as the underlying Browser implementation does not support adding headers to a WebSocket. /// public IDictionary? RequestHeaders { get; init; } @@ -32,7 +33,6 @@ internal async ValueTask ApplyClientWebSocketOptionsAsync( NatsTlsOpts tlsOpts, CancellationToken cancellationToken) { - // todo: test that this doesn't throw in Blazor, otherwise we need a way to detect Blazor and skip if (RequestHeaders != null) { foreach (var entry in RequestHeaders) @@ -44,12 +44,42 @@ internal async ValueTask ApplyClientWebSocketOptionsAsync( } } - if (tlsOpts.TryTls(uri)) + if (tlsOpts.HasTlsCerts) { var authenticateAsClientOptions = await tlsOpts.AuthenticateAsClientOptionsAsync(uri).ConfigureAwait(false); + var collection = new X509CertificateCollection(); + + // must match LoadClientCertFromX509 method in SslClientAuthenticationOptions.cs +#if NET8_0_OR_GREATER + if (authenticateAsClientOptions.ClientCertificateContext != null) + { + collection.Add(authenticateAsClientOptions.ClientCertificateContext.TargetCertificate); + } +#else + +/* Unmerged change from project 'NATS.Client.Core(netstandard2.1)' +Before: + if (authenticateAsClientOptions.ClientCertificates != null) { +After: + if (authenticateAsClientOptions.ClientCertificates != null) + { +*/ + +/* Unmerged change from project 'NATS.Client.Core(net6.0)' +Before: + if (authenticateAsClientOptions.ClientCertificates != null) { +After: + if (authenticateAsClientOptions.ClientCertificates != null) + { +*/ if (authenticateAsClientOptions.ClientCertificates != null) { - clientWebSocketOptions.ClientCertificates = authenticateAsClientOptions.ClientCertificates; + collection.AddRange(authenticateAsClientOptions.ClientCertificates); + } +#endif + if (collection.Count > 0) + { + clientWebSocketOptions.ClientCertificates = collection; } #if !NETSTANDARD2_0 diff --git a/tests/NATS.Client.Core.Tests/NatsConnectionTest.Transports.cs b/tests/NATS.Client.Core.Tests/NatsConnectionTest.Transports.cs index 41100bd55..abf2b7b0d 100644 --- a/tests/NATS.Client.Core.Tests/NatsConnectionTest.Transports.cs +++ b/tests/NATS.Client.Core.Tests/NatsConnectionTest.Transports.cs @@ -23,3 +23,11 @@ public NatsConnectionTestWs(ITestOutputHelper output) { } } + +public class NatsConnectionTestWss : NatsConnectionTest +{ + public NatsConnectionTestWss(ITestOutputHelper output) + : base(output, TransportType.WebSocketSecure) + { + } +} diff --git a/tests/NATS.Client.Core.Tests/TlsClientTest.cs b/tests/NATS.Client.Core.Tests/TlsClientTest.cs index 8a230a7c8..1547c006e 100644 --- a/tests/NATS.Client.Core.Tests/TlsClientTest.cs +++ b/tests/NATS.Client.Core.Tests/TlsClientTest.cs @@ -11,13 +11,15 @@ public class TlsClientTest public TlsClientTest(ITestOutputHelper output) => _output = output; - [Fact] - public async Task Client_connect_using_certificate() + [Theory] + [InlineData(TransportType.Tls)] + [InlineData(TransportType.WebSocketSecure)] + public async Task Client_connect_using_certificate(TransportType transportType) { await using var server = NatsServer.Start( new NullOutputHelper(), new NatsServerOptsBuilder() - .UseTransport(TransportType.Tls, tlsVerify: true) + .UseTransport(transportType, tlsVerify: true) .Build()); var clientOpts = server.ClientOpts(NatsOpts.Default with { Name = "tls-test-client" }); @@ -56,13 +58,15 @@ public async Task Client_connect_using_certificate_and_revocation_check() Assert.Contains("remote certificate was rejected", exception.InnerException!.InnerException!.Message); } - [Fact] - public async Task Client_cannot_connect_without_certificate() + [Theory] + [InlineData(TransportType.Tls)] + [InlineData(TransportType.WebSocketSecure)] + public async Task Client_cannot_connect_without_certificate(TransportType transportType) { await using var server = NatsServer.Start( new NullOutputHelper(), new NatsServerOptsBuilder() - .UseTransport(TransportType.Tls, tlsVerify: true) + .UseTransport(transportType, tlsVerify: true) .Build()); var clientOpts = server.ClientOpts(NatsOpts.Default); diff --git a/tests/NATS.Client.TestUtilities/NatsServer.cs b/tests/NATS.Client.TestUtilities/NatsServer.cs index 595d5b363..83cb90001 100644 --- a/tests/NATS.Client.TestUtilities/NatsServer.cs +++ b/tests/NATS.Client.TestUtilities/NatsServer.cs @@ -81,6 +81,7 @@ private NatsServer(ITestOutputHelper outputHelper, NatsServerOpts opts) TransportType.Tcp => $"nats://127.0.0.1:{Opts.ServerPort}", TransportType.Tls => $"tls://127.0.0.1:{Opts.ServerPort}", TransportType.WebSocket => $"ws://127.0.0.1:{Opts.WebSocketPort}", + TransportType.WebSocketSecure => $"wss://127.0.0.1:{Opts.WebSocketPort}", _ => throw new ArgumentOutOfRangeException(), }; @@ -88,7 +89,7 @@ public int ConnectionPort { get { - if (_transportType == TransportType.WebSocket && ServerVersions.V2_9_19 <= Version) + if (_transportType is TransportType.WebSocket or TransportType.WebSocketSecure && ServerVersions.V2_9_19 <= Version) { return Opts.WebSocketPort!.Value; } diff --git a/tests/NATS.Client.TestUtilities/NatsServerOpts.cs b/tests/NATS.Client.TestUtilities/NatsServerOpts.cs index ea1f4336a..43c69ba26 100644 --- a/tests/NATS.Client.TestUtilities/NatsServerOpts.cs +++ b/tests/NATS.Client.TestUtilities/NatsServerOpts.cs @@ -1,6 +1,7 @@ using System.Collections.Concurrent; using System.Net.NetworkInformation; using System.Text; +using System.Text.RegularExpressions; namespace NATS.Client.Core.Tests; @@ -9,6 +10,7 @@ public enum TransportType Tcp, Tls, WebSocket, + WebSocketSecure, } public sealed class NatsServerOptsBuilder @@ -77,7 +79,7 @@ public NatsServerOptsBuilder UseTransport(TransportType transportType, bool tlsF throw new Exception("tlsFirst is only valid for TLS transport"); } - if (transportType == TransportType.Tls) + if (transportType is TransportType.Tls or TransportType.WebSocketSecure) { _enableTls = true; _tlsServerCertFile = "resources/certs/server-cert.pem"; @@ -93,7 +95,8 @@ public NatsServerOptsBuilder UseTransport(TransportType transportType, bool tlsF _tlsFirst = tlsFirst; _tlsVerify = tlsVerify; } - else if (transportType == TransportType.WebSocket) + + if (transportType is TransportType.WebSocket or TransportType.WebSocketSecure) { _enableWebSocket = true; } @@ -214,27 +217,11 @@ public string ConfigFileContents if (Trace) { - sb.AppendLine($"trace: true"); - sb.AppendLine($"debug: true"); - } - - if (EnableWebSocket) - { - sb.AppendLine("websocket {"); - sb.AppendLine($" port: {WebSocketPort}"); - sb.AppendLine(" no_tls: true"); - sb.AppendLine("}"); - } - - if (EnableClustering) - { - sb.AppendLine("cluster {"); - sb.AppendLine(" name: nats"); - sb.AppendLine($" listen: {ServerHost}:{ClusteringPort}"); - sb.AppendLine($" routes: [{_routes}]"); - sb.AppendLine("}"); + sb.AppendLine("trace: true"); + sb.AppendLine("debug: true"); } + string? tls = null; if (EnableTls) { if (TlsServerCertFile == default || TlsServerKeyFile == default) @@ -242,24 +229,44 @@ public string ConfigFileContents throw new Exception("TLS is enabled but cert or key missing"); } - sb.AppendLine("tls {"); - sb.AppendLine($" cert_file: {TlsServerCertFile}"); - sb.AppendLine($" key_file: {TlsServerKeyFile}"); + var tlsSb = new StringBuilder(); + tlsSb.AppendLine("tls {"); + tlsSb.AppendLine($" cert_file: {TlsServerCertFile}"); + tlsSb.AppendLine($" key_file: {TlsServerKeyFile}"); if (TlsCaFile != default) { - sb.AppendLine($" ca_file: {TlsCaFile}"); + tlsSb.AppendLine($" ca_file: {TlsCaFile}"); } if (TlsFirst) { - sb.AppendLine($" handshake_first: true"); + tlsSb.AppendLine(" handshake_first: true"); } if (TlsVerify) { - sb.AppendLine($" verify_and_map: true"); + tlsSb.AppendLine($" verify_and_map: true"); } + tlsSb.Append("}"); + tls = tlsSb.ToString(); + sb.AppendLine(tls); + } + + if (EnableWebSocket) + { + sb.AppendLine("websocket {"); + sb.AppendLine($" listen: {ServerHost}:{WebSocketPort}"); + sb.AppendLine(tls != null ? Regex.Replace(tls, "^", " ", RegexOptions.Multiline) : " no_tls: true"); + sb.AppendLine("}"); + } + + if (EnableClustering) + { + sb.AppendLine("cluster {"); + sb.AppendLine(" name: nats"); + sb.AppendLine($" listen: {ServerHost}:{ClusteringPort}"); + sb.AppendLine($" routes: [{_routes}]"); sb.AppendLine("}"); } From 6b303413f67d268cb90e9c56c0c8f1f06f46e4ce Mon Sep 17 00:00:00 2001 From: Caleb Lloyd Date: Sun, 8 Sep 2024 22:23:34 -0400 Subject: [PATCH 11/12] fix format Signed-off-by: Caleb Lloyd --- src/NATS.Client.Core/NatsWebSocketOpts.cs | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/src/NATS.Client.Core/NatsWebSocketOpts.cs b/src/NATS.Client.Core/NatsWebSocketOpts.cs index 7ddc9f975..4976d4a5e 100644 --- a/src/NATS.Client.Core/NatsWebSocketOpts.cs +++ b/src/NATS.Client.Core/NatsWebSocketOpts.cs @@ -56,22 +56,6 @@ internal async ValueTask ApplyClientWebSocketOptionsAsync( collection.Add(authenticateAsClientOptions.ClientCertificateContext.TargetCertificate); } #else - -/* Unmerged change from project 'NATS.Client.Core(netstandard2.1)' -Before: - if (authenticateAsClientOptions.ClientCertificates != null) { -After: - if (authenticateAsClientOptions.ClientCertificates != null) - { -*/ - -/* Unmerged change from project 'NATS.Client.Core(net6.0)' -Before: - if (authenticateAsClientOptions.ClientCertificates != null) { -After: - if (authenticateAsClientOptions.ClientCertificates != null) - { -*/ if (authenticateAsClientOptions.ClientCertificates != null) { collection.AddRange(authenticateAsClientOptions.ClientCertificates); From dccfd36ebe963f2dfebe7646f0bbcd6de6c6aaaa Mon Sep 17 00:00:00 2001 From: Caleb Lloyd Date: Sun, 8 Sep 2024 23:31:55 -0400 Subject: [PATCH 12/12] simplify WebSocketOptionsTest Signed-off-by: Caleb Lloyd --- src/NATS.Client.Core/NatsWebSocketOpts.cs | 8 +- .../WebSocketOptionsTest.cs | 406 ++---------------- 2 files changed, 44 insertions(+), 370 deletions(-) diff --git a/src/NATS.Client.Core/NatsWebSocketOpts.cs b/src/NATS.Client.Core/NatsWebSocketOpts.cs index 4976d4a5e..3ed2a1db9 100644 --- a/src/NATS.Client.Core/NatsWebSocketOpts.cs +++ b/src/NATS.Client.Core/NatsWebSocketOpts.cs @@ -37,10 +37,10 @@ internal async ValueTask ApplyClientWebSocketOptionsAsync( { foreach (var entry in RequestHeaders) { - foreach (var value in entry.Value) - { - clientWebSocketOptions.SetRequestHeader(entry.Key, value); - } + // SetRequestHeader overwrites if called multiple times; + // RFC7230 Section 3.2.2 allows for combining them with a comma + // https://www.rfc-editor.org/rfc/rfc7230#section-3.2.2 + clientWebSocketOptions.SetRequestHeader(entry.Key, string.Join(",", entry.Value)); } } diff --git a/tests/NATS.Client.Core.Tests/WebSocketOptionsTest.cs b/tests/NATS.Client.Core.Tests/WebSocketOptionsTest.cs index 03b08dd19..bd34eb41d 100644 --- a/tests/NATS.Client.Core.Tests/WebSocketOptionsTest.cs +++ b/tests/NATS.Client.Core.Tests/WebSocketOptionsTest.cs @@ -1,4 +1,6 @@ using System.Net; +using System.Net.Sockets; +using System.Text; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Primitives; @@ -8,403 +10,75 @@ namespace NATS.Client.Core.Tests; public class WebSocketOptionsTest { - private readonly List _logs = new(); - - // Modeled after similar test in SendBufferTest.cs which also uses the MockServer. [Fact] - public async Task MockWebsocketServer_PubSubWithCancelAndReconnect_ShouldCallbackTwice() + public async Task Exception_in_callback_throws_nats_exception() { - using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - - List pubs = new(); - await using var server = new MockServer( - (client, cmd) => - { - if (cmd.Name == "PUB") - { - lock (pubs) - pubs.Add($"PUB {cmd.Subject}"); - } - - if (cmd is { Name: "PUB", Subject: "close" }) - { - client.Close(); - } - - return Task.CompletedTask; - }, - Log, - $"{{\"max_payload\":{1024 * 4}}}", - cts.Token); - - await using var wsServer = new WebSocketMockServer( - server.Url, - httpContext => - { - return true; - }, - Log, - cts.Token); - - Log("__________________________________"); - - var testLogger = new InMemoryTestLoggerFactory(LogLevel.Warning, m => - { - Log($"[NC] {m.Message}"); - }); - - var tokenCount = 0; - await using var nats = new NatsConnection(new NatsOpts + await using var server = NatsServer.Start( + new NullOutputHelper(), + new NatsServerOptsBuilder() + .UseTransport(TransportType.WebSocket) + .Build()); + + var clientOpts = server.ClientOpts(NatsOpts.Default with { Name = "ws-test-client" }); + clientOpts = clientOpts with { - Url = wsServer.WebSocketUrl, - LoggerFactory = testLogger, WebSocketOpts = new NatsWebSocketOpts - { - ConfigureClientWebSocketOptions = (serverUri, clientWsOpts, _) => - { - tokenCount++; - Log($"[C] ConfigureWebSocketOpts {serverUri}, accessToken TOKEN_{tokenCount}"); - clientWsOpts.SetRequestHeader("authorization", $"Bearer TOKEN_{tokenCount}"); - return ValueTask.CompletedTask; - }, - }, - }); - - Log($"[C] connect {server.Url}"); - await nats.ConnectAsync(); - - Log($"[C] ping"); - var rtt = await nats.PingAsync(cts.Token); - Log($"[C] ping rtt={rtt}"); - - Log($"[C] publishing x1..."); - await nats.PublishAsync("x1", "x", cancellationToken: cts.Token); - - // we will close the connection in mock server when we receive subject "close" - Log($"[C] publishing close (4KB)..."); - var pubTask = nats.PublishAsync("close", new byte[1024 * 4], cancellationToken: cts.Token).AsTask(); - - await pubTask.WaitAsync(cts.Token); - - for (var i = 1; i <= 10; i++) - { - try - { - await nats.PingAsync(cts.Token); - break; - } - catch (OperationCanceledException) - { - if (i == 10) - throw; - await Task.Delay(10 * i, cts.Token); - } - } - - Log($"[C] publishing x2..."); - await nats.PublishAsync("x2", "x", cancellationToken: cts.Token); - - Log($"[C] flush..."); - await nats.PingAsync(cts.Token); - - // Look for logs like the following: - // [WS] Received WebSocketRequest with authorization header: Bearer TOKEN_2 - var tokens = GetLogs().Where(l => l.Contains("Bearer")).ToList(); - Assert.Equal(2, tokens.Count); - var token = tokens.Where(t => t.Contains("TOKEN_1")); - Assert.Single(token); - token = tokens.Where(t => t.Contains("TOKEN_2")); - Assert.Single(token); - - lock (pubs) - { - Assert.Equal(3, pubs.Count); - Assert.Equal("PUB x1", pubs[0]); - Assert.Equal("PUB close", pubs[1]); - Assert.Equal("PUB x2", pubs[2]); - } - } - - [Fact] - public async Task WebSocketRespondsWithHttpError_ShouldThrowNatsException() - { - using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - - await using var server = new MockServer( - handler: (client, cmd) => - { - return Task.CompletedTask; - }, - Log, - info: $"{{\"max_payload\":{1024 * 4}}}", - cancellationToken: cts.Token); - - await using var wsServer = new WebSocketMockServer( - server.Url, - connectHandler: (httpContext) => - { - httpContext.Response.StatusCode = (int)HttpStatusCode.Forbidden; - return false; // reject connection - }, - Log, - cts.Token); - - Log("__________________________________"); - - var testLogger = new InMemoryTestLoggerFactory(LogLevel.Warning, m => - { - Log($"[NC] {m.Message}"); - }); - - await using var nats = new NatsConnection(new NatsOpts - { - Url = wsServer.WebSocketUrl, - LoggerFactory = testLogger, - }); - - Log($"[C] connect {server.Url}"); - - // expect: NATS.Client.Core.NatsException : can not connect uris: ws://127.0.0.1:5004 - var exception = await Assert.ThrowsAsync(async () => await nats.ConnectAsync()); - } - - [Fact] - public async Task HttpErrorDuringReconnect_ShouldContinueToReconnect() - { - using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - - await using var server = new MockServer( - handler: (client, cmd) => - { - if (cmd is { Name: "PUB", Subject: "close" }) - { - client.Close(); - } - - return Task.CompletedTask; - }, - Log, - info: $"{{\"max_payload\":{1024 * 4}}}", - cancellationToken: cts.Token); - - var tokenCount = 0; - - await using var wsServer = new WebSocketMockServer( - server.Url, - connectHandler: (httpContext) => - { - var token = httpContext.Request.Headers.Authorization; - if (token.Contains("Bearer TOKEN_1") || token.Contains("Bearer TOKEN_4")) - { - return true; - } - else - { - httpContext.Response.StatusCode = (int)HttpStatusCode.Forbidden; - return false; // reject connection - } - }, - Log, - cts.Token); - - Log("__________________________________"); - - var testLogger = new InMemoryTestLoggerFactory(LogLevel.Warning, m => - { - Log($"[NC] {m.Message}"); - }); - - var natsOpts = new NatsOpts - { - Url = wsServer.WebSocketUrl, - LoggerFactory = testLogger, - WebSocketOpts = new NatsWebSocketOpts() - { - ConfigureClientWebSocketOptions = (serverUri, clientWsOpts, _) => - { - tokenCount++; - Log($"[C] ConfigureWebSocketOpts {serverUri}, accessToken TOKEN_{tokenCount}"); - clientWsOpts.SetRequestHeader("authorization", $"Bearer TOKEN_{tokenCount}"); - return ValueTask.CompletedTask; - }, - }, - }; - - await using var nats = new NatsConnection(natsOpts); - - Log($"[C] connect {server.Url}"); - - // close connection and trigger reconnect - Log($"[C] publishing close ..."); - await nats.PublishAsync("close", "x", cancellationToken: cts.Token); - - for (var i = 1; i <= 10; i++) - { - try - { - await nats.PingAsync(cts.Token); - break; - } - catch (OperationCanceledException) - { - if (i == 10) - throw; - await Task.Delay(100 * i, cts.Token); - } - } - - Log($"[C] publishing reconnected"); - await nats.PublishAsync("reconnected", "rc", cancellationToken: cts.Token); - await Task.Delay(100); // short delay to allow log to be collected for reconnect - - // Expect to see in logs: - // 1st callback and TOKEN_1 - // Initial Connect - // 2nd callback with rejected TOKEN_2 - // NC reconnect - // 3rd callback with rejected TOKEN_3 - // NC reconnect - // 4th callback with good TOKEN_4 - // Successful Publish after reconnect - - // 4 tokens - var logs = GetLogs(); - var tokens = logs.Where(l => l.Contains("Bearer")).ToList(); - Assert.Equal(4, tokens.Count); - Assert.Single(tokens.Where(t => t.Contains("TOKEN_1"))); - Assert.Single(tokens.Where(t => t.Contains("TOKEN_2"))); - Assert.Single(tokens.Where(t => t.Contains("TOKEN_3"))); - Assert.Single(tokens.Where(t => t.Contains("TOKEN_4"))); - - // 2 errors in NATS.Client triggering the reconnect - var failures = logs.Where(l => l.Contains("[NC] Failed to connect NATS")); - Assert.Equal(2, failures.Count()); - - // 2 connects in MockServer - var connects = logs.Where(l => l.Contains("RCV CONNECT")); - Assert.Equal(2, failures.Count()); - - // 1 reconnect in MockServer - var reconnectPublish = logs.Where(l => l.Contains("RCV PUB reconnected")); - Assert.Single(reconnectPublish); - } - - [Fact] - public async Task ExceptionThrownInCallback_ShouldThrowNatsException() - { - // void Log(string m) => TmpFileLogger.Log(m); - List logs = new(); - void Log(string m) - { - lock (logs) - logs.Add(m); - } - - using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - - var testLogger = new InMemoryTestLoggerFactory(LogLevel.Warning, m => - { - Log($"[NC] {m.Message}"); - }); - - var natsOpts = new NatsOpts - { - Url = "ws://localhost:1234", - LoggerFactory = testLogger, - WebSocketOpts = new NatsWebSocketOpts() { ConfigureClientWebSocketOptions = (_, _, _) => throw new Exception("Error in callback"), }, }; - await using var nats = new NatsConnection(natsOpts); + await using var nats = new NatsConnection(clientOpts); - // expect: NATS.Client.Core.NatsException : can not connect uris: ws://localhost:1234 await Assert.ThrowsAsync(async () => await nats.ConnectAsync()); } [Fact] - public async Task HttpHeadersWebSocketServer_ShouldBeConsistsOfRequestHeadersHeaders() + public async Task Request_headers_are_correct() { - var expectedHeaderValue = "HeaderFromDictionary"; + var server = new TcpListener(IPAddress.Parse("127.0.0.1"), 0); + server.Start(); - void AssertAction(IHeaderDictionary requestHeaders) - { - var (_, value) = requestHeaders.Single(h => h.Key == "Header"); - Assert.Equal(expectedHeaderValue, value); - } + var port = ((IPEndPoint)server.LocalEndpoint).Port; - using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var headers = new List(); + var serverTask = Task.Run(async () => + { + using var client = await server.AcceptTcpClientAsync(); + var stream = client.GetStream(); + using var sr = new StreamReader(stream); - await using var server = new MockServer( - handler: (client, cmd) => + while (true) { - if (cmd is { Name: "PUB", Subject: "close" }) - { - client.Close(); - } + var line = await sr.ReadLineAsync(); + if (string.IsNullOrWhiteSpace(line)) + return; - return Task.CompletedTask; - }, - Log, - info: $"{{\"max_payload\":{1024 * 4}}}", - cancellationToken: cts.Token); - - await using var wsServer = new WebSocketMockServer( - server.Url, - connectHandler: (httpContext) => - { - AssertAction(httpContext.Request.Headers); - return true; - }, - Log, - cts.Token); + headers.Add(line); + } + }); - var natsOpts = new NatsOpts + await using var nats = new NatsConnection(new NatsOpts { - Url = wsServer.WebSocketUrl, + Url = $"ws://127.0.0.1:{port}", WebSocketOpts = new NatsWebSocketOpts { - RequestHeaders = new Dictionary { { "Header", expectedHeaderValue } }, + RequestHeaders = new Dictionary { { "Header1", "Header1" }, { "Header2", new StringValues(["Header2.1", "Header2.2"]) }, }, ConfigureClientWebSocketOptions = (_, clientWsOpts, _) => { - clientWsOpts.SetRequestHeader("Header", "HeaderFromCallBack"); + clientWsOpts.SetRequestHeader("Header3", "Header3"); return ValueTask.CompletedTask; }, }, - }; - await using var nats = new NatsConnection(natsOpts); - - // expect: NATS.Client.Core.NatsException : can not connect uris: ws://localhost:1234 - await nats.ConnectAsync(); - - await nats.PublishAsync("close", "x", cancellationToken: cts.Token); + }); - for (var i = 1; i <= 10; i++) - { - try - { - await nats.PingAsync(cts.Token); - break; - } - catch (OperationCanceledException) - { - if (i == 10) - throw; - await Task.Delay(100 * i, cts.Token); - } - } - } + // not connecting to an actual nats server so this throws + await Assert.ThrowsAsync(async () => await nats.ConnectAsync()); - private void Log(string m) - { - lock (_logs) - _logs.Add(m); - } + await serverTask; - private List GetLogs() - { - lock (_logs) - return _logs.ToList(); + Assert.Contains("Header1: Header1", headers); + Assert.Contains("Header2: Header2.1,Header2.2", headers); + Assert.Contains("Header3: Header3", headers); } }