Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1700 Feature/reverted endpoint managment files to fix rejoin issues #1755

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions src/core/Akka.Remote.Tests/EndpointRegistrySpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void EndpointRegistry_must_be_able_to_register_a_writeable_endpoint_and_p
var reg = new EndpointRegistry();
Assert.Null(reg.WritableEndpointWithPolicyFor(address1));

Assert.Equal(actorA, reg.RegisterWritableEndpoint(address1, actorA,null,null));
Assert.Equal(actorA, reg.RegisterWritableEndpoint(address1, actorA,null));

Assert.IsType<EndpointManager.Pass>(reg.WritableEndpointWithPolicyFor(address1));
Assert.Equal(actorA, reg.WritableEndpointWithPolicyFor(address1).AsInstanceOf<EndpointManager.Pass>().Endpoint);
Expand All @@ -52,8 +52,8 @@ public void EndpointRegistry_must_be_able_to_register_a_readonly_endpoint()
var reg = new EndpointRegistry();
Assert.Null(reg.ReadOnlyEndpointFor(address1));

Assert.Equal(actorA, reg.RegisterReadOnlyEndpoint(address1, actorA, 0));
Assert.Equal(Tuple.Create(actorA, 0), reg.ReadOnlyEndpointFor(address1));
Assert.Equal(actorA, reg.RegisterReadOnlyEndpoint(address1, actorA));
Assert.Equal(actorA, reg.ReadOnlyEndpointFor(address1));
Assert.Null(reg.WritableEndpointWithPolicyFor(address1));
Assert.False(reg.IsWritable(actorA));
Assert.True(reg.IsReadOnly(actorA));
Expand All @@ -67,10 +67,10 @@ public void EndpointRegistry_must_be_able_to_register_writable_and_readonly_endp
Assert.Null(reg.ReadOnlyEndpointFor(address1));
Assert.Null(reg.WritableEndpointWithPolicyFor(address1));

Assert.Equal(actorA, reg.RegisterReadOnlyEndpoint(address1, actorA, 1));
Assert.Equal(actorB, reg.RegisterWritableEndpoint(address1, actorB, null,null));
Assert.Equal(actorA, reg.RegisterReadOnlyEndpoint(address1, actorA));
Assert.Equal(actorB, reg.RegisterWritableEndpoint(address1, actorB, null));

Assert.Equal(Tuple.Create(actorA,1), reg.ReadOnlyEndpointFor(address1));
Assert.Equal(actorA, reg.ReadOnlyEndpointFor(address1));
Assert.Equal(actorB, reg.WritableEndpointWithPolicyFor(address1).AsInstanceOf<EndpointManager.Pass>().Endpoint);

Assert.False(reg.IsWritable(actorA));
Expand All @@ -85,7 +85,7 @@ public void EndpointRegistry_must_be_able_to_register_Gated_policy_for_an_addres
{
var reg = new EndpointRegistry();
Assert.Null(reg.WritableEndpointWithPolicyFor(address1));
reg.RegisterWritableEndpoint(address1, actorA, null, null);
reg.RegisterWritableEndpoint(address1, actorA, null);
var deadline = Deadline.Now;
reg.MarkAsFailed(actorA, deadline);
Assert.Equal(deadline, reg.WritableEndpointWithPolicyFor(address1).AsInstanceOf<EndpointManager.Gated>().TimeOfRelease);
Expand All @@ -97,7 +97,7 @@ public void EndpointRegistry_must_be_able_to_register_Gated_policy_for_an_addres
public void EndpointRegistry_must_remove_readonly_endpoints_if_marked_as_failed()
{
var reg = new EndpointRegistry();
reg.RegisterReadOnlyEndpoint(address1, actorA, 2);
reg.RegisterReadOnlyEndpoint(address1, actorA);
reg.MarkAsFailed(actorA, Deadline.Now);
Assert.Null(reg.ReadOnlyEndpointFor(address1));
}
Expand All @@ -106,8 +106,8 @@ public void EndpointRegistry_must_remove_readonly_endpoints_if_marked_as_failed(
public void EndpointRegistry_must_keep_tombstones_when_removing_an_endpoint()
{
var reg = new EndpointRegistry();
reg.RegisterWritableEndpoint(address1, actorA, null, null);
reg.RegisterWritableEndpoint(address2, actorB, null, null);
reg.RegisterWritableEndpoint(address1, actorA, null);
reg.RegisterWritableEndpoint(address2, actorB, null);
var deadline = Deadline.Now;
reg.MarkAsFailed(actorA, deadline);
reg.MarkAsQuarantined(address2, 42, deadline);
Expand All @@ -124,8 +124,8 @@ public void EndpointRegistry_must_keep_tombstones_when_removing_an_endpoint()
public void EndpointRegistry_should_prune_outdated_Gated_directives_properly()
{
var reg = new EndpointRegistry();
reg.RegisterWritableEndpoint(address1, actorA, null, null);
reg.RegisterWritableEndpoint(address2, actorB, null, null);
reg.RegisterWritableEndpoint(address1, actorA, null);
reg.RegisterWritableEndpoint(address2, actorB, null);
reg.MarkAsFailed(actorA, Deadline.Now);
var farIntheFuture = Deadline.Now + TimeSpan.FromSeconds(60);
reg.MarkAsFailed(actorB, farIntheFuture);
Expand Down
136 changes: 31 additions & 105 deletions src/core/Akka.Remote/EndpointManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,16 @@ protected EndpointPolicy(bool isTombstone)
/// </summary>
public class Pass : EndpointPolicy
{
public Pass(IActorRef endpoint, int? uid, int? refuseUid)
public Pass(IActorRef endpoint, int? uid)
: base(false)
{
Uid = uid;
Endpoint = endpoint;
RefuseUid = refuseUid;
}

public IActorRef Endpoint { get; private set; }

public int? Uid { get; private set; }

public int? RefuseUid { get; private set; }
}

/// <summary>
Expand Down Expand Up @@ -346,24 +343,6 @@ private void HandleStashedInbound(IActorRef endpoint, bool writerIsIdle)
HandleInboundAssociation(ia, writerIsIdle);
}

private void KeepQuarantinedOr(Address remoteAddress, Action body)
{
var uid = _endpoints.RefuseUid(remoteAddress);
if (uid.HasValue)
{
_log.Info(
"Quarantined address [{0}] is still unreachable or has not been restarted. Keeping it quarantined.",
remoteAddress);
// Restoring Quarantine marker overwritten by a Pass(endpoint, refuseUid) pair while probing remote system.
_endpoints.MarkAsQuarantined(remoteAddress, uid.Value, Deadline.Now + _settings.QuarantineDuration);
}
else
{
body();
}
}


#region ActorBase overrides

protected override SupervisorStrategy SupervisorStrategy()
Expand All @@ -375,30 +354,18 @@ protected override SupervisorStrategy SupervisorStrategy()
ex.Match()
.With<InvalidAssociation>(ia =>
{
KeepQuarantinedOr(ia.RemoteAddress, () =>
{
var causedBy = ia.InnerException == null
? ""
: string.Format("Caused by: [{0}]", ia.InnerException);
_log.Warning("Tried to associate with unreachable remote address [{0}]. Address is now gated for {1} ms, all messages to this address will be delivered to dead letters. Reason: [{2}] {3}",
ia.RemoteAddress, _settings.RetryGateClosedFor.TotalMilliseconds, ia.Message, causedBy);
_endpoints.MarkAsFailed(Sender, Deadline.Now + _settings.RetryGateClosedFor);
});

if (ia.DisassociationInfo.HasValue && ia.DisassociationInfo == DisassociateInfo.Quarantined)
{
//TODO: add context.system.eventStream.publish(ThisActorSystemQuarantinedEvent(localAddress, remoteAddress))
}
_log.Warning("Tried to associate with unreachable remote address [{0}]. Address is now gated for {1} ms, all messages to this address will be delivered to dead letters. Reason: [{2}]",
ia.RemoteAddress, _settings.RetryGateClosedFor.TotalMilliseconds, ia.Message);
_endpoints.MarkAsFailed(Sender, Deadline.Now + _settings.RetryGateClosedFor);
AddressTerminatedTopic.Get(Context.System).Publish(new AddressTerminated(ia.RemoteAddress));
directive = Directive.Stop;
})
.With<ShutDownAssociation>(shutdown =>
{
KeepQuarantinedOr(shutdown.RemoteAddress, () =>
{
_log.Debug("Remote system with address [{0}] has shut down. Address is now gated for {1}ms, all messages to this address will be delivered to dead letters.",
shutdown.RemoteAddress, _settings.RetryGateClosedFor.TotalMilliseconds);
_endpoints.MarkAsFailed(Sender, Deadline.Now + _settings.RetryGateClosedFor);
});
_log.Debug("Remote system with address [{0}] has shut down. Address is now gated for {1}ms, all messages to this address will be delivered to dead letters.",
shutdown.RemoteAddress, _settings.RetryGateClosedFor.TotalMilliseconds);
_endpoints.MarkAsFailed(Sender, Deadline.Now + _settings.RetryGateClosedFor);
AddressTerminatedTopic.Get(Context.System).Publish(new AddressTerminated(shutdown.RemoteAddress));
directive = Directive.Stop;
})
.With<HopelessAssociation>(hopeless =>
Expand Down Expand Up @@ -549,66 +516,25 @@ protected void Accepting()

Receive<Quarantine>(quarantine =>
{
//Stop writers
var policy =
Tuple.Create(_endpoints.WritableEndpointWithPolicyFor(quarantine.RemoteAddress), quarantine.Uid);
if (policy.Item1 is Pass && policy.Item2 == null)
{
var endpoint = policy.Item1.AsInstanceOf<Pass>().Endpoint;
Context.Stop(endpoint);
_log.Warning("Association to [{0}] with unknown UID is reported as quarantined, but " +
"address cannot be quarantined without knowing the UID, gating instead for {1} ms.", quarantine.RemoteAddress, _settings.RetryGateClosedFor.TotalMilliseconds);
_endpoints.MarkAsFailed(endpoint, Deadline.Now + _settings.RetryGateClosedFor);
}
else if (policy.Item1 is Pass && policy.Item2 != null)
{
var pass = policy.Item1 as Pass;
if (pass.Uid == quarantine.Uid)
Context.Stop(pass.Endpoint);
}
else
{
// Do nothing, because either:
// A: we don't know yet the UID of the writer, it will be checked against current quarantine state later
// B: we know the UID, but it does not match with the UID to be quarantined
}

// Stop inbound read-only associations
var readPolicy = Tuple.Create(_endpoints.ReadOnlyEndpointFor(quarantine.RemoteAddress), quarantine.Uid);
if (readPolicy.Item1?.Item1 != null && quarantine.Uid == null)
Context.Stop(readPolicy.Item1.Item1);
else if (readPolicy.Item1?.Item1 != null && quarantine.Uid != null && readPolicy.Item1?.Item2 == quarantine.Uid) { Context.Stop(readPolicy.Item1.Item1); }
else { } // nothing to stop

Func<AkkaProtocolHandle, bool> matchesQuarantine = handle => handle.RemoteAddress.Equals(quarantine.RemoteAddress) &&
quarantine.Uid == handle.HandshakeInfo.Uid;

// Stop all matching pending read handoffs
_pendingReadHandoffs = _pendingReadHandoffs.Where(x =>
//Stop writers
if (_endpoints.WritableEndpointWithPolicyFor(quarantine.RemoteAddress) is Pass)
{
var drop = matchesQuarantine(x.Value);
// Side-effecting here
if (drop)
var pass = (Pass)_endpoints.WritableEndpointWithPolicyFor(quarantine.RemoteAddress);
Context.Stop(pass.Endpoint);
if (!pass.Uid.HasValue)
{
x.Value.Disassociate();
Context.Stop(x.Key);
_log.Warning("Association to [{0}] with unknown UID is reported as quarantined, but address cannot be quarantined without knowing the UID, gated instead for {0} ms",
quarantine.RemoteAddress, _settings.RetryGateClosedFor.TotalMilliseconds);
_endpoints.MarkAsFailed(pass.Endpoint, Deadline.Now + _settings.RetryGateClosedFor);
}
return !drop;
}).ToDictionary(key => key.Key, value => value.Value);
}

// Stop all matching stashed connections
_stashedInbound = _stashedInbound.Select(x =>
//Stop inbound read-only association
var read = _endpoints.ReadOnlyEndpointFor(quarantine.RemoteAddress);
if (read != null)
{
var associations = x.Value.Where(assoc =>
{
var handle = assoc.Association.AsInstanceOf<AkkaProtocolHandle>();
var drop = matchesQuarantine(handle);
if (drop)
handle.Disassociate();
return !drop;
}).ToList();
return new KeyValuePair<IActorRef, List<InboundAssociation>>(x.Key, associations);
}).ToDictionary(k => k.Key, v => v.Value);
Context.Stop((IInternalActorRef)read);
}

if (quarantine.Uid.HasValue)
{
Expand All @@ -623,7 +549,7 @@ protected void Accepting()
Func<int?, IActorRef> createAndRegisterWritingEndpoint = refuseUid => _endpoints.RegisterWritableEndpoint(recipientAddress,
CreateEndpoint(recipientAddress, send.Recipient.LocalAddressToUse,
_transportMapping[send.Recipient.LocalAddressToUse], _settings, writing: true,
handleOption: null, refuseUid: refuseUid), uid: null, refuseUid: refuseUid);
handleOption: null, refuseUid: refuseUid), uid: null);

_endpoints.WritableEndpointWithPolicyFor(recipientAddress).Match()
.With<Pass>(
Expand Down Expand Up @@ -725,7 +651,7 @@ private void HandleInboundAssociation(InboundAssociation ia, bool writerIsIdle)
var handle = ((AkkaProtocolHandle)ia.Association);
if (readonlyEndpoint != null)
{
var endpoint = readonlyEndpoint.Item1;
var endpoint = readonlyEndpoint;
if (_pendingReadHandoffs.ContainsKey(endpoint)) _pendingReadHandoffs[endpoint].Disassociate();
_pendingReadHandoffs.AddOrSet(endpoint, handle);
endpoint.Tell(new EndpointWriter.TakeOver(handle, Self));
Expand Down Expand Up @@ -757,7 +683,7 @@ private void HandleInboundAssociation(InboundAssociation ia, bool writerIsIdle)
}
else
{
CreateAndRegisterEndpoint(handle, _endpoints.RefuseUid(handle.RemoteAddress));
CreateAndRegisterEndpoint(handle, pass.Uid);
}
}
else if (pass != null) // has a UID value
Expand All @@ -779,7 +705,7 @@ private void HandleInboundAssociation(InboundAssociation ia, bool writerIsIdle)
}
else
{
CreateAndRegisterEndpoint(handle, _endpoints.RefuseUid(handle.RemoteAddress));
CreateAndRegisterEndpoint(handle, null);
}
}
}
Expand Down Expand Up @@ -871,7 +797,7 @@ private void AcceptPendingReader(IActorRef takingOverFrom)
_eventPublisher.NotifyListeners(new AssociatedEvent(handle.LocalAddress, handle.RemoteAddress, inbound: true));
var endpoint = CreateEndpoint(handle.RemoteAddress, handle.LocalAddress,
_transportMapping[handle.LocalAddress], _settings, false, handle, refuseUid: null);
_endpoints.RegisterReadOnlyEndpoint(handle.RemoteAddress, endpoint, handle.HandshakeInfo.Uid);
_endpoints.RegisterReadOnlyEndpoint(handle.RemoteAddress, endpoint);
}
}

Expand Down Expand Up @@ -899,11 +825,11 @@ private void CreateAndRegisterEndpoint(AkkaProtocolHandle handle, int? refuseUid

if (writing)
{
_endpoints.RegisterWritableEndpoint(handle.RemoteAddress, endpoint, handle.HandshakeInfo.Uid, refuseUid);
_endpoints.RegisterWritableEndpoint(handle.RemoteAddress, endpoint, handle.HandshakeInfo.Uid);
}
else
{
_endpoints.RegisterReadOnlyEndpoint(handle.RemoteAddress, endpoint, handle.HandshakeInfo.Uid);
_endpoints.RegisterReadOnlyEndpoint(handle.RemoteAddress, endpoint);
if(!_endpoints.HasWriteableEndpointFor(handle.RemoteAddress))
_endpoints.RemovePolicy(handle.RemoteAddress);
}
Expand All @@ -919,7 +845,7 @@ private IActorRef CreateEndpoint(
int? refuseUid = null)
{
System.Diagnostics.Debug.Assert(_transportMapping.ContainsKey(localAddress));
System.Diagnostics.Debug.Assert(writing || refuseUid == null);
//System.Diagnostics.Debug.Assert(writing || refuseUid == null);

IActorRef endpointActor;

Expand Down
Loading