Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1251 Port Akka.Remote MultiNodeSpec: RemoteNodeRestartDeathWatchSpec #1600

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/core/Akka.Remote.TestKit/Conductor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using Akka.Event;
using Akka.Pattern;
using Akka.Remote.Transport;
Expand Down Expand Up @@ -127,10 +128,9 @@ public Task<Done> Blackhole(RoleName node, RoleName target, ThrottleTransportAda

private void RequireTestConductorTransport()
{
//TODO: What is helios equivalent of this?
/*if(!Transport.DefaultAddress.Protocol.Contains(".trttl.gremlin."))
if(!Transport.DefaultAddress.Protocol.Contains(".trttl.gremlin."))
throw new ConfigurationException("To use this feature you must activate the failure injector adapters " +
"(trttl, gremlin) by specifying `testTransport(on = true)` in your MultiNodeConfig.");*/
"(trttl, gremlin) by specifying `testTransport(on = true)` in your MultiNodeConfig.");
}

/// <summary>
Expand Down
1 change: 1 addition & 0 deletions src/core/Akka.Remote.TestKit/MsgEncoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public void Encode(IConnection connection, object message, out List<IByteBuf> en
throttle =>
w.SetFailure(
InjectFailure.CreateBuilder()
.SetFailure(TCP.FailType.Throttle)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch

.SetAddress(Address2Proto(throttle.Target))
.SetDirection(Direction2Proto(throttle.Direction))
.SetRateMBit(throttle.RateMBit)))
Expand Down
3 changes: 1 addition & 2 deletions src/core/Akka.Remote.TestKit/MultiNodeSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,8 @@ internal Config Config
{
get
{
//TODO: Equivalent in Helios?
var transportConfig = _testTransport ?
ConfigurationFactory.ParseString("akka.remote.helios.tcp.applied-adapters = []")
ConfigurationFactory.ParseString("akka.remote.helios.tcp.applied-adapters = [trttl, gremlin]")
: ConfigurationFactory.Empty;

var builder = ImmutableList.CreateBuilder<Config>();
Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.Remote.TestKit/Player.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using Akka.Event;
using Akka.Pattern;
using Akka.Remote.Transport;
using Akka.Util;
using Helios.Exceptions;
using Helios.Net;
using Helios.Topology;
Expand Down Expand Up @@ -439,9 +440,8 @@ public void InitFSM()
{
ThrottleMode mode;
if (throttleMsg.RateMBit < 0.0f) mode = Unthrottled.Instance;
else if (throttleMsg.RateMBit < 0.0f) mode = Blackhole.Instance;
else if (throttleMsg.RateMBit == 0.0f) mode = Blackhole.Instance;
else mode = new TokenBucket(1000, throttleMsg.RateMBit*125000, 0, 0);

var cmdTask =
TestConductor.Get(Context.System)
.Transport.ManagementCommand(new SetThrottle(throttleMsg.Target, throttleMsg.Direction,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
<Compile Include="PiercingShouldKeepQuarantineSpec.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="RemoteDeliverySpec.cs" />
<Compile Include="RemoteNodeRestartDeathWatchSpec.cs" />
<Compile Include="RemoteDeploymentDeathWatchSpec.cs" />
<Compile Include="RemoteRoundRobinSpec.cs" />
<Compile Include="RemoteRandomSpec.cs" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
//-----------------------------------------------------------------------
// <copyright file="RemoteWatcherSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2016 Typesafe Inc. <http://www.typesafe.com>
// Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Text;
using Akka.Actor;
using Akka.Configuration;
using Akka.Remote.TestKit;
using Akka.Remote.Transport;
using Akka.TestKit;
using Akka.TestKit.Xunit2;
using Akka.Util;
using Akka.Util.Internal;

namespace Akka.Remote.Tests.MultiNode
{
public abstract class RemoteNodeRestartDeathWatchSpec : MultiNodeSpec
{
private readonly RemoteNodeRestartDeathWatchSpecConfig _specConfig;

protected RemoteNodeRestartDeathWatchSpec()
: this(new RemoteNodeRestartDeathWatchSpecConfig())
{
}

protected RemoteNodeRestartDeathWatchSpec(RemoteNodeRestartDeathWatchSpecConfig specConfig)
: base(specConfig)
{
_specConfig = specConfig;
}

protected override int InitialParticipantsValueFactory
{
get { return Roles.Count; }
}

protected IActorRef Identify(RoleName role, string actorName)
{
Sys.ActorSelection(Node(role)/"user"/actorName).Tell(new Identify(actorName));
return ExpectMsg<ActorIdentity>().Subject;
}


[MultiNodeFact]
public void MustReceiveTerminatedWhenRemoteActorSystemIsRestarted()
{

RunOn(() =>
{
var secondAddress = Node(_specConfig.Second).Address;
EnterBarrier("actors-started");

var subject = Identify(_specConfig.Second, "subject");
Watch(subject);
subject.Tell("hello");
ExpectMsg("hello");
EnterBarrier("watch-established");

// simulate a hard shutdown, nothing sent from the shutdown node
TestConductor.Blackhole(_specConfig.Second, _specConfig.First, ThrottleTransportAdapter.Direction.Send)
.GetAwaiter()
.GetResult();
TestConductor.Shutdown(_specConfig.Second).GetAwaiter().GetResult();
ExpectTerminated(subject, TimeSpan.FromSeconds(20));
Within(TimeSpan.FromSeconds(10), () =>
{
// retry because the Subject actor might not be started yet
AwaitAssert(() =>
{
Sys.ActorSelection(new RootActorPath(secondAddress)/"user"/
"subject").Tell("shutdown");
ExpectMsg<string>(msg => { return "shutdown-ack" == msg; }, TimeSpan.FromSeconds(1));
});
});
}, _specConfig.First);

RunOn(() =>
{
var addr = Sys.AsInstanceOf<ExtendedActorSystem>().Provider.DefaultAddress;
Sys.ActorOf(Props.Create(() => new Subject()), "subject");
EnterBarrier("actors-started");

EnterBarrier("watch-established");
Sys.WhenTerminated.Wait(TimeSpan.FromSeconds(30));

var sb = new StringBuilder().AppendLine("akka.remote.helios.tcp {").AppendLine("hostname = " + addr.Host)
.AppendLine("port = " + addr.Port)
.AppendLine("}");
var freshSystem = ActorSystem.Create(Sys.Name,
ConfigurationFactory.ParseString(sb.ToString()).WithFallback(Sys.Settings.Config));
freshSystem.ActorOf(Props.Create(() => new Subject()), "subject");

freshSystem.WhenTerminated.Wait(TimeSpan.FromSeconds(30));
}, _specConfig.Second);
}

private class Subject : ActorBase
{
protected override bool Receive(object message)
{
if ("shutdown".Equals(message))
{
Sender.Tell("shutdown-ack");
Context.System.Terminate();
}
else
{
Sender.Tell(message);
}
return true;
}
}
}

#region Several different variations of the test

public class RemoteNodeRestartDeathWatchMultiNode1 : RemoteNodeRestartDeathWatchSpec
{
}

public class RemoteNodeRestartDeathWatchMultiNode2 : RemoteNodeRestartDeathWatchSpec
{
}

#endregion

#region Config

public class RemoteNodeRestartDeathWatchSpecConfig : MultiNodeConfig
{
public RemoteNodeRestartDeathWatchSpecConfig()
{
First = Role("first");
Second = Role("second");

CommonConfig = DebugConfig(false).WithFallback(ConfigurationFactory.ParseString(
@"akka.loglevel = INFO
akka.remote.log-remote-lifecycle-events = off
akka.remote.transport-failure-detector.heartbeat-interval = 1 s
akka.remote.transport-failure-detector.acceptable-heartbeat-pause = 3 s"
));
TestTransport = true;
}

public RoleName First { get; }
public RoleName Second { get; }
}

#endregion
}