From 23ac2c11cdd8b58d2e8484b8da10e6d2e55ed176 Mon Sep 17 00:00:00 2001 From: "maxim.salamatko" Date: Sat, 30 Jan 2016 13:38:47 +0400 Subject: [PATCH] #1251 added RemoteNodeRestartDeathWatchSpec --- src/core/Akka.Remote.TestKit/Conductor.cs | 6 +- src/core/Akka.Remote.TestKit/MsgEncoder.cs | 1 + src/core/Akka.Remote.TestKit/MultiNodeSpec.cs | 3 +- src/core/Akka.Remote.TestKit/Player.cs | 4 +- .../Akka.Remote.Tests.MultiNode.csproj | 1 + .../RemoteNodeRestartDeathWatchSpec.cs | 154 ++++++++++++++++++ 6 files changed, 162 insertions(+), 7 deletions(-) create mode 100644 src/core/Akka.Remote.Tests.MultiNode/RemoteNodeRestartDeathWatchSpec.cs diff --git a/src/core/Akka.Remote.TestKit/Conductor.cs b/src/core/Akka.Remote.TestKit/Conductor.cs index b05386967f6..635b961181b 100644 --- a/src/core/Akka.Remote.TestKit/Conductor.cs +++ b/src/core/Akka.Remote.TestKit/Conductor.cs @@ -10,6 +10,7 @@ using System.Collections.Generic; using System.Threading.Tasks; using Akka.Actor; +using Akka.Configuration; using Akka.Event; using Akka.Pattern; using Akka.Remote.Transport; @@ -127,10 +128,9 @@ public Task Blackhole(RoleName node, RoleName target, ThrottleTransportAda private void RequireTestConductorTransport() { - //TODO: What is helios equivalent of this? - /*if(!Transport.DefaultAddress.Protocol.Contains(".trttl.gremlin.")) + if(!Transport.DefaultAddress.Protocol.Contains(".trttl.gremlin.")) throw new ConfigurationException("To use this feature you must activate the failure injector adapters " + - "(trttl, gremlin) by specifying `testTransport(on = true)` in your MultiNodeConfig.");*/ + "(trttl, gremlin) by specifying `testTransport(on = true)` in your MultiNodeConfig."); } /// diff --git a/src/core/Akka.Remote.TestKit/MsgEncoder.cs b/src/core/Akka.Remote.TestKit/MsgEncoder.cs index f64f8371483..88925188c7c 100644 --- a/src/core/Akka.Remote.TestKit/MsgEncoder.cs +++ b/src/core/Akka.Remote.TestKit/MsgEncoder.cs @@ -76,6 +76,7 @@ public void Encode(IConnection connection, object message, out List en throttle => w.SetFailure( InjectFailure.CreateBuilder() + .SetFailure(TCP.FailType.Throttle) .SetAddress(Address2Proto(throttle.Target)) .SetDirection(Direction2Proto(throttle.Direction)) .SetRateMBit(throttle.RateMBit))) diff --git a/src/core/Akka.Remote.TestKit/MultiNodeSpec.cs b/src/core/Akka.Remote.TestKit/MultiNodeSpec.cs index cf1c4468b5b..a703778c782 100644 --- a/src/core/Akka.Remote.TestKit/MultiNodeSpec.cs +++ b/src/core/Akka.Remote.TestKit/MultiNodeSpec.cs @@ -126,9 +126,8 @@ internal Config Config { get { - //TODO: Equivalent in Helios? var transportConfig = _testTransport ? - ConfigurationFactory.ParseString("akka.remote.helios.tcp.applied-adapters = []") + ConfigurationFactory.ParseString("akka.remote.helios.tcp.applied-adapters = [trttl, gremlin]") : ConfigurationFactory.Empty; var builder = ImmutableList.CreateBuilder(); diff --git a/src/core/Akka.Remote.TestKit/Player.cs b/src/core/Akka.Remote.TestKit/Player.cs index 919ad42b6c1..c3aa6a1fa1f 100644 --- a/src/core/Akka.Remote.TestKit/Player.cs +++ b/src/core/Akka.Remote.TestKit/Player.cs @@ -14,6 +14,7 @@ using Akka.Event; using Akka.Pattern; using Akka.Remote.Transport; +using Akka.Util; using Helios.Exceptions; using Helios.Net; using Helios.Topology; @@ -439,9 +440,8 @@ public void InitFSM() { ThrottleMode mode; if (throttleMsg.RateMBit < 0.0f) mode = Unthrottled.Instance; - else if (throttleMsg.RateMBit < 0.0f) mode = Blackhole.Instance; + else if (throttleMsg.RateMBit == 0.0f) mode = Blackhole.Instance; else mode = new TokenBucket(1000, throttleMsg.RateMBit*125000, 0, 0); - var cmdTask = TestConductor.Get(Context.System) .Transport.ManagementCommand(new SetThrottle(throttleMsg.Target, throttleMsg.Direction, diff --git a/src/core/Akka.Remote.Tests.MultiNode/Akka.Remote.Tests.MultiNode.csproj b/src/core/Akka.Remote.Tests.MultiNode/Akka.Remote.Tests.MultiNode.csproj index e3ed929fd23..534afbe42ef 100644 --- a/src/core/Akka.Remote.Tests.MultiNode/Akka.Remote.Tests.MultiNode.csproj +++ b/src/core/Akka.Remote.Tests.MultiNode/Akka.Remote.Tests.MultiNode.csproj @@ -65,6 +65,7 @@ + diff --git a/src/core/Akka.Remote.Tests.MultiNode/RemoteNodeRestartDeathWatchSpec.cs b/src/core/Akka.Remote.Tests.MultiNode/RemoteNodeRestartDeathWatchSpec.cs new file mode 100644 index 00000000000..5e2b52d72ce --- /dev/null +++ b/src/core/Akka.Remote.Tests.MultiNode/RemoteNodeRestartDeathWatchSpec.cs @@ -0,0 +1,154 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2016 Typesafe Inc. +// Copyright (C) 2013-2016 Akka.NET project +// +//----------------------------------------------------------------------- + +using System; +using System.Text; +using Akka.Actor; +using Akka.Configuration; +using Akka.Remote.TestKit; +using Akka.Remote.Transport; +using Akka.TestKit; +using Akka.TestKit.Xunit2; +using Akka.Util; +using Akka.Util.Internal; + +namespace Akka.Remote.Tests.MultiNode +{ + public abstract class RemoteNodeRestartDeathWatchSpec : MultiNodeSpec + { + private readonly RemoteNodeRestartDeathWatchSpecConfig _specConfig; + + protected RemoteNodeRestartDeathWatchSpec() + : this(new RemoteNodeRestartDeathWatchSpecConfig()) + { + } + + protected RemoteNodeRestartDeathWatchSpec(RemoteNodeRestartDeathWatchSpecConfig specConfig) + : base(specConfig) + { + _specConfig = specConfig; + } + + protected override int InitialParticipantsValueFactory + { + get { return Roles.Count; } + } + + protected IActorRef Identify(RoleName role, string actorName) + { + Sys.ActorSelection(Node(role)/"user"/actorName).Tell(new Identify(actorName)); + return ExpectMsg().Subject; + } + + + [MultiNodeFact] + public void MustReceiveTerminatedWhenRemoteActorSystemIsRestarted() + { + + RunOn(() => + { + var secondAddress = Node(_specConfig.Second).Address; + EnterBarrier("actors-started"); + + var subject = Identify(_specConfig.Second, "subject"); + Watch(subject); + subject.Tell("hello"); + ExpectMsg("hello"); + EnterBarrier("watch-established"); + + // simulate a hard shutdown, nothing sent from the shutdown node + TestConductor.Blackhole(_specConfig.Second, _specConfig.First, ThrottleTransportAdapter.Direction.Send) + .GetAwaiter() + .GetResult(); + TestConductor.Shutdown(_specConfig.Second).GetAwaiter().GetResult(); + ExpectTerminated(subject, TimeSpan.FromSeconds(20)); + Within(TimeSpan.FromSeconds(10), () => + { + // retry because the Subject actor might not be started yet + AwaitAssert(() => + { + Sys.ActorSelection(new RootActorPath(secondAddress)/"user"/ + "subject").Tell("shutdown"); + ExpectMsg(msg => { return "shutdown-ack" == msg; }, TimeSpan.FromSeconds(1)); + }); + }); + }, _specConfig.First); + + RunOn(() => + { + var addr = Sys.AsInstanceOf().Provider.DefaultAddress; + Sys.ActorOf(Props.Create(() => new Subject()), "subject"); + EnterBarrier("actors-started"); + + EnterBarrier("watch-established"); + Sys.WhenTerminated.Wait(TimeSpan.FromSeconds(30)); + + var sb = new StringBuilder().AppendLine("akka.remote.helios.tcp {").AppendLine("hostname = " + addr.Host) + .AppendLine("port = " + addr.Port) + .AppendLine("}"); + var freshSystem = ActorSystem.Create(Sys.Name, + ConfigurationFactory.ParseString(sb.ToString()).WithFallback(Sys.Settings.Config)); + freshSystem.ActorOf(Props.Create(() => new Subject()), "subject"); + + freshSystem.WhenTerminated.Wait(TimeSpan.FromSeconds(30)); + }, _specConfig.Second); + } + + private class Subject : ActorBase + { + protected override bool Receive(object message) + { + if ("shutdown".Equals(message)) + { + Sender.Tell("shutdown-ack"); + Context.System.Terminate(); + } + else + { + Sender.Tell(message); + } + return true; + } + } + } + + #region Several different variations of the test + + public class RemoteNodeRestartDeathWatchMultiNode1 : RemoteNodeRestartDeathWatchSpec + { + } + + public class RemoteNodeRestartDeathWatchMultiNode2 : RemoteNodeRestartDeathWatchSpec + { + } + + #endregion + + #region Config + + public class RemoteNodeRestartDeathWatchSpecConfig : MultiNodeConfig + { + public RemoteNodeRestartDeathWatchSpecConfig() + { + First = Role("first"); + Second = Role("second"); + + CommonConfig = DebugConfig(false).WithFallback(ConfigurationFactory.ParseString( + @"akka.loglevel = INFO + akka.remote.log-remote-lifecycle-events = off + akka.remote.transport-failure-detector.heartbeat-interval = 1 s + akka.remote.transport-failure-detector.acceptable-heartbeat-pause = 3 s" + )); + TestTransport = true; + } + + public RoleName First { get; } + public RoleName Second { get; } + } + + #endregion +} \ No newline at end of file