Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable to activate Grain after clearing its state #9165

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using Orleans.Configuration;
using Orleans.Persistence.DynamoDB;
using Orleans.Runtime;
using Orleans.Serialization.Serializers;

namespace Orleans.Storage
{
Expand All @@ -33,6 +34,7 @@ public class DynamoDBGrainStorage : IGrainStorage, ILifecycleParticipant<ISiloLi
private readonly DynamoDBStorageOptions options;
private readonly ILogger logger;
private readonly IServiceProvider serviceProvider;
private readonly IActivatorProvider activatorProvider;
private readonly string name;

private DynamoDBStorage storage;
Expand All @@ -50,6 +52,7 @@ public DynamoDBGrainStorage(
this.logger = logger;
this.options = options;
this.serviceProvider = serviceProvider;
this.activatorProvider = this.serviceProvider.GetRequiredService<IActivatorProvider>();
}

public void Participate(ISiloLifecycle lifecycle)
Expand Down Expand Up @@ -152,6 +155,12 @@ public async Task ReadStateAsync<T>(string grainType, GrainId grainId, IGrainSta
grainState.State = loadedState ?? Activator.CreateInstance<T>();
grainState.ETag = record.ETag.ToString();
}
else
{
grainState.RecordExists = false;
grainState.ETag = null;
grainState.State = this.activatorProvider.GetActivator<T>().Create();
}

// Else leave grainState in previous default condition
}
Expand Down Expand Up @@ -294,6 +303,7 @@ public async Task ClearStateAsync<T>(string grainType, GrainId grainId, IGrainSt
else
{
await WriteStateInternal(grainState, record, true);
grainState.State = this.activatorProvider.GetActivator<T>().Create();
}
}
catch (Exception exc)
Expand Down Expand Up @@ -330,7 +340,8 @@ internal T ConvertFromStorageFormat<T>(GrainStateRecord entity)
T dataValue = default;
try
{
dataValue = this.options.GrainStorageSerializer.Deserialize<T>(entity.State);
if (entity.State is { Length: > 0 })
dataValue = this.options.GrainStorageSerializer.Deserialize<T>(entity.State);
}
catch (Exception exc)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using Orleans.Persistence.AzureStorage;
using Orleans.Providers.Azure;
using Orleans.Runtime;
using Orleans.Serialization.Serializers;
using LogLevel = Microsoft.Extensions.Logging.LogLevel;

namespace Orleans.Storage
Expand All @@ -28,6 +29,7 @@ public class AzureTableGrainStorage : IGrainStorage, IRestExceptionDecoder, ILif
private readonly AzureTableStorageOptions options;
private readonly ClusterOptions clusterOptions;
private readonly IGrainStorageSerializer storageSerializer;
private readonly IActivatorProvider activatorProvider;
private readonly ILogger logger;

private GrainStateTableDataManager tableDataManager;
Expand Down Expand Up @@ -55,6 +57,7 @@ public AzureTableGrainStorage(
this.clusterOptions = clusterOptions.Value;
this.name = name;
this.storageSerializer = options.GrainStorageSerializer;
this.activatorProvider = services.GetRequiredService<IActivatorProvider>();
this.logger = logger;
}

Expand All @@ -81,6 +84,12 @@ public async Task ReadStateAsync<T>(string grainType, GrainId grainId, IGrainSta
grainState.State = loadedState ?? Activator.CreateInstance<T>();
grainState.ETag = entity.ETag.ToString();
}
else
{
grainState.RecordExists = false;
grainState.ETag = null;
grainState.State = this.activatorProvider.GetActivator<T>().Create();
}
// Else leave grainState in previous default condition
}

Expand Down Expand Up @@ -160,6 +169,7 @@ public async Task ClearStateAsync<T>(string grainType, GrainId grainId, IGrainSt
else
{
await DoOptimisticUpdate(() => tableDataManager.Write(entity), grainType, grainId, this.options.TableName, grainState.ETag).ConfigureAwait(false);
grainState.State = this.activatorProvider.GetActivator<T>().Create();
}

grainState.ETag = entity.ETag.ToString(); // Update in-memory data to the new ETag
Expand Down Expand Up @@ -351,7 +361,8 @@ internal T ConvertFromStorageFormat<T>(TableEntity entity)
var input = binaryData.Length > 0
? new BinaryData(binaryData)
: new BinaryData(stringData);
dataValue = this.storageSerializer.Deserialize<T>(input);
if(input.Length > 0)
dataValue = this.storageSerializer.Deserialize<T>(input);
}
catch (Exception exc)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
using System.Diagnostics;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Orleans.Configuration;
using Orleans.Providers;
using Orleans.Runtime;
using Orleans.Serialization;
using Orleans.Storage;
using TestExtensions;
using UnitTests.Persistence;
Expand Down Expand Up @@ -54,21 +56,22 @@ public async Task PersistenceProvider_DynamoDB_WriteRead(int? stringLength, bool
}

[SkippableTheory, TestCategory("Functional")]
[InlineData(null, false)]
[InlineData(null, true)]
[InlineData(400_000, false)]
[InlineData(400_000, true)]
public async Task PersistenceProvider_DynamoDB_WriteClearRead(int? stringLength, bool useJson)
[InlineData(null, false, false)]
[InlineData(null, true, false)]
[InlineData(400_000, false, false)]
[InlineData(400_000, true, false)]
public async Task PersistenceProvider_DynamoDB_WriteClearRead(int? stringLength, bool useJson, bool useFallback)
{
var testName = string.Format("{0}({1} = {2}, {3} = {4})",
var testName = string.Format("{0}({1} = {2}, {3} = {4}, {5} = {6})",
nameof(PersistenceProvider_DynamoDB_WriteClearRead),
nameof(stringLength), stringLength == null ? "default" : stringLength.ToString(),
nameof(useJson), useJson);
nameof(useJson), useJson,
nameof(useFallback), useFallback);

var grainState = TestStoreGrainState.NewRandomState(stringLength);
EnsureEnvironmentSupportsState(grainState);

var store = await InitDynamoDBGrainStorage(useJson);
var store = await InitDynamoDBGrainStorage(useJson, useFallback);

await Test_PersistenceProvider_WriteClearRead(testName, store, grainState);
}
Expand Down Expand Up @@ -161,20 +164,31 @@ public async Task DynamoDBStorage_ConvertToFromStorageFormat(int? stringLength,

private async Task<DynamoDBGrainStorage> InitDynamoDBGrainStorage(DynamoDBStorageOptions options)
{
options.GrainStorageSerializer = ActivatorUtilities.CreateInstance<JsonGrainStorageSerializer>(this.providerRuntime.ServiceProvider);
DynamoDBGrainStorage store = ActivatorUtilities.CreateInstance<DynamoDBGrainStorage>(this.providerRuntime.ServiceProvider, "StorageProviderTests", options);
ISiloLifecycleSubject lifecycle = ActivatorUtilities.CreateInstance<SiloLifecycleSubject>(this.providerRuntime.ServiceProvider, NullLogger<SiloLifecycleSubject>.Instance);
store.Participate(lifecycle);
await lifecycle.OnStart();
return store;
}

private Task<DynamoDBGrainStorage> InitDynamoDBGrainStorage(bool useJson = false)
private Task<DynamoDBGrainStorage> InitDynamoDBGrainStorage(bool useJson = false, bool useFallback = true)
{
var options = new DynamoDBStorageOptions
{
Service = AWSTestConstants.DynamoDbService,
};

var jsonOptions = this.providerRuntime.ServiceProvider.GetService<IOptions<OrleansJsonSerializerOptions>>();
var binarySerializer = new OrleansGrainStorageSerializer(this.providerRuntime.ServiceProvider.GetRequiredService<Serializer>());
var jsonSerializer = new JsonGrainStorageSerializer(new OrleansJsonSerializer(jsonOptions));

if (useFallback)
options.GrainStorageSerializer = useJson
? new GrainStorageSerializer(jsonSerializer, binarySerializer)
: new GrainStorageSerializer(binarySerializer, jsonSerializer);
else
options.GrainStorageSerializer = useJson ? jsonSerializer : binarySerializer;

return InitDynamoDBGrainStorage(options);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,21 +90,22 @@ public async Task PersistenceProvider_Azure_WriteRead(int? stringLength, bool us
}

[SkippableTheory, TestCategory("Functional"), TestCategory("AzureStorage")]
[InlineData(null, false)]
[InlineData(null, true)]
[InlineData(15 * 64 * 1024 - 256, false)]
[InlineData(15 * 32 * 1024 - 256, true)]
public async Task PersistenceProvider_Azure_WriteClearRead(int? stringLength, bool useJson)
[InlineData(null, false, false)]
[InlineData(null, true, false)]
[InlineData(15 * 64 * 1024 - 256, false, false)]
[InlineData(15 * 32 * 1024 - 256, true, false)]
public async Task PersistenceProvider_Azure_WriteClearRead(int? stringLength, bool useJson, bool useFallback)
{
var testName = string.Format("{0}({1} = {2}, {3} = {4})",
var testName = string.Format("{0}({1} = {2}, {3} = {4}, {5} = {6})",
nameof(PersistenceProvider_Azure_WriteClearRead),
nameof(stringLength), stringLength == null ? "default" : stringLength.ToString(),
nameof(useJson), useJson);
nameof(useJson), useJson,
nameof(useFallback), useFallback);

var grainState = TestStoreGrainState.NewRandomState(stringLength);
EnsureEnvironmentSupportsState(grainState);

var store = await InitAzureTableGrainStorage(useJson);
var store = await InitAzureTableGrainStorage(useJson, useFallback);

await Test_PersistenceProvider_WriteClearRead(testName, store, grainState);
}
Expand Down Expand Up @@ -300,7 +301,7 @@ private async Task<AzureTableGrainStorage> InitAzureTableGrainStorage(AzureTable
return store;
}

private async Task<AzureTableGrainStorage> InitAzureTableGrainStorage(bool useJson = false, bool useStringFormat = false, TypeNameHandling? typeNameHandling = null)
private async Task<AzureTableGrainStorage> InitAzureTableGrainStorage(bool useJson = false, bool useFallback = true, bool useStringFormat = false, TypeNameHandling? typeNameHandling = null)
{
if (useStringFormat && !useJson)
{
Expand All @@ -320,9 +321,12 @@ private async Task<AzureTableGrainStorage> InitAzureTableGrainStorage(bool useJs
// TODO change test to include more serializer?
var binarySerializer = new OrleansGrainStorageSerializer(this.providerRuntime.ServiceProvider.GetRequiredService<Serializer>());
var jsonSerializer = new JsonGrainStorageSerializer(new OrleansJsonSerializer(jsonOptions));
options.GrainStorageSerializer = useJson
? new GrainStorageSerializer(jsonSerializer, binarySerializer)
: new GrainStorageSerializer(binarySerializer, jsonSerializer);
if (useFallback)
options.GrainStorageSerializer = useJson
? new GrainStorageSerializer(jsonSerializer, binarySerializer)
: new GrainStorageSerializer(binarySerializer, jsonSerializer);
else
options.GrainStorageSerializer = useJson ? jsonSerializer : binarySerializer;

AzureTableGrainStorage store = ActivatorUtilities.CreateInstance<AzureTableGrainStorage>(this.providerRuntime.ServiceProvider, options, "TestStorage");
ISiloLifecycleSubject lifecycle = ActivatorUtilities.CreateInstance<SiloLifecycleSubject>(this.providerRuntime.ServiceProvider);
Expand Down
Loading