diff --git a/src/core/Akka.Cluster.Tests.MultiNode/Akka.Cluster.Tests.MultiNode.csproj b/src/core/Akka.Cluster.Tests.MultiNode/Akka.Cluster.Tests.MultiNode.csproj
index d958d1276af..5f880f33c5c 100644
--- a/src/core/Akka.Cluster.Tests.MultiNode/Akka.Cluster.Tests.MultiNode.csproj
+++ b/src/core/Akka.Cluster.Tests.MultiNode/Akka.Cluster.Tests.MultiNode.csproj
@@ -80,6 +80,7 @@
+
diff --git a/src/core/Akka.Cluster.Tests.MultiNode/LeaderDowningAllOtherNodesSpec.cs b/src/core/Akka.Cluster.Tests.MultiNode/LeaderDowningAllOtherNodesSpec.cs
new file mode 100644
index 00000000000..8c25091a659
--- /dev/null
+++ b/src/core/Akka.Cluster.Tests.MultiNode/LeaderDowningAllOtherNodesSpec.cs
@@ -0,0 +1,88 @@
+using System;
+using System.Collections.Immutable;
+using System.Linq;
+using Akka.Cluster.TestKit;
+using Akka.Configuration;
+using Akka.Remote.TestKit;
+using Akka.Util.Internal;
+using FluentAssertions;
+
+namespace Akka.Cluster.Tests.MultiNode
+{
+ public class LeaderDowningAllOtherNodesConfig : MultiNodeConfig
+ {
+ public RoleName First { get; }
+ public RoleName Second { get; }
+ public RoleName Third { get; }
+ public RoleName Fourth { get; }
+ public RoleName Fifth { get; }
+ public RoleName Sixth { get; }
+
+ public LeaderDowningAllOtherNodesConfig()
+ {
+ First = Role("first");
+ Second = Role("second");
+ Third = Role("third");
+ Fourth = Role("fourth");
+ Fifth = Role("fifth");
+ Sixth = Role("sixth");
+
+ CommonConfig = DebugConfig(false)
+ .WithFallback(ConfigurationFactory.ParseString(@"
+ akka.cluster.failure-detector.monitored-by-nr-of-members = 2
+ akka.cluster.auto-down-unreachable-after = 1s"))
+ .WithFallback(MultiNodeClusterSpec.ClusterConfigWithFailureDetectorPuppet());
+ }
+ }
+
+ public class LeaderDowningAllOtherNodesSpecNode1 : LeaderDowningAllOtherNodesSpec { }
+ public class LeaderDowningAllOtherNodesSpecNode2 : LeaderDowningAllOtherNodesSpec { }
+ public class LeaderDowningAllOtherNodesSpecNode3 : LeaderDowningAllOtherNodesSpec { }
+ public class LeaderDowningAllOtherNodesSpecNode4 : LeaderDowningAllOtherNodesSpec { }
+ public class LeaderDowningAllOtherNodesSpecNode5 : LeaderDowningAllOtherNodesSpec { }
+ public class LeaderDowningAllOtherNodesSpecNode6 : LeaderDowningAllOtherNodesSpec { }
+
+ public abstract class LeaderDowningAllOtherNodesSpec : MultiNodeClusterSpec
+ {
+ private readonly LeaderDowningAllOtherNodesConfig _config;
+
+ protected LeaderDowningAllOtherNodesSpec() : this(new LeaderDowningAllOtherNodesConfig())
+ {
+ }
+
+ protected LeaderDowningAllOtherNodesSpec(LeaderDowningAllOtherNodesConfig config) : base(config)
+ {
+ _config = config;
+ }
+
+ [MultiNodeFact]
+ public void LeaderDowningAllOtherNodesSpecs()
+ {
+ A_Cluster_of_6_nodes_with_monitored_by_nr_of_members_2_must_setup();
+ A_Cluster_of_6_nodes_with_monitored_by_nr_of_members_2_must_remove_all_shutdown_nodes();
+ }
+
+ public void A_Cluster_of_6_nodes_with_monitored_by_nr_of_members_2_must_setup()
+ {
+ AwaitClusterUp(Roles.ToArray());
+ EnterBarrier("after-1");
+ }
+
+ public void A_Cluster_of_6_nodes_with_monitored_by_nr_of_members_2_must_remove_all_shutdown_nodes()
+ {
+ var others = Roles.Drop(1).ToList();
+ var shutdownAddresses = others.Select(c => GetAddress(c)).ToImmutableHashSet();
+ EnterBarrier("before-all-other-shutdown");
+
+ RunOn(() =>
+ {
+ foreach (var node in others)
+ {
+ TestConductor.Exit(node, 0).Wait();
+ }
+ }, _config.First);
+ EnterBarrier("all-other-shutdown");
+ AwaitMembersUp(numbersOfMembers: 1, canNotBePartOfMemberRing: shutdownAddresses, timeout: 30.Seconds());
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/core/Akka.Cluster.Tests/ClusterHeartBeatSenderStateSpec.cs b/src/core/Akka.Cluster.Tests/ClusterHeartBeatSenderStateSpec.cs
index 8d50ea29ae7..60dcc41614a 100644
--- a/src/core/Akka.Cluster.Tests/ClusterHeartBeatSenderStateSpec.cs
+++ b/src/core/Akka.Cluster.Tests/ClusterHeartBeatSenderStateSpec.cs
@@ -15,6 +15,7 @@
using Akka.Util;
using Akka.Util.Internal;
using Xunit;
+using FluentAssertions;
namespace Akka.Cluster.Tests
{
@@ -91,49 +92,69 @@ private FailureDetectorStub Fd(ClusterHeartbeatSenderState state, UniqueAddress
.AsInstanceOf();
}
- #region Tests
-
[Fact]
public void ClusterHeartbeatSenderState_must_return_empty_active_set_when_no_nodes()
{
- _emptyState.ActiveReceivers.IsEmpty.ShouldBeTrue();
+ _emptyState
+ .ActiveReceivers.IsEmpty.Should().BeTrue();
}
[Fact]
public void ClusterHeartbeatSenderState_must_init_with_empty()
{
- _emptyState.Init(ImmutableHashSet.Create()).ActiveReceivers.IsEmpty.ShouldBeTrue();
+ _emptyState.Init(ImmutableHashSet.Empty, ImmutableSortedSet.Empty)
+ .ActiveReceivers.IsEmpty.Should().BeTrue();
}
[Fact]
public void ClusterHeartbeatSenderState_must_init_with_self()
{
- _emptyState.Init(ImmutableHashSet.Create(aa, bb, cc)).ActiveReceivers.ShouldBe(ImmutableHashSet.Create(bb, cc));
+ _emptyState.Init(ImmutableHashSet.Create(aa, bb, cc), ImmutableSortedSet.Empty)
+ .ActiveReceivers.Should().BeEquivalentTo(ImmutableHashSet.Create(bb, cc));
}
[Fact]
public void ClusterHeartbeatSenderState_must_init_without_self()
{
- _emptyState.Init(ImmutableHashSet.Create(bb, cc)).ActiveReceivers.ShouldBe(ImmutableHashSet.Create(bb, cc));
+ _emptyState.Init(ImmutableHashSet.Create(bb, cc), ImmutableSortedSet.Empty)
+ .ActiveReceivers.Should().BeEquivalentTo(ImmutableHashSet.Create(bb, cc));
}
[Fact]
- public void ClusterHeartbeatSenderState_must_use_add_members()
+ public void ClusterHeartbeatSenderState_must_use_added_members()
{
- _emptyState.AddMember(bb).AddMember(cc).ActiveReceivers.ShouldBe(ImmutableHashSet.Create(bb, cc));
+ _emptyState.AddMember(bb).AddMember(cc)
+ .ActiveReceivers.Should().BeEquivalentTo(ImmutableHashSet.Create(bb, cc));
+ }
+
+ [Fact]
+ public void ClusterHeartbeatSenderState_must_use_added_members_also_when_unreachable()
+ {
+ _emptyState.AddMember(bb).AddMember(cc).UnreachableMember(bb)
+ .ActiveReceivers.Should().BeEquivalentTo(ImmutableHashSet.Create(bb, cc));
}
[Fact]
public void ClusterHeartbeatSenderState_must_not_use_removed_members()
{
- _emptyState.AddMember(bb).AddMember(cc).RemoveMember(bb).ActiveReceivers.ShouldBe(ImmutableHashSet.Create(cc));
+ _emptyState.AddMember(bb).AddMember(cc).RemoveMember(bb)
+ .ActiveReceivers.Should().BeEquivalentTo(ImmutableHashSet.Create(cc));
}
[Fact]
public void ClusterHeartbeatSenderState_must_use_specified_number_of_members()
{
// they are sorted by the hash (UID) of the UniqueAddress
- _emptyState.AddMember(cc).AddMember(dd).AddMember(bb).AddMember(ee).ActiveReceivers.ShouldBe(ImmutableHashSet.Create(bb,cc,dd));
+ _emptyState.AddMember(cc).AddMember(dd).AddMember(bb).AddMember(ee)
+ .ActiveReceivers.Should().BeEquivalentTo(ImmutableHashSet.Create(bb, cc, dd));
+ }
+
+ [Fact]
+ public void ClusterHeartbeatSenderState_must_use_specified_number_of_members_unreachable()
+ {
+ // they are sorted by the hash (UID) of the UniqueAddress
+ _emptyState.AddMember(cc).AddMember(dd).AddMember(bb).AddMember(ee).UnreachableMember(cc)
+ .ActiveReceivers.Should().BeEquivalentTo(ImmutableHashSet.Create(bb, cc, dd, ee));
}
[Fact]
@@ -141,10 +162,10 @@ public void ClusterHeartbeatSenderState_must_update_FailureDetector_in_active_se
{
var s1 = _emptyState.AddMember(bb).AddMember(cc).AddMember(dd);
var s2 = s1.HeartbeatRsp(bb).HeartbeatRsp(cc).HeartbeatRsp(dd).HeartbeatRsp(ee);
- s2.FailureDetector.IsMonitoring(bb.Address).ShouldBeTrue();
- s2.FailureDetector.IsMonitoring(cc.Address).ShouldBeTrue();
- s2.FailureDetector.IsMonitoring(dd.Address).ShouldBeTrue();
- s2.FailureDetector.IsMonitoring(ee.Address).ShouldBeFalse("Never added (ee) to active set, so we should not be monitoring it even if we did receive HeartbeatRsp from it");
+ s2.FailureDetector.IsMonitoring(bb.Address).Should().BeTrue();
+ s2.FailureDetector.IsMonitoring(cc.Address).Should().BeTrue();
+ s2.FailureDetector.IsMonitoring(dd.Address).Should().BeTrue();
+ s2.FailureDetector.IsMonitoring(ee.Address).Should().BeFalse("Never added (ee) to active set, so we should not be monitoring it even if we did receive HeartbeatRsp from it");
}
[Fact]
@@ -153,8 +174,8 @@ public void ClusterHeartbeatSenderState_must_continue_to_use_Unreachable()
var s1 = _emptyState.AddMember(cc).AddMember(dd).AddMember(ee);
var s2 = s1.HeartbeatRsp(cc).HeartbeatRsp(dd).HeartbeatRsp(ee);
Fd(s2, ee).MarkNodeAsUnavailable();
- s2.FailureDetector.IsAvailable(ee.Address).ShouldBeFalse();
- s2.AddMember(bb).ActiveReceivers.ShouldBe(ImmutableHashSet.Create(bb,cc,dd,ee));
+ s2.FailureDetector.IsAvailable(ee.Address).Should().BeFalse();
+ s2.AddMember(bb).ActiveReceivers.Should().BeEquivalentTo(ImmutableHashSet.Create(bb, cc, dd, ee));
}
[Fact]
@@ -165,10 +186,10 @@ public void ClusterHeartbeatSenderState_must_remove_unreachable_when_coming_back
Fd(s2,dd).MarkNodeAsUnavailable();
Fd(s2,ee).MarkNodeAsUnavailable();
var s3 = s2.AddMember(bb);
- s3.ActiveReceivers.ShouldBe(ImmutableHashSet.Create(bb,cc,dd,ee));
+ s3.ActiveReceivers.Should().BeEquivalentTo(ImmutableHashSet.Create(bb, cc, dd, ee));
var s4 = s3.HeartbeatRsp(cc).HeartbeatRsp(dd).HeartbeatRsp(ee);
- s4.ActiveReceivers.ShouldBe(ImmutableHashSet.Create(bb,cc,dd));
- s4.FailureDetector.IsMonitoring(ee.Address).ShouldBeFalse();
+ s4.ActiveReceivers.Should().BeEquivalentTo(ImmutableHashSet.Create(bb, cc, dd));
+ s4.FailureDetector.IsMonitoring(ee.Address).Should().BeFalse();
}
[Fact]
@@ -179,19 +200,18 @@ public void ClusterHeartbeatSenderState_must_remove_unreachable_member_when_remo
Fd(s2, cc).MarkNodeAsUnavailable();
Fd(s2, ee).MarkNodeAsUnavailable();
var s3 = s2.AddMember(bb).HeartbeatRsp(bb);
- s3.ActiveReceivers.ShouldBe(ImmutableHashSet.Create(bb,cc,dd,ee));
+ s3.ActiveReceivers.Should().BeEquivalentTo(ImmutableHashSet.Create(bb, cc, dd, ee));
var s4 = s3.RemoveMember(cc).RemoveMember(ee);
- s4.ActiveReceivers.ShouldBe(ImmutableHashSet.Create(bb,dd));
- s4.FailureDetector.IsMonitoring(cc.Address).ShouldBeFalse();
- s4.FailureDetector.IsMonitoring(ee.Address).ShouldBeFalse();
+ s4.ActiveReceivers.Should().BeEquivalentTo(ImmutableHashSet.Create(bb, dd));
+ s4.FailureDetector.IsMonitoring(cc.Address).Should().BeFalse();
+ s4.FailureDetector.IsMonitoring(ee.Address).Should().BeFalse();
}
[Fact]
public void ClusterHeartbeatSenderState_must_behave_correctly_for_random_operations()
{
var rnd = ThreadLocalRandom.Current;
- var nodes =
- Enumerable.Range(1, rnd.Next(10, 200))
+ var nodes = Enumerable.Range(1, rnd.Next(10, 200))
.Select(n => new UniqueAddress(new Address("akka.tcp", "sys", "n" + n, 2552), n))
.ToList();
Func rndNode = () => nodes[rnd.Next(0, nodes.Count)];
@@ -212,28 +232,28 @@ public void ClusterHeartbeatSenderState_must_behave_correctly_for_random_operati
case Add:
if (node != selfUniqueAddress && !state.Ring.NodeRing.Contains(node))
{
- var oldUnreachable = state.Unreachable;
+ var oldUnreachable = state.OldReceiversNowUnreachable;
state = state.AddMember(node);
//keep unreachable
- (oldUnreachable.Except(state.ActiveReceivers)).ShouldBe(ImmutableHashSet.Create());
- state.FailureDetector.IsMonitoring(node.Address).ShouldBeFalse();
- state.FailureDetector.IsAvailable(node.Address).ShouldBeTrue();
+ oldUnreachable.Except(state.ActiveReceivers).Should().BeEquivalentTo(ImmutableHashSet.Empty);
+ state.FailureDetector.IsMonitoring(node.Address).Should().BeFalse();
+ state.FailureDetector.IsAvailable(node.Address).Should().BeTrue();
}
break;
case Remove:
if (node != selfUniqueAddress && state.Ring.NodeRing.Contains(node))
{
- var oldUnreachable = state.Unreachable;
+ var oldUnreachable = state.OldReceiversNowUnreachable;
state = state.RemoveMember(node);
// keep unreachable, unless it was the removed
- if(oldUnreachable.Contains(node))
- oldUnreachable.Except(state.ActiveReceivers).ShouldBe(ImmutableHashSet.Create(node));
+ if (oldUnreachable.Contains(node))
+ oldUnreachable.Except(state.ActiveReceivers).Should().BeEquivalentTo(ImmutableHashSet.Create(node));
else
- (oldUnreachable.Except(state.ActiveReceivers)).ShouldBe(ImmutableHashSet.Create());
+ oldUnreachable.Except(state.ActiveReceivers).Should().BeEquivalentTo(ImmutableHashSet.Empty);
- state.FailureDetector.IsMonitoring(node.Address).ShouldBeFalse();
- state.FailureDetector.IsAvailable(node.Address).ShouldBeTrue();
- Assert.False(state.ActiveReceivers.Any(x => x == node));
+ state.FailureDetector.IsMonitoring(node.Address).Should().BeFalse();
+ state.FailureDetector.IsAvailable(node.Address).Should().BeTrue();
+ state.ActiveReceivers.Should().NotContain(node);
}
break;
case Unreachable:
@@ -241,27 +261,30 @@ public void ClusterHeartbeatSenderState_must_behave_correctly_for_random_operati
{
state.FailureDetector.Heartbeat(node.Address); //make sure the FD is created
Fd(state, node).MarkNodeAsUnavailable();
- state.FailureDetector.IsMonitoring(node.Address).ShouldBeTrue();
- state.FailureDetector.IsAvailable(node.Address).ShouldBeFalse();
+ state.FailureDetector.IsMonitoring(node.Address).Should().BeTrue();
+ state.FailureDetector.IsAvailable(node.Address).Should().BeFalse();
+ state = state.UnreachableMember(node);
}
break;
case HeartbeatRsp:
if (node != selfUniqueAddress && state.Ring.NodeRing.Contains(node))
{
- var oldUnreachable = state.Unreachable;
+ var oldUnreachable = state.OldReceiversNowUnreachable;
var oldReceivers = state.ActiveReceivers;
var oldRingReceivers = state.Ring.MyReceivers.Value;
state = state.HeartbeatRsp(node);
- if(oldUnreachable.Contains(node))
- Assert.False(state.Unreachable.Contains(node));
- if(oldUnreachable.Contains(node) && !oldRingReceivers.Contains(node))
- state.FailureDetector.IsMonitoring(node.Address).ShouldBeFalse();
- if(oldRingReceivers.Contains(node))
- state.FailureDetector.IsMonitoring(node.Address).ShouldBeTrue();
+ if (oldUnreachable.Contains(node))
+ state.OldReceiversNowUnreachable.Should().NotContain(node);
+
+ if (oldUnreachable.Contains(node) && !oldRingReceivers.Contains(node))
+ state.FailureDetector.IsMonitoring(node.Address).Should().BeFalse();
- state.Ring.MyReceivers.Value.ShouldBe(oldRingReceivers);
- state.FailureDetector.IsAvailable(node.Address).ShouldBeTrue();
+ if (oldRingReceivers.Contains(node))
+ state.FailureDetector.IsMonitoring(node.Address).Should().BeTrue();
+
+ state.Ring.MyReceivers.Value.Should().BeEquivalentTo(oldRingReceivers);
+ state.FailureDetector.IsAvailable(node.Address).Should().BeTrue();
}
break;
}
@@ -269,15 +292,13 @@ public void ClusterHeartbeatSenderState_must_behave_correctly_for_random_operati
catch (Exception)
{
Debug.WriteLine("Failure context: i = {0}, node = {1}, op={2}, unreachable={3}, ringReceivers={4}, ringNodes={5}", i, node, operation,
- string.Join(",",state.Unreachable),
+ string.Join(",",state.OldReceiversNowUnreachable),
string.Join(",", state.Ring.MyReceivers.Value),
string.Join(",", state.Ring.NodeRing));
throw;
}
}
}
-
- #endregion
}
}
diff --git a/src/core/Akka.Cluster.Tests/HeartbeatNodeRingSpec.cs b/src/core/Akka.Cluster.Tests/HeartbeatNodeRingSpec.cs
index 36130094dd3..4846c72e796 100644
--- a/src/core/Akka.Cluster.Tests/HeartbeatNodeRingSpec.cs
+++ b/src/core/Akka.Cluster.Tests/HeartbeatNodeRingSpec.cs
@@ -7,8 +7,8 @@
using System.Collections.Immutable;
using Akka.Actor;
-using Akka.TestKit;
using Xunit;
+using FluentAssertions;
namespace Akka.Cluster.Tests
{
@@ -16,7 +16,7 @@ public class HeartbeatNodeRingSpec : ClusterSpecBase
{
public HeartbeatNodeRingSpec()
{
- _nodes = ImmutableHashSet.Create(aa, bb, cc, dd, ee);
+ _nodes = ImmutableSortedSet.Create(aa, bb, cc, dd, ee, ff);
}
private UniqueAddress aa = new UniqueAddress(new Address("akka.tcp", "sys", "aa", 2552), 1);
@@ -24,38 +24,52 @@ public HeartbeatNodeRingSpec()
private UniqueAddress cc = new UniqueAddress(new Address("akka.tcp", "sys", "cc", 2552), 3);
private UniqueAddress dd = new UniqueAddress(new Address("akka.tcp", "sys", "dd", 2552), 4);
private UniqueAddress ee = new UniqueAddress(new Address("akka.tcp", "sys", "ee", 2552), 5);
+ private UniqueAddress ff = new UniqueAddress(new Address("akka.tcp", "sys", "ff", 2552), 6);
- private readonly ImmutableHashSet _nodes;
+ private readonly ImmutableSortedSet _nodes;
[Fact]
public void HeartbeatNodeRing_must_pick_specified_number_of_nodes_as_receivers()
{
- var ring = new HeartbeatNodeRing(cc, _nodes, 3);
- ring.MyReceivers.Value.ShouldBe(ring.Receivers(cc));
+ var ring = new HeartbeatNodeRing(cc, _nodes, ImmutableSortedSet.Empty, 3);
+ ring.MyReceivers.Value.Should().BeEquivalentTo(ring.Receivers(cc));
foreach (var node in _nodes)
{
var receivers = ring.Receivers(node);
- receivers.Count.ShouldBe(3);
- receivers.Contains(node).ShouldBeFalse();
+ receivers.Count.Should().Be(3);
+ receivers.Should().NotContain(node);
}
}
+ [Fact]
+ public void HeartbeatNodeRing_must_pick_specified_number_of_nodes_plus_unreachable_as_receivers()
+ {
+ var ring = new HeartbeatNodeRing(cc, _nodes, ImmutableSortedSet.Create(aa, dd, ee), 3);
+ ring.MyReceivers.Value.Should().BeEquivalentTo(ring.Receivers(cc));
+
+ ring.Receivers(aa).Should().BeEquivalentTo(ImmutableSortedSet.Create(bb, cc, dd, ff)); // unreachable ee skipped
+ ring.Receivers(bb).Should().BeEquivalentTo(ImmutableSortedSet.Create(cc, dd, ee, ff)); // unreachable ee skipped
+ ring.Receivers(cc).Should().BeEquivalentTo(ImmutableSortedSet.Create(dd, ee, ff, bb)); // unreachable ee skipped
+ ring.Receivers(dd).Should().BeEquivalentTo(ImmutableSortedSet.Create(ee, ff, aa, bb, cc));
+ ring.Receivers(ee).Should().BeEquivalentTo(ImmutableSortedSet.Create(ff, aa, bb, cc));
+ ring.Receivers(ff).Should().BeEquivalentTo(ImmutableSortedSet.Create(aa, bb, cc)); // unreachable dd and ee skipped
+ }
+
[Fact]
public void HeartbeatNodeRing_must_pick_all_except_own_as_receivers_when_less_than_total_number_of_nodes()
{
- var expected = ImmutableHashSet.Create(aa, bb, dd, ee);
- new HeartbeatNodeRing(cc, _nodes, 4).MyReceivers.Value.ShouldBe(expected);
- new HeartbeatNodeRing(cc, _nodes, 5).MyReceivers.Value.ShouldBe(expected);
- new HeartbeatNodeRing(cc, _nodes, 6).MyReceivers.Value.ShouldBe(expected);
+ var expected = ImmutableHashSet.Create(aa, bb, dd, ee, ff);
+ new HeartbeatNodeRing(cc, _nodes, ImmutableSortedSet.Empty, 5).MyReceivers.Value.Should().BeEquivalentTo(expected);
+ new HeartbeatNodeRing(cc, _nodes, ImmutableSortedSet.Empty, 6).MyReceivers.Value.Should().BeEquivalentTo(expected);
+ new HeartbeatNodeRing(cc, _nodes, ImmutableSortedSet.Empty, 7).MyReceivers.Value.Should().BeEquivalentTo(expected);
}
[Fact]
public void HeartbeatNodeRing_must_pick_none_when_alone()
{
- var ring = new HeartbeatNodeRing(cc, new[] {cc}, 3);
- ring.MyReceivers.Value.ShouldBe(ImmutableHashSet.Create());
+ var ring = new HeartbeatNodeRing(cc, new[] { cc }, 3);
+ ring.MyReceivers.Value.Should().BeEquivalentTo(ImmutableHashSet.Empty);
}
}
}
-
diff --git a/src/core/Akka.Cluster/ClusterHeartbeat.cs b/src/core/Akka.Cluster/ClusterHeartbeat.cs
index 9572d8011d3..5040b48924b 100644
--- a/src/core/Akka.Cluster/ClusterHeartbeat.cs
+++ b/src/core/Akka.Cluster/ClusterHeartbeat.cs
@@ -73,7 +73,7 @@ public ClusterHeartbeatSender()
protected override void PreStart()
{
- _cluster.Subscribe(Self, new[] { typeof(ClusterEvent.IMemberEvent) });
+ _cluster.Subscribe(Self, new[] { typeof(ClusterEvent.IMemberEvent), typeof(ClusterEvent.IReachabilityEvent) });
}
protected override void PostStop()
@@ -102,6 +102,8 @@ private void Active()
Receive(rsp => DoHeartbeatRsp(rsp.From));
Receive(removed => RemoveMember(removed.Member));
Receive(evt => AddMember(evt.Member));
+ Receive(m => UnreachableMember(m.Member));
+ Receive(m => ReachableMember(m.Member));
Receive(heartbeat => TriggerFirstHeart(heartbeat.From));
}
@@ -116,7 +118,8 @@ private ActorSelection HeartbeatReceiver(Address address)
private void Init(ClusterEvent.CurrentClusterState snapshot)
{
var nodes = snapshot.Members.Select(x => x.UniqueAddress).ToImmutableHashSet();
- _state = _state.Init(nodes);
+ var unreachable = snapshot.Unreachable.Select(c => c.UniqueAddress).ToImmutableSortedSet();
+ _state = _state.Init(nodes, unreachable);
}
private void AddMember(Member m)
@@ -139,6 +142,16 @@ private void RemoveMember(Member m)
}
}
+ private void UnreachableMember(Member m)
+ {
+ _state = _state.UnreachableMember(m.UniqueAddress);
+ }
+
+ private void ReachableMember(Member m)
+ {
+ _state = _state.ReachableMember(m.UniqueAddress);
+ }
+
private void DoHeartbeat()
{
foreach (var to in _state.ActiveReceivers)
@@ -270,17 +283,17 @@ public ExpectedFirstHeartbeat(UniqueAddress @from)
///
internal sealed class ClusterHeartbeatSenderState
{
- public ClusterHeartbeatSenderState(HeartbeatNodeRing ring, ImmutableHashSet unreachable, IFailureDetectorRegistry failureDetector)
+ public ClusterHeartbeatSenderState(HeartbeatNodeRing ring, ImmutableHashSet oldReceiversNowUnreachable, IFailureDetectorRegistry failureDetector)
{
FailureDetector = failureDetector;
- Unreachable = unreachable;
+ OldReceiversNowUnreachable = oldReceiversNowUnreachable;
Ring = ring;
- ActiveReceivers = Ring.MyReceivers.Value.Union(Unreachable);
+ ActiveReceivers = Ring.MyReceivers.Value.Union(OldReceiversNowUnreachable);
}
public HeartbeatNodeRing Ring { get; private set; }
- public ImmutableHashSet Unreachable { get; private set; }
+ public ImmutableHashSet OldReceiversNowUnreachable { get; private set; }
public IFailureDetectorRegistry FailureDetector { get; private set; }
@@ -288,14 +301,14 @@ public ClusterHeartbeatSenderState(HeartbeatNodeRing ring, ImmutableHashSet unreachable = null, IFailureDetectorRegistry failureDetector = null)
+ public ClusterHeartbeatSenderState Copy(HeartbeatNodeRing ring = null, ImmutableHashSet oldReceiversNowUnreachable = null, IFailureDetectorRegistry failureDetector = null)
{
- return new ClusterHeartbeatSenderState(ring ?? Ring, unreachable ?? Unreachable, failureDetector ?? FailureDetector);
+ return new ClusterHeartbeatSenderState(ring ?? Ring, oldReceiversNowUnreachable ?? OldReceiversNowUnreachable, failureDetector ?? FailureDetector);
}
- public ClusterHeartbeatSenderState Init(ImmutableHashSet nodes)
+ public ClusterHeartbeatSenderState Init(ImmutableHashSet nodes, ImmutableSortedSet unreachable)
{
- return Copy(ring: Ring.Copy(nodes: nodes.Add(SelfAddress)));
+ return Copy(ring: Ring.Copy(nodes: nodes.Add(SelfAddress).ToImmutableSortedSet(), unreachable: unreachable));
}
public bool Contains(UniqueAddress node)
@@ -312,26 +325,36 @@ public ClusterHeartbeatSenderState RemoveMember(UniqueAddress node)
{
var newState = MembershipChange(Ring - node);
FailureDetector.Remove(node.Address);
- if (newState.Unreachable.Contains(node))
- return newState.Copy(unreachable: newState.Unreachable.Remove(node));
+ if (newState.OldReceiversNowUnreachable.Contains(node))
+ return newState.Copy(oldReceiversNowUnreachable: newState.OldReceiversNowUnreachable.Remove(node));
return newState;
}
+ public ClusterHeartbeatSenderState UnreachableMember(UniqueAddress node)
+ {
+ return MembershipChange(Ring.Copy(unreachable: Ring.Unreachable.Add(node)));
+ }
+
+ public ClusterHeartbeatSenderState ReachableMember(UniqueAddress node)
+ {
+ return MembershipChange(Ring.Copy(unreachable: Ring.Unreachable.Remove(node)));
+ }
+
private ClusterHeartbeatSenderState MembershipChange(HeartbeatNodeRing newRing)
{
var oldReceivers = Ring.MyReceivers.Value;
var removedReceivers = oldReceivers.Except(newRing.MyReceivers.Value);
- var newUnreachable = Unreachable;
+ var adjustedOldReceiversNowUnreachable = OldReceiversNowUnreachable;
foreach (var r in removedReceivers)
{
if (FailureDetector.IsAvailable(r.Address))
FailureDetector.Remove(r.Address);
else
{
- newUnreachable = newUnreachable.Add(r);
+ adjustedOldReceiversNowUnreachable = adjustedOldReceiversNowUnreachable.Add(r);
}
}
- return Copy(newRing, newUnreachable);
+ return Copy(newRing, oldReceiversNowUnreachable: adjustedOldReceiversNowUnreachable);
}
public ClusterHeartbeatSenderState HeartbeatRsp(UniqueAddress from)
@@ -339,14 +362,14 @@ public ClusterHeartbeatSenderState HeartbeatRsp(UniqueAddress from)
if (ActiveReceivers.Contains(from))
{
FailureDetector.Heartbeat(from.Address);
- if (Unreachable.Contains(from))
+ if (OldReceiversNowUnreachable.Contains(from))
{
//back from unreachable, ok to stop heartbeating to it
if (!Ring.MyReceivers.Value.Contains(from))
{
FailureDetector.Remove(from.Address);
}
- return Copy(unreachable: Unreachable.Remove(from));
+ return Copy(oldReceiversNowUnreachable: OldReceiversNowUnreachable.Remove(from));
}
return this;
}
@@ -364,17 +387,17 @@ public ClusterHeartbeatSenderState HeartbeatRsp(UniqueAddress from)
///
internal sealed class HeartbeatNodeRing
{
- public HeartbeatNodeRing(UniqueAddress selfAddress, IEnumerable nodes,
- int monitoredByNumberOfNodes)
- : this(selfAddress, ImmutableSortedSet.Create(nodes.ToArray()), monitoredByNumberOfNodes)
+ public HeartbeatNodeRing(UniqueAddress selfAddress, IEnumerable nodes, int monitoredByNumberOfNodes)
+ : this(selfAddress, ImmutableSortedSet.Create(nodes.ToArray()), ImmutableSortedSet.Empty, monitoredByNumberOfNodes)
{
}
- public HeartbeatNodeRing(UniqueAddress selfAddress, ImmutableSortedSet nodes, int monitoredByNumberOfNodes)
+ public HeartbeatNodeRing(UniqueAddress selfAddress, ImmutableSortedSet nodes, ImmutableSortedSet unreachable, int monitoredByNumberOfNodes)
{
MonitoredByNumberOfNodes = monitoredByNumberOfNodes;
NodeRing = nodes;
+ Unreachable = unreachable;
SelfAddress = selfAddress;
_useAllAsReceivers = MonitoredByNumberOfNodes >= (NodeRing.Count - 1);
MyReceivers = new Lazy>(() => Receivers(SelfAddress));
@@ -384,6 +407,8 @@ public HeartbeatNodeRing(UniqueAddress selfAddress, ImmutableSortedSet NodeRing { get; private set; }
+ public ImmutableSortedSet Unreachable { get; private set; }
+
public int MonitoredByNumberOfNodes { get; private set; }
///
@@ -396,20 +421,55 @@ public HeartbeatNodeRing(UniqueAddress selfAddress, ImmutableSortedSet Receivers(UniqueAddress sender)
{
if (_useAllAsReceivers)
+ {
return NodeRing.Remove(sender).ToImmutableHashSet();
- var slice = NodeRing.From(sender).Skip(1).Take(MonitoredByNumberOfNodes).ToList(); //grab members furthest from this peer
- if (slice.Count < MonitoredByNumberOfNodes)
+ }
+ else
{
- slice = slice.Concat(NodeRing.Take(MonitoredByNumberOfNodes - slice.Count)).ToList();
+ Func, ImmutableSortedSet, Tuple>> take = null;
+ take = (n, iter, acc) =>
+ {
+ if (iter.MoveNext() == false || n == 0)
+ {
+ return Tuple.Create(n, acc);
+ }
+ else
+ {
+ UniqueAddress next = iter.Current;
+ var isUnreachable = Unreachable.Contains(next);
+ if (isUnreachable && acc.Count >= MonitoredByNumberOfNodes)
+ {
+ return take(n, iter, acc); // skip the unreachable, since we have already picked `MonitoredByNumberOfNodes`
+ }
+ else if (isUnreachable)
+ {
+ return take(n, iter, acc.Add(next)); // include the unreachable, but don't count it
+ }
+ else
+ {
+ return take(n - 1, iter, acc.Add(next)); // include the reachable
+ }
+ }
+ };
+
+ var tuple = take(MonitoredByNumberOfNodes, NodeRing.From(sender).Skip(1).GetEnumerator(), ImmutableSortedSet.Empty);
+ var remaining = tuple.Item1;
+ var slice1 = tuple.Item2;
+
+ ImmutableSortedSet slice = remaining == 0
+ ? slice1
+ : take(remaining, NodeRing.Where(c => c != sender).GetEnumerator(), slice1).Item2;
+
+ return slice.ToImmutableHashSet();
}
- return slice.ToImmutableHashSet();
}
- public HeartbeatNodeRing Copy(UniqueAddress selfAddress = null, IEnumerable nodes = null,
- int? monitoredByNumberOfNodes = null)
+ public HeartbeatNodeRing Copy(UniqueAddress selfAddress = null, ImmutableSortedSet nodes = null, ImmutableSortedSet unreachable = null, int? monitoredByNumberOfNodes = null)
{
- return new HeartbeatNodeRing(selfAddress ?? SelfAddress,
+ return new HeartbeatNodeRing(
+ selfAddress ?? SelfAddress,
nodes ?? NodeRing,
+ unreachable ?? Unreachable,
monitoredByNumberOfNodes.HasValue ? monitoredByNumberOfNodes.Value : MonitoredByNumberOfNodes);
}
@@ -422,7 +482,9 @@ public HeartbeatNodeRing Copy(UniqueAddress selfAddress = null, IEnumerable