Skip to content

Commit

Permalink
refactor: Better pipe implement
Browse files Browse the repository at this point in the history
  • Loading branch information
HMBSbige committed Aug 11, 2021
1 parent 5198c97 commit 093cd64
Show file tree
Hide file tree
Showing 11 changed files with 67 additions and 72 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
using System;
using System.Buffers;

namespace Pipelines.Extensions
{
public delegate ParseResult HandleReadOnlySequence(ref ReadOnlySequence<byte> buffer);

public delegate int CopyToSpan(Span<byte> buffer);
}
18 changes: 12 additions & 6 deletions Pipelines.Extensions/PipeWriterExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,23 @@ namespace Pipelines.Extensions
public static partial class PipelinesExtensions
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static async ValueTask<FlushResult> WriteAsync(
this PipeWriter writer,
int maxBufferSize,
Func<Memory<byte>, int> copyTo,
CancellationToken token = default)
public static void Write(this PipeWriter writer, int maxBufferSize, CopyToSpan copyTo)
{
var memory = writer.GetMemory(maxBufferSize);
var memory = writer.GetSpan(maxBufferSize);

var length = copyTo(memory);

writer.Advance(length);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static async ValueTask<FlushResult> WriteAsync(
this PipeWriter writer,
int maxBufferSize,
CopyToSpan copyTo,
CancellationToken token = default)
{
writer.Write(maxBufferSize, copyTo);
return await writer.FlushAsync(token);
}

Expand Down
1 change: 1 addition & 0 deletions Pipelines.Extensions/PipelinesExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public static async ValueTask LinkToAsync(this IDuplexPipe pipe1, IDuplexPipe pi
var b = pipe2.Input.CopyToAsync(pipe1.Output, token);

var task = await Task.WhenAny(a, b);
// A pipe run to an exception cause another pend reading permanently
if (task.IsCompletedSuccessfully)
{
await Task.WhenAll(a, b);
Expand Down
7 changes: 2 additions & 5 deletions Shadowsocks.Protocol/LocalTcpServices/Socks5Service.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
using Microsoft.Extensions.Logging;
using Pipelines.Extensions;
using Shadowsocks.Protocol.ServersControllers;
using Shadowsocks.Protocol.TcpClients;
using Socks5.Enums;
using Socks5.Models;
using Socks5.Servers;
Expand Down Expand Up @@ -70,11 +69,9 @@ public async ValueTask HandleAsync(IDuplexPipe pipe, CancellationToken token = d
};
await socks5.SendReplyAsync(Socks5Reply.Succeeded, bound, token);

Verify.Operation(client.Pipe is not null, @"You should TryConnect successfully first!");
var clientPipe = client.GetPipe(target, socks5.Target.Port);

await client.Pipe.Output.SendShadowsocksHeaderAsync(target, socks5.Target.Port, token);

await client.Pipe.LinkToAsync(pipe, token);
await clientPipe.LinkToAsync(pipe, token);

break;
}
Expand Down
4 changes: 2 additions & 2 deletions Shadowsocks.Protocol/TcpClients/IPipeClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ namespace Shadowsocks.Protocol.TcpClients
{
public interface IPipeClient : IAsyncDisposable
{
IDuplexPipe? Pipe { get; }
ValueTask ConnectAsync(CancellationToken token);

ValueTask<bool> TryConnectAsync(CancellationToken token);
IDuplexPipe GetPipe(string targetAddress, ushort targetPort);
}
}
22 changes: 13 additions & 9 deletions Shadowsocks.Protocol/TcpClients/ShadowsocksDuplexPipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,25 @@ public class ShadowsocksDuplexPipe : IDuplexPipe
public ShadowsocksDuplexPipe(
IDuplexPipe pipe,
ShadowsocksServerInfo serverInfo,
string targetAddress, ushort targetPort,
PipeOptions? pipeOptions = null,
CancellationToken cancellationToken = default)
{
Requires.NotNull(pipe, nameof(pipe));
Requires.NotNull(serverInfo, nameof(serverInfo));
Requires.NotNullAllowStructs(serverInfo.Method, nameof(serverInfo));
Requires.NotNullAllowStructs(serverInfo.Password, nameof(serverInfo));

var encryptor = ShadowsocksCrypto.Create(serverInfo.Method!, serverInfo.Password!);
var decryptor = ShadowsocksCrypto.Create(serverInfo.Method!, serverInfo.Password!);
var encryptor = ShadowsocksCrypto.Create(serverInfo.Method, serverInfo.Password);
var decryptor = ShadowsocksCrypto.Create(serverInfo.Method, serverInfo.Password);

pipeOptions ??= PipeOptions.Default;

_upPipe = pipe;

Input = WrapReader(decryptor, pipeOptions, cancellationToken);
Output = WrapWriter(encryptor, pipeOptions, cancellationToken);
Output.WriteShadowsocksHeader(targetAddress, targetPort);
}

private PipeWriter WrapWriter(
Expand All @@ -58,7 +62,7 @@ private PipeWriter WrapWriter(
{
foreach (var segment in buffer)
{
SendToRemote(encryptor, _upPipe.Output, segment);
SendToRemote(encryptor, _upPipe.Output, segment.Span);
}

var flushResult = await _upPipe.Output.FlushAsync(cancellationToken);
Expand Down Expand Up @@ -141,16 +145,16 @@ private PipeReader WrapReader(
return pipe.Reader;
}

private void SendToRemote(
private static void SendToRemote(
IShadowsocksCrypto encryptor,
PipeWriter writer,
ReadOnlyMemory<byte> buffer)
ReadOnlySpan<byte> buffer)
{
while (!buffer.IsEmpty)
{
var memory = writer.GetMemory(BufferSize);
var span = writer.GetSpan(BufferSize);

encryptor.EncryptTCP(buffer.Span, memory.Span, out var p, out var outLength);
encryptor.EncryptTCP(buffer, span, out var p, out var outLength);

writer.Advance(outLength);

Expand All @@ -173,9 +177,9 @@ private static bool ReceiveFromRemote(
{
var oldLength = sequence.Length;

var memory = writer.GetMemory(BufferSize);
var span = writer.GetSpan(BufferSize);

var outLength = decryptor.DecryptTCP(ref sequence, memory.Span);
var outLength = decryptor.DecryptTCP(ref sequence, span);

writer.Advance(outLength);
if (outLength > 0)
Expand Down
13 changes: 6 additions & 7 deletions Shadowsocks.Protocol/TcpClients/ShadowsocksPipeExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
using Socks5.Utils;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;

namespace Shadowsocks.Protocol.TcpClients
{
Expand All @@ -11,20 +10,20 @@ public static class ShadowsocksPipeExtensions
public static IDuplexPipe AsShadowsocksPipe(
this IDuplexPipe pipe,
ShadowsocksServerInfo serverInfo,
string targetAddress, ushort targetPort,
PipeOptions? pipeOptions = null,
CancellationToken cancellationToken = default)
{
return new ShadowsocksDuplexPipe(pipe, serverInfo, pipeOptions, cancellationToken);
return new ShadowsocksDuplexPipe(pipe, serverInfo, targetAddress, targetPort, pipeOptions, cancellationToken);
}

public static async ValueTask<FlushResult> SendShadowsocksHeaderAsync(
public static void WriteShadowsocksHeader(
this PipeWriter writer,
string targetAddress, ushort targetPort, CancellationToken token)
string targetAddress, ushort targetPort)
{
var memory = writer.GetMemory(1 + 1 + byte.MaxValue + 2);
var addressLength = Pack.DestinationAddressAndPort(targetAddress, default, targetPort, memory.Span);
var span = writer.GetSpan(1 + 1 + byte.MaxValue + 2);
var addressLength = Pack.DestinationAddressAndPort(targetAddress, default, targetPort, span);
writer.Advance(addressLength);
return await writer.FlushAsync(token);
}
}
}
40 changes: 14 additions & 26 deletions Shadowsocks.Protocol/TcpClients/ShadowsocksTcpClient.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
using Microsoft;
using Microsoft.Extensions.Logging;
using Pipelines.Extensions;
using Shadowsocks.Protocol.Models;
using System;
using System.IO.Pipelines;
using System.Net.Sockets;
using System.Threading;
Expand All @@ -14,38 +12,28 @@ public sealed class ShadowsocksTcpClient : IPipeClient
{
private TcpClient? _client;

public IDuplexPipe? Pipe { get; private set; }
private IDuplexPipe? _pipe;

private readonly ILogger _logger;
private readonly ShadowsocksServerInfo _serverInfo;

private const string LogHeader = @"[ShadowsocksTcpClient]";

public ShadowsocksTcpClient(ILogger logger, ShadowsocksServerInfo serverInfo)
public ShadowsocksTcpClient(ShadowsocksServerInfo serverInfo)
{
_logger = logger;
_serverInfo = serverInfo;
}

public async ValueTask<bool> TryConnectAsync(CancellationToken token)
public async ValueTask ConnectAsync(CancellationToken token)
{
try
{
Requires.NotNullAllowStructs(_serverInfo.Address, nameof(_serverInfo.Address));
Requires.NotNullAllowStructs(_serverInfo.Address, nameof(_serverInfo.Address));

_client = new TcpClient { NoDelay = true };
await _client.ConnectAsync(_serverInfo.Address, _serverInfo.Port, token);
_client = new TcpClient { NoDelay = true };
await _client.ConnectAsync(_serverInfo.Address, _serverInfo.Port, token);
}

Pipe = _client.GetStream().AsDuplexPipe().AsShadowsocksPipe(_serverInfo);
public IDuplexPipe GetPipe(string targetAddress, ushort targetPort)
{
Verify.Operation(_client is not null && _client.Connected, @"You must connect to the server first!");

_logger.LogDebug(@"{0} TryConnect success", LogHeader);
return true;
}
catch (Exception ex)
{
_logger.LogError(ex, @"{0} TryConnect error", LogHeader);
return false;
}
return _pipe ??= _client.GetStream().AsDuplexPipe().AsShadowsocksPipe(_serverInfo, targetAddress, targetPort);
}

public override string? ToString()
Expand All @@ -56,10 +44,10 @@ public async ValueTask<bool> TryConnectAsync(CancellationToken token)
public async ValueTask DisposeAsync()
{
_client?.Dispose();
if (Pipe is not null)
if (_pipe is not null)
{
await Pipe.Input.CompleteAsync();
await Pipe.Output.CompleteAsync();
await _pipe.Input.CompleteAsync();
await _pipe.Output.CompleteAsync();
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions Socks5/Clients/Socks5Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,9 @@ private static async ValueTask<Method> HandshakeMethodAsync(IDuplexPipe pipe, IR

return method;

int PackHandshake(Memory<byte> memory)
int PackHandshake(Span<byte> span)
{
return Pack.Handshake(clientMethods, memory.Span);
return Pack.Handshake(clientMethods, span);
}

ParseResult HandleResponse(ref ReadOnlySequence<byte> buffer)
Expand All @@ -250,9 +250,9 @@ private static async ValueTask AuthAsync(IDuplexPipe pipe, UsernamePassword cred

await pipe.Input.ReadAsync(HandleResponse, token);

int PackUsernamePassword(Memory<byte> memory)
int PackUsernamePassword(Span<byte> span)
{
return Pack.UsernamePasswordAuth(credential, memory.Span);
return Pack.UsernamePasswordAuth(credential, span);
}

static ParseResult HandleResponse(ref ReadOnlySequence<byte> buffer)
Expand All @@ -277,9 +277,9 @@ private static async ValueTask<ServerBound> SendCommandAsync(

return bound;

int PackClientCommand(Memory<byte> memory)
int PackClientCommand(Span<byte> span)
{
return Pack.ClientCommand(command, dst, dstAddress, dstPort, memory.Span);
return Pack.ClientCommand(command, dst, dstAddress, dstPort, span);
}

ParseResult HandleResponse(ref ReadOnlySequence<byte> buffer)
Expand Down
12 changes: 6 additions & 6 deletions Socks5/Servers/Socks5ServerConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ ParseResult TryReadClientHandshake(ref ReadOnlySequence<byte> buffer)
return Unpack.ReadClientHandshake(ref buffer, ref methods) ? ParseResult.Success : ParseResult.NeedsMoreData;
}

int PackMethod(Memory<byte> memory)
int PackMethod(Span<byte> span)
{
return Pack.Handshake(method, memory.Span);
return Pack.Handshake(method, span);
}
}

Expand All @@ -124,9 +124,9 @@ ParseResult TryReadClientAuth(ref ReadOnlySequence<byte> buffer)
return Unpack.ReadClientAuth(ref buffer, ref clientCredential) ? ParseResult.Success : ParseResult.NeedsMoreData;
}

int PackReply(Memory<byte> memory)
int PackReply(Span<byte> span)
{
return Pack.AuthReply(isAuth, memory.Span);
return Pack.AuthReply(isAuth, span);
}
}

Expand Down Expand Up @@ -199,9 +199,9 @@ public async ValueTask SendReplyAsync(Socks5Reply reply, ServerBound bound, Canc
{
await _pipe.Output.WriteAsync(Constants.MaxCommandLength, PackCommand, token);

int PackCommand(Memory<byte> memory)
int PackCommand(Span<byte> span)
{
return Pack.ServerReply(reply, bound, memory.Span);
return Pack.ServerReply(reply, bound, span);
}
}
}
Expand Down
7 changes: 2 additions & 5 deletions TestConsoleApp/TestServersController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,11 @@ public async ValueTask<IPipeClient> GetServerAsync(string target)
Remark = @""
};

IPipeClient client = new ShadowsocksTcpClient(_logger, info);
IPipeClient client = new ShadowsocksTcpClient(info);

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3));

if (!await client.TryConnectAsync(cts.Token))
{
throw new Exception($@"Connect to {info} Error");
}
await client.ConnectAsync(cts.Token);

return client;
}
Expand Down

0 comments on commit 093cd64

Please sign in to comment.