Skip to content

Commit

Permalink
implemented Microsoft.Extensions.DependencyInjection based actor crea…
Browse files Browse the repository at this point in the history
…tion pipeline
  • Loading branch information
Horusiath committed Sep 1, 2017
1 parent 491e2d0 commit b4093ec
Show file tree
Hide file tree
Showing 24 changed files with 275 additions and 616 deletions.
2 changes: 1 addition & 1 deletion src/benchmark/PingPong/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ private static async Task<Tuple<bool, long, int>> Benchmark<TActor>(int factor,

var ts = new TaskCompletionSource<bool>();
tasks.Add(ts.Task);
var client = (RepointableActorRef)system.ActorOf(new Props(typeof(TActor), null, destination, repeatsPerClient, ts), "client-" + i);
var client = (RepointableActorRef)system.ActorOf(Props.Create(typeof(TActor), null, destination, repeatsPerClient, ts), "client-" + i);
SpinWait.SpinUntil(() => client.IsStarted);
client.Underlying.AsInstanceOf<ActorCell>().Dispatcher.Throughput = factor;
clients.Add(client);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ private void InitializeFSM()
{
if (e.FsmEvent is StartOldestChangedBuffer)
{
_oldestChangedBuffer = Context.ActorOf(Actor.Props.Create<OldestChangedBuffer>(_settings.Role).WithDispatcher(Context.Props.Dispatcher));
_oldestChangedBuffer = Context.ActorOf(Actor.Props.Create<OldestChangedBuffer>(new object[] { _settings.Role }).WithDispatcher(Context.Props.Dispatcher));
GetNextOldestChange();
return Stay();
}
Expand Down Expand Up @@ -751,7 +751,8 @@ private void InitializeFSM()
// transition when OldestChanged
return Stay().Using(new YoungerData(null));
}
else if (e.FsmEvent is HandOverToMe) {
else if (e.FsmEvent is HandOverToMe)
{
// this node was probably quickly restarted with same hostname:port,
// confirm that the old singleton instance has been stopped
Sender.Tell(HandOverDone.Instance);
Expand Down
8 changes: 2 additions & 6 deletions src/core/Akka.Cluster.Tests/AutoDownSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,8 @@ public override void Down(Address node)
}
}

private IActorRef AutoDownActor(TimeSpan autoDownUnreachableAfter)
{
return
Sys.ActorOf(new Props(typeof(AutoDownTestActor),
new object[] { autoDownUnreachableAfter, this.TestActor }));
}
private IActorRef AutoDownActor(TimeSpan autoDownUnreachableAfter) => Sys.ActorOf(Props.Create(typeof(AutoDownTestActor),
new object[] { autoDownUnreachableAfter, this.TestActor }));

[Fact]
public void AutoDown_must_down_unreachable_when_leader()
Expand Down
18 changes: 8 additions & 10 deletions src/core/Akka.Cluster/AutoDown.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,8 @@ internal class AutoDown : AutoDownBase
/// </summary>
/// <param name="autoDownUnreachableAfter">TBD</param>
/// <returns>TBD</returns>
public static Props Props(TimeSpan autoDownUnreachableAfter)
{
return Actor.Props.Create<AutoDown>(autoDownUnreachableAfter);
}
public static Props Props(TimeSpan autoDownUnreachableAfter) =>
Actor.Props.Create<AutoDown>(new object[] { autoDownUnreachableAfter });

/// <summary>
/// TBD
Expand Down Expand Up @@ -106,7 +104,7 @@ public override IScheduler Scheduler
/// </summary>
protected override void PreStart()
{
_cluster.Subscribe(Self,new []{ typeof(ClusterEvent.IClusterDomainEvent)});
_cluster.Subscribe(Self, new[] { typeof(ClusterEvent.IClusterDomainEvent) });
base.PreStart();
}

Expand All @@ -128,7 +126,7 @@ protected override void PostStop()
/// </exception>
public override void Down(Address node)
{
if(!_leader) throw new InvalidOperationException("Must be leader to down node");
if (!_leader) throw new InvalidOperationException("Must be leader to down node");
_cluster.LogInfo("Leader is auto-downing unreachable node [{0}]", node);
_cluster.Down(node);
}
Expand Down Expand Up @@ -226,7 +224,7 @@ protected override void OnReceive(object message)
_leader = leaderChanged.Leader != null && leaderChanged.Leader.Equals(SelfAddress);
if (_leader)
{
foreach(var node in _pendingUnreachable) Down(node.Address);
foreach (var node in _pendingUnreachable) Down(node.Address);
_pendingUnreachable = ImmutableHashSet.Create<UniqueAddress>();
}
return;
Expand All @@ -246,7 +244,7 @@ protected override void OnReceive(object message)

private void UnreachableMember(Member m)
{
if(!_skipMemberStatus.Contains(m.Status) && !_scheduledUnreachable.ContainsKey(m.UniqueAddress))
if (!_skipMemberStatus.Contains(m.Status) && !_scheduledUnreachable.ContainsKey(m.UniqueAddress))
ScheduleUnreachable(m.UniqueAddress);
}

Expand Down Expand Up @@ -279,7 +277,7 @@ private void DownOrAddPending(UniqueAddress node)

private void Remove(UniqueAddress node)
{
if(_scheduledUnreachable.TryGetValue(node, out var source))
if (_scheduledUnreachable.TryGetValue(node, out var source))
source.Cancel();
_scheduledUnreachable = _scheduledUnreachable.Remove(node);
_pendingUnreachable = _pendingUnreachable.Remove(node);
Expand Down Expand Up @@ -321,7 +319,7 @@ public Props DowningActorProps
{
if (_clusterSettings.AutoDownUnreachableAfter.HasValue)
return AutoDown.Props(_clusterSettings.AutoDownUnreachableAfter.Value);
else
else
throw new ConfigurationException("AutoDowning downing provider selected but 'akka.cluster.auto-down-unreachable-after' not set");
}
}
Expand Down
15 changes: 6 additions & 9 deletions src/core/Akka.Cluster/ClusterRemoteWatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,13 @@ public static Props Props(
IFailureDetectorRegistry<Address> failureDetector,
TimeSpan heartbeatInterval,
TimeSpan unreachableReaperInterval,
TimeSpan heartbeatExpectedResponseAfter)
TimeSpan heartbeatExpectedResponseAfter) => Actor.Props.Create(typeof(ClusterRemoteWatcher), new object[]
{
return new Props(typeof(ClusterRemoteWatcher), new object[]
{
failureDetector,
heartbeatInterval,
unreachableReaperInterval,
heartbeatExpectedResponseAfter
}).WithDeploy(Deploy.Local);
}
failureDetector,
heartbeatInterval,
unreachableReaperInterval,
heartbeatExpectedResponseAfter
}).WithDeploy(Deploy.Local);

private readonly Cluster _cluster;
private ImmutableHashSet<Address> _clusterNodes = ImmutableHashSet.Create<Address>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ public void PersistentActor_should_stop_if_recovery_from_persisted_events_fails(

// recover by creating another with same name
var sup = Sys.ActorOf(Props.Create(() => new Supervisor(TestActor)));
sup.Tell(new Props(typeof (BehaviorOneActor), new object[] {Name}));
sup.Tell(Props.Create(typeof (BehaviorOneActor), new object[] {Name}));
var newPref = ExpectMsg<IActorRef>();
Watch(newPref);
ExpectTerminated(newPref);
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Persistence/Persistence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ private static IActorRef CreatePlugin(ExtendedActorSystem system, string configP
var pluginType = Type.GetType(pluginTypeName, true);
var pluginDispatcherId = pluginConfig.GetString("plugin-dispatcher");
object[] pluginActorArgs = pluginType.GetConstructor(new[] { typeof(Config) }) != null ? new object[] { pluginConfig } : null;
var pluginActorProps = new Props(pluginType, pluginActorArgs).WithDispatcher(pluginDispatcherId);
var pluginActorProps = Props.Create(pluginType, pluginActorArgs).WithDispatcher(pluginDispatcherId);

return system.SystemActorOf(pluginActorProps, pluginActorName);
}
Expand Down
3 changes: 1 addition & 2 deletions src/core/Akka.Remote.TestKit.Tests/BarrierSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,7 @@ public void A_BarrierCoordinator_must_fail_if_a_node_registers_twice()
private IActorRef GetBarrier()
{
var actor =
Sys.ActorOf(
new Props(typeof (BarrierCoordinatorSupervisor), new object[] {TestActor}).WithDeploy(Deploy.Local));
Sys.ActorOf(Props.Create(typeof (BarrierCoordinatorSupervisor), new object[] {TestActor}).WithDeploy(Deploy.Local));
actor.Tell("", TestActor);
return ExpectMsg<IActorRef>();
}
Expand Down
10 changes: 5 additions & 5 deletions src/core/Akka.Remote.Tests/RemoteWatcherSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public void A_RemoteWatcher_must_have_correct_interaction_when_watching()
var fd = CreateFailureDetectorRegistry();
var monitorA = Sys.ActorOf(Props.Create<TestRemoteWatcher>(), "monitor1");
//TODO: Better way to write this?
var monitorB = CreateRemoteActor(new Props(new Deploy(), typeof(TestActorProxy), new[]{TestActor}), "monitor1");
var monitorB = CreateRemoteActor(Props.Create(() => new TestActorProxy(TestActor)), "monitor1");

var a1 = Sys.ActorOf(Props.Create<MyActor>(), "a1").AsInstanceOf<IInternalActorRef>();
var a2 = Sys.ActorOf(Props.Create<MyActor>(), "a2").AsInstanceOf<IInternalActorRef>();
Expand Down Expand Up @@ -278,7 +278,7 @@ public void A_RemoteWatcher_must_generate_address_terminated_when_missing_heartb
Sys.EventStream.Subscribe(q.Ref, typeof(TestRemoteWatcher.Quarantined));

var monitorA = Sys.ActorOf(Props.Create<TestRemoteWatcher>(), "monitor4");
var monitorB = CreateRemoteActor(new Props(new Deploy(), typeof(TestActorProxy), new[] { TestActor }), "monitor4");
var monitorB = CreateRemoteActor(Props.Create(() => new TestActorProxy(TestActor)), "monitor4");

var a = Sys.ActorOf(Props.Create<MyActor>(), "a4").AsInstanceOf<IInternalActorRef>();
var b = CreateRemoteActor(Props.Create<MyActor>(), "b4");
Expand Down Expand Up @@ -320,8 +320,8 @@ public void A_RemoteWatcher_must_generate_address_terminated_when_missing_first_

var fd = CreateFailureDetectorRegistry();
var heartbeatExpectedResponseAfter = TimeSpan.FromSeconds(2);
var monitorA = Sys.ActorOf(new Props(new Deploy(), typeof(TestRemoteWatcher), new object[] {heartbeatExpectedResponseAfter}), "monitor5");
var monitorB = CreateRemoteActor(new Props(new Deploy(), typeof(TestActorProxy), new[] { TestActor }), "monitor5");
var monitorA = Sys.ActorOf(Props.Create(() => new TestRemoteWatcher(heartbeatExpectedResponseAfter)), "monitor5");
var monitorB = CreateRemoteActor(Props.Create(() => new TestActorProxy(TestActor)), "monitor5");

var a = Sys.ActorOf(Props.Create<MyActor>(), "a5").AsInstanceOf<IInternalActorRef>();
var b = CreateRemoteActor(Props.Create<MyActor>(), "b5");
Expand Down Expand Up @@ -360,7 +360,7 @@ public void
Sys.EventStream.Subscribe(q.Ref, typeof(TestRemoteWatcher.Quarantined));

var monitorA = Sys.ActorOf(Props.Create<TestRemoteWatcher>(), "monitor6");
var monitorB = CreateRemoteActor(new Props(new Deploy(), typeof(TestActorProxy), new[] { TestActor }), "monitor6");
var monitorB = CreateRemoteActor(Props.Create(() => new TestActorProxy(TestActor)), "monitor6");

var a = Sys.ActorOf(Props.Create<MyActor>(), "a6").AsInstanceOf<IInternalActorRef>();
var b = CreateRemoteActor(Props.Create<MyActor>(), "b6");
Expand Down
42 changes: 1 addition & 41 deletions src/core/Akka.Remote.Tests/RemotingSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -380,39 +380,7 @@ public void Remoting_must_create_and_supervise_children_on_remote_Node()
var r = Sys.ActorOf<Echo1>("blub");
Assert.Equal("akka.test://remote-sys@localhost:12346/remote/akka.test/RemotingSpec@localhost:12345/user/blub", r.Path.ToString());
}

[Fact]
public void Remoting_must_create_by_IndirectActorProducer()
{
try
{
Resolve.SetResolver(new TestResolver());
var r = Sys.ActorOf(Props.CreateBy<Resolve<Echo2>>(), "echo");
Assert.Equal("akka.test://remote-sys@localhost:12346/remote/akka.test/RemotingSpec@localhost:12345/user/echo", r.Path.ToString());
}
finally
{
Resolve.SetResolver(null);
}
}

[Fact()]
public void Remoting_must_create_by_IndirectActorProducer_and_ping()
{
try
{
Resolve.SetResolver(new TestResolver());
var r = Sys.ActorOf(Props.CreateBy<Resolve<Echo2>>(), "echo");
Assert.Equal("akka.test://remote-sys@localhost:12346/remote/akka.test/RemotingSpec@localhost:12345/user/echo", r.Path.ToString());
r.Tell("ping", TestActor);
ExpectMsg(Tuple.Create("pong", TestActor), TimeSpan.FromSeconds(1.5));
}
finally
{
Resolve.SetResolver(null);
}
}


[Fact]
public async Task Bug_884_Remoting_must_support_reply_to_Routee()
{
Expand Down Expand Up @@ -700,14 +668,6 @@ protected override void OnReceive(object message)
}
}

class TestResolver : IResolver
{
public T Resolve<T>(object[] args)
{
return Activator.CreateInstance(typeof(T), args).AsInstanceOf<T>();
}
}


#endregion
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ private Props PropsFromProto(Proto.Msg.PropsData protoProps)
);
}

return new Props(DeployFromProto(protoProps.Deploy), actorClass, args);
return Props.Create(actorClass, args).WithDeploy(DeployFromProto(protoProps.Deploy));
}

//
Expand Down
Loading

0 comments on commit b4093ec

Please sign in to comment.