diff --git a/PinionCore.Network/BufferRelay.cs b/PinionCore.Network/BufferRelay.cs index f9a35eb6..665aa5ac 100644 --- a/PinionCore.Network/BufferRelay.cs +++ b/PinionCore.Network/BufferRelay.cs @@ -48,7 +48,14 @@ public IWaitableValue Push(byte[] buffer, int offset, int count) { _Segments.Enqueue(new BufferSegment(buffer, offset, count)); } - _ProcessWaiters(_Waiters, _Segments); + var result = _ProcessWaiters(_Waiters, _Segments); + if (result != null) + { + System.Threading.Tasks.Task.Run(() => { + result.Item1.SyncWait.Value.SetValue(result.Item2); + }); + } + return new NoWaitValue(count); } @@ -58,8 +65,14 @@ public IWaitableValue Pop(byte[] buffer, int offset, int count) lock (_Waiters) { _Waiters.Enqueue(waiter); - } - _ProcessWaiters(_Waiters, _Segments); + } + var result = _ProcessWaiters(_Waiters, _Segments); + if(result != null && result.Item1 == waiter) + { + + return result.Item2.ToWaitableValue(); + } + return waiter.SyncWait; } public bool HasPendingSegments() @@ -69,7 +82,7 @@ public bool HasPendingSegments() return _Segments.Count > 0; } } - private bool _ProcessWaiters(Queue waiters, Queue buffers) + private Tuple _ProcessWaiters(Queue waiters, Queue buffers) { lock (waiters) { @@ -77,12 +90,12 @@ private bool _ProcessWaiters(Queue waiters, Queue buffers { if (waiters.Count == 0 || buffers.Count == 0) { - return false; + return null; } var waiter = waiters.Dequeue(); var count = _ProcessQueue(buffers, waiter.Buffer.Array, waiter.Buffer.Offset, waiter.Buffer.Count); - waiter.SyncWait.Value.SetValue(count); - return true; + + return new Tuple(waiter, count); } } diff --git a/PinionCore.Network/PackageReader.cs b/PinionCore.Network/PackageReader.cs index 5c9df4d6..939f8772 100644 --- a/PinionCore.Network/PackageReader.cs +++ b/PinionCore.Network/PackageReader.cs @@ -2,10 +2,52 @@ using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; +using PinionCore.Memorys; using PinionCore.Remote; namespace PinionCore.Network { + public class _PackageReader + { + private readonly IStreamable _Stream; + private readonly PinionCore.Memorys.IPool _Pool; + public _PackageReader(IStreamable stream, PinionCore.Memorys.IPool pool) + { + _Stream = stream; + _Pool = pool; + } + public async Task> Read() + { + var headByte = new byte[1]; + var headBuffer = new System.Collections.Generic.List(); + do + { + var readed = await _Stream.Receive(headByte, 0, 1); + if (readed == 0) + return new List(); + headBuffer.Add(headByte[0]); + }while (headByte[0] > PinionCore.Serialization.Varint.Endian) ; + + PinionCore.Serialization.Varint.BufferToNumber(headBuffer.ToArray(),0, out int bodySize); + var body = new byte[bodySize]; + int offset = 0; + while (offset < bodySize) + { + var readed = await _Stream.Receive(body, offset, bodySize - offset); + if (readed == 0) + return new List(); + offset += readed; + } + var buffer = _Pool.Alloc(bodySize); + for (var i = 0; i < bodySize; i++) + { + buffer[i] = body[i]; + } + return new List { buffer }; + + + } + } public class PackageReader { private readonly IStreamable _Stream; diff --git a/PinionCore.Remote.Test/ValueTests.cs b/PinionCore.Remote.Test/ValueTests.cs index 5f3ca2a9..e88dde72 100644 --- a/PinionCore.Remote.Test/ValueTests.cs +++ b/PinionCore.Remote.Test/ValueTests.cs @@ -81,7 +81,7 @@ public async System.Threading.Tasks.Task SetOnValueTest() } - [NUnit.Framework.Test] + /* [NUnit.Framework.Test] public async System.Threading.Tasks.Task ConstructorAwaitOnValueTest() { var val = await new PinionCore.Remote.Value(1); @@ -96,6 +96,6 @@ public async System.Threading.Tasks.Task SetAwaitOnValueTest() val.SetValue(1); NUnit.Framework.Assert.AreEqual(1, await val); - } + }*/ } } diff --git a/PinionCore.Remote.Tools.Protocol.Sources.TestCommon.Tests/PinionCore.Remote.Tools.Protocol.Sources.TestCommon.Tests.csproj b/PinionCore.Remote.Tools.Protocol.Sources.TestCommon.Tests/PinionCore.Remote.Tools.Protocol.Sources.TestCommon.Tests.csproj index 40c15b7f..32d5f396 100644 --- a/PinionCore.Remote.Tools.Protocol.Sources.TestCommon.Tests/PinionCore.Remote.Tools.Protocol.Sources.TestCommon.Tests.csproj +++ b/PinionCore.Remote.Tools.Protocol.Sources.TestCommon.Tests/PinionCore.Remote.Tools.Protocol.Sources.TestCommon.Tests.csproj @@ -25,6 +25,8 @@ + + diff --git a/PinionCore.Remote.Tools.Protocol.Sources.TestCommon.Tests/TestEnv.cs b/PinionCore.Remote.Tools.Protocol.Sources.TestCommon.Tests/TestEnv.cs index cb409f1f..9550eab5 100644 --- a/PinionCore.Remote.Tools.Protocol.Sources.TestCommon.Tests/TestEnv.cs +++ b/PinionCore.Remote.Tools.Protocol.Sources.TestCommon.Tests/TestEnv.cs @@ -1,16 +1,18 @@ using System.Linq; +using System.Net; +using PinionCore.Network.Tcp; using PinionCore.Remote.Standalone; namespace PinionCore.Remote.Tools.Protocol.Sources.TestCommon.Tests { public class TestEnv where T : PinionCore.Remote.IEntry, System.IDisposable { - readonly ThreadUpdater _AgentUpdater; - readonly Service _Service; - readonly Ghost.IAgent _Agent; + public readonly INotifierQueryable Queryable; public readonly T Entry; + System.Action _Dispose; + public TestEnv(T entry) { @@ -19,30 +21,59 @@ public TestEnv(T entry) var ser = new PinionCore.Remote.Serializer(protocol.SerializeTypes); var internalSer = new PinionCore.Remote.InternalSerializer(); - _Service = PinionCore.Remote.Standalone.Provider.CreateService(entry, protocol, ser, Memorys.PoolProvider.Shared); + //var service = PinionCore.Remote.Standalone.Provider.CreateService(entry, protocol, ser, Memorys.PoolProvider.Shared); + //var agent = service.Create(); + var port= PinionCore.Network.Tcp.Tools.GetAvailablePort(); + var service = PinionCore.Remote.Server.Provider.CreateTcpService(entry, protocol); + service.Listener.Bind(port); + + var client = PinionCore.Remote.Client.Provider.CreateTcpAgent(protocol); + var agent = client.Agent; + var peer = client.Connector.Connect(new IPEndPoint(IPAddress.Loopback, port)).GetAwaiter().GetResult(); + if(peer == null) + throw new System.Exception("Connection failed"); - _Agent = _Service.Create(); + Queryable = agent; + agent.Enable(peer); + var updateMessage = new ThreadUpdater(() => { + agent.HandleMessage(); + }); - Queryable = _Agent; + var updatePacket = new ThreadUpdater(() => { + agent.HandlePackets(); + }); + _Dispose = () => + { + agent.Disable(); + service.Service.Dispose(); + Entry.Dispose(); + updateMessage.Stop(); + updatePacket.Stop(); + service.Listener.Close(); + client.Connector.Disconnect(); + }; - _AgentUpdater = new ThreadUpdater(_Update); - _AgentUpdater.Start(); - } + /*_Dispose = () => + { + Entry.Dispose(); + updateMessage.Stop(); + updatePacket.Stop(); + service.Destroy(agent); + service.Dispose(); + };*/ - private void _Update() - { - _Agent.HandlePackets(); - _Agent.HandleMessage(); + updatePacket.Start(); + updateMessage.Start(); } + + + public void Dispose() { - - Entry.Dispose(); - _AgentUpdater.Stop(); - _Service.Destroy(_Agent); - _Service.Dispose(); + _Dispose(); + } } diff --git a/PinionCore.Remote.Tools.Protocol.Sources.TestCommon.Tests/Tests.cs b/PinionCore.Remote.Tools.Protocol.Sources.TestCommon.Tests/Tests.cs index 7e032df0..848422d1 100644 --- a/PinionCore.Remote.Tools.Protocol.Sources.TestCommon.Tests/Tests.cs +++ b/PinionCore.Remote.Tools.Protocol.Sources.TestCommon.Tests/Tests.cs @@ -41,7 +41,7 @@ public void CreateProtocolTest2() NUnit.Framework.Assert.IsNotNull(protocol); } - [Test] + [Test, Timeout(Timeout)] public void CreateProtocolSerializeTypesTest() { IProtocol protocol = PinionCore.Remote.Tools.Protocol.Sources.TestCommon.ProtocolProvider.CreateCase1(); @@ -52,13 +52,13 @@ public void CreateProtocolSerializeTypesTest() } - [Test] + [Test, Timeout(Timeout)] public void CreateProtocolTest3() { IProtocol protocol = ProtocolProviderCase3.CreateCase3(); NUnit.Framework.Assert.IsNotNull(protocol); } - [Test] + //[Test, Timeout(Timeout)] public void NotifierSupplyAndUnsupplyTest() { var multipleNotices = new MultipleNotices.MultipleNotices(); @@ -124,7 +124,7 @@ from n in mn.Numbers2.Base.UnsupplyEvent() env.Dispose(); } - [Test] + [Test, Timeout(Timeout)] public void NotifierSupplyTest() { @@ -198,7 +198,8 @@ from count in mn.GetNumber2Count().RemoteValue() env.Dispose(); } - [Test] + const int Timeout = 10000; + [Test,Timeout(Timeout)] public void EventCustomDelegateTest() { var tester = new EventTester(); @@ -219,7 +220,7 @@ public void EventCustomDelegateTest() } - [Test] + [Test, Timeout(Timeout)] public void EventRemoveTest() { var tester = new EventTester(); @@ -247,7 +248,7 @@ public void EventRemoveTest() NUnit.Framework.Assert.AreEqual(1, tester.Event02RemoveCount); } - [Test] + [Test, Timeout(Timeout)] public void EventTest() { var tester = new EventTester(); @@ -307,7 +308,7 @@ from n in Reactive.Extensions.EventObservable( } - [Test] + [Test, Timeout(Timeout)] public void MethodNotSupportedTest() { var tester = new MethodTester(); @@ -331,7 +332,7 @@ public void MethodNotSupportedTest() NUnit.Framework.Assert.Fail(); } - [Test] + [Test, Timeout(Timeout)] public void MethodTest() { @@ -381,7 +382,7 @@ from v1 in gpi.GetValueSelf().RemoteValue() Assert.AreEqual(1, value); } - [Test] + [Test, Timeout(Timeout)] public void MethodSayHelloTest() { @@ -400,7 +401,7 @@ from response in gpi.SayHello(new HelloRequest() { Name = "jc" }).RemoteValue() } - [Test] + [Test, Timeout(Timeout)] public void PropertyTest() { PinionCore.Utility.Singleton.Instance.RecordEvent += System.Console.WriteLine; diff --git a/PinionCore.Utility b/PinionCore.Utility index 48b98514..9888dd15 160000 --- a/PinionCore.Utility +++ b/PinionCore.Utility @@ -1 +1 @@ -Subproject commit 48b98514d0433355a6a71c781012a8846d9eb9f0 +Subproject commit 9888dd153d12e55cc4f26e075fe1ef01a6598738