Skip to content

Commit

Permalink
. (#1770)
Browse files Browse the repository at this point in the history
  • Loading branch information
rogeralsing authored Sep 25, 2022
1 parent 260cc5f commit dc7ec97
Show file tree
Hide file tree
Showing 15 changed files with 36 additions and 28 deletions.
2 changes: 1 addition & 1 deletion src/Proto.Cluster.Identity.MongoDb/MongoIdentityStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ CancellationToken ct
Logger.LogWarning(x, "Mongo connection failure, retrying");
}

throw new StorageFailure($"Failed to connect to MongoDB while looking up key {key}");
throw new StorageFailureException($"Failed to connect to MongoDB while looking up key {key}");
}

private string GetKey(ClusterIdentity clusterIdentity) => $"{_clusterName}/{clusterIdentity}";
Expand Down
2 changes: 1 addition & 1 deletion src/Proto.Cluster/DefaultClusterContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ private async ValueTask RemoveFromSource(ClusterIdentity clusterIdentity, PidSou
return pid;
}
}
catch (Exception e) when (e is not IdentityIsBlocked)
catch (Exception e) when (e is not IdentityIsBlockedException)
{
e.CheckFailFast();

Expand Down
8 changes: 4 additions & 4 deletions src/Proto.Cluster/Gossip/GossipStateManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ Func<T, TV> extractValue
{
logger?.LogDebug("No members found for consensus check");

return (false, default);
return (false, default!);
}

logger?.LogDebug("Checking consensus");
Expand All @@ -155,7 +155,7 @@ Func<T, TV> extractValue
{
logger?.LogDebug("I can't find myself");

return (false, default);
return (false, default!);
}

var ownValue = GetConsensusValue(ownMemberState);
Expand All @@ -164,7 +164,7 @@ Func<T, TV> extractValue
{
logger?.LogDebug("I don't have any value for {Key}", valueKey);

return (false, default);
return (false, default!);
}

foreach (var (memberId, memberState) in state.Members)
Expand All @@ -181,7 +181,7 @@ Func<T, TV> extractValue

if (consensusValue is null || !ownValue.Equals(consensusValue))
{
return (false, default);
return (false, default!);
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/Proto.Cluster/Gossip/Gossiper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ public ConsensusCheckBuilder<T> InConsensusWith<TE>(string key, Func<TE, T> getV
var (member, state) = kv;
var value = state.Values.TryGetValue(key, out var any) ? unpack(any.Value) : default;

return (member, key, value);
return (member, key, value!);
};
}

Expand Down Expand Up @@ -371,7 +371,7 @@ private ConsensusCheck<T> Build()
);
}

return result;
return result!;
};
}

Expand Down Expand Up @@ -403,7 +403,7 @@ private ConsensusCheck<T> Build()
}

// ReSharper enable PossibleMultipleEnumeration
return consensus;
return consensus!;
};

KeyValuePair<string, GossipState.Types.GossipMemberState>[] GetValidMemberStates(GossipState state,
Expand Down
12 changes: 8 additions & 4 deletions src/Proto.Cluster/Identity/IIdentityStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,18 +137,22 @@ public StoredActivation(string memberId, PID pid)
public string MemberId { get; }
}

public class StorageFailure : Exception
#pragma warning disable RCS1194
public class StorageFailureException : Exception
#pragma warning restore RCS1194
{
public StorageFailure(string message) : base(message)
public StorageFailureException(string message) : base(message)
{
}

public StorageFailure(string message, Exception innerException) : base(message, innerException)
public StorageFailureException(string message, Exception innerException) : base(message, innerException)
{
}
}

public class LockNotFoundException : StorageFailure
#pragma warning disable RCS1194
public class LockNotFoundException : StorageFailureException
#pragma warning restore RCS1194
{
public LockNotFoundException(string message) : base(message)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ namespace Proto.Cluster.Identity;
/// <summary>
/// Lets the caller know that the identity is not available to spawn.
/// </summary>
public class IdentityIsBlocked : Exception
#pragma warning disable RCS1194
public class IdentityIsBlockedException : Exception
#pragma warning restore RCS1194
{
public IdentityIsBlocked(ClusterIdentity blockedIdentity)
public IdentityIsBlockedException(ClusterIdentity blockedIdentity)
{
BlockedIdentity = blockedIdentity;
}
Expand Down
2 changes: 1 addition & 1 deletion src/Proto.Cluster/Identity/IdentityStorageLookup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public IdentityStorageLookup(IIdentityStorage storage)

if (res?.IdentityBlocked == true)
{
throw new IdentityIsBlocked(clusterIdentity);
throw new IdentityIsBlockedException(clusterIdentity);
}

return res?.Pid;
Expand Down
6 changes: 3 additions & 3 deletions src/Proto.Cluster/Identity/IdentityStorageWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ async Task<PidResult> Inner()

if (activator == null)
{
return null;
return null!;
}

//try to acquire global lock
Expand Down Expand Up @@ -178,7 +178,7 @@ async Task<PidResult> Inner()
{
if (_cluster.System.Shutdown.IsCancellationRequested)
{
return null;
return null!;
}

if (_shouldThrottle().IsOpen())
Expand All @@ -192,7 +192,7 @@ async Task<PidResult> Inner()
{
if (_cluster.System.Shutdown.IsCancellationRequested)
{
return null;
return null!;
}

if (_shouldThrottle().IsOpen())
Expand Down
2 changes: 1 addition & 1 deletion src/Proto.Cluster/LegacyClusterContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ private async ValueTask RemoveFromSource(ClusterIdentity clusterIdentity, PidSou
return pid;
}
}
catch (Exception e) when (e is not IdentityIsBlocked)
catch (Exception e) when (e is not IdentityIsBlockedException)
{
e.CheckFailFast();

Expand Down
4 changes: 2 additions & 2 deletions src/Proto.Cluster/Partition/PartitionIdentityLookup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public PartitionIdentityLookup(PartitionConfig? config)

if (resp?.InvalidIdentity == true)
{
throw new IdentityIsBlocked(clusterIdentity);
throw new IdentityIsBlockedException(clusterIdentity);
}

if (_config.DeveloperLogging)
Expand Down Expand Up @@ -167,7 +167,7 @@ public PartitionIdentityLookup(PartitionConfig? config)

return null;
}
catch (Exception e) when (e is not IdentityIsBlocked)
catch (Exception e) when (e is not IdentityIsBlockedException)
{
e.CheckFailFast();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public PartitionActivatorLookup(TimeSpan getPidTimeout)

if (resp.InvalidIdentity)
{
throw new IdentityIsBlocked(clusterIdentity);
throw new IdentityIsBlockedException(clusterIdentity);
}

return resp?.Pid;
Expand All @@ -89,7 +89,7 @@ public PartitionActivatorLookup(TimeSpan getPidTimeout)

return null;
}
catch (Exception e) when (e is not IdentityIsBlocked)
catch (Exception e) when (e is not IdentityIsBlockedException)
{
e.CheckFailFast();

Expand Down
2 changes: 2 additions & 0 deletions src/Proto.Cluster/PubSub/BatchingProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,9 @@ private class PubSubBatchWithReceipts
}
}

#pragma warning disable RCS1194
public class ProducerQueueFullException : Exception
#pragma warning restore RCS1194
{
public ProducerQueueFullException(string topic) : base($"Producer for topic {topic} has full queue")
{
Expand Down
4 changes: 2 additions & 2 deletions src/Proto.Cluster/SingleNode/SingleNodeLookup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public SingleNodeLookup(TimeSpan getPidTimeout)

if (resp.InvalidIdentity)
{
throw new IdentityIsBlocked(clusterIdentity);
throw new IdentityIsBlockedException(clusterIdentity);
}

return resp?.Pid;
Expand All @@ -67,7 +67,7 @@ public SingleNodeLookup(TimeSpan getPidTimeout)

return null;
}
catch (Exception e) when (e is not IdentityIsBlocked)
catch (Exception e) when (e is not IdentityIsBlockedException)
{
e.CheckFailFast();
Logger.LogError(e, "[SingleNode] Error occured requesting remote PID {@Request}", req);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ ChaosMonkeyRedisIdentityClusterFixture clusterFixture
internal static class RedisFixture
{
private static readonly Lazy<ConnectionMultiplexer> LazyConnection = new(()
=> ConnectionMultiplexer.Connect(TestConfig.Configuration.GetConnectionString("Redis")));
=> ConnectionMultiplexer.Connect(TestConfig.Configuration.GetConnectionString("Redis")!));

static RedisFixture()
{
Expand Down
2 changes: 1 addition & 1 deletion tests/Proto.Cluster.Tests/ClusterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ await Tracing.Trace(async () =>

await member.Invoking(async m => await m.RequestAsync<Pong>(invalidIdentity, message, timeout))
.Should()
.ThrowExactlyAsync<IdentityIsBlocked>();
.ThrowExactlyAsync<IdentityIsBlockedException>();
}, _testOutputHelper
);

Expand Down

0 comments on commit dc7ec97

Please sign in to comment.