Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nats web socket opts improvements #623

Merged
merged 12 commits into from
Sep 9, 2024
10 changes: 2 additions & 8 deletions .github/workflows/perf.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,11 @@ jobs:
steps:
- name: Install nats
run: |
rel=$(curl -H 'Authorization: token ${{ secrets.AUTH_TOKEN_FOR_GITHUB_API }}' -s https://api.github.com/repos/nats-io/natscli/releases/latest | jq -r .tag_name | sed s/v//)
rel=$(curl https://api.mtmk.dev/gh/v1/releases/tag/nats-io/natscli/latest | sed s/v//)
wget https://github.com/nats-io/natscli/releases/download/v$rel/nats-$rel-linux-amd64.zip
unzip nats-$rel-linux-amd64.zip
sudo mv nats-$rel-linux-amd64/nats /usr/local/bin
gh_api_url="https://api.github.com/repos/nats-io/nats-server/releases"
branch="${{ matrix.config.branch }}"
if [[ $branch == "v"* ]]; then
branch=$(curl -H 'Authorization: token ${{ secrets.AUTH_TOKEN_FOR_GITHUB_API }}' -s $gh_api_url | jq -r '.[].tag_name' | grep $branch | sort -V | tail -1)
elif [[ $branch == "latest" ]]; then
branch=$(curl -H 'Authorization: token ${{ secrets.AUTH_TOKEN_FOR_GITHUB_API }}' -s $gh_api_url/latest | jq -r .tag_name)
fi
branch=$(curl https://api.mtmk.dev/gh/v1/releases/tag/nats-io/nats-server/${{ matrix.config.branch }})
for i in 1 2 3
do
curl -sf https://binaries.nats.dev/nats-io/nats-server/v2@$branch | PREFIX=. sh && break || sleep 30
Expand Down
16 changes: 2 additions & 14 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,7 @@ jobs:
steps:
- name: Install nats-server
run: |
gh_api_url="https://api.github.com/repos/nats-io/nats-server/releases"
branch="${{ matrix.config.branch }}"
if [[ $branch == "v"* ]]; then
branch=$(curl -H 'Authorization: token ${{ secrets.AUTH_TOKEN_FOR_GITHUB_API }}' -s $gh_api_url | jq -r '.[].tag_name' | grep $branch | sort -V | tail -1)
elif [[ $branch == "latest" ]]; then
branch=$(curl -H 'Authorization: token ${{ secrets.AUTH_TOKEN_FOR_GITHUB_API }}' -s $gh_api_url/latest | jq -r .tag_name)
fi
branch=$(curl https://api.mtmk.dev/gh/v1/releases/tag/nats-io/nats-server/${{ matrix.config.branch }})
for i in 1 2 3
do
curl -sf https://binaries.nats.dev/nats-io/nats-server/v2@$branch | PREFIX=. sh && break || sleep 30
Expand Down Expand Up @@ -119,13 +113,7 @@ jobs:
shell: bash
run: |
mkdir tools-nats-server && cd tools-nats-server
gh_api_url="https://api.github.com/repos/nats-io/nats-server/releases"
branch="${{ matrix.config.branch }}"
if [[ $branch == "v"* ]]; then
branch=$(curl -H 'Authorization: token ${{ secrets.AUTH_TOKEN_FOR_GITHUB_API }}' -s $gh_api_url | jq -r '.[].tag_name' | grep $branch | sort -V | tail -1)
elif [[ $branch == "latest" ]]; then
branch=$(curl -H 'Authorization: token ${{ secrets.AUTH_TOKEN_FOR_GITHUB_API }}' -s $gh_api_url/latest | jq -r .tag_name)
fi
branch=$(curl https://api.mtmk.dev/gh/v1/releases/tag/nats-io/nats-server/${{ matrix.config.branch }})
for i in 1 2 3
do
curl -sf https://binaries.nats.dev/nats-io/nats-server/v2@$branch | PREFIX=. sh && break || sleep 30
Expand Down
16 changes: 4 additions & 12 deletions src/NATS.Client.Core/Internal/WebSocketConnection.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Net.Security;
using System.Net.Sockets;
using System.Net.WebSockets;
using System.Runtime.CompilerServices;
Expand Down Expand Up @@ -39,13 +40,13 @@ public Task ConnectAsync(Uri uri, CancellationToken cancellationToken)
/// <summary>
/// Connect with Timeout. When failed, Dispose this connection.
/// </summary>
public async ValueTask ConnectAsync(Uri uri, NatsOpts opts)
public async ValueTask ConnectAsync(NatsUri uri, NatsOpts opts)
{
using var cts = new CancellationTokenSource(opts.ConnectTimeout);
try
{
await InvokeCallbackForClientWebSocketOptionsAsync(opts, uri, _socket.Options, cts.Token).ConfigureAwait(false);
await _socket.ConnectAsync(uri, cts.Token).ConfigureAwait(false);
await opts.WebSocketOpts.ApplyClientWebSocketOptionsAsync(_socket.Options, uri, opts.TlsOpts, cts.Token).ConfigureAwait(false);
await _socket.ConnectAsync(uri.Uri, cts.Token).ConfigureAwait(false);
}
catch (Exception ex)
{
Expand Down Expand Up @@ -131,13 +132,4 @@ public void SignalDisconnected(Exception exception)
{
_waitForClosedSource.TrySetResult(exception);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private async Task InvokeCallbackForClientWebSocketOptionsAsync(NatsOpts opts, Uri uri, ClientWebSocketOptions options, CancellationToken token)
{
if (opts.ConfigureWebSocketOpts != null)
{
await opts.ConfigureWebSocketOpts(uri, options, token).ConfigureAwait(false);
}
}
}
4 changes: 2 additions & 2 deletions src/NATS.Client.Core/NatsConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ private async ValueTask InitialConnectAsync()
if (uri.IsWebSocket)
{
var conn = new WebSocketConnection();
await conn.ConnectAsync(uri.Uri, Opts).ConfigureAwait(false);
await conn.ConnectAsync(uri, Opts).ConfigureAwait(false);
_socket = conn;
}
else
Expand Down Expand Up @@ -606,7 +606,7 @@ private async void ReconnectLoop()
{
_logger.LogDebug(NatsLogEvents.Connection, "Trying to reconnect using WebSocket {Url} [{ReconnectCount}]", url, reconnectCount);
var conn = new WebSocketConnection();
await conn.ConnectAsync(url.Uri, Opts).ConfigureAwait(false);
await conn.ConnectAsync(url, Opts).ConfigureAwait(false);
_socket = conn;
}
else
Expand Down
24 changes: 2 additions & 22 deletions src/NATS.Client.Core/NatsOpts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public sealed record NatsOpts

public NatsTlsOpts TlsOpts { get; init; } = NatsTlsOpts.Default;

public NatsWebSocketOpts WebSocketOpts { get; init; } = NatsWebSocketOpts.Default;

public INatsSerializerRegistry SerializerRegistry { get; init; } = NatsDefaultSerializerRegistry.Default;

public ILoggerFactory LoggerFactory { get; init; } = NullLoggerFactory.Instance;
Expand Down Expand Up @@ -115,28 +117,6 @@ public sealed record NatsOpts
/// </remarks>
public BoundedChannelFullMode SubPendingChannelFullMode { get; init; } = BoundedChannelFullMode.DropNewest;

/// <summary>
/// An optional async callback handler for manipulation of ClientWebSocketOptions used for WebSocket connections.
/// </summary>
/// <remarks>
/// This can be used to set authorization header and other HTTP header values.
/// Note: Setting HTTP header values is not supported by Blazor WebAssembly as the underlying browser implementation does not support adding headers to a WebSocket.
/// The callback's execution time contributes to the connection establishment subject to the <see cref="ConnectTimeout"/>.
/// Implementors should use the passed CancellationToken for async operations called by this handler.
/// </remarks>
/// <example>
/// await using var nats = new NatsConnection(new NatsOpts
/// {
/// Url = "ws://localhost:8080",
/// ConfigureWebSocketOpts = (serverUri, clientWsOpts, ct) =>
/// {
/// clientWsOpts.SetRequestHeader("authorization", $"Bearer MY_TOKEN");
/// return ValueTask.CompletedTask;
/// },
/// });
/// </example>
public Func<Uri, ClientWebSocketOptions, CancellationToken, ValueTask>? ConfigureWebSocketOpts { get; init; } = null;

internal NatsUri[] GetSeedUris()
{
var urls = Url.Split(',');
Expand Down
79 changes: 79 additions & 0 deletions src/NATS.Client.Core/NatsWebSocketOpts.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
using System.Net.WebSockets;
using System.Security.Cryptography.X509Certificates;
using Microsoft.Extensions.Primitives;
using NATS.Client.Core.Internal;

namespace NATS.Client.Core;

/// <summary>
/// Options for ClientWebSocketOptions
/// </summary>
public sealed record NatsWebSocketOpts
{
public static readonly NatsWebSocketOpts Default = new();

/// <summary>
/// An optional dictionary of HTTP request headers to be sent with the WebSocket request.
/// </summary>
/// <remarks>
/// Not supported when running in the Browser, such as when using Blazor WebAssembly,
/// as the underlying Browser implementation does not support adding headers to a WebSocket.
/// </remarks>
public IDictionary<string, StringValues>? RequestHeaders { get; init; }

/// <summary>
/// An optional async callback handler for manipulation of ClientWebSocketOptions used for WebSocket connections.
/// Implementors should use the passed CancellationToken for async operations called by this handler.
/// </summary>
public Func<Uri, ClientWebSocketOptions, CancellationToken, ValueTask>? ConfigureClientWebSocketOptions { get; init; } = null;

internal async ValueTask ApplyClientWebSocketOptionsAsync(
ClientWebSocketOptions clientWebSocketOptions,
NatsUri uri,
NatsTlsOpts tlsOpts,
CancellationToken cancellationToken)
{
if (RequestHeaders != null)
{
foreach (var entry in RequestHeaders)
{
// SetRequestHeader overwrites if called multiple times;
// RFC7230 Section 3.2.2 allows for combining them with a comma
// https://www.rfc-editor.org/rfc/rfc7230#section-3.2.2
clientWebSocketOptions.SetRequestHeader(entry.Key, string.Join(",", entry.Value));
}
}

if (tlsOpts.HasTlsCerts)
{
var authenticateAsClientOptions = await tlsOpts.AuthenticateAsClientOptionsAsync(uri).ConfigureAwait(false);
var collection = new X509CertificateCollection();

// must match LoadClientCertFromX509 method in SslClientAuthenticationOptions.cs
#if NET8_0_OR_GREATER
if (authenticateAsClientOptions.ClientCertificateContext != null)
{
collection.Add(authenticateAsClientOptions.ClientCertificateContext.TargetCertificate);
}
#else
if (authenticateAsClientOptions.ClientCertificates != null)
{
collection.AddRange(authenticateAsClientOptions.ClientCertificates);
}
#endif
if (collection.Count > 0)
{
clientWebSocketOptions.ClientCertificates = collection;
}

#if !NETSTANDARD2_0
clientWebSocketOptions.RemoteCertificateValidationCallback = authenticateAsClientOptions.RemoteCertificateValidationCallback;
#endif
}

if (ConfigureClientWebSocketOptions != null)
{
await ConfigureClientWebSocketOptions(uri.Uri, clientWebSocketOptions, cancellationToken).ConfigureAwait(false);
}
}
}
8 changes: 8 additions & 0 deletions tests/NATS.Client.Core.Tests/NatsConnectionTest.Transports.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,11 @@ public NatsConnectionTestWs(ITestOutputHelper output)
{
}
}

public class NatsConnectionTestWss : NatsConnectionTest
{
public NatsConnectionTestWss(ITestOutputHelper output)
: base(output, TransportType.WebSocketSecure)
{
}
}
16 changes: 10 additions & 6 deletions tests/NATS.Client.Core.Tests/TlsClientTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ public class TlsClientTest

public TlsClientTest(ITestOutputHelper output) => _output = output;

[Fact]
public async Task Client_connect_using_certificate()
[Theory]
[InlineData(TransportType.Tls)]
[InlineData(TransportType.WebSocketSecure)]
public async Task Client_connect_using_certificate(TransportType transportType)
{
await using var server = NatsServer.Start(
new NullOutputHelper(),
new NatsServerOptsBuilder()
.UseTransport(TransportType.Tls, tlsVerify: true)
.UseTransport(transportType, tlsVerify: true)
.Build());

var clientOpts = server.ClientOpts(NatsOpts.Default with { Name = "tls-test-client" });
Expand Down Expand Up @@ -56,13 +58,15 @@ public async Task Client_connect_using_certificate_and_revocation_check()
Assert.Contains("remote certificate was rejected", exception.InnerException!.InnerException!.Message);
}

[Fact]
public async Task Client_cannot_connect_without_certificate()
[Theory]
[InlineData(TransportType.Tls)]
[InlineData(TransportType.WebSocketSecure)]
public async Task Client_cannot_connect_without_certificate(TransportType transportType)
{
await using var server = NatsServer.Start(
new NullOutputHelper(),
new NatsServerOptsBuilder()
.UseTransport(TransportType.Tls, tlsVerify: true)
.UseTransport(transportType, tlsVerify: true)
.Build());

var clientOpts = server.ClientOpts(NatsOpts.Default);
Expand Down
Loading
Loading