Skip to content

Commit

Permalink
Implemented LeaderDowningAllOtherNodesSpec
Browse files Browse the repository at this point in the history
  • Loading branch information
alexvaluyskiy committed Jun 20, 2016
1 parent 297ac0e commit 1bae754
Show file tree
Hide file tree
Showing 5 changed files with 279 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
<Compile Include="JoinSeedNodeSpec.cs" />
<Compile Include="LeaderDowningNodeThatIsUnreachableSpec.cs" />
<Compile Include="LeaderElectionSpec.cs" />
<Compile Include="LeaderDowningAllOtherNodesSpec.cs" />
<Compile Include="LeaderLeavingSpec.cs" />
<Compile Include="MembershipChangeListenerExitingSpec.cs" />
<Compile Include="MembershipChangeListenerUpSpec.cs" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
using System;
using System.Collections.Immutable;
using System.Linq;
using Akka.Cluster.TestKit;
using Akka.Configuration;
using Akka.Remote.TestKit;
using Akka.Util.Internal;
using FluentAssertions;

namespace Akka.Cluster.Tests.MultiNode
{
public class LeaderDowningAllOtherNodesConfig : MultiNodeConfig
{
public RoleName First { get; }
public RoleName Second { get; }
public RoleName Third { get; }
public RoleName Fourth { get; }
public RoleName Fifth { get; }
public RoleName Sixth { get; }

public LeaderDowningAllOtherNodesConfig()
{
First = Role("first");
Second = Role("second");
Third = Role("third");
Fourth = Role("fourth");
Fifth = Role("fifth");
Sixth = Role("sixth");

CommonConfig = DebugConfig(false)
.WithFallback(ConfigurationFactory.ParseString(@"
akka.cluster.failure-detector.monitored-by-nr-of-members = 2
akka.cluster.auto-down-unreachable-after = 1s"))
.WithFallback(MultiNodeClusterSpec.ClusterConfigWithFailureDetectorPuppet());
}
}

public class LeaderDowningAllOtherNodesSpecNode1 : LeaderDowningAllOtherNodesSpec { }
public class LeaderDowningAllOtherNodesSpecNode2 : LeaderDowningAllOtherNodesSpec { }
public class LeaderDowningAllOtherNodesSpecNode3 : LeaderDowningAllOtherNodesSpec { }
public class LeaderDowningAllOtherNodesSpecNode4 : LeaderDowningAllOtherNodesSpec { }
public class LeaderDowningAllOtherNodesSpecNode5 : LeaderDowningAllOtherNodesSpec { }
public class LeaderDowningAllOtherNodesSpecNode6 : LeaderDowningAllOtherNodesSpec { }

public abstract class LeaderDowningAllOtherNodesSpec : MultiNodeClusterSpec
{
private readonly LeaderDowningAllOtherNodesConfig _config;

protected LeaderDowningAllOtherNodesSpec() : this(new LeaderDowningAllOtherNodesConfig())
{
}

protected LeaderDowningAllOtherNodesSpec(LeaderDowningAllOtherNodesConfig config) : base(config)
{
_config = config;
}

[MultiNodeFact]
public void LeaderDowningAllOtherNodesSpecs()
{
A_Cluster_of_6_nodes_with_monitored_by_nr_of_members_2_must_setup();
A_Cluster_of_6_nodes_with_monitored_by_nr_of_members_2_must_remove_all_shutdown_nodes();
}

public void A_Cluster_of_6_nodes_with_monitored_by_nr_of_members_2_must_setup()
{
AwaitClusterUp(Roles.ToArray());
EnterBarrier("after-1");
}

public void A_Cluster_of_6_nodes_with_monitored_by_nr_of_members_2_must_remove_all_shutdown_nodes()
{
var others = Roles.Drop(1).ToList();
var shutdownAddresses = others.Select(c => GetAddress(c)).ToImmutableHashSet();
EnterBarrier("before-all-other-shutdown");

RunOn(() =>
{
foreach (var node in others)
{
TestConductor.Exit(node, 0).Wait();
}
}, _config.First);
EnterBarrier("all-other-shutdown");
AwaitMembersUp(numbersOfMembers: 1, canNotBePartOfMemberRing: shutdownAddresses, timeout: 30.Seconds());
}
}
}
121 changes: 71 additions & 50 deletions src/core/Akka.Cluster.Tests/ClusterHeartBeatSenderStateSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using Akka.Util;
using Akka.Util.Internal;
using Xunit;
using FluentAssertions;

namespace Akka.Cluster.Tests
{
Expand Down Expand Up @@ -91,60 +92,80 @@ private FailureDetectorStub Fd(ClusterHeartbeatSenderState state, UniqueAddress
.AsInstanceOf<FailureDetectorStub>();
}

#region Tests

[Fact]
public void ClusterHeartbeatSenderState_must_return_empty_active_set_when_no_nodes()
{
_emptyState.ActiveReceivers.IsEmpty.ShouldBeTrue();
_emptyState
.ActiveReceivers.IsEmpty.Should().BeTrue();
}

[Fact]
public void ClusterHeartbeatSenderState_must_init_with_empty()
{
_emptyState.Init(ImmutableHashSet.Create<UniqueAddress>()).ActiveReceivers.IsEmpty.ShouldBeTrue();
_emptyState.Init(ImmutableHashSet<UniqueAddress>.Empty, ImmutableSortedSet<UniqueAddress>.Empty)
.ActiveReceivers.IsEmpty.Should().BeTrue();
}

[Fact]
public void ClusterHeartbeatSenderState_must_init_with_self()
{
_emptyState.Init(ImmutableHashSet.Create<UniqueAddress>(aa, bb, cc)).ActiveReceivers.ShouldBe(ImmutableHashSet.Create<UniqueAddress>(bb, cc));
_emptyState.Init(ImmutableHashSet.Create(aa, bb, cc), ImmutableSortedSet<UniqueAddress>.Empty)
.ActiveReceivers.Should().BeEquivalentTo(ImmutableHashSet.Create(bb, cc));
}

[Fact]
public void ClusterHeartbeatSenderState_must_init_without_self()
{
_emptyState.Init(ImmutableHashSet.Create<UniqueAddress>(bb, cc)).ActiveReceivers.ShouldBe(ImmutableHashSet.Create<UniqueAddress>(bb, cc));
_emptyState.Init(ImmutableHashSet.Create(bb, cc), ImmutableSortedSet<UniqueAddress>.Empty)
.ActiveReceivers.Should().BeEquivalentTo(ImmutableHashSet.Create(bb, cc));
}

[Fact]
public void ClusterHeartbeatSenderState_must_use_add_members()
public void ClusterHeartbeatSenderState_must_use_added_members()
{
_emptyState.AddMember(bb).AddMember(cc).ActiveReceivers.ShouldBe(ImmutableHashSet.Create<UniqueAddress>(bb, cc));
_emptyState.AddMember(bb).AddMember(cc)
.ActiveReceivers.Should().BeEquivalentTo(ImmutableHashSet.Create(bb, cc));
}

[Fact]
public void ClusterHeartbeatSenderState_must_use_added_members_also_when_unreachable()
{
_emptyState.AddMember(bb).AddMember(cc).UnreachableMember(bb)
.ActiveReceivers.Should().BeEquivalentTo(ImmutableHashSet.Create(bb, cc));
}

[Fact]
public void ClusterHeartbeatSenderState_must_not_use_removed_members()
{
_emptyState.AddMember(bb).AddMember(cc).RemoveMember(bb).ActiveReceivers.ShouldBe(ImmutableHashSet.Create<UniqueAddress>(cc));
_emptyState.AddMember(bb).AddMember(cc).RemoveMember(bb)
.ActiveReceivers.Should().BeEquivalentTo(ImmutableHashSet.Create(cc));
}

[Fact]
public void ClusterHeartbeatSenderState_must_use_specified_number_of_members()
{
// they are sorted by the hash (UID) of the UniqueAddress
_emptyState.AddMember(cc).AddMember(dd).AddMember(bb).AddMember(ee).ActiveReceivers.ShouldBe(ImmutableHashSet.Create<UniqueAddress>(bb,cc,dd));
_emptyState.AddMember(cc).AddMember(dd).AddMember(bb).AddMember(ee)
.ActiveReceivers.Should().BeEquivalentTo(ImmutableHashSet.Create(bb, cc, dd));
}

[Fact]
public void ClusterHeartbeatSenderState_must_use_specified_number_of_members_unreachable()
{
// they are sorted by the hash (UID) of the UniqueAddress
_emptyState.AddMember(cc).AddMember(dd).AddMember(bb).AddMember(ee).UnreachableMember(cc)
.ActiveReceivers.Should().BeEquivalentTo(ImmutableHashSet.Create(bb, cc, dd, ee));
}

[Fact]
public void ClusterHeartbeatSenderState_must_update_FailureDetector_in_active_set()
{
var s1 = _emptyState.AddMember(bb).AddMember(cc).AddMember(dd);
var s2 = s1.HeartbeatRsp(bb).HeartbeatRsp(cc).HeartbeatRsp(dd).HeartbeatRsp(ee);
s2.FailureDetector.IsMonitoring(bb.Address).ShouldBeTrue();
s2.FailureDetector.IsMonitoring(cc.Address).ShouldBeTrue();
s2.FailureDetector.IsMonitoring(dd.Address).ShouldBeTrue();
s2.FailureDetector.IsMonitoring(ee.Address).ShouldBeFalse("Never added (ee) to active set, so we should not be monitoring it even if we did receive HeartbeatRsp from it");
s2.FailureDetector.IsMonitoring(bb.Address).Should().BeTrue();
s2.FailureDetector.IsMonitoring(cc.Address).Should().BeTrue();
s2.FailureDetector.IsMonitoring(dd.Address).Should().BeTrue();
s2.FailureDetector.IsMonitoring(ee.Address).Should().BeFalse("Never added (ee) to active set, so we should not be monitoring it even if we did receive HeartbeatRsp from it");
}

[Fact]
Expand All @@ -153,8 +174,8 @@ public void ClusterHeartbeatSenderState_must_continue_to_use_Unreachable()
var s1 = _emptyState.AddMember(cc).AddMember(dd).AddMember(ee);
var s2 = s1.HeartbeatRsp(cc).HeartbeatRsp(dd).HeartbeatRsp(ee);
Fd(s2, ee).MarkNodeAsUnavailable();
s2.FailureDetector.IsAvailable(ee.Address).ShouldBeFalse();
s2.AddMember(bb).ActiveReceivers.ShouldBe(ImmutableHashSet.Create(bb,cc,dd,ee));
s2.FailureDetector.IsAvailable(ee.Address).Should().BeFalse();
s2.AddMember(bb).ActiveReceivers.Should().BeEquivalentTo(ImmutableHashSet.Create(bb, cc, dd, ee));
}

[Fact]
Expand All @@ -165,10 +186,10 @@ public void ClusterHeartbeatSenderState_must_remove_unreachable_when_coming_back
Fd(s2,dd).MarkNodeAsUnavailable();
Fd(s2,ee).MarkNodeAsUnavailable();
var s3 = s2.AddMember(bb);
s3.ActiveReceivers.ShouldBe(ImmutableHashSet.Create(bb,cc,dd,ee));
s3.ActiveReceivers.Should().BeEquivalentTo(ImmutableHashSet.Create(bb, cc, dd, ee));
var s4 = s3.HeartbeatRsp(cc).HeartbeatRsp(dd).HeartbeatRsp(ee);
s4.ActiveReceivers.ShouldBe(ImmutableHashSet.Create(bb,cc,dd));
s4.FailureDetector.IsMonitoring(ee.Address).ShouldBeFalse();
s4.ActiveReceivers.Should().BeEquivalentTo(ImmutableHashSet.Create(bb, cc, dd));
s4.FailureDetector.IsMonitoring(ee.Address).Should().BeFalse();
}

[Fact]
Expand All @@ -179,19 +200,18 @@ public void ClusterHeartbeatSenderState_must_remove_unreachable_member_when_remo
Fd(s2, cc).MarkNodeAsUnavailable();
Fd(s2, ee).MarkNodeAsUnavailable();
var s3 = s2.AddMember(bb).HeartbeatRsp(bb);
s3.ActiveReceivers.ShouldBe(ImmutableHashSet.Create(bb,cc,dd,ee));
s3.ActiveReceivers.Should().BeEquivalentTo(ImmutableHashSet.Create(bb, cc, dd, ee));
var s4 = s3.RemoveMember(cc).RemoveMember(ee);
s4.ActiveReceivers.ShouldBe(ImmutableHashSet.Create(bb,dd));
s4.FailureDetector.IsMonitoring(cc.Address).ShouldBeFalse();
s4.FailureDetector.IsMonitoring(ee.Address).ShouldBeFalse();
s4.ActiveReceivers.Should().BeEquivalentTo(ImmutableHashSet.Create(bb, dd));
s4.FailureDetector.IsMonitoring(cc.Address).Should().BeFalse();
s4.FailureDetector.IsMonitoring(ee.Address).Should().BeFalse();
}

[Fact]
public void ClusterHeartbeatSenderState_must_behave_correctly_for_random_operations()
{
var rnd = ThreadLocalRandom.Current;
var nodes =
Enumerable.Range(1, rnd.Next(10, 200))
var nodes = Enumerable.Range(1, rnd.Next(10, 200))
.Select(n => new UniqueAddress(new Address("akka.tcp", "sys", "n" + n, 2552), n))
.ToList();
Func<UniqueAddress> rndNode = () => nodes[rnd.Next(0, nodes.Count)];
Expand All @@ -212,72 +232,73 @@ public void ClusterHeartbeatSenderState_must_behave_correctly_for_random_operati
case Add:
if (node != selfUniqueAddress && !state.Ring.NodeRing.Contains(node))
{
var oldUnreachable = state.Unreachable;
var oldUnreachable = state.OldReceiversNowUnreachable;
state = state.AddMember(node);
//keep unreachable
(oldUnreachable.Except(state.ActiveReceivers)).ShouldBe(ImmutableHashSet.Create<UniqueAddress>());
state.FailureDetector.IsMonitoring(node.Address).ShouldBeFalse();
state.FailureDetector.IsAvailable(node.Address).ShouldBeTrue();
oldUnreachable.Except(state.ActiveReceivers).Should().BeEquivalentTo(ImmutableHashSet<UniqueAddress>.Empty);
state.FailureDetector.IsMonitoring(node.Address).Should().BeFalse();
state.FailureDetector.IsAvailable(node.Address).Should().BeTrue();
}
break;
case Remove:
if (node != selfUniqueAddress && state.Ring.NodeRing.Contains(node))
{
var oldUnreachable = state.Unreachable;
var oldUnreachable = state.OldReceiversNowUnreachable;
state = state.RemoveMember(node);
// keep unreachable, unless it was the removed
if(oldUnreachable.Contains(node))
oldUnreachable.Except(state.ActiveReceivers).ShouldBe(ImmutableHashSet.Create(node));
if (oldUnreachable.Contains(node))
oldUnreachable.Except(state.ActiveReceivers).Should().BeEquivalentTo(ImmutableHashSet.Create(node));
else
(oldUnreachable.Except(state.ActiveReceivers)).ShouldBe(ImmutableHashSet.Create<UniqueAddress>());
oldUnreachable.Except(state.ActiveReceivers).Should().BeEquivalentTo(ImmutableHashSet<UniqueAddress>.Empty);

state.FailureDetector.IsMonitoring(node.Address).ShouldBeFalse();
state.FailureDetector.IsAvailable(node.Address).ShouldBeTrue();
Assert.False(state.ActiveReceivers.Any(x => x == node));
state.FailureDetector.IsMonitoring(node.Address).Should().BeFalse();
state.FailureDetector.IsAvailable(node.Address).Should().BeTrue();
state.ActiveReceivers.Should().NotContain(node);
}
break;
case Unreachable:
if (node != selfUniqueAddress && state.ActiveReceivers.Contains(node))
{
state.FailureDetector.Heartbeat(node.Address); //make sure the FD is created
Fd(state, node).MarkNodeAsUnavailable();
state.FailureDetector.IsMonitoring(node.Address).ShouldBeTrue();
state.FailureDetector.IsAvailable(node.Address).ShouldBeFalse();
state.FailureDetector.IsMonitoring(node.Address).Should().BeTrue();
state.FailureDetector.IsAvailable(node.Address).Should().BeFalse();
state = state.UnreachableMember(node);
}
break;
case HeartbeatRsp:
if (node != selfUniqueAddress && state.Ring.NodeRing.Contains(node))
{
var oldUnreachable = state.Unreachable;
var oldUnreachable = state.OldReceiversNowUnreachable;
var oldReceivers = state.ActiveReceivers;
var oldRingReceivers = state.Ring.MyReceivers.Value;
state = state.HeartbeatRsp(node);

if(oldUnreachable.Contains(node))
Assert.False(state.Unreachable.Contains(node));
if(oldUnreachable.Contains(node) && !oldRingReceivers.Contains(node))
state.FailureDetector.IsMonitoring(node.Address).ShouldBeFalse();
if(oldRingReceivers.Contains(node))
state.FailureDetector.IsMonitoring(node.Address).ShouldBeTrue();
if (oldUnreachable.Contains(node))
state.OldReceiversNowUnreachable.Should().NotContain(node);

if (oldUnreachable.Contains(node) && !oldRingReceivers.Contains(node))
state.FailureDetector.IsMonitoring(node.Address).Should().BeFalse();

state.Ring.MyReceivers.Value.ShouldBe(oldRingReceivers);
state.FailureDetector.IsAvailable(node.Address).ShouldBeTrue();
if (oldRingReceivers.Contains(node))
state.FailureDetector.IsMonitoring(node.Address).Should().BeTrue();

state.Ring.MyReceivers.Value.Should().BeEquivalentTo(oldRingReceivers);
state.FailureDetector.IsAvailable(node.Address).Should().BeTrue();
}
break;
}
}
catch (Exception)
{
Debug.WriteLine("Failure context: i = {0}, node = {1}, op={2}, unreachable={3}, ringReceivers={4}, ringNodes={5}", i, node, operation,
string.Join(",",state.Unreachable),
string.Join(",",state.OldReceiversNowUnreachable),
string.Join(",", state.Ring.MyReceivers.Value),
string.Join(",", state.Ring.NodeRing));
throw;
}
}
}

#endregion
}
}

Loading

0 comments on commit 1bae754

Please sign in to comment.