From 093cd64fded4f4a5b942de3f6dfdeb183292ebfb Mon Sep 17 00:00:00 2001 From: Bruce Wayne Date: Wed, 11 Aug 2021 10:56:53 +0800 Subject: [PATCH] refactor: Better pipe implement --- ...HandleReadOnlySequence.cs => Delegates.cs} | 3 ++ Pipelines.Extensions/PipeWriterExtensions.cs | 18 ++++++--- Pipelines.Extensions/PipelinesExtensions.cs | 1 + .../LocalTcpServices/Socks5Service.cs | 7 +--- .../TcpClients/IPipeClient.cs | 4 +- .../TcpClients/ShadowsocksDuplexPipe.cs | 22 +++++----- .../TcpClients/ShadowsocksPipeExtensions.cs | 13 +++--- .../TcpClients/ShadowsocksTcpClient.cs | 40 +++++++------------ Socks5/Clients/Socks5Client.cs | 12 +++--- Socks5/Servers/Socks5ServerConnection.cs | 12 +++--- TestConsoleApp/TestServersController.cs | 7 +--- 11 files changed, 67 insertions(+), 72 deletions(-) rename Pipelines.Extensions/{HandleReadOnlySequence.cs => Delegates.cs} (68%) diff --git a/Pipelines.Extensions/HandleReadOnlySequence.cs b/Pipelines.Extensions/Delegates.cs similarity index 68% rename from Pipelines.Extensions/HandleReadOnlySequence.cs rename to Pipelines.Extensions/Delegates.cs index bb596ad..6e7045a 100644 --- a/Pipelines.Extensions/HandleReadOnlySequence.cs +++ b/Pipelines.Extensions/Delegates.cs @@ -1,6 +1,9 @@ +using System; using System.Buffers; namespace Pipelines.Extensions { public delegate ParseResult HandleReadOnlySequence(ref ReadOnlySequence buffer); + + public delegate int CopyToSpan(Span buffer); } diff --git a/Pipelines.Extensions/PipeWriterExtensions.cs b/Pipelines.Extensions/PipeWriterExtensions.cs index 9f5feeb..2806132 100644 --- a/Pipelines.Extensions/PipeWriterExtensions.cs +++ b/Pipelines.Extensions/PipeWriterExtensions.cs @@ -11,17 +11,23 @@ namespace Pipelines.Extensions public static partial class PipelinesExtensions { [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static async ValueTask WriteAsync( - this PipeWriter writer, - int maxBufferSize, - Func, int> copyTo, - CancellationToken token = default) + public static void Write(this PipeWriter writer, int maxBufferSize, CopyToSpan copyTo) { - var memory = writer.GetMemory(maxBufferSize); + var memory = writer.GetSpan(maxBufferSize); var length = copyTo(memory); writer.Advance(length); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static async ValueTask WriteAsync( + this PipeWriter writer, + int maxBufferSize, + CopyToSpan copyTo, + CancellationToken token = default) + { + writer.Write(maxBufferSize, copyTo); return await writer.FlushAsync(token); } diff --git a/Pipelines.Extensions/PipelinesExtensions.cs b/Pipelines.Extensions/PipelinesExtensions.cs index 9eea342..6829363 100644 --- a/Pipelines.Extensions/PipelinesExtensions.cs +++ b/Pipelines.Extensions/PipelinesExtensions.cs @@ -16,6 +16,7 @@ public static async ValueTask LinkToAsync(this IDuplexPipe pipe1, IDuplexPipe pi var b = pipe2.Input.CopyToAsync(pipe1.Output, token); var task = await Task.WhenAny(a, b); + // A pipe run to an exception cause another pend reading permanently if (task.IsCompletedSuccessfully) { await Task.WhenAll(a, b); diff --git a/Shadowsocks.Protocol/LocalTcpServices/Socks5Service.cs b/Shadowsocks.Protocol/LocalTcpServices/Socks5Service.cs index 43bd1e8..e5a1347 100644 --- a/Shadowsocks.Protocol/LocalTcpServices/Socks5Service.cs +++ b/Shadowsocks.Protocol/LocalTcpServices/Socks5Service.cs @@ -2,7 +2,6 @@ using Microsoft.Extensions.Logging; using Pipelines.Extensions; using Shadowsocks.Protocol.ServersControllers; -using Shadowsocks.Protocol.TcpClients; using Socks5.Enums; using Socks5.Models; using Socks5.Servers; @@ -70,11 +69,9 @@ public async ValueTask HandleAsync(IDuplexPipe pipe, CancellationToken token = d }; await socks5.SendReplyAsync(Socks5Reply.Succeeded, bound, token); - Verify.Operation(client.Pipe is not null, @"You should TryConnect successfully first!"); + var clientPipe = client.GetPipe(target, socks5.Target.Port); - await client.Pipe.Output.SendShadowsocksHeaderAsync(target, socks5.Target.Port, token); - - await client.Pipe.LinkToAsync(pipe, token); + await clientPipe.LinkToAsync(pipe, token); break; } diff --git a/Shadowsocks.Protocol/TcpClients/IPipeClient.cs b/Shadowsocks.Protocol/TcpClients/IPipeClient.cs index 3bbca40..3248f7c 100644 --- a/Shadowsocks.Protocol/TcpClients/IPipeClient.cs +++ b/Shadowsocks.Protocol/TcpClients/IPipeClient.cs @@ -7,8 +7,8 @@ namespace Shadowsocks.Protocol.TcpClients { public interface IPipeClient : IAsyncDisposable { - IDuplexPipe? Pipe { get; } + ValueTask ConnectAsync(CancellationToken token); - ValueTask TryConnectAsync(CancellationToken token); + IDuplexPipe GetPipe(string targetAddress, ushort targetPort); } } diff --git a/Shadowsocks.Protocol/TcpClients/ShadowsocksDuplexPipe.cs b/Shadowsocks.Protocol/TcpClients/ShadowsocksDuplexPipe.cs index 3a5146a..582cb3f 100644 --- a/Shadowsocks.Protocol/TcpClients/ShadowsocksDuplexPipe.cs +++ b/Shadowsocks.Protocol/TcpClients/ShadowsocksDuplexPipe.cs @@ -22,14 +22,17 @@ public class ShadowsocksDuplexPipe : IDuplexPipe public ShadowsocksDuplexPipe( IDuplexPipe pipe, ShadowsocksServerInfo serverInfo, + string targetAddress, ushort targetPort, PipeOptions? pipeOptions = null, CancellationToken cancellationToken = default) { Requires.NotNull(pipe, nameof(pipe)); Requires.NotNull(serverInfo, nameof(serverInfo)); + Requires.NotNullAllowStructs(serverInfo.Method, nameof(serverInfo)); + Requires.NotNullAllowStructs(serverInfo.Password, nameof(serverInfo)); - var encryptor = ShadowsocksCrypto.Create(serverInfo.Method!, serverInfo.Password!); - var decryptor = ShadowsocksCrypto.Create(serverInfo.Method!, serverInfo.Password!); + var encryptor = ShadowsocksCrypto.Create(serverInfo.Method, serverInfo.Password); + var decryptor = ShadowsocksCrypto.Create(serverInfo.Method, serverInfo.Password); pipeOptions ??= PipeOptions.Default; @@ -37,6 +40,7 @@ public ShadowsocksDuplexPipe( Input = WrapReader(decryptor, pipeOptions, cancellationToken); Output = WrapWriter(encryptor, pipeOptions, cancellationToken); + Output.WriteShadowsocksHeader(targetAddress, targetPort); } private PipeWriter WrapWriter( @@ -58,7 +62,7 @@ private PipeWriter WrapWriter( { foreach (var segment in buffer) { - SendToRemote(encryptor, _upPipe.Output, segment); + SendToRemote(encryptor, _upPipe.Output, segment.Span); } var flushResult = await _upPipe.Output.FlushAsync(cancellationToken); @@ -141,16 +145,16 @@ private PipeReader WrapReader( return pipe.Reader; } - private void SendToRemote( + private static void SendToRemote( IShadowsocksCrypto encryptor, PipeWriter writer, - ReadOnlyMemory buffer) + ReadOnlySpan buffer) { while (!buffer.IsEmpty) { - var memory = writer.GetMemory(BufferSize); + var span = writer.GetSpan(BufferSize); - encryptor.EncryptTCP(buffer.Span, memory.Span, out var p, out var outLength); + encryptor.EncryptTCP(buffer, span, out var p, out var outLength); writer.Advance(outLength); @@ -173,9 +177,9 @@ private static bool ReceiveFromRemote( { var oldLength = sequence.Length; - var memory = writer.GetMemory(BufferSize); + var span = writer.GetSpan(BufferSize); - var outLength = decryptor.DecryptTCP(ref sequence, memory.Span); + var outLength = decryptor.DecryptTCP(ref sequence, span); writer.Advance(outLength); if (outLength > 0) diff --git a/Shadowsocks.Protocol/TcpClients/ShadowsocksPipeExtensions.cs b/Shadowsocks.Protocol/TcpClients/ShadowsocksPipeExtensions.cs index 645fa83..e39aded 100644 --- a/Shadowsocks.Protocol/TcpClients/ShadowsocksPipeExtensions.cs +++ b/Shadowsocks.Protocol/TcpClients/ShadowsocksPipeExtensions.cs @@ -2,7 +2,6 @@ using Socks5.Utils; using System.IO.Pipelines; using System.Threading; -using System.Threading.Tasks; namespace Shadowsocks.Protocol.TcpClients { @@ -11,20 +10,20 @@ public static class ShadowsocksPipeExtensions public static IDuplexPipe AsShadowsocksPipe( this IDuplexPipe pipe, ShadowsocksServerInfo serverInfo, + string targetAddress, ushort targetPort, PipeOptions? pipeOptions = null, CancellationToken cancellationToken = default) { - return new ShadowsocksDuplexPipe(pipe, serverInfo, pipeOptions, cancellationToken); + return new ShadowsocksDuplexPipe(pipe, serverInfo, targetAddress, targetPort, pipeOptions, cancellationToken); } - public static async ValueTask SendShadowsocksHeaderAsync( + public static void WriteShadowsocksHeader( this PipeWriter writer, - string targetAddress, ushort targetPort, CancellationToken token) + string targetAddress, ushort targetPort) { - var memory = writer.GetMemory(1 + 1 + byte.MaxValue + 2); - var addressLength = Pack.DestinationAddressAndPort(targetAddress, default, targetPort, memory.Span); + var span = writer.GetSpan(1 + 1 + byte.MaxValue + 2); + var addressLength = Pack.DestinationAddressAndPort(targetAddress, default, targetPort, span); writer.Advance(addressLength); - return await writer.FlushAsync(token); } } } diff --git a/Shadowsocks.Protocol/TcpClients/ShadowsocksTcpClient.cs b/Shadowsocks.Protocol/TcpClients/ShadowsocksTcpClient.cs index 9775863..4d1d35e 100644 --- a/Shadowsocks.Protocol/TcpClients/ShadowsocksTcpClient.cs +++ b/Shadowsocks.Protocol/TcpClients/ShadowsocksTcpClient.cs @@ -1,8 +1,6 @@ using Microsoft; -using Microsoft.Extensions.Logging; using Pipelines.Extensions; using Shadowsocks.Protocol.Models; -using System; using System.IO.Pipelines; using System.Net.Sockets; using System.Threading; @@ -14,38 +12,28 @@ public sealed class ShadowsocksTcpClient : IPipeClient { private TcpClient? _client; - public IDuplexPipe? Pipe { get; private set; } + private IDuplexPipe? _pipe; - private readonly ILogger _logger; private readonly ShadowsocksServerInfo _serverInfo; - private const string LogHeader = @"[ShadowsocksTcpClient]"; - - public ShadowsocksTcpClient(ILogger logger, ShadowsocksServerInfo serverInfo) + public ShadowsocksTcpClient(ShadowsocksServerInfo serverInfo) { - _logger = logger; _serverInfo = serverInfo; } - public async ValueTask TryConnectAsync(CancellationToken token) + public async ValueTask ConnectAsync(CancellationToken token) { - try - { - Requires.NotNullAllowStructs(_serverInfo.Address, nameof(_serverInfo.Address)); + Requires.NotNullAllowStructs(_serverInfo.Address, nameof(_serverInfo.Address)); - _client = new TcpClient { NoDelay = true }; - await _client.ConnectAsync(_serverInfo.Address, _serverInfo.Port, token); + _client = new TcpClient { NoDelay = true }; + await _client.ConnectAsync(_serverInfo.Address, _serverInfo.Port, token); + } - Pipe = _client.GetStream().AsDuplexPipe().AsShadowsocksPipe(_serverInfo); + public IDuplexPipe GetPipe(string targetAddress, ushort targetPort) + { + Verify.Operation(_client is not null && _client.Connected, @"You must connect to the server first!"); - _logger.LogDebug(@"{0} TryConnect success", LogHeader); - return true; - } - catch (Exception ex) - { - _logger.LogError(ex, @"{0} TryConnect error", LogHeader); - return false; - } + return _pipe ??= _client.GetStream().AsDuplexPipe().AsShadowsocksPipe(_serverInfo, targetAddress, targetPort); } public override string? ToString() @@ -56,10 +44,10 @@ public async ValueTask TryConnectAsync(CancellationToken token) public async ValueTask DisposeAsync() { _client?.Dispose(); - if (Pipe is not null) + if (_pipe is not null) { - await Pipe.Input.CompleteAsync(); - await Pipe.Output.CompleteAsync(); + await _pipe.Input.CompleteAsync(); + await _pipe.Output.CompleteAsync(); } } } diff --git a/Socks5/Clients/Socks5Client.cs b/Socks5/Clients/Socks5Client.cs index e642264..2817d21 100644 --- a/Socks5/Clients/Socks5Client.cs +++ b/Socks5/Clients/Socks5Client.cs @@ -231,9 +231,9 @@ private static async ValueTask HandshakeMethodAsync(IDuplexPipe pipe, IR return method; - int PackHandshake(Memory memory) + int PackHandshake(Span span) { - return Pack.Handshake(clientMethods, memory.Span); + return Pack.Handshake(clientMethods, span); } ParseResult HandleResponse(ref ReadOnlySequence buffer) @@ -250,9 +250,9 @@ private static async ValueTask AuthAsync(IDuplexPipe pipe, UsernamePassword cred await pipe.Input.ReadAsync(HandleResponse, token); - int PackUsernamePassword(Memory memory) + int PackUsernamePassword(Span span) { - return Pack.UsernamePasswordAuth(credential, memory.Span); + return Pack.UsernamePasswordAuth(credential, span); } static ParseResult HandleResponse(ref ReadOnlySequence buffer) @@ -277,9 +277,9 @@ private static async ValueTask SendCommandAsync( return bound; - int PackClientCommand(Memory memory) + int PackClientCommand(Span span) { - return Pack.ClientCommand(command, dst, dstAddress, dstPort, memory.Span); + return Pack.ClientCommand(command, dst, dstAddress, dstPort, span); } ParseResult HandleResponse(ref ReadOnlySequence buffer) diff --git a/Socks5/Servers/Socks5ServerConnection.cs b/Socks5/Servers/Socks5ServerConnection.cs index 956060d..7c98356 100644 --- a/Socks5/Servers/Socks5ServerConnection.cs +++ b/Socks5/Servers/Socks5ServerConnection.cs @@ -100,9 +100,9 @@ ParseResult TryReadClientHandshake(ref ReadOnlySequence buffer) return Unpack.ReadClientHandshake(ref buffer, ref methods) ? ParseResult.Success : ParseResult.NeedsMoreData; } - int PackMethod(Memory memory) + int PackMethod(Span span) { - return Pack.Handshake(method, memory.Span); + return Pack.Handshake(method, span); } } @@ -124,9 +124,9 @@ ParseResult TryReadClientAuth(ref ReadOnlySequence buffer) return Unpack.ReadClientAuth(ref buffer, ref clientCredential) ? ParseResult.Success : ParseResult.NeedsMoreData; } - int PackReply(Memory memory) + int PackReply(Span span) { - return Pack.AuthReply(isAuth, memory.Span); + return Pack.AuthReply(isAuth, span); } } @@ -199,9 +199,9 @@ public async ValueTask SendReplyAsync(Socks5Reply reply, ServerBound bound, Canc { await _pipe.Output.WriteAsync(Constants.MaxCommandLength, PackCommand, token); - int PackCommand(Memory memory) + int PackCommand(Span span) { - return Pack.ServerReply(reply, bound, memory.Span); + return Pack.ServerReply(reply, bound, span); } } } diff --git a/TestConsoleApp/TestServersController.cs b/TestConsoleApp/TestServersController.cs index 8eafd49..2822678 100644 --- a/TestConsoleApp/TestServersController.cs +++ b/TestConsoleApp/TestServersController.cs @@ -30,14 +30,11 @@ public async ValueTask GetServerAsync(string target) Remark = @"" }; - IPipeClient client = new ShadowsocksTcpClient(_logger, info); + IPipeClient client = new ShadowsocksTcpClient(info); var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3)); - if (!await client.TryConnectAsync(cts.Token)) - { - throw new Exception($@"Connect to {info} Error"); - } + await client.ConnectAsync(cts.Token); return client; }