From de0a847927d15600129e55249d1fce341b13d0e9 Mon Sep 17 00:00:00 2001 From: joshvanl Date: Tue, 10 Oct 2023 19:11:06 +0100 Subject: [PATCH 01/15] Actor state TTL support Signed-off-by: joshvanl --- examples/Actor/DemoActor/DemoActor.cs | 2 +- .../Communication/ActorStateResponse.cs | 50 ++++++++++ src/Dapr.Actors/Constants.cs | 3 +- src/Dapr.Actors/DaprHttpInteractor.cs | 15 ++- src/Dapr.Actors/IDaprInteractor.cs | 4 +- src/Dapr.Actors/Runtime/ActorStateChange.cs | 40 +++++++- src/Dapr.Actors/Runtime/ActorStateManager.cs | 65 +++++++------ src/Dapr.Actors/Runtime/ConditionalValue.cs | 4 +- src/Dapr.Actors/Runtime/DaprStateProvider.cs | 26 ++++-- src/Dapr.Actors/Runtime/IActorStateManager.cs | 18 ++-- .../Runtime/IActorStateSerializer.cs | 2 +- .../Protos/dapr/proto/common/v1/common.proto | 2 +- .../Protos/dapr/proto/dapr/v1/dapr.proto | 92 ++++++++++++++----- test/Dapr.Actors.Test/Runtime/ActorTests.cs | 2 +- .../Dapr.E2E.Test.Actors/State/IStateActor.cs | 22 +++++ .../Actors/ReentrantActor.cs | 4 +- .../Dapr.E2E.Test.App/Actors/ReminderActor.cs | 2 +- test/Dapr.E2E.Test.App/Actors/StateActor.cs | 38 ++++++++ test/Dapr.E2E.Test.App/Actors/TimerActor.cs | 2 +- .../Actors/E2ETests.StateTests.cs | 44 +++++++++ .../configuration/featureconfig.yaml | 2 + 21 files changed, 357 insertions(+), 82 deletions(-) create mode 100644 src/Dapr.Actors/Communication/ActorStateResponse.cs create mode 100644 test/Dapr.E2E.Test.Actors/State/IStateActor.cs create mode 100644 test/Dapr.E2E.Test.App/Actors/StateActor.cs create mode 100644 test/Dapr.E2E.Test/Actors/E2ETests.StateTests.cs diff --git a/examples/Actor/DemoActor/DemoActor.cs b/examples/Actor/DemoActor/DemoActor.cs index 0ab633fcd..19d30cc92 100644 --- a/examples/Actor/DemoActor/DemoActor.cs +++ b/examples/Actor/DemoActor/DemoActor.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/src/Dapr.Actors/Communication/ActorStateResponse.cs b/src/Dapr.Actors/Communication/ActorStateResponse.cs new file mode 100644 index 000000000..1a990ce46 --- /dev/null +++ b/src/Dapr.Actors/Communication/ActorStateResponse.cs @@ -0,0 +1,50 @@ +// ------------------------------------------------------------------------ +// Copyright 2023 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Actors.Communication +{ + using System; + + /// + /// Represents a response from fetching an actor state key. + /// + internal class ActorStateResponse + { + /// + /// Initializes a new instance of the class. + /// + /// The response value. + /// The time to live expiration time. + public ActorStateResponse(T value, DateTime? ttlExpireTime) + { + this.Value = value; + this.TTLExpireTime = ttlExpireTime; + } + + /// + /// Gets the response value as a string. + /// + /// + /// The response value as a string. + /// + public T Value { get; } + + /// + /// Gets the time to live expiration time. + /// + /// + /// The time to live expiration time. + /// + public DateTime? TTLExpireTime { get; } + } +} diff --git a/src/Dapr.Actors/Constants.cs b/src/Dapr.Actors/Constants.cs index be2d8f49f..038caf101 100644 --- a/src/Dapr.Actors/Constants.cs +++ b/src/Dapr.Actors/Constants.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ internal static class Constants public const string RequestHeaderName = "X-DaprRequestHeader"; public const string ErrorResponseHeaderName = "X-DaprErrorResponseHeader"; public const string ReentrancyRequestHeaderName = "Dapr-Reentrancy-Id"; + public const string TTLResponseHeaderName = "Metadata.ttlExpireTime"; public const string Dapr = "dapr"; public const string Config = "config"; public const string State = "state"; diff --git a/src/Dapr.Actors/DaprHttpInteractor.cs b/src/Dapr.Actors/DaprHttpInteractor.cs index 4695375fb..b8cc74cdb 100644 --- a/src/Dapr.Actors/DaprHttpInteractor.cs +++ b/src/Dapr.Actors/DaprHttpInteractor.cs @@ -57,7 +57,7 @@ public DaprHttpInteractor( this.httpClient.Timeout = requestTimeout ?? this.httpClient.Timeout; } - public async Task GetStateAsync(string actorType, string actorId, string keyName, CancellationToken cancellationToken = default) + public async Task> GetStateAsync(string actorType, string actorId, string keyName, CancellationToken cancellationToken = default) { var relativeUrl = string.Format(CultureInfo.InvariantCulture, Constants.ActorStateKeyRelativeUrlFormat, actorType, actorId, keyName); @@ -72,7 +72,18 @@ HttpRequestMessage RequestFunc() using var response = await this.SendAsync(RequestFunc, relativeUrl, cancellationToken); var stringResponse = await response.Content.ReadAsStringAsync(); - return stringResponse; + + var ttlExpireTime = new DateTime(); + if (response.Headers.TryGetValues(Constants.TTLResponseHeaderName, out IEnumerable headerValues)) + { + var ttlExpireTimeString = headerValues.First(); + if (!string.IsNullOrEmpty(ttlExpireTimeString)) + { + ttlExpireTime = DateTime.Parse(ttlExpireTimeString, CultureInfo.InvariantCulture); + } + } + + return new ActorStateResponse(stringResponse, ttlExpireTime); } public Task SaveStateTransactionallyAsync(string actorType, string actorId, string data, CancellationToken cancellationToken = default) diff --git a/src/Dapr.Actors/IDaprInteractor.cs b/src/Dapr.Actors/IDaprInteractor.cs index 8f30aa18f..5849328a8 100644 --- a/src/Dapr.Actors/IDaprInteractor.cs +++ b/src/Dapr.Actors/IDaprInteractor.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -52,7 +52,7 @@ internal interface IDaprInteractor /// Name of key to get value for. /// Cancels the operation. /// A task that represents the asynchronous operation. - Task GetStateAsync(string actorType, string actorId, string keyName, CancellationToken cancellationToken = default); + Task> GetStateAsync(string actorType, string actorId, string keyName, CancellationToken cancellationToken = default); /// /// Invokes Actor method. diff --git a/src/Dapr.Actors/Runtime/ActorStateChange.cs b/src/Dapr.Actors/Runtime/ActorStateChange.cs index c09e48df6..18c7eaac1 100644 --- a/src/Dapr.Actors/Runtime/ActorStateChange.cs +++ b/src/Dapr.Actors/Runtime/ActorStateChange.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -27,7 +27,8 @@ public sealed class ActorStateChange /// The type of value associated with given actor state name. /// The value associated with given actor state name. /// The kind of state change for given actor state name. - public ActorStateChange(string stateName, Type type, object value, StateChangeKind changeKind) + /// The time to live for the state. + public ActorStateChange(string stateName, Type type, object value, StateChangeKind changeKind, DateTime? ttlExpireTime) { ArgumentVerifier.ThrowIfNull(stateName, nameof(stateName)); @@ -35,6 +36,7 @@ public ActorStateChange(string stateName, Type type, object value, StateChangeKi this.Type = type; this.Value = value; this.ChangeKind = changeKind; + this.TTLExpireTime = ttlExpireTime; } /// @@ -68,5 +70,39 @@ public ActorStateChange(string stateName, Type type, object value, StateChangeKi /// The kind of state change for given actor state name. /// public StateChangeKind ChangeKind { get; } + + /// + /// Gets the time to live for the state. + /// + /// + /// The time to live for the state. + /// + /// + /// If null, the state will not expire. + /// + public DateTime? TTLExpireTime { get; } + + /// + /// Gets the time to live in seconds for the state. + /// + /// + /// The time to live for the state. + /// + /// + /// If null, the state will not expire. + /// + public int? TTLInSeconds { + get + { + if (this.TTLExpireTime.HasValue) + { + return (int)Math.Ceiling((this.TTLExpireTime.Value - DateTime.UtcNow).TotalSeconds); + } + else + { + return null; + } + } + } } } diff --git a/src/Dapr.Actors/Runtime/ActorStateManager.cs b/src/Dapr.Actors/Runtime/ActorStateManager.cs index 9c752f56b..1b873e9af 100644 --- a/src/Dapr.Actors/Runtime/ActorStateManager.cs +++ b/src/Dapr.Actors/Runtime/ActorStateManager.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ using System.Threading; using System.Threading.Tasks; using Dapr.Actors.Resources; +using Dapr.Actors.Communication; namespace Dapr.Actors.Runtime { @@ -35,17 +36,17 @@ internal ActorStateManager(Actor actor) this.defaultTracker = new Dictionary(); } - public async Task AddStateAsync(string stateName, T value, CancellationToken cancellationToken) + public async Task AddStateAsync(string stateName, T value, CancellationToken cancellationToken, int? ttlInSeconds) { EnsureStateProviderInitialized(); - - if (!(await this.TryAddStateAsync(stateName, value, cancellationToken))) + + if (!(await this.TryAddStateAsync(stateName, value, cancellationToken, ttlInSeconds))) { throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, SR.ActorStateAlreadyExists, stateName)); } } - public async Task TryAddStateAsync(string stateName, T value, CancellationToken cancellationToken) + public async Task TryAddStateAsync(string stateName, T value, CancellationToken cancellationToken = default, int? ttlInSeconds = null) { ArgumentVerifier.ThrowIfNull(stateName, nameof(stateName)); @@ -60,7 +61,7 @@ public async Task TryAddStateAsync(string stateName, T value, Cancellat // Check if the property was marked as remove in the cache if (stateMetadata.ChangeKind == StateChangeKind.Remove) { - stateChangeTracker[stateName] = StateMetadata.Create(value, StateChangeKind.Update); + stateChangeTracker[stateName] = StateMetadata.Create(value, StateChangeKind.Update, ttlInSeconds: ttlInSeconds); return true; } @@ -72,7 +73,7 @@ public async Task TryAddStateAsync(string stateName, T value, Cancellat return false; } - stateChangeTracker[stateName] = StateMetadata.Create(value, StateChangeKind.Add); + stateChangeTracker[stateName] = StateMetadata.Create(value, StateChangeKind.Add, ttlInSeconds: ttlInSeconds); return true; } @@ -102,8 +103,8 @@ public async Task> TryGetStateAsync(string stateName, Can { var stateMetadata = stateChangeTracker[stateName]; - // Check if the property was marked as remove in the cache - if (stateMetadata.ChangeKind == StateChangeKind.Remove) + // Check if the property was marked as remove in the cache or is expired + if (stateMetadata.ChangeKind == StateChangeKind.Remove || stateMetadata.TTLExpireTime <= DateTime.UtcNow) { return new ConditionalValue(false, default); } @@ -114,13 +115,13 @@ public async Task> TryGetStateAsync(string stateName, Can var conditionalResult = await this.TryGetStateFromStateProviderAsync(stateName, cancellationToken); if (conditionalResult.HasValue) { - stateChangeTracker.Add(stateName, StateMetadata.Create(conditionalResult.Value, StateChangeKind.None)); + stateChangeTracker.Add(stateName, StateMetadata.Create(conditionalResult.Value.Value, StateChangeKind.None, conditionalResult.Value.TTLExpireTime)); } - return conditionalResult; + return new ConditionalValue(false, default); } - public async Task SetStateAsync(string stateName, T value, CancellationToken cancellationToken) + public async Task SetStateAsync(string stateName, T value, CancellationToken cancellationToken, int? ttlInSeconds) { ArgumentVerifier.ThrowIfNull(stateName, nameof(stateName)); @@ -141,11 +142,11 @@ public async Task SetStateAsync(string stateName, T value, CancellationToken } else if (await this.actor.Host.StateProvider.ContainsStateAsync(this.actorTypeName, this.actor.Id.ToString(), stateName, cancellationToken)) { - stateChangeTracker.Add(stateName, StateMetadata.Create(value, StateChangeKind.Update)); + stateChangeTracker.Add(stateName, StateMetadata.Create(value, StateChangeKind.Update, ttlInSeconds: ttlInSeconds)); } else { - stateChangeTracker[stateName] = StateMetadata.Create(value, StateChangeKind.Add); + stateChangeTracker[stateName] = StateMetadata.Create(value, StateChangeKind.Add, ttlInSeconds: ttlInSeconds); } } @@ -217,7 +218,7 @@ public async Task ContainsStateAsync(string stateName, CancellationToken c return false; } - public async Task GetOrAddStateAsync(string stateName, T value, CancellationToken cancellationToken) + public async Task GetOrAddStateAsync(string stateName, T value, CancellationToken cancellationToken, int? ttlInSeconds) { EnsureStateProviderInitialized(); @@ -231,7 +232,7 @@ public async Task GetOrAddStateAsync(string stateName, T value, Cancellati var changeKind = this.IsStateMarkedForRemove(stateName) ? StateChangeKind.Update : StateChangeKind.Add; var stateChangeTracker = GetContextualStateTracker(); - stateChangeTracker[stateName] = StateMetadata.Create(value, changeKind); + stateChangeTracker[stateName] = StateMetadata.Create(value, changeKind, ttlInSeconds: ttlInSeconds); return value; } @@ -239,7 +240,8 @@ public async Task AddOrUpdateStateAsync( string stateName, T addValue, Func updateValueFactory, - CancellationToken cancellationToken = default) + CancellationToken cancellationToken = default, + int? ttlInSeconds = null) { ArgumentVerifier.ThrowIfNull(stateName, nameof(stateName)); @@ -254,7 +256,7 @@ public async Task AddOrUpdateStateAsync( // Check if the property was marked as remove in the cache if (stateMetadata.ChangeKind == StateChangeKind.Remove) { - stateChangeTracker[stateName] = StateMetadata.Create(addValue, StateChangeKind.Update); + stateChangeTracker[stateName] = StateMetadata.Create(addValue, StateChangeKind.Update, ttlInSeconds: ttlInSeconds); return addValue; } @@ -272,13 +274,13 @@ public async Task AddOrUpdateStateAsync( var conditionalResult = await this.TryGetStateFromStateProviderAsync(stateName, cancellationToken); if (conditionalResult.HasValue) { - var newValue = updateValueFactory.Invoke(stateName, conditionalResult.Value); - stateChangeTracker.Add(stateName, StateMetadata.Create(newValue, StateChangeKind.Update)); + var newValue = updateValueFactory.Invoke(stateName, conditionalResult.Value.Value); + stateChangeTracker.Add(stateName, StateMetadata.Create(newValue, StateChangeKind.Update, ttlInSeconds: ttlInSeconds)); return newValue; } - stateChangeTracker[stateName] = StateMetadata.Create(addValue, StateChangeKind.Add); + stateChangeTracker[stateName] = StateMetadata.Create(addValue, StateChangeKind.Add, ttlInSeconds: ttlInSeconds); return addValue; } @@ -310,7 +312,7 @@ public async Task SaveStateAsync(CancellationToken cancellationToken = default) if (stateMetadata.ChangeKind != StateChangeKind.None) { stateChangeList.Add( - new ActorStateChange(stateName, stateMetadata.Type, stateMetadata.Value, stateMetadata.ChangeKind)); + new ActorStateChange(stateName, stateMetadata.Type, stateMetadata.Value, stateMetadata.ChangeKind, stateMetadata.TTLExpireTime)); if (stateMetadata.ChangeKind == StateChangeKind.Remove) { @@ -362,7 +364,7 @@ private bool IsStateMarkedForRemove(string stateName) return false; } - private Task> TryGetStateFromStateProviderAsync(string stateName, CancellationToken cancellationToken) + private Task>> TryGetStateFromStateProviderAsync(string stateName, CancellationToken cancellationToken) { EnsureStateProviderInitialized(); return this.actor.Host.StateProvider.TryLoadStateAsync(this.actorTypeName, this.actor.Id.ToString(), stateName, cancellationToken); @@ -392,11 +394,20 @@ private Dictionary GetContextualStateTracker() private sealed class StateMetadata { - private StateMetadata(object value, Type type, StateChangeKind changeKind) + private StateMetadata(object value, Type type, StateChangeKind changeKind, DateTime? ttlExpireTime = null, int? ttlInSeconds = null) { this.Value = value; this.Type = type; this.ChangeKind = changeKind; + + if (ttlExpireTime.HasValue && ttlInSeconds.HasValue) { + throw new ArgumentException("Cannot specify both TTLExpireTime and TTLInSeconds"); + } + if (ttlInSeconds.HasValue) { + this.TTLExpireTime = DateTime.UtcNow.AddSeconds(ttlInSeconds.Value); + } else { + this.TTLExpireTime = ttlExpireTime; + } } public object Value { get; set; } @@ -405,9 +416,11 @@ private StateMetadata(object value, Type type, StateChangeKind changeKind) public Type Type { get; } - public static StateMetadata Create(T value, StateChangeKind changeKind) + public DateTime? TTLExpireTime { get; set; } + + public static StateMetadata Create(T value, StateChangeKind changeKind, DateTime? ttlExpireTime = null, int? ttlInSeconds = null) { - return new StateMetadata(value, typeof(T), changeKind); + return new StateMetadata(value, typeof(T), changeKind, ttlExpireTime, ttlInSeconds); } public static StateMetadata CreateForRemove() diff --git a/src/Dapr.Actors/Runtime/ConditionalValue.cs b/src/Dapr.Actors/Runtime/ConditionalValue.cs index 1d2a197eb..ec4f3a5a6 100644 --- a/src/Dapr.Actors/Runtime/ConditionalValue.cs +++ b/src/Dapr.Actors/Runtime/ConditionalValue.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -42,4 +42,4 @@ public ConditionalValue(bool hasValue, TValue value) /// The value of the object. If HasValue is false, returns the default value for type of the TValue parameter. public TValue Value { get; } } -} \ No newline at end of file +} diff --git a/src/Dapr.Actors/Runtime/DaprStateProvider.cs b/src/Dapr.Actors/Runtime/DaprStateProvider.cs index ae86fb28b..9ace60fbf 100644 --- a/src/Dapr.Actors/Runtime/DaprStateProvider.cs +++ b/src/Dapr.Actors/Runtime/DaprStateProvider.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ namespace Dapr.Actors.Runtime using System.Text.Json; using System.Threading; using System.Threading.Tasks; + using Dapr.Actors.Communication; /// /// State Provider to interact with Dapr runtime. @@ -43,27 +44,27 @@ public DaprStateProvider(IDaprInteractor daprInteractor, JsonSerializerOptions j this.daprInteractor = daprInteractor; } - public async Task> TryLoadStateAsync(string actorType, string actorId, string stateName, CancellationToken cancellationToken = default) + public async Task>> TryLoadStateAsync(string actorType, string actorId, string stateName, CancellationToken cancellationToken = default) { - var result = new ConditionalValue(false, default); - var stringResult = await this.daprInteractor.GetStateAsync(actorType, actorId, stateName, cancellationToken); + var result = new ConditionalValue>(false, default); + var response = await this.daprInteractor.GetStateAsync(actorType, actorId, stateName, cancellationToken); - if (stringResult.Length != 0) + if (response.Value.Length != 0 && response.TTLExpireTime > DateTime.UtcNow) { T typedResult; // perform default json de-serialization if custom serializer was not provided. if (this.actorStateSerializer != null) { - var byteResult = Convert.FromBase64String(stringResult.Trim('"')); + var byteResult = Convert.FromBase64String(response.Value.Trim('"')); typedResult = this.actorStateSerializer.Deserialize(byteResult); } else { - typedResult = JsonSerializer.Deserialize(stringResult, jsonSerializerOptions); + typedResult = JsonSerializer.Deserialize(response.Value, jsonSerializerOptions); } - result = new ConditionalValue(true, typedResult); + result = new ConditionalValue>(true, new ActorStateResponse(typedResult, response.TTLExpireTime)); } return result; @@ -71,8 +72,8 @@ public async Task> TryLoadStateAsync(string actorType, st public async Task ContainsStateAsync(string actorType, string actorId, string stateName, CancellationToken cancellationToken = default) { - var byteResult = await this.daprInteractor.GetStateAsync(actorType, actorId, stateName, cancellationToken); - return byteResult.Length != 0; + var result = await this.daprInteractor.GetStateAsync(actorType, actorId, stateName, cancellationToken); + return result.Value.Length != 0; } public async Task SaveStateAsync(string actorType, string actorId, IReadOnlyCollection stateChanges, CancellationToken cancellationToken = default) @@ -132,6 +133,11 @@ private async Task DoStateChangesTransactionallyAsync(string actorType, string a writer.WritePropertyName("value"); JsonSerializer.Serialize(writer, stateChange.Value, stateChange.Type, jsonSerializerOptions); } + + var ttlInSeconds = stateChange.TTLInSeconds; + if (ttlInSeconds.HasValue) { + writer.WriteString("ttlInSeconds", ttlInSeconds.Value.ToString()); + } break; default: break; diff --git a/src/Dapr.Actors/Runtime/IActorStateManager.cs b/src/Dapr.Actors/Runtime/IActorStateManager.cs index df1eb3356..75d2a5996 100644 --- a/src/Dapr.Actors/Runtime/IActorStateManager.cs +++ b/src/Dapr.Actors/Runtime/IActorStateManager.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ namespace Dapr.Actors.Runtime using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; + using Dapr.Actors.Communication; /// /// Represents an interface that exposes methods to manage state of an . @@ -31,6 +32,7 @@ public interface IActorStateManager /// Name of the actor state to add. /// Value of the actor state to add. /// The token to monitor for cancellation requests. + /// The time to live for the state. If null, the state will not expire. /// /// A task that represents the asynchronous add operation. /// @@ -43,7 +45,7 @@ public interface IActorStateManager /// The type of state value must be /// Data Contract serializable. /// - Task AddStateAsync(string stateName, T value, CancellationToken cancellationToken = default); + Task AddStateAsync(string stateName, T value, CancellationToken cancellationToken = default, int? ttlInSeconds = null); /// /// Gets an actor state with specified state name. @@ -73,6 +75,7 @@ public interface IActorStateManager /// Type of value associated with given state name. /// Name of the actor state to set. /// Value of the actor state. + /// The time to live for the state. If null, the state will not expire. /// The token to monitor for cancellation requests. /// /// A task that represents the asynchronous set operation. @@ -83,7 +86,7 @@ public interface IActorStateManager /// The type of state value must be /// Data Contract serializable. /// - Task SetStateAsync(string stateName, T value, CancellationToken cancellationToken = default); + Task SetStateAsync(string stateName, T value, CancellationToken cancellationToken = default, int? ttlInSeconds = null); /// /// Removes an actor state with specified state name. @@ -107,6 +110,7 @@ public interface IActorStateManager /// Value of the actor state to add. /// The token to monitor for cancellation requests. /// This is optional and defaults to . + /// The time to live for the state. If null, the state will not expire. /// /// A boolean task that represents the asynchronous add operation. Returns true if the /// value was successfully added and false if an actor state with the same name already exists. @@ -119,7 +123,7 @@ public interface IActorStateManager /// The type of state value must be /// Data Contract serializable. /// - Task TryAddStateAsync(string stateName, T value, CancellationToken cancellationToken = default); + Task TryAddStateAsync(string stateName, T value, CancellationToken cancellationToken = default, int? ttlInSeconds = null); /// /// Attempts to get an actor state with specified state name. @@ -174,6 +178,7 @@ public interface IActorStateManager /// Name of the actor state to get or add. /// Value of the actor state to add if it does not exist. /// The token to monitor for cancellation requests. + /// The time to live for the state. If null, the state will not expire. /// /// A task that represents the asynchronous get or add operation. The value of TResult /// parameter contains value of actor state with given state name. @@ -186,7 +191,7 @@ public interface IActorStateManager /// The type of state value must be /// Data Contract serializable. /// - Task GetOrAddStateAsync(string stateName, T value, CancellationToken cancellationToken = default); + Task GetOrAddStateAsync(string stateName, T value, CancellationToken cancellationToken = default, int? ttlInSeconds = null); /// /// Adds an actor state with given state name, if it does not already exist or updates @@ -197,6 +202,7 @@ public interface IActorStateManager /// Value of the actor state to add if it does not exist. /// Factory function to generate value of actor state to update if it exists. /// The token to monitor for cancellation requests. + /// The time to live for the state. If null, the state will not expire. /// /// A task that represents the asynchronous add/update operation. The value of TResult /// parameter contains value of actor state that was added/updated. @@ -207,7 +213,7 @@ public interface IActorStateManager /// The type of state value must be /// Data Contract serializable. /// - Task AddOrUpdateStateAsync(string stateName, T addValue, Func updateValueFactory, CancellationToken cancellationToken = default); + Task AddOrUpdateStateAsync(string stateName, T addValue, Func updateValueFactory, CancellationToken cancellationToken = default, int? ttlInSeconds = null); /// /// Clears all the cached actor states and any operation(s) performed on diff --git a/src/Dapr.Actors/Runtime/IActorStateSerializer.cs b/src/Dapr.Actors/Runtime/IActorStateSerializer.cs index c6136c057..cff3b7c26 100644 --- a/src/Dapr.Actors/Runtime/IActorStateSerializer.cs +++ b/src/Dapr.Actors/Runtime/IActorStateSerializer.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/src/Dapr.Client/Protos/dapr/proto/common/v1/common.proto b/src/Dapr.Client/Protos/dapr/proto/common/v1/common.proto index 3faea3016..1e63b885d 100644 --- a/src/Dapr.Client/Protos/dapr/proto/common/v1/common.proto +++ b/src/Dapr.Client/Protos/dapr/proto/common/v1/common.proto @@ -77,7 +77,7 @@ message InvokeRequest { HTTPExtension http_extension = 4; } -// InvokeResponse is the response message inclduing data and its content type +// InvokeResponse is the response message including data and its content type // from app callback. // This message is used in InvokeService of Dapr gRPC Service and OnInvoke // of AppCallback gRPC service. diff --git a/src/Dapr.Client/Protos/dapr/proto/dapr/v1/dapr.proto b/src/Dapr.Client/Protos/dapr/proto/dapr/v1/dapr.proto index 883527adb..eafb5452e 100644 --- a/src/Dapr.Client/Protos/dapr/proto/dapr/v1/dapr.proto +++ b/src/Dapr.Client/Protos/dapr/proto/dapr/v1/dapr.proto @@ -169,6 +169,26 @@ service Dapr { // Raise an event to a running workflow instance rpc RaiseEventWorkflowAlpha1 (RaiseEventWorkflowRequest) returns (google.protobuf.Empty) {} + // Starts a new instance of a workflow + rpc StartWorkflowBeta1 (StartWorkflowRequest) returns (StartWorkflowResponse) {} + + // Gets details about a started workflow instance + rpc GetWorkflowBeta1 (GetWorkflowRequest) returns (GetWorkflowResponse) {} + + // Purge Workflow + rpc PurgeWorkflowBeta1 (PurgeWorkflowRequest) returns (google.protobuf.Empty) {} + + // Terminates a running workflow instance + rpc TerminateWorkflowBeta1 (TerminateWorkflowRequest) returns (google.protobuf.Empty) {} + + // Pauses a running workflow instance + rpc PauseWorkflowBeta1 (PauseWorkflowRequest) returns (google.protobuf.Empty) {} + + // Resumes a paused workflow instance + rpc ResumeWorkflowBeta1 (ResumeWorkflowRequest) returns (google.protobuf.Empty) {} + + // Raise an event to a running workflow instance + rpc RaiseEventWorkflowBeta1 (RaiseEventWorkflowRequest) returns (google.protobuf.Empty) {} // Shutdown the sidecar rpc Shutdown (google.protobuf.Empty) returns (google.protobuf.Empty) {} } @@ -542,6 +562,9 @@ message GetActorStateRequest { // GetActorStateResponse is the response conveying the actor's state value. message GetActorStateResponse { bytes data = 1; + + // The metadata which will be sent to app. + map metadata = 2; } // ExecuteActorStateTransactionRequest is the message to execute multiple operations on a specified actor. @@ -580,10 +603,14 @@ message InvokeActorResponse { // GetMetadataResponse is a message that is returned on GetMetadata rpc call message GetMetadataResponse { string id = 1; - repeated ActiveActorsCount active_actors_count = 2; - repeated RegisteredComponents registered_components = 3; - map extended_metadata = 4; - repeated PubsubSubscription subscriptions = 5; + repeated ActiveActorsCount active_actors_count = 2 [json_name = "actors"]; + repeated RegisteredComponents registered_components = 3 [json_name = "components"]; + map extended_metadata = 4 [json_name = "extended"]; + repeated PubsubSubscription subscriptions = 5 [json_name = "subscriptions"]; + repeated MetadataHTTPEndpoint http_endpoints = 6 [json_name = "httpEndpoints"]; + AppConnectionProperties app_connection_properties = 7 [json_name = "appConnectionProperties"]; + string runtime_version = 8 [json_name = "runtimeVersion"]; + repeated string enabled_features = 9 [json_name = "enabledFeatures"]; } message ActiveActorsCount { @@ -598,12 +625,31 @@ message RegisteredComponents { repeated string capabilities = 4; } +message MetadataHTTPEndpoint { + string name = 1 [json_name = "name"]; +} + +message AppConnectionProperties { + int32 port = 1; + string protocol = 2; + string channel_address = 3 [json_name = "channelAddress"]; + int32 max_concurrency = 4 [json_name = "maxConcurrency"]; + AppConnectionHealthProperties health = 5; +} + +message AppConnectionHealthProperties { + string health_check_path = 1 [json_name = "healthCheckPath"]; + string health_probe_interval = 2 [json_name = "healthProbeInterval"]; + string health_probe_timeout = 3 [json_name = "healthProbeTimeout"]; + int32 health_threshold = 4 [json_name = "healthThreshold"]; +} + message PubsubSubscription { - string pubsub_name = 1; - string topic = 2; - map metadata = 3; - PubsubSubscriptionRules rules = 4; - string dead_letter_topic = 5; + string pubsub_name = 1 [json_name = "pubsubname"]; + string topic = 2 [json_name = "topic"]; + map metadata = 3 [json_name = "metadata"]; + PubsubSubscriptionRules rules = 4 [json_name = "rules"]; + string dead_letter_topic = 5 [json_name = "deadLetterTopic"]; } message PubsubSubscriptionRules { @@ -900,7 +946,7 @@ message EncryptRequest { // Request details. Must be present in the first message only. EncryptRequestOptions options = 1; // Chunk of data of arbitrary size. - // common.v1.StreamPayload payload = 2; // TODO: Commented out since it was causing an issue + common.v1.StreamPayload payload = 2; } // EncryptRequestOptions contains options for the first message in the EncryptAlpha1 request. @@ -928,7 +974,7 @@ message EncryptRequestOptions { // EncryptResponse is the response for EncryptAlpha1. message EncryptResponse { // Chunk of data. - // common.v1.StreamPayload payload = 1; // TODO: Commented out since it was causing an issue + common.v1.StreamPayload payload = 1; } // DecryptRequest is the request for DecryptAlpha1. @@ -936,7 +982,7 @@ message DecryptRequest { // Request details. Must be present in the first message only. DecryptRequestOptions options = 1; // Chunk of data of arbitrary size. - // common.v1.StreamPayload payload = 2; // TODO: Commented out since it was causing an issue + common.v1.StreamPayload payload = 2; } // DecryptRequestOptions contains options for the first message in the DecryptAlpha1 request. @@ -952,10 +998,10 @@ message DecryptRequestOptions { // DecryptResponse is the response for DecryptAlpha1. message DecryptResponse { // Chunk of data. - // common.v1.StreamPayload payload = 1; // TODO: Commented out since it was causing an issue + common.v1.StreamPayload payload = 1; } -// GetWorkflowRequest is the request for GetWorkflowAlpha1. +// GetWorkflowRequest is the request for GetWorkflowBeta1. message GetWorkflowRequest { // ID of the workflow instance to query. string instance_id = 1 [json_name = "instanceID"]; @@ -963,7 +1009,7 @@ message GetWorkflowRequest { string workflow_component = 2 [json_name = "workflowComponent"]; } -// GetWorkflowResponse is the response for GetWorkflowAlpha1. +// GetWorkflowResponse is the response for GetWorkflowBeta1. message GetWorkflowResponse { // ID of the workflow instance. string instance_id = 1 [json_name = "instanceID"]; @@ -979,7 +1025,7 @@ message GetWorkflowResponse { map properties = 6; } -// StartWorkflowRequest is the request for StartWorkflowAlpha1. +// StartWorkflowRequest is the request for StartWorkflowBeta1. message StartWorkflowRequest { // The ID to assign to the started workflow instance. If empty, a random ID is generated. string instance_id = 1 [json_name = "instanceID"]; @@ -993,13 +1039,13 @@ message StartWorkflowRequest { bytes input = 5; } -// StartWorkflowResponse is the response for StartWorkflowAlpha1. +// StartWorkflowResponse is the response for StartWorkflowBeta1. message StartWorkflowResponse { // ID of the started workflow instance. string instance_id = 1 [json_name = "instanceID"]; } -// TerminateWorkflowRequest is the request for TerminateWorkflowAlpha1. +// TerminateWorkflowRequest is the request for TerminateWorkflowBeta1. message TerminateWorkflowRequest { // ID of the workflow instance to terminate. string instance_id = 1 [json_name = "instanceID"]; @@ -1007,7 +1053,7 @@ message TerminateWorkflowRequest { string workflow_component = 2 [json_name = "workflowComponent"]; } -// PauseWorkflowRequest is the request for PauseWorkflowAlpha1. +// PauseWorkflowRequest is the request for PauseWorkflowBeta1. message PauseWorkflowRequest { // ID of the workflow instance to pause. string instance_id = 1 [json_name = "instanceID"]; @@ -1015,7 +1061,7 @@ message PauseWorkflowRequest { string workflow_component = 2 [json_name = "workflowComponent"]; } -// ResumeWorkflowRequest is the request for ResumeWorkflowAlpha1. +// ResumeWorkflowRequest is the request for ResumeWorkflowBeta1. message ResumeWorkflowRequest { // ID of the workflow instance to resume. string instance_id = 1 [json_name = "instanceID"]; @@ -1023,7 +1069,7 @@ message ResumeWorkflowRequest { string workflow_component = 2 [json_name = "workflowComponent"]; } -// RaiseEventWorkflowRequest is the request for RaiseEventWorkflowAlpha1. +// RaiseEventWorkflowRequest is the request for RaiseEventWorkflowBeta1. message RaiseEventWorkflowRequest { // ID of the workflow instance to raise an event for. string instance_id = 1 [json_name = "instanceID"]; @@ -1035,10 +1081,10 @@ message RaiseEventWorkflowRequest { bytes event_data = 4; } -// PurgeWorkflowRequest is the request for PurgeWorkflowAlpha1. +// PurgeWorkflowRequest is the request for PurgeWorkflowBeta1. message PurgeWorkflowRequest { // ID of the workflow instance to purge. string instance_id = 1 [json_name = "instanceID"]; // Name of the workflow component. string workflow_component = 2 [json_name = "workflowComponent"]; -} \ No newline at end of file +} diff --git a/test/Dapr.Actors.Test/Runtime/ActorTests.cs b/test/Dapr.Actors.Test/Runtime/ActorTests.cs index b18800f0e..f88b4e03f 100644 --- a/test/Dapr.Actors.Test/Runtime/ActorTests.cs +++ b/test/Dapr.Actors.Test/Runtime/ActorTests.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/test/Dapr.E2E.Test.Actors/State/IStateActor.cs b/test/Dapr.E2E.Test.Actors/State/IStateActor.cs new file mode 100644 index 000000000..e453f7fa3 --- /dev/null +++ b/test/Dapr.E2E.Test.Actors/State/IStateActor.cs @@ -0,0 +1,22 @@ +// ------------------------------------------------------------------------ +// Copyright 2021 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System; +using System.Threading.Tasks; +using Dapr.Actors; + +namespace Dapr.E2E.Test.Actors.State +{ + public interface IStateActor : IPingActor, IActor + { } +} diff --git a/test/Dapr.E2E.Test.App.ReentrantActor/Actors/ReentrantActor.cs b/test/Dapr.E2E.Test.App.ReentrantActor/Actors/ReentrantActor.cs index 5f2d5db86..58776fe28 100644 --- a/test/Dapr.E2E.Test.App.ReentrantActor/Actors/ReentrantActor.cs +++ b/test/Dapr.E2E.Test.App.ReentrantActor/Actors/ReentrantActor.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -67,4 +67,4 @@ private async Task UpdateState(bool isEnter, int callNumber) } } } -} \ No newline at end of file +} diff --git a/test/Dapr.E2E.Test.App/Actors/ReminderActor.cs b/test/Dapr.E2E.Test.App/Actors/ReminderActor.cs index b08e483c2..57536377d 100644 --- a/test/Dapr.E2E.Test.App/Actors/ReminderActor.cs +++ b/test/Dapr.E2E.Test.App/Actors/ReminderActor.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/test/Dapr.E2E.Test.App/Actors/StateActor.cs b/test/Dapr.E2E.Test.App/Actors/StateActor.cs new file mode 100644 index 000000000..f8bff85e7 --- /dev/null +++ b/test/Dapr.E2E.Test.App/Actors/StateActor.cs @@ -0,0 +1,38 @@ +// ------------------------------------------------------------------------ +// Copyright 2021 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System; +using System.Text.Json; +using System.Threading.Tasks; +using Dapr.Actors.Runtime; + +namespace Dapr.E2E.Test.Actors.State +{ + public class StateActor : Actor + { + public StateActor(ActorHost host) + : base(host) + { + } + + public Task GetState(string key) + { + return this.StateManager.GetStateAsync(key); + } + + public Task SetState(string key, string value, int? ttlInSeconds) + { + return this.StateManager.SetStateAsync(key, value, ttlInSeconds = ttlInSeconds); + } + } +} diff --git a/test/Dapr.E2E.Test.App/Actors/TimerActor.cs b/test/Dapr.E2E.Test.App/Actors/TimerActor.cs index bbe6cf7ae..4c6589965 100644 --- a/test/Dapr.E2E.Test.App/Actors/TimerActor.cs +++ b/test/Dapr.E2E.Test.App/Actors/TimerActor.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/test/Dapr.E2E.Test/Actors/E2ETests.StateTests.cs b/test/Dapr.E2E.Test/Actors/E2ETests.StateTests.cs new file mode 100644 index 000000000..853acf944 --- /dev/null +++ b/test/Dapr.E2E.Test/Actors/E2ETests.StateTests.cs @@ -0,0 +1,44 @@ +// ------------------------------------------------------------------------ +// Copyright 2021 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ +namespace Dapr.E2E.Test +{ + using System; + using System.Text.Json; + using System.Threading; + using System.Threading.Tasks; + using Dapr.Actors; + using Dapr.E2E.Test.Actors.State; + using Xunit; + + public partial class E2ETests : IAsyncLifetime + { + [Fact] + public async Task ActorCanSaveStateWithTTL() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); + var proxy = this.ProxyFactory.CreateActorProxy(ActorId.CreateRandom(), "StateActor"); + + await WaitForActorRuntimeAsync(proxy, cts.Token); + + await proxy.SaveState("key", "value", 2, cts.Token); + + state = await.proxy.GetState("key", cts.Token); + Assert.Equal("value", state.Value); + + await Task.Delay(TimeSpan.FromSeconds(2), cts.Token); + + state = await.proxy.GetState("key", cts.Token); + Assert.Null(state.Value); + } + } +} diff --git a/test/Dapr.E2E.Test/configuration/featureconfig.yaml b/test/Dapr.E2E.Test/configuration/featureconfig.yaml index 81ef1ecb1..4806c630f 100644 --- a/test/Dapr.E2E.Test/configuration/featureconfig.yaml +++ b/test/Dapr.E2E.Test/configuration/featureconfig.yaml @@ -12,3 +12,5 @@ spec: enabled: true - name: "proxy.grpc" enabled: true + - name: "ActorStateTTL" + enabled: true From 2f0cd2a8f56b36dd9d81d1494e8e1c69a56396fa Mon Sep 17 00:00:00 2001 From: joshvanl Date: Thu, 12 Oct 2023 15:04:40 +0100 Subject: [PATCH 02/15] Updates Dapr version to 1.12.0 Signed-off-by: joshvanl --- .github/workflows/itests.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/itests.yml b/.github/workflows/itests.yml index 870264f40..67674681f 100644 --- a/.github/workflows/itests.yml +++ b/.github/workflows/itests.yml @@ -37,11 +37,11 @@ jobs: GOOS: linux GOARCH: amd64 GOPROXY: https://proxy.golang.org - DAPR_CLI_VER: 1.9.1 - DAPR_RUNTIME_VER: 1.10.5 - DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/3dacfb672d55f1436c249057aaebbe597e1066f3/install/install.sh + DAPR_CLI_VER: 1.12.0 + DAPR_RUNTIME_VER: 1.12.0 + DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/release-1.12/install/install.sh DAPR_CLI_REF: '' - DAPR_REF: '4181de0edc65fc98a836ae7abc6042c575c8fae5' + DAPR_REF: '2149fca96cdf11627c387bda26dcc027d1c47354' steps: - name: Set up Dapr CLI run: wget -q ${{ env.DAPR_INSTALL_URL }} -O - | /bin/bash -s ${{ env.DAPR_CLI_VER }} From 07188342a0652994ff1ceb20ffba42f02ec0a0ba Mon Sep 17 00:00:00 2001 From: joshvanl Date: Thu, 12 Oct 2023 15:16:00 +0100 Subject: [PATCH 03/15] Adds `ttlInSeconds` to example Dapr application Signed-off-by: joshvanl --- examples/Actor/DemoActor/DemoActor.cs | 8 ++++---- examples/Actor/IDemoActor/IDemoActor.cs | 5 +++-- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/examples/Actor/DemoActor/DemoActor.cs b/examples/Actor/DemoActor/DemoActor.cs index 19d30cc92..8e1cfe975 100644 --- a/examples/Actor/DemoActor/DemoActor.cs +++ b/examples/Actor/DemoActor/DemoActor.cs @@ -41,12 +41,12 @@ public DemoActor(ActorHost host, BankService bank) this.bank = bank; } - public async Task SaveData(MyData data) + public async Task SaveData(MyData data, int? ttlInSeconds = null) { Console.WriteLine($"This is Actor id {this.Id} with data {data}."); // Set State using StateManager, state is saved after the method execution. - await this.StateManager.SetStateAsync(StateName, data); + await this.StateManager.SetStateAsync(StateName, data, ttlInSeconds: ttlInSeconds); } public Task GetData() @@ -100,7 +100,7 @@ public async Task ReceiveReminderAsync(string reminderName, byte[] state, TimeSp // This method is invoked when an actor reminder is fired. var actorState = await this.StateManager.GetStateAsync(StateName); actorState.PropertyB = $"Reminder triggered at '{DateTime.Now:yyyy-MM-ddTHH:mm:ss}'"; - await this.StateManager.SetStateAsync(StateName, actorState); + await this.StateManager.SetStateAsync(StateName, actorState, ttlInSeconds: 360); } class TimerParams @@ -164,7 +164,7 @@ public async Task TimerCallback(byte[] data) { var state = await this.StateManager.GetStateAsync(StateName); state.PropertyA = $"Timer triggered at '{DateTime.Now:yyyyy-MM-ddTHH:mm:s}'"; - await this.StateManager.SetStateAsync(StateName, state); + await this.StateManager.SetStateAsync(StateName, state, ttlInSeconds: 360); var timerParams = JsonSerializer.Deserialize(data); Console.WriteLine("Timer parameter1: " + timerParams.IntParam); Console.WriteLine("Timer parameter2: " + timerParams.StringParam); diff --git a/examples/Actor/IDemoActor/IDemoActor.cs b/examples/Actor/IDemoActor/IDemoActor.cs index adec6df68..af24b7d03 100644 --- a/examples/Actor/IDemoActor/IDemoActor.cs +++ b/examples/Actor/IDemoActor/IDemoActor.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -27,8 +27,9 @@ public interface IDemoActor : IActor /// Method to save data. /// /// DAta to save. + /// Optional TTL in seconds. /// A task that represents the asynchronous save operation. - Task SaveData(MyData data); + Task SaveData(MyData data, int? ttlInSeconds = null); /// /// Method to get data. From 9e4ada71c488d670f4114e0314a526f5eca7196a Mon Sep 17 00:00:00 2001 From: joshvanl Date: Fri, 13 Oct 2023 09:27:32 +0100 Subject: [PATCH 04/15] Use overloads when setting ttlInSeconds. Don't use nullable int Signed-off-by: joshvanl --- src/Dapr.Actors/Runtime/ActorStateManager.cs | 149 +++++++++++++++++- src/Dapr.Actors/Runtime/IActorStateManager.cs | 116 +++++++++++++- 2 files changed, 253 insertions(+), 12 deletions(-) diff --git a/src/Dapr.Actors/Runtime/ActorStateManager.cs b/src/Dapr.Actors/Runtime/ActorStateManager.cs index 1b873e9af..aa5b62376 100644 --- a/src/Dapr.Actors/Runtime/ActorStateManager.cs +++ b/src/Dapr.Actors/Runtime/ActorStateManager.cs @@ -36,17 +36,58 @@ internal ActorStateManager(Actor actor) this.defaultTracker = new Dictionary(); } - public async Task AddStateAsync(string stateName, T value, CancellationToken cancellationToken, int? ttlInSeconds) + public async Task AddStateAsync(string stateName, T value, CancellationToken cancellationToken) { EnsureStateProviderInitialized(); - if (!(await this.TryAddStateAsync(stateName, value, cancellationToken, ttlInSeconds))) + if (!(await this.TryAddStateAsync(stateName, value, cancellationToken))) { throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, SR.ActorStateAlreadyExists, stateName)); } } - public async Task TryAddStateAsync(string stateName, T value, CancellationToken cancellationToken = default, int? ttlInSeconds = null) + public async Task AddStateAsync(string stateName, T value, int ttlInSeconds, CancellationToken cancellationToken) + { + EnsureStateProviderInitialized(); + + if (!(await this.TryAddStateAsync(stateName, value, ttlInSeconds, cancellationToken))) + { + throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, SR.ActorStateAlreadyExists, stateName)); + } + } + + public async Task TryAddStateAsync(string stateName, T value, CancellationToken cancellationToken = default) + { + ArgumentVerifier.ThrowIfNull(stateName, nameof(stateName)); + + EnsureStateProviderInitialized(); + + var stateChangeTracker = GetContextualStateTracker(); + + if (stateChangeTracker.ContainsKey(stateName)) + { + var stateMetadata = stateChangeTracker[stateName]; + + // Check if the property was marked as remove in the cache + if (stateMetadata.ChangeKind == StateChangeKind.Remove) + { + stateChangeTracker[stateName] = StateMetadata.Create(value, StateChangeKind.Update); + return true; + } + + return false; + } + + if (await this.actor.Host.StateProvider.ContainsStateAsync(this.actorTypeName, this.actor.Id.ToString(), stateName, cancellationToken)) + { + return false; + } + + stateChangeTracker[stateName] = StateMetadata.Create(value, StateChangeKind.Add); + return true; + } + + public async Task TryAddStateAsync(string stateName, T value, int ttlInSeconds, CancellationToken cancellationToken = default) { ArgumentVerifier.ThrowIfNull(stateName, nameof(stateName)); @@ -121,7 +162,36 @@ public async Task> TryGetStateAsync(string stateName, Can return new ConditionalValue(false, default); } - public async Task SetStateAsync(string stateName, T value, CancellationToken cancellationToken, int? ttlInSeconds) + public async Task SetStateAsync(string stateName, T value, CancellationToken cancellationToken) + { + ArgumentVerifier.ThrowIfNull(stateName, nameof(stateName)); + + EnsureStateProviderInitialized(); + + var stateChangeTracker = GetContextualStateTracker(); + + if (stateChangeTracker.ContainsKey(stateName)) + { + var stateMetadata = stateChangeTracker[stateName]; + stateMetadata.Value = value; + + if (stateMetadata.ChangeKind == StateChangeKind.None || + stateMetadata.ChangeKind == StateChangeKind.Remove) + { + stateMetadata.ChangeKind = StateChangeKind.Update; + } + } + else if (await this.actor.Host.StateProvider.ContainsStateAsync(this.actorTypeName, this.actor.Id.ToString(), stateName, cancellationToken)) + { + stateChangeTracker.Add(stateName, StateMetadata.Create(value, StateChangeKind.Update)); + } + else + { + stateChangeTracker[stateName] = StateMetadata.Create(value, StateChangeKind.Add); + } + } + + public async Task SetStateAsync(string stateName, T value, int ttlInSeconds, CancellationToken cancellationToken) { ArgumentVerifier.ThrowIfNull(stateName, nameof(stateName)); @@ -218,7 +288,25 @@ public async Task ContainsStateAsync(string stateName, CancellationToken c return false; } - public async Task GetOrAddStateAsync(string stateName, T value, CancellationToken cancellationToken, int? ttlInSeconds) + public async Task GetOrAddStateAsync(string stateName, T value, CancellationToken cancellationToken) + { + EnsureStateProviderInitialized(); + + var condRes = await this.TryGetStateAsync(stateName, cancellationToken); + + if (condRes.HasValue) + { + return condRes.Value; + } + + var changeKind = this.IsStateMarkedForRemove(stateName) ? StateChangeKind.Update : StateChangeKind.Add; + + var stateChangeTracker = GetContextualStateTracker(); + stateChangeTracker[stateName] = StateMetadata.Create(value, changeKind); + return value; + } + + public async Task GetOrAddStateAsync(string stateName, T value, int ttlInSeconds, CancellationToken cancellationToken) { EnsureStateProviderInitialized(); @@ -240,8 +328,55 @@ public async Task AddOrUpdateStateAsync( string stateName, T addValue, Func updateValueFactory, - CancellationToken cancellationToken = default, - int? ttlInSeconds = null) + CancellationToken cancellationToken = default) + { + ArgumentVerifier.ThrowIfNull(stateName, nameof(stateName)); + + EnsureStateProviderInitialized(); + + var stateChangeTracker = GetContextualStateTracker(); + + if (stateChangeTracker.ContainsKey(stateName)) + { + var stateMetadata = stateChangeTracker[stateName]; + + // Check if the property was marked as remove in the cache + if (stateMetadata.ChangeKind == StateChangeKind.Remove) + { + stateChangeTracker[stateName] = StateMetadata.Create(addValue, StateChangeKind.Update); + return addValue; + } + + var newValue = updateValueFactory.Invoke(stateName, (T)stateMetadata.Value); + stateMetadata.Value = newValue; + + if (stateMetadata.ChangeKind == StateChangeKind.None) + { + stateMetadata.ChangeKind = StateChangeKind.Update; + } + + return newValue; + } + + var conditionalResult = await this.TryGetStateFromStateProviderAsync(stateName, cancellationToken); + if (conditionalResult.HasValue) + { + var newValue = updateValueFactory.Invoke(stateName, conditionalResult.Value.Value); + stateChangeTracker.Add(stateName, StateMetadata.Create(newValue, StateChangeKind.Update)); + + return newValue; + } + + stateChangeTracker[stateName] = StateMetadata.Create(addValue, StateChangeKind.Add); + return addValue; + } + + public async Task AddOrUpdateStateAsync( + string stateName, + T addValue, + Func updateValueFactory, + int ttlInSeconds, + CancellationToken cancellationToken = default) { ArgumentVerifier.ThrowIfNull(stateName, nameof(stateName)); diff --git a/src/Dapr.Actors/Runtime/IActorStateManager.cs b/src/Dapr.Actors/Runtime/IActorStateManager.cs index 75d2a5996..204cfc56a 100644 --- a/src/Dapr.Actors/Runtime/IActorStateManager.cs +++ b/src/Dapr.Actors/Runtime/IActorStateManager.cs @@ -25,6 +25,27 @@ namespace Dapr.Actors.Runtime /// public interface IActorStateManager { + /// + /// Adds an actor state with given state name. + /// + /// Type of value associated with given state name. + /// Name of the actor state to add. + /// Value of the actor state to add. + /// The token to monitor for cancellation requests. + /// + /// A task that represents the asynchronous add operation. + /// + /// + /// An actor state with given state name already exists. + /// + /// The specified state name is null. + /// The operation was canceled. + /// + /// The type of state value must be + /// Data Contract serializable. + /// + Task AddStateAsync(string stateName, T value, CancellationToken cancellationToken = default); + /// /// Adds an actor state with given state name. /// @@ -45,7 +66,7 @@ public interface IActorStateManager /// The type of state value must be /// Data Contract serializable. /// - Task AddStateAsync(string stateName, T value, CancellationToken cancellationToken = default, int? ttlInSeconds = null); + Task AddStateAsync(string stateName, T value, int ttlInSeconds, CancellationToken cancellationToken = default); /// /// Gets an actor state with specified state name. @@ -68,6 +89,25 @@ public interface IActorStateManager /// Task GetStateAsync(string stateName, CancellationToken cancellationToken = default); + /// + /// Sets an actor state with given state name to specified value. + /// If an actor state with specified name does not exist, it is added. + /// + /// Type of value associated with given state name. + /// Name of the actor state to set. + /// Value of the actor state. + /// The token to monitor for cancellation requests. + /// + /// A task that represents the asynchronous set operation. + /// + /// The specified state name is null. + /// The operation was canceled. + /// + /// The type of state value must be + /// Data Contract serializable. + /// + Task SetStateAsync(string stateName, T value, CancellationToken cancellationToken = default); + /// /// Sets an actor state with given state name to specified value. /// If an actor state with specified name does not exist, it is added. @@ -86,7 +126,7 @@ public interface IActorStateManager /// The type of state value must be /// Data Contract serializable. /// - Task SetStateAsync(string stateName, T value, CancellationToken cancellationToken = default, int? ttlInSeconds = null); + Task SetStateAsync(string stateName, T value, int ttlInSeconds, CancellationToken cancellationToken = default); /// /// Removes an actor state with specified state name. @@ -110,7 +150,30 @@ public interface IActorStateManager /// Value of the actor state to add. /// The token to monitor for cancellation requests. /// This is optional and defaults to . + /// + /// A boolean task that represents the asynchronous add operation. Returns true if the + /// value was successfully added and false if an actor state with the same name already exists. + /// + /// The specified state name is null. + /// Provide a valid state name string. + /// The request was canceled using the specified + /// . + /// + /// The type of state value must be + /// Data Contract serializable. + /// + Task TryAddStateAsync(string stateName, T value, CancellationToken cancellationToken = default); + + /// + /// Attempts to add an actor state with given state name and value. Returns false if an actor state with + /// the same name already exists. + /// + /// Type of value associated with given state name. + /// Name of the actor state to add. + /// Value of the actor state to add. /// The time to live for the state. If null, the state will not expire. + /// The token to monitor for cancellation requests. + /// This is optional and defaults to . /// /// A boolean task that represents the asynchronous add operation. Returns true if the /// value was successfully added and false if an actor state with the same name already exists. @@ -123,7 +186,7 @@ public interface IActorStateManager /// The type of state value must be /// Data Contract serializable. /// - Task TryAddStateAsync(string stateName, T value, CancellationToken cancellationToken = default, int? ttlInSeconds = null); + Task TryAddStateAsync(string stateName, T value, int ttlInSeconds, CancellationToken cancellationToken = default); /// /// Attempts to get an actor state with specified state name. @@ -178,7 +241,29 @@ public interface IActorStateManager /// Name of the actor state to get or add. /// Value of the actor state to add if it does not exist. /// The token to monitor for cancellation requests. + /// + /// A task that represents the asynchronous get or add operation. The value of TResult + /// parameter contains value of actor state with given state name. + /// + /// The specified state name is null. + /// Provide a valid state name string. + /// The request was canceled using the specified + /// . + /// + /// The type of state value must be + /// Data Contract serializable. + /// + Task GetOrAddStateAsync(string stateName, T value, CancellationToken cancellationToken = default); + + /// + /// Gets an actor state with the given state name if it exists. If it does not + /// exist, creates and new state with the specified name and value. + /// + /// Type of value associated with given state name. + /// Name of the actor state to get or add. + /// Value of the actor state to add if it does not exist. /// The time to live for the state. If null, the state will not expire. + /// The token to monitor for cancellation requests. /// /// A task that represents the asynchronous get or add operation. The value of TResult /// parameter contains value of actor state with given state name. @@ -191,7 +276,7 @@ public interface IActorStateManager /// The type of state value must be /// Data Contract serializable. /// - Task GetOrAddStateAsync(string stateName, T value, CancellationToken cancellationToken = default, int? ttlInSeconds = null); + Task GetOrAddStateAsync(string stateName, T value, int ttlInSeconds, CancellationToken cancellationToken = default); /// /// Adds an actor state with given state name, if it does not already exist or updates @@ -202,7 +287,28 @@ public interface IActorStateManager /// Value of the actor state to add if it does not exist. /// Factory function to generate value of actor state to update if it exists. /// The token to monitor for cancellation requests. + /// + /// A task that represents the asynchronous add/update operation. The value of TResult + /// parameter contains value of actor state that was added/updated. + /// + /// The specified state name is null. + /// The operation was canceled. + /// + /// The type of state value must be + /// Data Contract serializable. + /// + Task AddOrUpdateStateAsync(string stateName, T addValue, Func updateValueFactory, CancellationToken cancellationToken = default); + + /// + /// Adds an actor state with given state name, if it does not already exist or updates + /// the state with specified state name, if it exists. + /// + /// Type of value associated with given state name. + /// Name of the actor state to add or update. + /// Value of the actor state to add if it does not exist. + /// Factory function to generate value of actor state to update if it exists. /// The time to live for the state. If null, the state will not expire. + /// The token to monitor for cancellation requests. /// /// A task that represents the asynchronous add/update operation. The value of TResult /// parameter contains value of actor state that was added/updated. @@ -213,7 +319,7 @@ public interface IActorStateManager /// The type of state value must be /// Data Contract serializable. /// - Task AddOrUpdateStateAsync(string stateName, T addValue, Func updateValueFactory, CancellationToken cancellationToken = default, int? ttlInSeconds = null); + Task AddOrUpdateStateAsync(string stateName, T addValue, Func updateValueFactory, int ttlInSeconds, CancellationToken cancellationToken = default); /// /// Clears all the cached actor states and any operation(s) performed on From f3866e716dfa71b4fd52a928373ea82aa355d007 Mon Sep 17 00:00:00 2001 From: joshvanl Date: Fri, 13 Oct 2023 09:36:00 +0100 Subject: [PATCH 05/15] Use `DateTimeOffset?` rather than `DateTime?` Signed-off-by: joshvanl --- src/Dapr.Actors/Communication/ActorStateResponse.cs | 4 ++-- src/Dapr.Actors/Runtime/ActorStateChange.cs | 4 ++-- src/Dapr.Actors/Runtime/ActorStateManager.cs | 6 +++--- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/Dapr.Actors/Communication/ActorStateResponse.cs b/src/Dapr.Actors/Communication/ActorStateResponse.cs index 1a990ce46..e60878b6a 100644 --- a/src/Dapr.Actors/Communication/ActorStateResponse.cs +++ b/src/Dapr.Actors/Communication/ActorStateResponse.cs @@ -25,7 +25,7 @@ internal class ActorStateResponse /// /// The response value. /// The time to live expiration time. - public ActorStateResponse(T value, DateTime? ttlExpireTime) + public ActorStateResponse(T value, DateTimeOffset? ttlExpireTime) { this.Value = value; this.TTLExpireTime = ttlExpireTime; @@ -45,6 +45,6 @@ public ActorStateResponse(T value, DateTime? ttlExpireTime) /// /// The time to live expiration time. /// - public DateTime? TTLExpireTime { get; } + public DateTimeOffset? TTLExpireTime { get; } } } diff --git a/src/Dapr.Actors/Runtime/ActorStateChange.cs b/src/Dapr.Actors/Runtime/ActorStateChange.cs index 18c7eaac1..32a3ca7fe 100644 --- a/src/Dapr.Actors/Runtime/ActorStateChange.cs +++ b/src/Dapr.Actors/Runtime/ActorStateChange.cs @@ -28,7 +28,7 @@ public sealed class ActorStateChange /// The value associated with given actor state name. /// The kind of state change for given actor state name. /// The time to live for the state. - public ActorStateChange(string stateName, Type type, object value, StateChangeKind changeKind, DateTime? ttlExpireTime) + public ActorStateChange(string stateName, Type type, object value, StateChangeKind changeKind, DateTimeOffset? ttlExpireTime) { ArgumentVerifier.ThrowIfNull(stateName, nameof(stateName)); @@ -80,7 +80,7 @@ public ActorStateChange(string stateName, Type type, object value, StateChangeKi /// /// If null, the state will not expire. /// - public DateTime? TTLExpireTime { get; } + public DateTimeOffset? TTLExpireTime { get; } /// /// Gets the time to live in seconds for the state. diff --git a/src/Dapr.Actors/Runtime/ActorStateManager.cs b/src/Dapr.Actors/Runtime/ActorStateManager.cs index aa5b62376..f6c561566 100644 --- a/src/Dapr.Actors/Runtime/ActorStateManager.cs +++ b/src/Dapr.Actors/Runtime/ActorStateManager.cs @@ -529,7 +529,7 @@ private Dictionary GetContextualStateTracker() private sealed class StateMetadata { - private StateMetadata(object value, Type type, StateChangeKind changeKind, DateTime? ttlExpireTime = null, int? ttlInSeconds = null) + private StateMetadata(object value, Type type, StateChangeKind changeKind, DateTimeOffset? ttlExpireTime = null, int? ttlInSeconds = null) { this.Value = value; this.Type = type; @@ -551,9 +551,9 @@ private StateMetadata(object value, Type type, StateChangeKind changeKind, DateT public Type Type { get; } - public DateTime? TTLExpireTime { get; set; } + public DateTimeOffset? TTLExpireTime { get; set; } - public static StateMetadata Create(T value, StateChangeKind changeKind, DateTime? ttlExpireTime = null, int? ttlInSeconds = null) + public static StateMetadata Create(T value, StateChangeKind changeKind, DateTimeOffset? ttlExpireTime = null, int? ttlInSeconds = null) { return new StateMetadata(value, typeof(T), changeKind, ttlExpireTime, ttlInSeconds); } From ea9999b5d0b63b82f20e8e15a93e35f8caa0bdb1 Mon Sep 17 00:00:00 2001 From: joshvanl Date: Tue, 17 Oct 2023 11:11:55 +0100 Subject: [PATCH 06/15] Use TimeSpan instead of int seconds, and only expose TTLExpireTime in change metadata Signed-off-by: joshvanl --- examples/Actor/ActorClient/Program.cs | 4 +- examples/Actor/DemoActor/DemoActor.cs | 8 ++-- examples/Actor/IDemoActor/IDemoActor.cs | 4 +- src/Dapr.Actors/Runtime/ActorStateChange.cs | 23 ---------- src/Dapr.Actors/Runtime/ActorStateManager.cs | 44 +++++++++---------- src/Dapr.Actors/Runtime/DaprStateProvider.cs | 7 +-- src/Dapr.Actors/Runtime/IActorStateManager.cs | 20 ++++----- test/Dapr.E2E.Test.App/Actors/StateActor.cs | 4 +- 8 files changed, 46 insertions(+), 68 deletions(-) diff --git a/examples/Actor/ActorClient/Program.cs b/examples/Actor/ActorClient/Program.cs index aeee28386..ad66677c0 100644 --- a/examples/Actor/ActorClient/Program.cs +++ b/examples/Actor/ActorClient/Program.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -47,7 +47,7 @@ public static async Task Main(string[] args) var proxy = ActorProxy.Create(actorId, "DemoActor"); Console.WriteLine("Making call using actor proxy to save data."); - await proxy.SaveData(data); + await proxy.SaveData(data, TimeSpan.FromMinutes(10)); Console.WriteLine("Making call using actor proxy to get data."); var receivedData = await proxy.GetData(); Console.WriteLine($"Received data is {receivedData}."); diff --git a/examples/Actor/DemoActor/DemoActor.cs b/examples/Actor/DemoActor/DemoActor.cs index 8e1cfe975..a999d4c1e 100644 --- a/examples/Actor/DemoActor/DemoActor.cs +++ b/examples/Actor/DemoActor/DemoActor.cs @@ -41,12 +41,12 @@ public DemoActor(ActorHost host, BankService bank) this.bank = bank; } - public async Task SaveData(MyData data, int? ttlInSeconds = null) + public async Task SaveData(MyData data, TimeSpan ttl) { Console.WriteLine($"This is Actor id {this.Id} with data {data}."); // Set State using StateManager, state is saved after the method execution. - await this.StateManager.SetStateAsync(StateName, data, ttlInSeconds: ttlInSeconds); + await this.StateManager.SetStateAsync(StateName, data, ttl); } public Task GetData() @@ -100,7 +100,7 @@ public async Task ReceiveReminderAsync(string reminderName, byte[] state, TimeSp // This method is invoked when an actor reminder is fired. var actorState = await this.StateManager.GetStateAsync(StateName); actorState.PropertyB = $"Reminder triggered at '{DateTime.Now:yyyy-MM-ddTHH:mm:ss}'"; - await this.StateManager.SetStateAsync(StateName, actorState, ttlInSeconds: 360); + await this.StateManager.SetStateAsync(StateName, actorState, ttl: TimeSpan.FromMinutes(5)); } class TimerParams @@ -164,7 +164,7 @@ public async Task TimerCallback(byte[] data) { var state = await this.StateManager.GetStateAsync(StateName); state.PropertyA = $"Timer triggered at '{DateTime.Now:yyyyy-MM-ddTHH:mm:s}'"; - await this.StateManager.SetStateAsync(StateName, state, ttlInSeconds: 360); + await this.StateManager.SetStateAsync(StateName, state, ttl: TimeSpan.FromMinutes(5)); var timerParams = JsonSerializer.Deserialize(data); Console.WriteLine("Timer parameter1: " + timerParams.IntParam); Console.WriteLine("Timer parameter2: " + timerParams.StringParam); diff --git a/examples/Actor/IDemoActor/IDemoActor.cs b/examples/Actor/IDemoActor/IDemoActor.cs index af24b7d03..a32178c0f 100644 --- a/examples/Actor/IDemoActor/IDemoActor.cs +++ b/examples/Actor/IDemoActor/IDemoActor.cs @@ -27,9 +27,9 @@ public interface IDemoActor : IActor /// Method to save data. /// /// DAta to save. - /// Optional TTL in seconds. + /// TTL of state key. /// A task that represents the asynchronous save operation. - Task SaveData(MyData data, int? ttlInSeconds = null); + Task SaveData(MyData data, TimeSpan ttl); /// /// Method to get data. diff --git a/src/Dapr.Actors/Runtime/ActorStateChange.cs b/src/Dapr.Actors/Runtime/ActorStateChange.cs index 32a3ca7fe..34fa68fdf 100644 --- a/src/Dapr.Actors/Runtime/ActorStateChange.cs +++ b/src/Dapr.Actors/Runtime/ActorStateChange.cs @@ -81,28 +81,5 @@ public ActorStateChange(string stateName, Type type, object value, StateChangeKi /// If null, the state will not expire. /// public DateTimeOffset? TTLExpireTime { get; } - - /// - /// Gets the time to live in seconds for the state. - /// - /// - /// The time to live for the state. - /// - /// - /// If null, the state will not expire. - /// - public int? TTLInSeconds { - get - { - if (this.TTLExpireTime.HasValue) - { - return (int)Math.Ceiling((this.TTLExpireTime.Value - DateTime.UtcNow).TotalSeconds); - } - else - { - return null; - } - } - } } } diff --git a/src/Dapr.Actors/Runtime/ActorStateManager.cs b/src/Dapr.Actors/Runtime/ActorStateManager.cs index f6c561566..e25314566 100644 --- a/src/Dapr.Actors/Runtime/ActorStateManager.cs +++ b/src/Dapr.Actors/Runtime/ActorStateManager.cs @@ -46,11 +46,11 @@ public async Task AddStateAsync(string stateName, T value, CancellationToken } } - public async Task AddStateAsync(string stateName, T value, int ttlInSeconds, CancellationToken cancellationToken) + public async Task AddStateAsync(string stateName, T value, TimeSpan ttl, CancellationToken cancellationToken) { EnsureStateProviderInitialized(); - if (!(await this.TryAddStateAsync(stateName, value, ttlInSeconds, cancellationToken))) + if (!(await this.TryAddStateAsync(stateName, value, ttl, cancellationToken))) { throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, SR.ActorStateAlreadyExists, stateName)); } @@ -87,7 +87,7 @@ public async Task TryAddStateAsync(string stateName, T value, Cancellat return true; } - public async Task TryAddStateAsync(string stateName, T value, int ttlInSeconds, CancellationToken cancellationToken = default) + public async Task TryAddStateAsync(string stateName, T value, TimeSpan ttl, CancellationToken cancellationToken = default) { ArgumentVerifier.ThrowIfNull(stateName, nameof(stateName)); @@ -102,7 +102,7 @@ public async Task TryAddStateAsync(string stateName, T value, int ttlIn // Check if the property was marked as remove in the cache if (stateMetadata.ChangeKind == StateChangeKind.Remove) { - stateChangeTracker[stateName] = StateMetadata.Create(value, StateChangeKind.Update, ttlInSeconds: ttlInSeconds); + stateChangeTracker[stateName] = StateMetadata.Create(value, StateChangeKind.Update, ttl: ttl); return true; } @@ -114,7 +114,7 @@ public async Task TryAddStateAsync(string stateName, T value, int ttlIn return false; } - stateChangeTracker[stateName] = StateMetadata.Create(value, StateChangeKind.Add, ttlInSeconds: ttlInSeconds); + stateChangeTracker[stateName] = StateMetadata.Create(value, StateChangeKind.Add, ttl: ttl); return true; } @@ -156,7 +156,7 @@ public async Task> TryGetStateAsync(string stateName, Can var conditionalResult = await this.TryGetStateFromStateProviderAsync(stateName, cancellationToken); if (conditionalResult.HasValue) { - stateChangeTracker.Add(stateName, StateMetadata.Create(conditionalResult.Value.Value, StateChangeKind.None, conditionalResult.Value.TTLExpireTime)); + stateChangeTracker.Add(stateName, StateMetadata.Create(conditionalResult.Value.Value, StateChangeKind.None, ttlExpireTime: conditionalResult.Value.TTLExpireTime)); } return new ConditionalValue(false, default); @@ -191,7 +191,7 @@ public async Task SetStateAsync(string stateName, T value, CancellationToken } } - public async Task SetStateAsync(string stateName, T value, int ttlInSeconds, CancellationToken cancellationToken) + public async Task SetStateAsync(string stateName, T value, TimeSpan ttl, CancellationToken cancellationToken) { ArgumentVerifier.ThrowIfNull(stateName, nameof(stateName)); @@ -212,11 +212,11 @@ public async Task SetStateAsync(string stateName, T value, int ttlInSeconds, } else if (await this.actor.Host.StateProvider.ContainsStateAsync(this.actorTypeName, this.actor.Id.ToString(), stateName, cancellationToken)) { - stateChangeTracker.Add(stateName, StateMetadata.Create(value, StateChangeKind.Update, ttlInSeconds: ttlInSeconds)); + stateChangeTracker.Add(stateName, StateMetadata.Create(value, StateChangeKind.Update, ttl: ttl)); } else { - stateChangeTracker[stateName] = StateMetadata.Create(value, StateChangeKind.Add, ttlInSeconds: ttlInSeconds); + stateChangeTracker[stateName] = StateMetadata.Create(value, StateChangeKind.Add, ttl: ttl); } } @@ -306,7 +306,7 @@ public async Task GetOrAddStateAsync(string stateName, T value, Cancellati return value; } - public async Task GetOrAddStateAsync(string stateName, T value, int ttlInSeconds, CancellationToken cancellationToken) + public async Task GetOrAddStateAsync(string stateName, T value, TimeSpan ttl, CancellationToken cancellationToken) { EnsureStateProviderInitialized(); @@ -320,7 +320,7 @@ public async Task GetOrAddStateAsync(string stateName, T value, int ttlInS var changeKind = this.IsStateMarkedForRemove(stateName) ? StateChangeKind.Update : StateChangeKind.Add; var stateChangeTracker = GetContextualStateTracker(); - stateChangeTracker[stateName] = StateMetadata.Create(value, changeKind, ttlInSeconds: ttlInSeconds); + stateChangeTracker[stateName] = StateMetadata.Create(value, changeKind, ttl: ttl); return value; } @@ -375,7 +375,7 @@ public async Task AddOrUpdateStateAsync( string stateName, T addValue, Func updateValueFactory, - int ttlInSeconds, + TimeSpan ttl, CancellationToken cancellationToken = default) { ArgumentVerifier.ThrowIfNull(stateName, nameof(stateName)); @@ -391,7 +391,7 @@ public async Task AddOrUpdateStateAsync( // Check if the property was marked as remove in the cache if (stateMetadata.ChangeKind == StateChangeKind.Remove) { - stateChangeTracker[stateName] = StateMetadata.Create(addValue, StateChangeKind.Update, ttlInSeconds: ttlInSeconds); + stateChangeTracker[stateName] = StateMetadata.Create(addValue, StateChangeKind.Update, ttl: ttl); return addValue; } @@ -410,12 +410,12 @@ public async Task AddOrUpdateStateAsync( if (conditionalResult.HasValue) { var newValue = updateValueFactory.Invoke(stateName, conditionalResult.Value.Value); - stateChangeTracker.Add(stateName, StateMetadata.Create(newValue, StateChangeKind.Update, ttlInSeconds: ttlInSeconds)); + stateChangeTracker.Add(stateName, StateMetadata.Create(newValue, StateChangeKind.Update, ttl: ttl)); return newValue; } - stateChangeTracker[stateName] = StateMetadata.Create(addValue, StateChangeKind.Add, ttlInSeconds: ttlInSeconds); + stateChangeTracker[stateName] = StateMetadata.Create(addValue, StateChangeKind.Add, ttl: ttl); return addValue; } @@ -529,17 +529,17 @@ private Dictionary GetContextualStateTracker() private sealed class StateMetadata { - private StateMetadata(object value, Type type, StateChangeKind changeKind, DateTimeOffset? ttlExpireTime = null, int? ttlInSeconds = null) + private StateMetadata(object value, Type type, StateChangeKind changeKind, DateTimeOffset? ttlExpireTime = null, TimeSpan? ttl = null) { this.Value = value; this.Type = type; this.ChangeKind = changeKind; - if (ttlExpireTime.HasValue && ttlInSeconds.HasValue) { - throw new ArgumentException("Cannot specify both TTLExpireTime and TTLInSeconds"); + if (ttlExpireTime.HasValue && ttl.HasValue) { + throw new ArgumentException("Cannot specify both TTLExpireTime and TTL"); } - if (ttlInSeconds.HasValue) { - this.TTLExpireTime = DateTime.UtcNow.AddSeconds(ttlInSeconds.Value); + if (ttl.HasValue) { + this.TTLExpireTime = DateTime.UtcNow.Add(ttl.Value); } else { this.TTLExpireTime = ttlExpireTime; } @@ -553,9 +553,9 @@ private StateMetadata(object value, Type type, StateChangeKind changeKind, DateT public DateTimeOffset? TTLExpireTime { get; set; } - public static StateMetadata Create(T value, StateChangeKind changeKind, DateTimeOffset? ttlExpireTime = null, int? ttlInSeconds = null) + public static StateMetadata Create(T value, StateChangeKind changeKind, DateTimeOffset? ttlExpireTime = null, TimeSpan? ttl = null) { - return new StateMetadata(value, typeof(T), changeKind, ttlExpireTime, ttlInSeconds); + return new StateMetadata(value, typeof(T), changeKind, ttlExpireTime, ttl); } public static StateMetadata CreateForRemove() diff --git a/src/Dapr.Actors/Runtime/DaprStateProvider.cs b/src/Dapr.Actors/Runtime/DaprStateProvider.cs index 9ace60fbf..a6b81cd8f 100644 --- a/src/Dapr.Actors/Runtime/DaprStateProvider.cs +++ b/src/Dapr.Actors/Runtime/DaprStateProvider.cs @@ -134,10 +134,11 @@ private async Task DoStateChangesTransactionallyAsync(string actorType, string a JsonSerializer.Serialize(writer, stateChange.Value, stateChange.Type, jsonSerializerOptions); } - var ttlInSeconds = stateChange.TTLInSeconds; - if (ttlInSeconds.HasValue) { - writer.WriteString("ttlInSeconds", ttlInSeconds.Value.ToString()); + if (stateChange.TTLExpireTime.HasValue) { + var ttl = (int)Math.Ceiling((stateChange.TTLExpireTime.Value - DateTime.UtcNow).TotalSeconds); + writer.WriteString("ttlInSeconds", ttl.ToString()); } + break; default: break; diff --git a/src/Dapr.Actors/Runtime/IActorStateManager.cs b/src/Dapr.Actors/Runtime/IActorStateManager.cs index 204cfc56a..b85fa2a06 100644 --- a/src/Dapr.Actors/Runtime/IActorStateManager.cs +++ b/src/Dapr.Actors/Runtime/IActorStateManager.cs @@ -53,7 +53,7 @@ public interface IActorStateManager /// Name of the actor state to add. /// Value of the actor state to add. /// The token to monitor for cancellation requests. - /// The time to live for the state. If null, the state will not expire. + /// The time to live for the state. /// /// A task that represents the asynchronous add operation. /// @@ -66,7 +66,7 @@ public interface IActorStateManager /// The type of state value must be /// Data Contract serializable. /// - Task AddStateAsync(string stateName, T value, int ttlInSeconds, CancellationToken cancellationToken = default); + Task AddStateAsync(string stateName, T value, TimeSpan ttl, CancellationToken cancellationToken = default); /// /// Gets an actor state with specified state name. @@ -115,7 +115,7 @@ public interface IActorStateManager /// Type of value associated with given state name. /// Name of the actor state to set. /// Value of the actor state. - /// The time to live for the state. If null, the state will not expire. + /// The time to live for the state. /// The token to monitor for cancellation requests. /// /// A task that represents the asynchronous set operation. @@ -126,7 +126,7 @@ public interface IActorStateManager /// The type of state value must be /// Data Contract serializable. /// - Task SetStateAsync(string stateName, T value, int ttlInSeconds, CancellationToken cancellationToken = default); + Task SetStateAsync(string stateName, T value, TimeSpan ttl, CancellationToken cancellationToken = default); /// /// Removes an actor state with specified state name. @@ -171,7 +171,7 @@ public interface IActorStateManager /// Type of value associated with given state name. /// Name of the actor state to add. /// Value of the actor state to add. - /// The time to live for the state. If null, the state will not expire. + /// The time to live for the state. /// The token to monitor for cancellation requests. /// This is optional and defaults to . /// @@ -186,7 +186,7 @@ public interface IActorStateManager /// The type of state value must be /// Data Contract serializable. /// - Task TryAddStateAsync(string stateName, T value, int ttlInSeconds, CancellationToken cancellationToken = default); + Task TryAddStateAsync(string stateName, T value, TimeSpan ttl, CancellationToken cancellationToken = default); /// /// Attempts to get an actor state with specified state name. @@ -262,7 +262,7 @@ public interface IActorStateManager /// Type of value associated with given state name. /// Name of the actor state to get or add. /// Value of the actor state to add if it does not exist. - /// The time to live for the state. If null, the state will not expire. + /// The time to live for the state. /// The token to monitor for cancellation requests. /// /// A task that represents the asynchronous get or add operation. The value of TResult @@ -276,7 +276,7 @@ public interface IActorStateManager /// The type of state value must be /// Data Contract serializable. /// - Task GetOrAddStateAsync(string stateName, T value, int ttlInSeconds, CancellationToken cancellationToken = default); + Task GetOrAddStateAsync(string stateName, T value, TimeSpan ttl, CancellationToken cancellationToken = default); /// /// Adds an actor state with given state name, if it does not already exist or updates @@ -307,7 +307,7 @@ public interface IActorStateManager /// Name of the actor state to add or update. /// Value of the actor state to add if it does not exist. /// Factory function to generate value of actor state to update if it exists. - /// The time to live for the state. If null, the state will not expire. + /// The time to live for the state. /// The token to monitor for cancellation requests. /// /// A task that represents the asynchronous add/update operation. The value of TResult @@ -319,7 +319,7 @@ public interface IActorStateManager /// The type of state value must be /// Data Contract serializable. /// - Task AddOrUpdateStateAsync(string stateName, T addValue, Func updateValueFactory, int ttlInSeconds, CancellationToken cancellationToken = default); + Task AddOrUpdateStateAsync(string stateName, T addValue, Func updateValueFactory, TimeSpan ttl, CancellationToken cancellationToken = default); /// /// Clears all the cached actor states and any operation(s) performed on diff --git a/test/Dapr.E2E.Test.App/Actors/StateActor.cs b/test/Dapr.E2E.Test.App/Actors/StateActor.cs index f8bff85e7..8468771ce 100644 --- a/test/Dapr.E2E.Test.App/Actors/StateActor.cs +++ b/test/Dapr.E2E.Test.App/Actors/StateActor.cs @@ -30,9 +30,9 @@ public Task GetState(string key) return this.StateManager.GetStateAsync(key); } - public Task SetState(string key, string value, int? ttlInSeconds) + public Task SetState(string key, string value, TimeSpan? ttl) { - return this.StateManager.SetStateAsync(key, value, ttlInSeconds = ttlInSeconds); + return this.StateManager.SetStateAsync(key, value, ttl: ttl); } } } From df5a6c0212a2a7e8ac189d0b8ba53c34e4d1453e Mon Sep 17 00:00:00 2001 From: joshvanl Date: Fri, 27 Oct 2023 18:13:44 +0100 Subject: [PATCH 07/15] Address feedback Signed-off-by: joshvanl --- .../Communication/ActorStateResponse.cs | 2 +- src/Dapr.Actors/Runtime/ActorStateManager.cs | 18 ++++++++++++++---- src/Dapr.Actors/Runtime/DaprStateProvider.cs | 4 ++-- 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/src/Dapr.Actors/Communication/ActorStateResponse.cs b/src/Dapr.Actors/Communication/ActorStateResponse.cs index e60878b6a..f75da0626 100644 --- a/src/Dapr.Actors/Communication/ActorStateResponse.cs +++ b/src/Dapr.Actors/Communication/ActorStateResponse.cs @@ -18,7 +18,7 @@ namespace Dapr.Actors.Communication /// /// Represents a response from fetching an actor state key. /// - internal class ActorStateResponse + class ActorStateResponse { /// /// Initializes a new instance of the class. diff --git a/src/Dapr.Actors/Runtime/ActorStateManager.cs b/src/Dapr.Actors/Runtime/ActorStateManager.cs index e25314566..0f1ac43e4 100644 --- a/src/Dapr.Actors/Runtime/ActorStateManager.cs +++ b/src/Dapr.Actors/Runtime/ActorStateManager.cs @@ -145,7 +145,7 @@ public async Task> TryGetStateAsync(string stateName, Can var stateMetadata = stateChangeTracker[stateName]; // Check if the property was marked as remove in the cache or is expired - if (stateMetadata.ChangeKind == StateChangeKind.Remove || stateMetadata.TTLExpireTime <= DateTime.UtcNow) + if (stateMetadata.ChangeKind == StateChangeKind.Remove || stateMetadata.TTLExpireTime <= DateTimeOffset.UtcNow) { return new ConditionalValue(false, default); } @@ -539,7 +539,7 @@ private StateMetadata(object value, Type type, StateChangeKind changeKind, DateT throw new ArgumentException("Cannot specify both TTLExpireTime and TTL"); } if (ttl.HasValue) { - this.TTLExpireTime = DateTime.UtcNow.Add(ttl.Value); + this.TTLExpireTime = DateTimeOffset.UtcNow.Add(ttl.Value); } else { this.TTLExpireTime = ttlExpireTime; } @@ -553,9 +553,19 @@ private StateMetadata(object value, Type type, StateChangeKind changeKind, DateT public DateTimeOffset? TTLExpireTime { get; set; } - public static StateMetadata Create(T value, StateChangeKind changeKind, DateTimeOffset? ttlExpireTime = null, TimeSpan? ttl = null) + public static StateMetadata Create(T value, StateChangeKind changeKind) { - return new StateMetadata(value, typeof(T), changeKind, ttlExpireTime, ttl); + return new StateMetadata(value, typeof(T), changeKind); + } + + public static StateMetadata Create(T value, StateChangeKind changeKind, DateTimeOffset? ttlExpireTime) + { + return new StateMetadata(value, typeof(T), changeKind, ttlExpireTime: ttlExpireTime); + } + + public static StateMetadata Create(T value, StateChangeKind changeKind, TimeSpan? ttl) + { + return new StateMetadata(value, typeof(T), changeKind, ttl: ttl); } public static StateMetadata CreateForRemove() diff --git a/src/Dapr.Actors/Runtime/DaprStateProvider.cs b/src/Dapr.Actors/Runtime/DaprStateProvider.cs index a6b81cd8f..d8d5ed2b5 100644 --- a/src/Dapr.Actors/Runtime/DaprStateProvider.cs +++ b/src/Dapr.Actors/Runtime/DaprStateProvider.cs @@ -49,7 +49,7 @@ public async Task>> TryLoadStateAsync( var result = new ConditionalValue>(false, default); var response = await this.daprInteractor.GetStateAsync(actorType, actorId, stateName, cancellationToken); - if (response.Value.Length != 0 && response.TTLExpireTime > DateTime.UtcNow) + if (response.Value.Length != 0 && response.TTLExpireTime > DateTimeOffset.UtcNow) { T typedResult; @@ -135,7 +135,7 @@ private async Task DoStateChangesTransactionallyAsync(string actorType, string a } if (stateChange.TTLExpireTime.HasValue) { - var ttl = (int)Math.Ceiling((stateChange.TTLExpireTime.Value - DateTime.UtcNow).TotalSeconds); + var ttl = (int)Math.Ceiling((stateChange.TTLExpireTime.Value - DateTimeOffset.UtcNow).TotalSeconds); writer.WriteString("ttlInSeconds", ttl.ToString()); } From 5922940ad24b6cf365ff566281077d5d1cef54b9 Mon Sep 17 00:00:00 2001 From: joshvanl Date: Tue, 31 Oct 2023 14:02:26 +0000 Subject: [PATCH 08/15] Fix TestDaprInteractor Signed-off-by: joshvanl --- .../Communication/ActorStateResponse.cs | 2 +- test/Dapr.Actors.Test/TestDaprInteractor.cs | 2 +- .../Reminders/IReminderActor.cs | 2 +- test/Dapr.E2E.Test.Actors/State/IStateActor.cs | 6 +++++- test/Dapr.E2E.Test.App/Actors/StateActor.cs | 15 ++++++++++++--- test/Dapr.E2E.Test/Actors/E2ETests.StateTests.cs | 16 ++++++++++------ 6 files changed, 30 insertions(+), 13 deletions(-) diff --git a/src/Dapr.Actors/Communication/ActorStateResponse.cs b/src/Dapr.Actors/Communication/ActorStateResponse.cs index f75da0626..22b3bf20e 100644 --- a/src/Dapr.Actors/Communication/ActorStateResponse.cs +++ b/src/Dapr.Actors/Communication/ActorStateResponse.cs @@ -18,7 +18,7 @@ namespace Dapr.Actors.Communication /// /// Represents a response from fetching an actor state key. /// - class ActorStateResponse + public class ActorStateResponse { /// /// Initializes a new instance of the class. diff --git a/test/Dapr.Actors.Test/TestDaprInteractor.cs b/test/Dapr.Actors.Test/TestDaprInteractor.cs index 92cfa7096..1bfd46e5e 100644 --- a/test/Dapr.Actors.Test/TestDaprInteractor.cs +++ b/test/Dapr.Actors.Test/TestDaprInteractor.cs @@ -81,7 +81,7 @@ public Task SaveStateTransactionallyAsync(string actorType, string actorId, stri /// Name of key to get value for. /// Cancels the operation. /// A task that represents the asynchronous operation. - public Task GetStateAsync(string actorType, string actorId, string keyName, CancellationToken cancellationToken = default) + public Task> GetStateAsync(string actorType, string actorId, string keyName, CancellationToken cancellationToken = default) { throw new System.NotImplementedException(); } diff --git a/test/Dapr.E2E.Test.Actors/Reminders/IReminderActor.cs b/test/Dapr.E2E.Test.Actors/Reminders/IReminderActor.cs index 0bf57f64c..c0e3f86a2 100644 --- a/test/Dapr.E2E.Test.Actors/Reminders/IReminderActor.cs +++ b/test/Dapr.E2E.Test.Actors/Reminders/IReminderActor.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/test/Dapr.E2E.Test.Actors/State/IStateActor.cs b/test/Dapr.E2E.Test.Actors/State/IStateActor.cs index e453f7fa3..f19122102 100644 --- a/test/Dapr.E2E.Test.Actors/State/IStateActor.cs +++ b/test/Dapr.E2E.Test.Actors/State/IStateActor.cs @@ -18,5 +18,9 @@ namespace Dapr.E2E.Test.Actors.State { public interface IStateActor : IPingActor, IActor - { } + { + Task GetState(string key); + + Task SetState(string key, string value, TimeSpan? ttl); + } } diff --git a/test/Dapr.E2E.Test.App/Actors/StateActor.cs b/test/Dapr.E2E.Test.App/Actors/StateActor.cs index 8468771ce..71a952e0f 100644 --- a/test/Dapr.E2E.Test.App/Actors/StateActor.cs +++ b/test/Dapr.E2E.Test.App/Actors/StateActor.cs @@ -18,21 +18,30 @@ namespace Dapr.E2E.Test.Actors.State { - public class StateActor : Actor + public class StateActor : Actor, IStateActor { public StateActor(ActorHost host) : base(host) { } + public Task Ping() + { + return Task.CompletedTask; + } + public Task GetState(string key) { return this.StateManager.GetStateAsync(key); } - public Task SetState(string key, string value, TimeSpan? ttl) + public Task SetState(string key, string value, TimeSpan? ttl) { - return this.StateManager.SetStateAsync(key, value, ttl: ttl); + if (ttl.HasValue) + { + return this.StateManager.SetStateAsync(key, value, ttl: ttl.Value); + } + return this.StateManager.SetStateAsync(key, value); } } } diff --git a/test/Dapr.E2E.Test/Actors/E2ETests.StateTests.cs b/test/Dapr.E2E.Test/Actors/E2ETests.StateTests.cs index 853acf944..735d1594a 100644 --- a/test/Dapr.E2E.Test/Actors/E2ETests.StateTests.cs +++ b/test/Dapr.E2E.Test/Actors/E2ETests.StateTests.cs @@ -30,15 +30,19 @@ public async Task ActorCanSaveStateWithTTL() await WaitForActorRuntimeAsync(proxy, cts.Token); - await proxy.SaveState("key", "value", 2, cts.Token); + await proxy.SetState("key", "value", TimeSpan.FromSeconds(2)); - state = await.proxy.GetState("key", cts.Token); - Assert.Equal("value", state.Value); + var resp = await proxy.GetState("key"); + Assert.Equal("value", resp); - await Task.Delay(TimeSpan.FromSeconds(2), cts.Token); + await Task.Delay(TimeSpan.FromSeconds(2)); - state = await.proxy.GetState("key", cts.Token); - Assert.Null(state.Value); + resp = await proxy.GetState("key"); + Assert.Null(resp); + + await proxy.SetState("key", "new-value", null); + resp = await proxy.GetState("key"); + Assert.Equal("new-value", resp); } } } From 982d1004ef069699d329d5c360a45edac9ecefcc Mon Sep 17 00:00:00 2001 From: joshvanl Date: Mon, 13 Nov 2023 13:56:05 +0000 Subject: [PATCH 09/15] Revert github actions Dapr CI changes Signed-off-by: joshvanl --- .github/workflows/itests.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/itests.yml b/.github/workflows/itests.yml index 67674681f..870264f40 100644 --- a/.github/workflows/itests.yml +++ b/.github/workflows/itests.yml @@ -37,11 +37,11 @@ jobs: GOOS: linux GOARCH: amd64 GOPROXY: https://proxy.golang.org - DAPR_CLI_VER: 1.12.0 - DAPR_RUNTIME_VER: 1.12.0 - DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/release-1.12/install/install.sh + DAPR_CLI_VER: 1.9.1 + DAPR_RUNTIME_VER: 1.10.5 + DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/3dacfb672d55f1436c249057aaebbe597e1066f3/install/install.sh DAPR_CLI_REF: '' - DAPR_REF: '2149fca96cdf11627c387bda26dcc027d1c47354' + DAPR_REF: '4181de0edc65fc98a836ae7abc6042c575c8fae5' steps: - name: Set up Dapr CLI run: wget -q ${{ env.DAPR_INSTALL_URL }} -O - | /bin/bash -s ${{ env.DAPR_CLI_VER }} From 5bb19da32056a8c74d76b5bd82a4b2b61095fd97 Mon Sep 17 00:00:00 2001 From: joshvanl Date: Mon, 13 Nov 2023 17:52:45 +0000 Subject: [PATCH 10/15] Register StateActor in E2E Signed-off-by: joshvanl --- test/Dapr.E2E.Test.App/Startup.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/test/Dapr.E2E.Test.App/Startup.cs b/test/Dapr.E2E.Test.App/Startup.cs index d1f291bf9..948dc2c43 100644 --- a/test/Dapr.E2E.Test.App/Startup.cs +++ b/test/Dapr.E2E.Test.App/Startup.cs @@ -98,6 +98,7 @@ public void ConfigureServices(IServiceCollection services) options.Actors.RegisterActor(); options.Actors.RegisterActor(); options.Actors.RegisterActor(); + options.Actors.RegisterActor(); }); } From c4135c75c637b8f349899b7fad5a20ba896a8781 Mon Sep 17 00:00:00 2001 From: joshvanl Date: Mon, 13 Nov 2023 18:24:28 +0000 Subject: [PATCH 11/15] Add State namespace Signed-off-by: joshvanl --- test/Dapr.E2E.Test.App/Startup.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/test/Dapr.E2E.Test.App/Startup.cs b/test/Dapr.E2E.Test.App/Startup.cs index 948dc2c43..8412e57c1 100644 --- a/test/Dapr.E2E.Test.App/Startup.cs +++ b/test/Dapr.E2E.Test.App/Startup.cs @@ -16,6 +16,7 @@ namespace Dapr.E2E.Test using Dapr.E2E.Test.Actors.Reentrancy; using Dapr.E2E.Test.Actors.Reminders; using Dapr.E2E.Test.Actors.Timers; + using Dapr.E2E.Test.Actors.State; using Dapr.E2E.Test.Actors.ExceptionTesting; using Dapr.E2E.Test.Actors.Serialization; using Dapr.E2E.Test.App.ErrorTesting; From cd58e97b89244f82db24e62ed70e52ca38572160 Mon Sep 17 00:00:00 2001 From: joshvanl Date: Tue, 14 Nov 2023 16:24:13 +0000 Subject: [PATCH 12/15] Fix Actor state test TTL exception check Signed-off-by: joshvanl --- test/Dapr.E2E.Test/Actors/E2ETests.StateTests.cs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/test/Dapr.E2E.Test/Actors/E2ETests.StateTests.cs b/test/Dapr.E2E.Test/Actors/E2ETests.StateTests.cs index 735d1594a..2db7e3a2c 100644 --- a/test/Dapr.E2E.Test/Actors/E2ETests.StateTests.cs +++ b/test/Dapr.E2E.Test/Actors/E2ETests.StateTests.cs @@ -37,8 +37,11 @@ public async Task ActorCanSaveStateWithTTL() await Task.Delay(TimeSpan.FromSeconds(2)); - resp = await proxy.GetState("key"); - Assert.Null(resp); + // Assert key no longer exists. + try { + await proxy.GetState("key"); + Assert.True(false, "Expected exception"); + } catch (Exception) { } await proxy.SetState("key", "new-value", null); resp = await proxy.GetState("key"); From 6d5728da865f28f8c749a53d3a735cf9458cdf9b Mon Sep 17 00:00:00 2001 From: joshvanl Date: Fri, 17 Nov 2023 20:24:38 +0000 Subject: [PATCH 13/15] Fix-up actor ttl state bugs and add unit tests Signed-off-by: joshvanl --- src/Dapr.Actors/DaprHttpInteractor.cs | 2 +- src/Dapr.Actors/Runtime/ActorStateManager.cs | 19 +- src/Dapr.Actors/Runtime/DaprStateProvider.cs | 5 +- .../Dapr.Actors.Test/ActorCodeBuilderTests.cs | 2 +- .../Dapr.Actors.Test/ActorStateManagerTest.cs | 199 ++++++++++++++++++ .../DaprHttpInteractorTest.cs | 57 ++++- .../Dapr.Actors.Test/DaprStateProviderTest.cs | 125 +++++++++++ test/Dapr.Actors.Test/TestDaprInteractor.cs | 8 +- test/Dapr.Client.Test/StateApiTest.cs | 2 +- .../Actors/E2ETests.StateTests.cs | 80 ++++++- 10 files changed, 481 insertions(+), 18 deletions(-) create mode 100644 test/Dapr.Actors.Test/ActorStateManagerTest.cs create mode 100644 test/Dapr.Actors.Test/DaprStateProviderTest.cs diff --git a/src/Dapr.Actors/DaprHttpInteractor.cs b/src/Dapr.Actors/DaprHttpInteractor.cs index b8cc74cdb..2565bab62 100644 --- a/src/Dapr.Actors/DaprHttpInteractor.cs +++ b/src/Dapr.Actors/DaprHttpInteractor.cs @@ -73,7 +73,7 @@ HttpRequestMessage RequestFunc() using var response = await this.SendAsync(RequestFunc, relativeUrl, cancellationToken); var stringResponse = await response.Content.ReadAsStringAsync(); - var ttlExpireTime = new DateTime(); + DateTimeOffset? ttlExpireTime = null; if (response.Headers.TryGetValues(Constants.TTLResponseHeaderName, out IEnumerable headerValues)) { var ttlExpireTimeString = headerValues.First(); diff --git a/src/Dapr.Actors/Runtime/ActorStateManager.cs b/src/Dapr.Actors/Runtime/ActorStateManager.cs index 0f1ac43e4..111bb80f4 100644 --- a/src/Dapr.Actors/Runtime/ActorStateManager.cs +++ b/src/Dapr.Actors/Runtime/ActorStateManager.cs @@ -68,8 +68,8 @@ public async Task TryAddStateAsync(string stateName, T value, Cancellat { var stateMetadata = stateChangeTracker[stateName]; - // Check if the property was marked as remove in the cache - if (stateMetadata.ChangeKind == StateChangeKind.Remove) + // Check if the property was marked as remove or is expired in the cache + if (stateMetadata.ChangeKind == StateChangeKind.Remove || (stateMetadata.TTLExpireTime.HasValue && stateMetadata.TTLExpireTime.Value <= DateTimeOffset.UtcNow)) { stateChangeTracker[stateName] = StateMetadata.Create(value, StateChangeKind.Update); return true; @@ -99,8 +99,8 @@ public async Task TryAddStateAsync(string stateName, T value, TimeSpan { var stateMetadata = stateChangeTracker[stateName]; - // Check if the property was marked as remove in the cache - if (stateMetadata.ChangeKind == StateChangeKind.Remove) + // Check if the property was marked as remove in the cache or has been expired. + if (stateMetadata.ChangeKind == StateChangeKind.Remove || (stateMetadata.TTLExpireTime.HasValue && stateMetadata.TTLExpireTime.Value <= DateTimeOffset.UtcNow)) { stateChangeTracker[stateName] = StateMetadata.Create(value, StateChangeKind.Update, ttl: ttl); return true; @@ -145,7 +145,7 @@ public async Task> TryGetStateAsync(string stateName, Can var stateMetadata = stateChangeTracker[stateName]; // Check if the property was marked as remove in the cache or is expired - if (stateMetadata.ChangeKind == StateChangeKind.Remove || stateMetadata.TTLExpireTime <= DateTimeOffset.UtcNow) + if (stateMetadata.ChangeKind == StateChangeKind.Remove || (stateMetadata.TTLExpireTime.HasValue && stateMetadata.TTLExpireTime.Value <= DateTimeOffset.UtcNow)) { return new ConditionalValue(false, default); } @@ -157,6 +157,7 @@ public async Task> TryGetStateAsync(string stateName, Can if (conditionalResult.HasValue) { stateChangeTracker.Add(stateName, StateMetadata.Create(conditionalResult.Value.Value, StateChangeKind.None, ttlExpireTime: conditionalResult.Value.TTLExpireTime)); + return new ConditionalValue(true, conditionalResult.Value.Value); } return new ConditionalValue(false, default); @@ -174,6 +175,7 @@ public async Task SetStateAsync(string stateName, T value, CancellationToken { var stateMetadata = stateChangeTracker[stateName]; stateMetadata.Value = value; + stateMetadata.TTLExpireTime = null; if (stateMetadata.ChangeKind == StateChangeKind.None || stateMetadata.ChangeKind == StateChangeKind.Remove) @@ -203,6 +205,7 @@ public async Task SetStateAsync(string stateName, T value, TimeSpan ttl, Canc { var stateMetadata = stateChangeTracker[stateName]; stateMetadata.Value = value; + stateMetadata.TTLExpireTime = DateTimeOffset.UtcNow.Add(ttl); if (stateMetadata.ChangeKind == StateChangeKind.None || stateMetadata.ChangeKind == StateChangeKind.Remove) @@ -242,6 +245,12 @@ public async Task TryRemoveStateAsync(string stateName, CancellationToken { var stateMetadata = stateChangeTracker[stateName]; + if (stateMetadata.TTLExpireTime.HasValue && stateMetadata.TTLExpireTime.Value <= DateTimeOffset.UtcNow) + { + stateChangeTracker.Remove(stateName); + return false; + } + switch (stateMetadata.ChangeKind) { case StateChangeKind.Remove: diff --git a/src/Dapr.Actors/Runtime/DaprStateProvider.cs b/src/Dapr.Actors/Runtime/DaprStateProvider.cs index d8d5ed2b5..84ab693a6 100644 --- a/src/Dapr.Actors/Runtime/DaprStateProvider.cs +++ b/src/Dapr.Actors/Runtime/DaprStateProvider.cs @@ -49,7 +49,7 @@ public async Task>> TryLoadStateAsync( var result = new ConditionalValue>(false, default); var response = await this.daprInteractor.GetStateAsync(actorType, actorId, stateName, cancellationToken); - if (response.Value.Length != 0 && response.TTLExpireTime > DateTimeOffset.UtcNow) + if (response.Value.Length != 0 && (!response.TTLExpireTime.HasValue || response.TTLExpireTime.Value > DateTimeOffset.UtcNow)) { T typedResult; @@ -136,7 +136,10 @@ private async Task DoStateChangesTransactionallyAsync(string actorType, string a if (stateChange.TTLExpireTime.HasValue) { var ttl = (int)Math.Ceiling((stateChange.TTLExpireTime.Value - DateTimeOffset.UtcNow).TotalSeconds); + writer.WritePropertyName("metadata"); + writer.WriteStartObject(); writer.WriteString("ttlInSeconds", ttl.ToString()); + writer.WriteEndObject(); } break; diff --git a/test/Dapr.Actors.Test/ActorCodeBuilderTests.cs b/test/Dapr.Actors.Test/ActorCodeBuilderTests.cs index 93f6ba92f..6bb3c827d 100644 --- a/test/Dapr.Actors.Test/ActorCodeBuilderTests.cs +++ b/test/Dapr.Actors.Test/ActorCodeBuilderTests.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/test/Dapr.Actors.Test/ActorStateManagerTest.cs b/test/Dapr.Actors.Test/ActorStateManagerTest.cs new file mode 100644 index 000000000..a6517a6b4 --- /dev/null +++ b/test/Dapr.Actors.Test/ActorStateManagerTest.cs @@ -0,0 +1,199 @@ +// ------------------------------------------------------------------------ +// Copyright 2023 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Actors.Test +{ + using System; + using System.Globalization; + using System.Linq; + using System.Net; + using System.Net.Http; + using System.Security; + using System.Security.Authentication; + using System.Text.Json; + using System.Threading; + using System.Threading.Tasks; + using System.Collections.Generic; + using FluentAssertions; + using Xunit; + using Dapr.Actors.Communication; + using Dapr.Actors.Runtime; + using Moq; + + /// + /// Contains tests for ActorStateManager. + /// + public class ActorStateManagerTest + { + [Fact] + public async Task SetGet() + { + var interactor = new Mock(); + var host = ActorHost.CreateForTest(); + host.StateProvider = new DaprStateProvider(interactor.Object, new JsonSerializerOptions()); + var mngr = new ActorStateManager(new TestActor(host)); + var token = new CancellationToken(); + + interactor + .Setup(d => d.GetStateAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(new ActorStateResponse("", null))); + + await mngr.AddStateAsync("key1", "value1", token); + await mngr.AddStateAsync("key2", "value2", token); + Assert.Equal("value1", await mngr.GetStateAsync("key1", token)); + Assert.Equal("value2", await mngr.GetStateAsync("key2", token)); + + await Assert.ThrowsAsync(() => mngr.AddStateAsync("key1", "value3", token)); + await Assert.ThrowsAsync(() => mngr.AddStateAsync("key2", "value4", token)); + + await mngr.SetStateAsync("key1", "value5", token); + await mngr.SetStateAsync("key2", "value6", token); + Assert.Equal("value5", await mngr.GetStateAsync("key1", token)); + Assert.Equal("value6", await mngr.GetStateAsync("key2", token)); + } + + [Fact] + public async Task StateWithTTL() + { + var interactor = new Mock(); + var host = ActorHost.CreateForTest(); + host.StateProvider = new DaprStateProvider(interactor.Object, new JsonSerializerOptions()); + var mngr = new ActorStateManager(new TestActor(host)); + var token = new CancellationToken(); + + interactor + .Setup(d => d.GetStateAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(new ActorStateResponse("", null))); + + await mngr.AddStateAsync("key1", "value1", TimeSpan.FromSeconds(1), token); + await mngr.AddStateAsync("key2", "value2", TimeSpan.FromSeconds(1), token); + Assert.Equal("value1", await mngr.GetStateAsync("key1", token)); + Assert.Equal("value2", await mngr.GetStateAsync("key2", token)); + + await Task.Delay(TimeSpan.FromSeconds(1.5)); + + await Assert.ThrowsAsync(() => mngr.GetStateAsync("key1", token)); + await Assert.ThrowsAsync(() => mngr.GetStateAsync("key2", token)); + + // Should be able to add state again after expiry and should not expire. + await mngr.AddStateAsync("key1", "value1", token); + await mngr.AddStateAsync("key2", "value2", token); + Assert.Equal("value1", await mngr.GetStateAsync("key1", token)); + Assert.Equal("value2", await mngr.GetStateAsync("key2", token)); + await Task.Delay(TimeSpan.FromSeconds(1.5)); + Assert.Equal("value1", await mngr.GetStateAsync("key1", token)); + Assert.Equal("value2", await mngr.GetStateAsync("key2", token)); + } + + [Fact] + public async Task StateRemoveAddTTL() + { + var interactor = new Mock(); + var host = ActorHost.CreateForTest(); + host.StateProvider = new DaprStateProvider(interactor.Object, new JsonSerializerOptions()); + var mngr = new ActorStateManager(new TestActor(host)); + var token = new CancellationToken(); + + interactor + .Setup(d => d.GetStateAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(new ActorStateResponse("", null))); + + await mngr.AddStateAsync("key1", "value1", TimeSpan.FromSeconds(1), token); + await mngr.AddStateAsync("key2", "value2", TimeSpan.FromSeconds(1), token); + Assert.Equal("value1", await mngr.GetStateAsync("key1", token)); + Assert.Equal("value2", await mngr.GetStateAsync("key2", token)); + + await mngr.SetStateAsync("key1", "value1", token); + await mngr.SetStateAsync("key2", "value2", token); + Assert.Equal("value1", await mngr.GetStateAsync("key1", token)); + Assert.Equal("value2", await mngr.GetStateAsync("key2", token)); + + // TTL is removed so state should not expire. + await Task.Delay(TimeSpan.FromSeconds(1.5)); + Assert.Equal("value1", await mngr.GetStateAsync("key1", token)); + Assert.Equal("value2", await mngr.GetStateAsync("key2", token)); + + // Adding TTL back should expire state. + await mngr.SetStateAsync("key1", "value1", TimeSpan.FromSeconds(1), token); + await mngr.SetStateAsync("key2", "value2", TimeSpan.FromSeconds(1), token); + Assert.Equal("value1", await mngr.GetStateAsync("key1", token)); + Assert.Equal("value2", await mngr.GetStateAsync("key2", token)); + await Task.Delay(TimeSpan.FromSeconds(1.5)); + await Assert.ThrowsAsync(() => mngr.GetStateAsync("key1", token)); + await Assert.ThrowsAsync(() => mngr.GetStateAsync("key2", token)); + } + + [Fact] + public async Task StateDaprdExpireTime() + { + var interactor = new Mock(); + var host = ActorHost.CreateForTest(); + host.StateProvider = new DaprStateProvider(interactor.Object, new JsonSerializerOptions()); + var mngr = new ActorStateManager(new TestActor(host)); + var token = new CancellationToken(); + + // Existing key which has an expiry time. + interactor + .Setup(d => d.GetStateAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(new ActorStateResponse("\"value1\"", DateTime.UtcNow.AddSeconds(1)))); + + await Assert.ThrowsAsync(() => mngr.AddStateAsync("key1", "value3", token)); + Assert.Equal("value1", await mngr.GetStateAsync("key1", token)); + + // No longer return the value from the state provider. + interactor + .Setup(d => d.GetStateAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(new ActorStateResponse("", null))); + + // Key should be expired after 1 seconds. + await Task.Delay(TimeSpan.FromSeconds(1.5)); + await Assert.ThrowsAsync(() => mngr.GetStateAsync("key1", token)); + await Assert.ThrowsAsync(() => mngr.RemoveStateAsync("key1", token)); + await mngr.AddStateAsync("key1", "value2", TimeSpan.FromSeconds(1), token); + Assert.Equal("value2", await mngr.GetStateAsync("key1", token)); + } + + [Fact] + public async Task RemoveState() + { + var interactor = new Mock(); + var host = ActorHost.CreateForTest(); + host.StateProvider = new DaprStateProvider(interactor.Object, new JsonSerializerOptions()); + var mngr = new ActorStateManager(new TestActor(host)); + var token = new CancellationToken(); + + interactor + .Setup(d => d.GetStateAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(new ActorStateResponse("", null))); + + await Assert.ThrowsAsync(() => mngr.RemoveStateAsync("key1", token)); + + await mngr.AddStateAsync("key1", "value1", token); + await mngr.AddStateAsync("key2", "value2", token); + Assert.Equal("value1", await mngr.GetStateAsync("key1", token)); + Assert.Equal("value2", await mngr.GetStateAsync("key2", token)); + + await mngr.RemoveStateAsync("key1", token); + await mngr.RemoveStateAsync("key2", token); + + await Assert.ThrowsAsync(() => mngr.GetStateAsync("key1", token)); + await Assert.ThrowsAsync(() => mngr.GetStateAsync("key2", token)); + + // Should be able to add state again after removal. + await mngr.AddStateAsync("key1", "value1", TimeSpan.FromSeconds(1), token); + await mngr.AddStateAsync("key2", "value2", TimeSpan.FromSeconds(1), token); + Assert.Equal("value1", await mngr.GetStateAsync("key1", token)); + Assert.Equal("value2", await mngr.GetStateAsync("key2", token)); + } + } +} diff --git a/test/Dapr.Actors.Test/DaprHttpInteractorTest.cs b/test/Dapr.Actors.Test/DaprHttpInteractorTest.cs index 80dae342f..21c142267 100644 --- a/test/Dapr.Actors.Test/DaprHttpInteractorTest.cs +++ b/test/Dapr.Actors.Test/DaprHttpInteractorTest.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -13,6 +13,7 @@ namespace Dapr.Actors.Test { + using System; using System.Globalization; using System.Linq; using System.Net; @@ -23,6 +24,7 @@ namespace Dapr.Actors.Test using System.Threading.Tasks; using FluentAssertions; using Xunit; + using Dapr.Actors.Communication; /// /// Contains tests for DaprHttpInteractor. @@ -350,5 +352,58 @@ public async Task InvokeActorMethodOmitsReentrancyIdIfNotSet_ValidateHeaders() request.Dismiss(); Assert.False(request.Request.Headers.Contains(Constants.ReentrancyRequestHeaderName)); } + + [Fact] + public async Task GetState_TTLExpireTimeExists() + { + await using var client = TestClient.CreateForDaprHttpInterator(); + + var actorType = "ActorType_Test"; + var actorId = "ActorId_Test"; + var keyName = "StateKey_Test"; + + var request = await client.CaptureHttpRequestAsync(async httpInteractor => + { + return await httpInteractor.GetStateAsync(actorType, actorId, keyName); + }); + + var message = new HttpResponseMessage(HttpStatusCode.OK) + { + Content = new StringContent("test"), + Headers = + { + { "Metadata.ttlExpireTime", "2023-04-05T23:22:21Z" }, + }, + }; + + var actual = await request.CompleteAsync(message); + Assert.Equal("test", actual.Value); + var expTTL = new DateTimeOffset(2023, 04, 05, 23, 22, 21, 0, new GregorianCalendar(), new TimeSpan(0, 0, 0)); + Assert.Equal(expTTL, actual.TTLExpireTime); + } + + [Fact] + public async Task GetState_TTLExpireTimeNotExists() + { + await using var client = TestClient.CreateForDaprHttpInterator(); + + var actorType = "ActorType_Test"; + var actorId = "ActorId_Test"; + var keyName = "StateKey_Test"; + + var request = await client.CaptureHttpRequestAsync(async httpInteractor => + { + return await httpInteractor.GetStateAsync(actorType, actorId, keyName); + }); + + var message = new HttpResponseMessage(HttpStatusCode.OK) + { + Content = new StringContent("test"), + }; + + var actual = await request.CompleteAsync(message); + Assert.Equal("test", actual.Value); + Assert.False(actual.TTLExpireTime.HasValue); + } } } diff --git a/test/Dapr.Actors.Test/DaprStateProviderTest.cs b/test/Dapr.Actors.Test/DaprStateProviderTest.cs new file mode 100644 index 000000000..61f0ad14c --- /dev/null +++ b/test/Dapr.Actors.Test/DaprStateProviderTest.cs @@ -0,0 +1,125 @@ +// ------------------------------------------------------------------------ +// Copyright 2023 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Actors.Test +{ + using System; + using System.Globalization; + using System.Linq; + using System.Net; + using System.Net.Http; + using System.Security; + using System.Security.Authentication; + using System.Text.Json; + using System.Threading; + using System.Threading.Tasks; + using System.Collections.Generic; + using FluentAssertions; + using Xunit; + using Dapr.Actors.Communication; + using Dapr.Actors.Runtime; + using Moq; + + /// + /// Contains tests for DaprStateProvider. + /// + public class DaprStateProviderTest + { + [Fact] + public async Task SaveStateAsync() + { + var interactor = new Mock(); + var provider = new DaprStateProvider(interactor.Object, new JsonSerializerOptions()); + var token = new CancellationToken(); + + var stateChangeList = new List(); + stateChangeList.Add( + new ActorStateChange("key1", typeof(string), "value1", StateChangeKind.Add, DateTimeOffset.UtcNow.Add(TimeSpan.FromSeconds(2)))); + stateChangeList.Add( + new ActorStateChange("key2", typeof(string), "value2", StateChangeKind.Add, null)); + + string content = null; + interactor + .Setup(d => d.SaveStateTransactionallyAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny())) + .Callback((actorType, actorId, data, token) => content = data) + .Returns(Task.FromResult(true)); + + await provider.SaveStateAsync("actorType", "actorId", stateChangeList, token); + Assert.Equal( + "[{\"operation\":\"upsert\",\"request\":{\"key\":\"key1\",\"value\":\"value1\",\"metadata\":{\"ttlInSeconds\":\"2\"}}},{\"operation\":\"upsert\",\"request\":{\"key\":\"key2\",\"value\":\"value2\"}}]", + content + ); + } + + [Fact] + public async Task ContainsStateAsync() + { + var interactor = new Mock(); + var provider = new DaprStateProvider(interactor.Object, new JsonSerializerOptions()); + var token = new CancellationToken(); + + interactor + .Setup(d => d.GetStateAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(new ActorStateResponse("", null))); + Assert.False(await provider.ContainsStateAsync("actorType", "actorId", "key", token)); + + interactor + .Setup(d => d.GetStateAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(new ActorStateResponse("\"value\"", null))); + Assert.True(await provider.ContainsStateAsync("actorType", "actorId", "key", token)); + } + + [Fact] + public async Task TryLoadStateAsync() + { + var interactor = new Mock(); + var provider = new DaprStateProvider(interactor.Object, new JsonSerializerOptions()); + var token = new CancellationToken(); + + interactor + .Setup(d => d.GetStateAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(new ActorStateResponse("", null))); + var resp = await provider.TryLoadStateAsync("actorType", "actorId", "key", token); + Assert.False(resp.HasValue); + + interactor + .Setup(d => d.GetStateAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(new ActorStateResponse("\"value\"", null))); + resp = await provider.TryLoadStateAsync("actorType", "actorId", "key", token); + Assert.True(resp.HasValue); + Assert.Equal("value", resp.Value.Value); + Assert.False(resp.Value.TTLExpireTime.HasValue); + + var ttl = DateTime.UtcNow.AddSeconds(1); + interactor + .Setup(d => d.GetStateAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(new ActorStateResponse("\"value\"", ttl))); + resp = await provider.TryLoadStateAsync("actorType", "actorId", "key", token); + Assert.True(resp.HasValue); + Assert.Equal("value", resp.Value.Value); + Assert.True(resp.Value.TTLExpireTime.HasValue); + Assert.Equal(ttl, resp.Value.TTLExpireTime.Value); + + ttl = DateTime.UtcNow.AddSeconds(-1); + interactor + .Setup(d => d.GetStateAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(new ActorStateResponse("\"value\"", ttl))); + resp = await provider.TryLoadStateAsync("actorType", "actorId", "key", token); + Assert.False(resp.HasValue); + } + } +} diff --git a/test/Dapr.Actors.Test/TestDaprInteractor.cs b/test/Dapr.Actors.Test/TestDaprInteractor.cs index 1bfd46e5e..11f88e684 100644 --- a/test/Dapr.Actors.Test/TestDaprInteractor.cs +++ b/test/Dapr.Actors.Test/TestDaprInteractor.cs @@ -67,10 +67,10 @@ public Task InvokeActorMethodWithoutRemotingAsync(string actorType, stri /// JSON data with state changes as per the Dapr spec for transaction state update. /// Cancels the operation. /// A task that represents the asynchronous operation. - public Task SaveStateTransactionallyAsync(string actorType, string actorId, string data, + public virtual async Task SaveStateTransactionallyAsync(string actorType, string actorId, string data, CancellationToken cancellationToken = default) { - throw new System.NotImplementedException(); + await _testDaprInteractor.SaveStateTransactionallyAsync(actorType, actorId, data); } /// @@ -81,9 +81,9 @@ public Task SaveStateTransactionallyAsync(string actorType, string actorId, stri /// Name of key to get value for. /// Cancels the operation. /// A task that represents the asynchronous operation. - public Task> GetStateAsync(string actorType, string actorId, string keyName, CancellationToken cancellationToken = default) + public virtual async Task> GetStateAsync(string actorType, string actorId, string keyName, CancellationToken cancellationToken = default) { - throw new System.NotImplementedException(); + return await _testDaprInteractor.GetStateAsync(actorType, actorId, keyName); } /// diff --git a/test/Dapr.Client.Test/StateApiTest.cs b/test/Dapr.Client.Test/StateApiTest.cs index 90c06e6b1..cfa664663 100644 --- a/test/Dapr.Client.Test/StateApiTest.cs +++ b/test/Dapr.Client.Test/StateApiTest.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/test/Dapr.E2E.Test/Actors/E2ETests.StateTests.cs b/test/Dapr.E2E.Test/Actors/E2ETests.StateTests.cs index 2db7e3a2c..25d0c70bf 100644 --- a/test/Dapr.E2E.Test/Actors/E2ETests.StateTests.cs +++ b/test/Dapr.E2E.Test/Actors/E2ETests.StateTests.cs @@ -38,14 +38,86 @@ public async Task ActorCanSaveStateWithTTL() await Task.Delay(TimeSpan.FromSeconds(2)); // Assert key no longer exists. - try { - await proxy.GetState("key"); - Assert.True(false, "Expected exception"); - } catch (Exception) { } + await Assert.ThrowsAsync(() => proxy.GetState("key")); + // Can create key again await proxy.SetState("key", "new-value", null); resp = await proxy.GetState("key"); Assert.Equal("new-value", resp); } + + [Fact] + public async Task ActorStateTTLOverridesExisting() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); + var proxy = this.ProxyFactory.CreateActorProxy(ActorId.CreateRandom(), "StateActor"); + + await WaitForActorRuntimeAsync(proxy, cts.Token); + + // TLL 4 seconds + await proxy.SetState("key", "value", TimeSpan.FromSeconds(4)); + + var resp = await proxy.GetState("key"); + Assert.Equal("value", resp); + + // TLL 2 seconds + await Task.Delay(TimeSpan.FromSeconds(2)); + resp = await proxy.GetState("key"); + Assert.Equal("value", resp); + + // TLL 4 seconds + await proxy.SetState("key", "value", TimeSpan.FromSeconds(4)); + + // TLL 2 seconds + await Task.Delay(TimeSpan.FromSeconds(2)); + resp = await proxy.GetState("key"); + Assert.Equal("value", resp); + + // TLL 0 seconds + await Task.Delay(TimeSpan.FromSeconds(2.5)); + + // Assert key no longer exists. + await Assert.ThrowsAsync(() => proxy.GetState("key")); + } + + [Fact] + public async Task ActorStateTTLRemoveTTL() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); + var proxy = this.ProxyFactory.CreateActorProxy(ActorId.CreateRandom(), "StateActor"); + + await WaitForActorRuntimeAsync(proxy, cts.Token); + + // Can remove TTL and then add again + await proxy.SetState("key", "value", TimeSpan.FromSeconds(2)); + await proxy.SetState("key", "value", null); + await Task.Delay(TimeSpan.FromSeconds(2)); + var resp = await proxy.GetState("key"); + Assert.Equal("value", resp); + await proxy.SetState("key", "value", TimeSpan.FromSeconds(2)); + await Task.Delay(TimeSpan.FromSeconds(2)); + await Assert.ThrowsAsync(() => proxy.GetState("key")); + } + + [Fact] + public async Task ActorStateBetweenProxies() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); + var actorId = ActorId.CreateRandom(); + var proxy1 = this.ProxyFactory.CreateActorProxy(actorId, "StateActor"); + var proxy2 = this.ProxyFactory.CreateActorProxy(actorId, "StateActor"); + + await WaitForActorRuntimeAsync(proxy1, cts.Token); + + await proxy1.SetState("key", "value", TimeSpan.FromSeconds(2)); + var resp = await proxy1.GetState("key"); + Assert.Equal("value", resp); + resp = await proxy2.GetState("key"); + Assert.Equal("value", resp); + + await Task.Delay(TimeSpan.FromSeconds(2)); + await Assert.ThrowsAsync(() => proxy1.GetState("key")); + await Assert.ThrowsAsync(() => proxy2.GetState("key")); + } } } From ba7ab5e9d40bc8a88a1c37f46709c2ec40abbb84 Mon Sep 17 00:00:00 2001 From: joshvanl Date: Fri, 17 Nov 2023 20:29:45 +0000 Subject: [PATCH 14/15] Return false in state provider if key is expired Signed-off-by: joshvanl --- src/Dapr.Actors/Runtime/DaprStateProvider.cs | 2 +- test/Dapr.Actors.Test/DaprStateProviderTest.cs | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/src/Dapr.Actors/Runtime/DaprStateProvider.cs b/src/Dapr.Actors/Runtime/DaprStateProvider.cs index 84ab693a6..e81308dbd 100644 --- a/src/Dapr.Actors/Runtime/DaprStateProvider.cs +++ b/src/Dapr.Actors/Runtime/DaprStateProvider.cs @@ -73,7 +73,7 @@ public async Task>> TryLoadStateAsync( public async Task ContainsStateAsync(string actorType, string actorId, string stateName, CancellationToken cancellationToken = default) { var result = await this.daprInteractor.GetStateAsync(actorType, actorId, stateName, cancellationToken); - return result.Value.Length != 0; + return (result.Value.Length != 0 && (!result.TTLExpireTime.HasValue || result.TTLExpireTime.Value > DateTimeOffset.UtcNow)); } public async Task SaveStateAsync(string actorType, string actorId, IReadOnlyCollection stateChanges, CancellationToken cancellationToken = default) diff --git a/test/Dapr.Actors.Test/DaprStateProviderTest.cs b/test/Dapr.Actors.Test/DaprStateProviderTest.cs index 61f0ad14c..63be89e95 100644 --- a/test/Dapr.Actors.Test/DaprStateProviderTest.cs +++ b/test/Dapr.Actors.Test/DaprStateProviderTest.cs @@ -81,6 +81,18 @@ public async Task ContainsStateAsync() .Setup(d => d.GetStateAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) .Returns(Task.FromResult(new ActorStateResponse("\"value\"", null))); Assert.True(await provider.ContainsStateAsync("actorType", "actorId", "key", token)); + + var ttl = DateTime.UtcNow.AddSeconds(1); + interactor + .Setup(d => d.GetStateAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(new ActorStateResponse("\"value\"", ttl))); + Assert.True(await provider.ContainsStateAsync("actorType", "actorId", "key", token)); + + ttl = DateTime.UtcNow.AddSeconds(-1); + interactor + .Setup(d => d.GetStateAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(new ActorStateResponse("\"value\"", ttl))); + Assert.False(await provider.ContainsStateAsync("actorType", "actorId", "key", token)); } [Fact] From 2967d4a1b79b31ea0c64b87a3464eaf1d800028b Mon Sep 17 00:00:00 2001 From: joshvanl Date: Fri, 17 Nov 2023 20:38:18 +0000 Subject: [PATCH 15/15] Ensure enough time has passed before checking key Signed-off-by: joshvanl --- test/Dapr.E2E.Test/Actors/E2ETests.StateTests.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/Dapr.E2E.Test/Actors/E2ETests.StateTests.cs b/test/Dapr.E2E.Test/Actors/E2ETests.StateTests.cs index 25d0c70bf..184a40448 100644 --- a/test/Dapr.E2E.Test/Actors/E2ETests.StateTests.cs +++ b/test/Dapr.E2E.Test/Actors/E2ETests.StateTests.cs @@ -35,7 +35,7 @@ public async Task ActorCanSaveStateWithTTL() var resp = await proxy.GetState("key"); Assert.Equal("value", resp); - await Task.Delay(TimeSpan.FromSeconds(2)); + await Task.Delay(TimeSpan.FromSeconds(2.5)); // Assert key no longer exists. await Assert.ThrowsAsync(() => proxy.GetState("key")); @@ -95,7 +95,7 @@ public async Task ActorStateTTLRemoveTTL() var resp = await proxy.GetState("key"); Assert.Equal("value", resp); await proxy.SetState("key", "value", TimeSpan.FromSeconds(2)); - await Task.Delay(TimeSpan.FromSeconds(2)); + await Task.Delay(TimeSpan.FromSeconds(2.5)); await Assert.ThrowsAsync(() => proxy.GetState("key")); } @@ -115,7 +115,7 @@ public async Task ActorStateBetweenProxies() resp = await proxy2.GetState("key"); Assert.Equal("value", resp); - await Task.Delay(TimeSpan.FromSeconds(2)); + await Task.Delay(TimeSpan.FromSeconds(2.5)); await Assert.ThrowsAsync(() => proxy1.GetState("key")); await Assert.ThrowsAsync(() => proxy2.GetState("key")); }