Skip to content

Commit

Permalink
Merge pull request #2098 from alexvaluyskiy/clusterspecs
Browse files Browse the repository at this point in the history
Cluster specs update
  • Loading branch information
Aaronontheweb authored Jun 20, 2016
2 parents f4cbf3a + c614ad8 commit 297ac0e
Show file tree
Hide file tree
Showing 25 changed files with 758 additions and 465 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public ClusterShardingFailureSpecConfig()
akka.actor.provider = ""Akka.Cluster.ClusterActorRefProvider""
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-down-unreachable-after = 0s
akka.cluster.down-removal-margin = 5s
akka.cluster.roles = [""backend""]
akka.persistence.journal.plugin = ""akka.persistence.journal.in-mem""
akka.persistence.journal.in-mem {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ public ClusterShardingLeavingSpecConfig()
akka.actor.provider = ""Akka.Cluster.ClusterActorRefProvider""
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-down-unreachable-after = 0s
akka.cluster.down-removal-margin = 5s
akka.persistence.journal.plugin = ""Akka.Persistence.Journal.in-mem""
akka.persistence.journal.in-mem {
timeout = 5s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ public ClusterShardingSpecConfig()
akka.actor.provider = ""Akka.Cluster.ClusterActorRefProvider""
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-down-unreachable-after = 0s
akka.cluster.down-removal-margin = 5s
akka.cluster.roles = [""backend""]
akka.persistence.journal.plugin = ""akka.persistence.journal.leveldb-shared""
akka.persistence.journal.leveldb-shared.store {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public void ClusterSingletonManagerSettings_must_have_default_config()
clusterSingletonManagerSettings.SingletonName.ShouldBe("singleton");
clusterSingletonManagerSettings.Role.ShouldBe(null);
clusterSingletonManagerSettings.HandOverRetryInterval.TotalSeconds.ShouldBe(1);
clusterSingletonManagerSettings.RemovalMargin.TotalSeconds.ShouldBe(20);
clusterSingletonManagerSettings.RemovalMargin.TotalSeconds.ShouldBe(0);

var config = Sys.Settings.Config.GetConfig("akka.cluster.singleton");
config.GetInt("min-number-of-hand-over-retries").ShouldBe(10);
Expand Down
2 changes: 2 additions & 0 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,8 @@ namespace Akka.Actor
{
public static readonly Akka.Actor.Address AllSystems;
public Address(string protocol, string system, string host = null, System.Nullable<int> port = null) { }
public bool HasGlobalScope { get; }
public bool HasLocalScope { get; }
public string Host { get; }
public System.Nullable<int> Port { get; }
public string Protocol { get; }
Expand Down
16 changes: 7 additions & 9 deletions src/core/Akka.Cluster.Tests/AutoDownSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System;
using Akka.Actor;
using Akka.TestKit;
using FluentAssertions;
using Xunit;

namespace Akka.Cluster.Tests
Expand Down Expand Up @@ -113,7 +114,7 @@ public void AutoDown_must_down_unreachable_after_specified_duration()
var a = AutoDownActor(TimeSpan.FromSeconds(2));
a.Tell(new ClusterEvent.LeaderChanged(MemberA.Address));
a.Tell(new ClusterEvent.UnreachableMember(MemberB));
ExpectNoMsg(TimeSpan.FromSeconds(1));
ExpectNoMsg(1.Seconds());
ExpectMsg(new DownCalled(MemberB.Address));
}

Expand All @@ -124,7 +125,7 @@ public void AutoDown_must_down_unreachable_when_becoming_leader_inbetween_detect
a.Tell(new ClusterEvent.LeaderChanged(MemberB.Address));
a.Tell(new ClusterEvent.UnreachableMember(MemberC));
a.Tell(new ClusterEvent.LeaderChanged(MemberA.Address));
ExpectNoMsg(TimeSpan.FromSeconds(1));
ExpectNoMsg(1.Seconds());
ExpectMsg(new DownCalled(MemberC.Address));
}

Expand All @@ -135,7 +136,7 @@ public void AutoDown_must_not_down_unreachable_when_loosing_leadership_inbetween
a.Tell(new ClusterEvent.LeaderChanged(MemberA.Address));
a.Tell(new ClusterEvent.UnreachableMember(MemberC));
a.Tell(new ClusterEvent.LeaderChanged(MemberB.Address));
ExpectNoMsg(TimeSpan.FromSeconds(3));
ExpectNoMsg(3.Seconds());
}

[Fact]
Expand All @@ -145,18 +146,17 @@ public void AutoDown_must_not_down_when_unreachable_become_reachable_inbetween_d
a.Tell(new ClusterEvent.LeaderChanged(MemberA.Address));
a.Tell(new ClusterEvent.UnreachableMember(MemberB));
a.Tell(new ClusterEvent.ReachableMember(MemberB));
ExpectNoMsg(TimeSpan.FromSeconds(3));
ExpectNoMsg(3.Seconds());
}


[Fact]
public void AutoDown_must_not_down_unreachable_is_removed_inbetween_detection_and_specified_duration()
{
var a = AutoDownActor(TimeSpan.FromSeconds(2));
a.Tell(new ClusterEvent.LeaderChanged(MemberA.Address));
a.Tell(new ClusterEvent.UnreachableMember(MemberB));
a.Tell(new ClusterEvent.MemberRemoved(MemberB.Copy(MemberStatus.Removed), MemberStatus.Exiting));
ExpectNoMsg(TimeSpan.FromSeconds(3));
ExpectNoMsg(3.Seconds());
}

[Fact]
Expand All @@ -165,9 +165,7 @@ public void AutoDown_must_not_down_when_unreachable_is_already_down()
var a = AutoDownActor(TimeSpan.Zero);
a.Tell(new ClusterEvent.LeaderChanged(MemberA.Address));
a.Tell(new ClusterEvent.UnreachableMember(MemberB.Copy(MemberStatus.Down)));
ExpectNoMsg(TimeSpan.FromSeconds(1));
ExpectNoMsg(1.Seconds());
}

}
}

79 changes: 45 additions & 34 deletions src/core/Akka.Cluster.Tests/ClusterConfigSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
using System;
using System.Collections.Immutable;
using Akka.Actor;
using Akka.Dispatch;
using Akka.Remote;
using Akka.TestKit;
using Xunit;
using Assert = Xunit.Assert;
using FluentAssertions;

namespace Akka.Cluster.Tests
{
Expand All @@ -23,40 +25,49 @@ public ClusterConfigSpec() : base(@"akka.actor.provider = ""Akka.Cluster.Cluster
public void Clustering_must_be_able_to_parse_generic_cluster_config_elements()
{
var settings = new ClusterSettings(Sys.Settings.Config, Sys.Name);
Assert.True(settings.LogInfo);
Assert.Equal(8, settings.FailureDetectorConfig.GetDouble("threshold"));
Assert.Equal(1000, settings.FailureDetectorConfig.GetInt("max-sample-size"));
Assert.Equal(TimeSpan.FromMilliseconds(100), settings.FailureDetectorConfig.GetTimeSpan("min-std-deviation"));
Assert.Equal(TimeSpan.FromSeconds(3), settings.FailureDetectorConfig.GetTimeSpan("acceptable-heartbeat-pause"));
Assert.Equal(typeof(PhiAccrualFailureDetector), Type.GetType(settings.FailureDetectorImplementationClass));
Assert.Equal(ImmutableList.Create<Address>(), settings.SeedNodes);
Assert.Equal(TimeSpan.FromSeconds(5), settings.SeedNodeTimeout);
Assert.Equal(TimeSpan.FromSeconds(10), settings.RetryUnsuccessfulJoinAfter);
Assert.Equal(TimeSpan.FromSeconds(1), settings.PeriodicTasksInitialDelay);
Assert.Equal(TimeSpan.FromSeconds(1), settings.GossipInterval);
Assert.Equal(TimeSpan.FromSeconds(2), settings.GossipTimeToLive);
Assert.Equal(TimeSpan.FromSeconds(1), settings.HeartbeatInterval);
Assert.Equal(5, settings.MonitoredByNrOfMembers);
Assert.Equal(TimeSpan.FromSeconds(5), settings.HeartbeatExpectedResponseAfter);
Assert.Equal(TimeSpan.FromSeconds(1), settings.LeaderActionsInterval);
Assert.Equal(TimeSpan.FromSeconds(1), settings.UnreachableNodesReaperInterval);
Assert.Null(settings.PublishStatsInterval);
Assert.Null(settings.AutoDownUnreachableAfter);
Assert.Equal(1, settings.MinNrOfMembers);
//TODO:
//Assert.AreEqual(ImmutableDictionary.Create<string, int>(), settings.);
Assert.Equal(ImmutableHashSet.Create<string>(), settings.Roles);
Assert.Equal("akka.cluster.default-cluster-dispatcher", settings.UseDispatcher);
Assert.Equal(.8, settings.GossipDifferentViewProbability);
Assert.Equal(400, settings.ReduceGossipDifferentViewProbability);
Assert.Equal(TimeSpan.FromMilliseconds(33), settings.SchedulerTickDuration);
Assert.Equal(512, settings.SchedulerTicksPerWheel);
Assert.True(settings.MetricsEnabled);
//TODO:
//Assert.AreEqual(typeof(SigarMetricsCollector).FullName, settings.MetricsCollectorClass);
Assert.Equal(TimeSpan.FromSeconds(3), settings.MetricsGossipInterval);
Assert.Equal(TimeSpan.FromSeconds(12), settings.MetricsMovingAverageHalfLife);
Assert.Equal(false, settings.VerboseHeartbeatLogging);
settings.LogInfo.Should().BeTrue();

settings.SeedNodes.Should().BeEquivalentTo(ImmutableList.Create<Address>());
settings.SeedNodeTimeout.Should().Be(5.Seconds());
settings.RetryUnsuccessfulJoinAfter.Should().Be(10.Seconds());
settings.PeriodicTasksInitialDelay.Should().Be(1.Seconds());
settings.GossipInterval.Should().Be(1.Seconds());
settings.GossipTimeToLive.Should().Be(2.Seconds());
settings.HeartbeatInterval.Should().Be(1.Seconds());
settings.MonitoredByNrOfMembers.Should().Be(5);
settings.HeartbeatExpectedResponseAfter.Should().Be(1.Seconds());
settings.LeaderActionsInterval.Should().Be(1.Seconds());
settings.UnreachableNodesReaperInterval.Should().Be(1.Seconds());
settings.PublishStatsInterval.Should().NotHaveValue();
settings.AutoDownUnreachableAfter.Should().NotHaveValue();
settings.DownRemovalMargin.Should().Be(TimeSpan.Zero);
settings.MinNrOfMembers.Should().Be(1);
settings.MinNrOfMembersOfRole.Should().Equal(ImmutableDictionary<string, int>.Empty);
settings.Roles.Should().BeEquivalentTo(ImmutableHashSet<string>.Empty);
// settings.UseDispatcher.Should().Be(Dispatchers.DefaultDispatcherId);
settings.GossipDifferentViewProbability.Should().Be(0.8);
settings.ReduceGossipDifferentViewProbability.Should().Be(400);

Type.GetType(settings.FailureDetectorImplementationClass).Should().Be(typeof(PhiAccrualFailureDetector));
settings.FailureDetectorConfig.GetTimeSpan("heartbeat-interval").Should().Be(1.Seconds());
settings.FailureDetectorConfig.GetDouble("threshold").Should().Be(8.0d);
settings.FailureDetectorConfig.GetInt("max-sample-size").Should().Be(1000);
settings.FailureDetectorConfig.GetTimeSpan("min-std-deviation").Should().Be(100.Milliseconds());
settings.FailureDetectorConfig.GetTimeSpan("acceptable-heartbeat-pause").Should().Be(3.Seconds());
settings.FailureDetectorConfig.GetInt("monitored-by-nr-of-members").Should().Be(5);
settings.FailureDetectorConfig.GetTimeSpan("expected-response-after").Should().Be(1.Seconds());

// TODO remove metrics
settings.MetricsEnabled.Should().BeTrue();
Type.GetType(settings.MetricsCollectorClass).Should().Be(typeof(PerformanceCounterMetricsCollector));
settings.MetricsInterval.Should().Be(3.Seconds());
settings.MetricsGossipInterval.Should().Be(3.Seconds());
settings.MetricsMovingAverageHalfLife.Should().Be(12.Seconds());

settings.SchedulerTickDuration.Should().Be(33.Milliseconds());
settings.SchedulerTicksPerWheel.Should().Be(512);

settings.VerboseHeartbeatLogging.Should().BeFalse();
}

[Fact]
Expand Down
105 changes: 52 additions & 53 deletions src/core/Akka.Cluster.Tests/ClusterDeployerSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,81 +13,80 @@
using Akka.TestKit;
using Akka.Util.Internal;
using Xunit;
using FluentAssertions;

namespace Akka.Cluster.Tests
{
public class ClusterDeployerSpec : AkkaSpec
{
public static readonly Config deployConf = ConfigurationFactory.ParseString(@"
akka.actor.provider = ""Akka.Cluster.ClusterActorRefProvider, Akka.Cluster""
akka.actor.deployment {
/user/service1 {
router = round-robin-pool
nr-of-instances = 20
cluster.enabled = on
cluster.max-nr-of-instances-per-node = 3
cluster.max-total-nr-of-instances = 20
cluster.allow-local-routees = off
}
/user/service2 {
dispatcher = mydispatcher
mailbox = mymailbox
router = round-robin-group
nr-of-instances = 20
routees.paths = [""/user/myservice""]
cluster.enabled = on
cluster.max-total-nr-of-instances = 20
cluster.allow-local-routees = off
cluster.use-role = backend
}
}
akka.remote.helios.tcp.port = 0
");
public static readonly Config deployerConf = ConfigurationFactory.ParseString(@"
akka.actor.provider = ""Akka.Cluster.ClusterActorRefProvider, Akka.Cluster""
akka.actor.deployment {
/user/service1 {
router = round-robin-pool
cluster.enabled = on
cluster.max-nr-of-instances-per-node = 3
cluster.max-total-nr-of-instances = 20
cluster.allow-local-routees = off
}
/user/service2 {
dispatcher = mydispatcher
mailbox = mymailbox
router = round-robin-group
nr-of-instances = 20
routees.paths = [""/user/myservice""]
cluster.enabled = on
cluster.max-total-nr-of-instances = 20
cluster.allow-local-routees = off
}
}
akka.remote.helios.tcp.port = 0");

public ClusterDeployerSpec() : base(deployConf) { }
public ClusterDeployerSpec() : base(deployerConf) { }

[Fact]
public void RemoteDeployer_must_be_able_to_parse_akka_actor_deployment_with_specified_cluster_pool()
{
var service = "/user/service1";
var deployment = Sys.AsInstanceOf<ActorSystemImpl>().Provider.Deployer.Lookup(service.Split('/').Drop(1));
deployment.ShouldNotBe(null);
deployment.Should().NotBeNull();

deployment.Path.ShouldBe(service);
deployment.RouterConfig.GetType().ShouldBe(typeof(ClusterRouterPool));
deployment.RouterConfig.AsInstanceOf<ClusterRouterPool>().Local.GetType().ShouldBe(typeof(RoundRobinPool));
deployment.RouterConfig.AsInstanceOf<ClusterRouterPool>().Local.AsInstanceOf<RoundRobinPool>().NrOfInstances.ShouldBe(20);
deployment.RouterConfig.AsInstanceOf<ClusterRouterPool>().Settings.TotalInstances.ShouldBe(20);
deployment.RouterConfig.AsInstanceOf<ClusterRouterPool>().Settings.AllowLocalRoutees.ShouldBe(false);
deployment.RouterConfig.AsInstanceOf<ClusterRouterPool>().Settings.UseRole.ShouldBe(string.Empty);
deployment.RouterConfig.AsInstanceOf<ClusterRouterPool>().Settings.AsInstanceOf<ClusterRouterPoolSettings>().MaxInstancesPerNode.ShouldBe(3);
deployment.Scope.ShouldBe(ClusterScope.Instance);
deployment.Mailbox.ShouldBe(Deploy.NoMailboxGiven);
deployment.Dispatcher.ShouldBe(Deploy.NoDispatcherGiven);
deployment.Path.Should().Be(service);
deployment.RouterConfig.Should().BeOfType<ClusterRouterPool>();

var routerConfig = deployment.RouterConfig.AsInstanceOf<ClusterRouterPool>();
routerConfig.Local.Should().BeOfType<RoundRobinPool>();
routerConfig.Local.AsInstanceOf<RoundRobinPool>().NrOfInstances.Should().Be(20);
routerConfig.Settings.TotalInstances.Should().Be(20);
routerConfig.Settings.AllowLocalRoutees.Should().BeFalse();
routerConfig.Settings.UseRole.Should().BeNull();
routerConfig.Settings.AsInstanceOf<ClusterRouterPoolSettings>().MaxInstancesPerNode.Should().Be(3);
deployment.Scope.Should().Be(ClusterScope.Instance);
deployment.Mailbox.Should().Be(Deploy.NoMailboxGiven);
deployment.Dispatcher.Should().Be(Deploy.NoDispatcherGiven);
}

[Fact]
public void RemoteDeployer_must_be_able_to_parse_akka_actor_deployment_with_specified_cluster_group()
{
var service = "/user/service2";
var deployment = Sys.AsInstanceOf<ActorSystemImpl>().Provider.Deployer.Lookup(service.Split('/').Drop(1));
deployment.ShouldNotBe(null);
deployment.Should().NotBeNull();

deployment.Path.ShouldBe(service);
deployment.RouterConfig.GetType().ShouldBe(typeof(ClusterRouterGroup));
deployment.RouterConfig.AsInstanceOf<ClusterRouterGroup>().Local.GetType().ShouldBe(typeof(RoundRobinGroup));
deployment.RouterConfig.AsInstanceOf<ClusterRouterGroup>().Local.AsInstanceOf<RoundRobinGroup>().Paths.ShouldBe(new[]{ "/user/myservice" });
deployment.RouterConfig.AsInstanceOf<ClusterRouterGroup>().Settings.TotalInstances.ShouldBe(20);
deployment.RouterConfig.AsInstanceOf<ClusterRouterGroup>().Settings.AllowLocalRoutees.ShouldBe(false);
deployment.RouterConfig.AsInstanceOf<ClusterRouterGroup>().Settings.UseRole.ShouldBe("backend");
deployment.RouterConfig.AsInstanceOf<ClusterRouterGroup>().Settings.AsInstanceOf<ClusterRouterGroupSettings>().RouteesPaths.ShouldBe(new[] { "/user/myservice" });
deployment.Scope.ShouldBe(ClusterScope.Instance);
deployment.Mailbox.ShouldBe("mymailbox");
deployment.Dispatcher.ShouldBe("mydispatcher");
}
deployment.Path.Should().Be(service);
deployment.RouterConfig.Should().BeOfType<ClusterRouterGroup>();

//todo: implement "have correct router mappings" test for adaptive load-balancing routers (not yet implemented)
var routerConfig = deployment.RouterConfig.AsInstanceOf<ClusterRouterGroup>();
routerConfig.Local.Should().BeOfType<RoundRobinGroup>();
routerConfig.Local.AsInstanceOf<RoundRobinGroup>().Paths.Should().BeEquivalentTo("/user/myservice");
routerConfig.Settings.TotalInstances.Should().Be(20);
routerConfig.Settings.AllowLocalRoutees.Should().BeFalse();
routerConfig.Settings.UseRole.Should().BeNull();
routerConfig.Settings.AsInstanceOf<ClusterRouterGroupSettings>().RouteesPaths.Should().BeEquivalentTo("/user/myservice");
deployment.Scope.Should().Be(ClusterScope.Instance);
deployment.Mailbox.Should().Be("mymailbox");
deployment.Dispatcher.Should().Be("mydispatcher");
}
}
}

Loading

0 comments on commit 297ac0e

Please sign in to comment.