From de0a847927d15600129e55249d1fce341b13d0e9 Mon Sep 17 00:00:00 2001 From: joshvanl Date: Tue, 10 Oct 2023 19:11:06 +0100 Subject: [PATCH] Actor state TTL support Signed-off-by: joshvanl --- examples/Actor/DemoActor/DemoActor.cs | 2 +- .../Communication/ActorStateResponse.cs | 50 ++++++++++ src/Dapr.Actors/Constants.cs | 3 +- src/Dapr.Actors/DaprHttpInteractor.cs | 15 ++- src/Dapr.Actors/IDaprInteractor.cs | 4 +- src/Dapr.Actors/Runtime/ActorStateChange.cs | 40 +++++++- src/Dapr.Actors/Runtime/ActorStateManager.cs | 65 +++++++------ src/Dapr.Actors/Runtime/ConditionalValue.cs | 4 +- src/Dapr.Actors/Runtime/DaprStateProvider.cs | 26 ++++-- src/Dapr.Actors/Runtime/IActorStateManager.cs | 18 ++-- .../Runtime/IActorStateSerializer.cs | 2 +- .../Protos/dapr/proto/common/v1/common.proto | 2 +- .../Protos/dapr/proto/dapr/v1/dapr.proto | 92 ++++++++++++++----- test/Dapr.Actors.Test/Runtime/ActorTests.cs | 2 +- .../Dapr.E2E.Test.Actors/State/IStateActor.cs | 22 +++++ .../Actors/ReentrantActor.cs | 4 +- .../Dapr.E2E.Test.App/Actors/ReminderActor.cs | 2 +- test/Dapr.E2E.Test.App/Actors/StateActor.cs | 38 ++++++++ test/Dapr.E2E.Test.App/Actors/TimerActor.cs | 2 +- .../Actors/E2ETests.StateTests.cs | 44 +++++++++ .../configuration/featureconfig.yaml | 2 + 21 files changed, 357 insertions(+), 82 deletions(-) create mode 100644 src/Dapr.Actors/Communication/ActorStateResponse.cs create mode 100644 test/Dapr.E2E.Test.Actors/State/IStateActor.cs create mode 100644 test/Dapr.E2E.Test.App/Actors/StateActor.cs create mode 100644 test/Dapr.E2E.Test/Actors/E2ETests.StateTests.cs diff --git a/examples/Actor/DemoActor/DemoActor.cs b/examples/Actor/DemoActor/DemoActor.cs index 0ab633fcd..19d30cc92 100644 --- a/examples/Actor/DemoActor/DemoActor.cs +++ b/examples/Actor/DemoActor/DemoActor.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/src/Dapr.Actors/Communication/ActorStateResponse.cs b/src/Dapr.Actors/Communication/ActorStateResponse.cs new file mode 100644 index 000000000..1a990ce46 --- /dev/null +++ b/src/Dapr.Actors/Communication/ActorStateResponse.cs @@ -0,0 +1,50 @@ +// ------------------------------------------------------------------------ +// Copyright 2023 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Actors.Communication +{ + using System; + + /// + /// Represents a response from fetching an actor state key. + /// + internal class ActorStateResponse + { + /// + /// Initializes a new instance of the class. + /// + /// The response value. + /// The time to live expiration time. + public ActorStateResponse(T value, DateTime? ttlExpireTime) + { + this.Value = value; + this.TTLExpireTime = ttlExpireTime; + } + + /// + /// Gets the response value as a string. + /// + /// + /// The response value as a string. + /// + public T Value { get; } + + /// + /// Gets the time to live expiration time. + /// + /// + /// The time to live expiration time. + /// + public DateTime? TTLExpireTime { get; } + } +} diff --git a/src/Dapr.Actors/Constants.cs b/src/Dapr.Actors/Constants.cs index be2d8f49f..038caf101 100644 --- a/src/Dapr.Actors/Constants.cs +++ b/src/Dapr.Actors/Constants.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ internal static class Constants public const string RequestHeaderName = "X-DaprRequestHeader"; public const string ErrorResponseHeaderName = "X-DaprErrorResponseHeader"; public const string ReentrancyRequestHeaderName = "Dapr-Reentrancy-Id"; + public const string TTLResponseHeaderName = "Metadata.ttlExpireTime"; public const string Dapr = "dapr"; public const string Config = "config"; public const string State = "state"; diff --git a/src/Dapr.Actors/DaprHttpInteractor.cs b/src/Dapr.Actors/DaprHttpInteractor.cs index 4695375fb..b8cc74cdb 100644 --- a/src/Dapr.Actors/DaprHttpInteractor.cs +++ b/src/Dapr.Actors/DaprHttpInteractor.cs @@ -57,7 +57,7 @@ public DaprHttpInteractor( this.httpClient.Timeout = requestTimeout ?? this.httpClient.Timeout; } - public async Task GetStateAsync(string actorType, string actorId, string keyName, CancellationToken cancellationToken = default) + public async Task> GetStateAsync(string actorType, string actorId, string keyName, CancellationToken cancellationToken = default) { var relativeUrl = string.Format(CultureInfo.InvariantCulture, Constants.ActorStateKeyRelativeUrlFormat, actorType, actorId, keyName); @@ -72,7 +72,18 @@ HttpRequestMessage RequestFunc() using var response = await this.SendAsync(RequestFunc, relativeUrl, cancellationToken); var stringResponse = await response.Content.ReadAsStringAsync(); - return stringResponse; + + var ttlExpireTime = new DateTime(); + if (response.Headers.TryGetValues(Constants.TTLResponseHeaderName, out IEnumerable headerValues)) + { + var ttlExpireTimeString = headerValues.First(); + if (!string.IsNullOrEmpty(ttlExpireTimeString)) + { + ttlExpireTime = DateTime.Parse(ttlExpireTimeString, CultureInfo.InvariantCulture); + } + } + + return new ActorStateResponse(stringResponse, ttlExpireTime); } public Task SaveStateTransactionallyAsync(string actorType, string actorId, string data, CancellationToken cancellationToken = default) diff --git a/src/Dapr.Actors/IDaprInteractor.cs b/src/Dapr.Actors/IDaprInteractor.cs index 8f30aa18f..5849328a8 100644 --- a/src/Dapr.Actors/IDaprInteractor.cs +++ b/src/Dapr.Actors/IDaprInteractor.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -52,7 +52,7 @@ internal interface IDaprInteractor /// Name of key to get value for. /// Cancels the operation. /// A task that represents the asynchronous operation. - Task GetStateAsync(string actorType, string actorId, string keyName, CancellationToken cancellationToken = default); + Task> GetStateAsync(string actorType, string actorId, string keyName, CancellationToken cancellationToken = default); /// /// Invokes Actor method. diff --git a/src/Dapr.Actors/Runtime/ActorStateChange.cs b/src/Dapr.Actors/Runtime/ActorStateChange.cs index c09e48df6..18c7eaac1 100644 --- a/src/Dapr.Actors/Runtime/ActorStateChange.cs +++ b/src/Dapr.Actors/Runtime/ActorStateChange.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -27,7 +27,8 @@ public sealed class ActorStateChange /// The type of value associated with given actor state name. /// The value associated with given actor state name. /// The kind of state change for given actor state name. - public ActorStateChange(string stateName, Type type, object value, StateChangeKind changeKind) + /// The time to live for the state. + public ActorStateChange(string stateName, Type type, object value, StateChangeKind changeKind, DateTime? ttlExpireTime) { ArgumentVerifier.ThrowIfNull(stateName, nameof(stateName)); @@ -35,6 +36,7 @@ public ActorStateChange(string stateName, Type type, object value, StateChangeKi this.Type = type; this.Value = value; this.ChangeKind = changeKind; + this.TTLExpireTime = ttlExpireTime; } /// @@ -68,5 +70,39 @@ public ActorStateChange(string stateName, Type type, object value, StateChangeKi /// The kind of state change for given actor state name. /// public StateChangeKind ChangeKind { get; } + + /// + /// Gets the time to live for the state. + /// + /// + /// The time to live for the state. + /// + /// + /// If null, the state will not expire. + /// + public DateTime? TTLExpireTime { get; } + + /// + /// Gets the time to live in seconds for the state. + /// + /// + /// The time to live for the state. + /// + /// + /// If null, the state will not expire. + /// + public int? TTLInSeconds { + get + { + if (this.TTLExpireTime.HasValue) + { + return (int)Math.Ceiling((this.TTLExpireTime.Value - DateTime.UtcNow).TotalSeconds); + } + else + { + return null; + } + } + } } } diff --git a/src/Dapr.Actors/Runtime/ActorStateManager.cs b/src/Dapr.Actors/Runtime/ActorStateManager.cs index 9c752f56b..1b873e9af 100644 --- a/src/Dapr.Actors/Runtime/ActorStateManager.cs +++ b/src/Dapr.Actors/Runtime/ActorStateManager.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ using System.Threading; using System.Threading.Tasks; using Dapr.Actors.Resources; +using Dapr.Actors.Communication; namespace Dapr.Actors.Runtime { @@ -35,17 +36,17 @@ internal ActorStateManager(Actor actor) this.defaultTracker = new Dictionary(); } - public async Task AddStateAsync(string stateName, T value, CancellationToken cancellationToken) + public async Task AddStateAsync(string stateName, T value, CancellationToken cancellationToken, int? ttlInSeconds) { EnsureStateProviderInitialized(); - - if (!(await this.TryAddStateAsync(stateName, value, cancellationToken))) + + if (!(await this.TryAddStateAsync(stateName, value, cancellationToken, ttlInSeconds))) { throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, SR.ActorStateAlreadyExists, stateName)); } } - public async Task TryAddStateAsync(string stateName, T value, CancellationToken cancellationToken) + public async Task TryAddStateAsync(string stateName, T value, CancellationToken cancellationToken = default, int? ttlInSeconds = null) { ArgumentVerifier.ThrowIfNull(stateName, nameof(stateName)); @@ -60,7 +61,7 @@ public async Task TryAddStateAsync(string stateName, T value, Cancellat // Check if the property was marked as remove in the cache if (stateMetadata.ChangeKind == StateChangeKind.Remove) { - stateChangeTracker[stateName] = StateMetadata.Create(value, StateChangeKind.Update); + stateChangeTracker[stateName] = StateMetadata.Create(value, StateChangeKind.Update, ttlInSeconds: ttlInSeconds); return true; } @@ -72,7 +73,7 @@ public async Task TryAddStateAsync(string stateName, T value, Cancellat return false; } - stateChangeTracker[stateName] = StateMetadata.Create(value, StateChangeKind.Add); + stateChangeTracker[stateName] = StateMetadata.Create(value, StateChangeKind.Add, ttlInSeconds: ttlInSeconds); return true; } @@ -102,8 +103,8 @@ public async Task> TryGetStateAsync(string stateName, Can { var stateMetadata = stateChangeTracker[stateName]; - // Check if the property was marked as remove in the cache - if (stateMetadata.ChangeKind == StateChangeKind.Remove) + // Check if the property was marked as remove in the cache or is expired + if (stateMetadata.ChangeKind == StateChangeKind.Remove || stateMetadata.TTLExpireTime <= DateTime.UtcNow) { return new ConditionalValue(false, default); } @@ -114,13 +115,13 @@ public async Task> TryGetStateAsync(string stateName, Can var conditionalResult = await this.TryGetStateFromStateProviderAsync(stateName, cancellationToken); if (conditionalResult.HasValue) { - stateChangeTracker.Add(stateName, StateMetadata.Create(conditionalResult.Value, StateChangeKind.None)); + stateChangeTracker.Add(stateName, StateMetadata.Create(conditionalResult.Value.Value, StateChangeKind.None, conditionalResult.Value.TTLExpireTime)); } - return conditionalResult; + return new ConditionalValue(false, default); } - public async Task SetStateAsync(string stateName, T value, CancellationToken cancellationToken) + public async Task SetStateAsync(string stateName, T value, CancellationToken cancellationToken, int? ttlInSeconds) { ArgumentVerifier.ThrowIfNull(stateName, nameof(stateName)); @@ -141,11 +142,11 @@ public async Task SetStateAsync(string stateName, T value, CancellationToken } else if (await this.actor.Host.StateProvider.ContainsStateAsync(this.actorTypeName, this.actor.Id.ToString(), stateName, cancellationToken)) { - stateChangeTracker.Add(stateName, StateMetadata.Create(value, StateChangeKind.Update)); + stateChangeTracker.Add(stateName, StateMetadata.Create(value, StateChangeKind.Update, ttlInSeconds: ttlInSeconds)); } else { - stateChangeTracker[stateName] = StateMetadata.Create(value, StateChangeKind.Add); + stateChangeTracker[stateName] = StateMetadata.Create(value, StateChangeKind.Add, ttlInSeconds: ttlInSeconds); } } @@ -217,7 +218,7 @@ public async Task ContainsStateAsync(string stateName, CancellationToken c return false; } - public async Task GetOrAddStateAsync(string stateName, T value, CancellationToken cancellationToken) + public async Task GetOrAddStateAsync(string stateName, T value, CancellationToken cancellationToken, int? ttlInSeconds) { EnsureStateProviderInitialized(); @@ -231,7 +232,7 @@ public async Task GetOrAddStateAsync(string stateName, T value, Cancellati var changeKind = this.IsStateMarkedForRemove(stateName) ? StateChangeKind.Update : StateChangeKind.Add; var stateChangeTracker = GetContextualStateTracker(); - stateChangeTracker[stateName] = StateMetadata.Create(value, changeKind); + stateChangeTracker[stateName] = StateMetadata.Create(value, changeKind, ttlInSeconds: ttlInSeconds); return value; } @@ -239,7 +240,8 @@ public async Task AddOrUpdateStateAsync( string stateName, T addValue, Func updateValueFactory, - CancellationToken cancellationToken = default) + CancellationToken cancellationToken = default, + int? ttlInSeconds = null) { ArgumentVerifier.ThrowIfNull(stateName, nameof(stateName)); @@ -254,7 +256,7 @@ public async Task AddOrUpdateStateAsync( // Check if the property was marked as remove in the cache if (stateMetadata.ChangeKind == StateChangeKind.Remove) { - stateChangeTracker[stateName] = StateMetadata.Create(addValue, StateChangeKind.Update); + stateChangeTracker[stateName] = StateMetadata.Create(addValue, StateChangeKind.Update, ttlInSeconds: ttlInSeconds); return addValue; } @@ -272,13 +274,13 @@ public async Task AddOrUpdateStateAsync( var conditionalResult = await this.TryGetStateFromStateProviderAsync(stateName, cancellationToken); if (conditionalResult.HasValue) { - var newValue = updateValueFactory.Invoke(stateName, conditionalResult.Value); - stateChangeTracker.Add(stateName, StateMetadata.Create(newValue, StateChangeKind.Update)); + var newValue = updateValueFactory.Invoke(stateName, conditionalResult.Value.Value); + stateChangeTracker.Add(stateName, StateMetadata.Create(newValue, StateChangeKind.Update, ttlInSeconds: ttlInSeconds)); return newValue; } - stateChangeTracker[stateName] = StateMetadata.Create(addValue, StateChangeKind.Add); + stateChangeTracker[stateName] = StateMetadata.Create(addValue, StateChangeKind.Add, ttlInSeconds: ttlInSeconds); return addValue; } @@ -310,7 +312,7 @@ public async Task SaveStateAsync(CancellationToken cancellationToken = default) if (stateMetadata.ChangeKind != StateChangeKind.None) { stateChangeList.Add( - new ActorStateChange(stateName, stateMetadata.Type, stateMetadata.Value, stateMetadata.ChangeKind)); + new ActorStateChange(stateName, stateMetadata.Type, stateMetadata.Value, stateMetadata.ChangeKind, stateMetadata.TTLExpireTime)); if (stateMetadata.ChangeKind == StateChangeKind.Remove) { @@ -362,7 +364,7 @@ private bool IsStateMarkedForRemove(string stateName) return false; } - private Task> TryGetStateFromStateProviderAsync(string stateName, CancellationToken cancellationToken) + private Task>> TryGetStateFromStateProviderAsync(string stateName, CancellationToken cancellationToken) { EnsureStateProviderInitialized(); return this.actor.Host.StateProvider.TryLoadStateAsync(this.actorTypeName, this.actor.Id.ToString(), stateName, cancellationToken); @@ -392,11 +394,20 @@ private Dictionary GetContextualStateTracker() private sealed class StateMetadata { - private StateMetadata(object value, Type type, StateChangeKind changeKind) + private StateMetadata(object value, Type type, StateChangeKind changeKind, DateTime? ttlExpireTime = null, int? ttlInSeconds = null) { this.Value = value; this.Type = type; this.ChangeKind = changeKind; + + if (ttlExpireTime.HasValue && ttlInSeconds.HasValue) { + throw new ArgumentException("Cannot specify both TTLExpireTime and TTLInSeconds"); + } + if (ttlInSeconds.HasValue) { + this.TTLExpireTime = DateTime.UtcNow.AddSeconds(ttlInSeconds.Value); + } else { + this.TTLExpireTime = ttlExpireTime; + } } public object Value { get; set; } @@ -405,9 +416,11 @@ private StateMetadata(object value, Type type, StateChangeKind changeKind) public Type Type { get; } - public static StateMetadata Create(T value, StateChangeKind changeKind) + public DateTime? TTLExpireTime { get; set; } + + public static StateMetadata Create(T value, StateChangeKind changeKind, DateTime? ttlExpireTime = null, int? ttlInSeconds = null) { - return new StateMetadata(value, typeof(T), changeKind); + return new StateMetadata(value, typeof(T), changeKind, ttlExpireTime, ttlInSeconds); } public static StateMetadata CreateForRemove() diff --git a/src/Dapr.Actors/Runtime/ConditionalValue.cs b/src/Dapr.Actors/Runtime/ConditionalValue.cs index 1d2a197eb..ec4f3a5a6 100644 --- a/src/Dapr.Actors/Runtime/ConditionalValue.cs +++ b/src/Dapr.Actors/Runtime/ConditionalValue.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -42,4 +42,4 @@ public ConditionalValue(bool hasValue, TValue value) /// The value of the object. If HasValue is false, returns the default value for type of the TValue parameter. public TValue Value { get; } } -} \ No newline at end of file +} diff --git a/src/Dapr.Actors/Runtime/DaprStateProvider.cs b/src/Dapr.Actors/Runtime/DaprStateProvider.cs index ae86fb28b..9ace60fbf 100644 --- a/src/Dapr.Actors/Runtime/DaprStateProvider.cs +++ b/src/Dapr.Actors/Runtime/DaprStateProvider.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ namespace Dapr.Actors.Runtime using System.Text.Json; using System.Threading; using System.Threading.Tasks; + using Dapr.Actors.Communication; /// /// State Provider to interact with Dapr runtime. @@ -43,27 +44,27 @@ public DaprStateProvider(IDaprInteractor daprInteractor, JsonSerializerOptions j this.daprInteractor = daprInteractor; } - public async Task> TryLoadStateAsync(string actorType, string actorId, string stateName, CancellationToken cancellationToken = default) + public async Task>> TryLoadStateAsync(string actorType, string actorId, string stateName, CancellationToken cancellationToken = default) { - var result = new ConditionalValue(false, default); - var stringResult = await this.daprInteractor.GetStateAsync(actorType, actorId, stateName, cancellationToken); + var result = new ConditionalValue>(false, default); + var response = await this.daprInteractor.GetStateAsync(actorType, actorId, stateName, cancellationToken); - if (stringResult.Length != 0) + if (response.Value.Length != 0 && response.TTLExpireTime > DateTime.UtcNow) { T typedResult; // perform default json de-serialization if custom serializer was not provided. if (this.actorStateSerializer != null) { - var byteResult = Convert.FromBase64String(stringResult.Trim('"')); + var byteResult = Convert.FromBase64String(response.Value.Trim('"')); typedResult = this.actorStateSerializer.Deserialize(byteResult); } else { - typedResult = JsonSerializer.Deserialize(stringResult, jsonSerializerOptions); + typedResult = JsonSerializer.Deserialize(response.Value, jsonSerializerOptions); } - result = new ConditionalValue(true, typedResult); + result = new ConditionalValue>(true, new ActorStateResponse(typedResult, response.TTLExpireTime)); } return result; @@ -71,8 +72,8 @@ public async Task> TryLoadStateAsync(string actorType, st public async Task ContainsStateAsync(string actorType, string actorId, string stateName, CancellationToken cancellationToken = default) { - var byteResult = await this.daprInteractor.GetStateAsync(actorType, actorId, stateName, cancellationToken); - return byteResult.Length != 0; + var result = await this.daprInteractor.GetStateAsync(actorType, actorId, stateName, cancellationToken); + return result.Value.Length != 0; } public async Task SaveStateAsync(string actorType, string actorId, IReadOnlyCollection stateChanges, CancellationToken cancellationToken = default) @@ -132,6 +133,11 @@ private async Task DoStateChangesTransactionallyAsync(string actorType, string a writer.WritePropertyName("value"); JsonSerializer.Serialize(writer, stateChange.Value, stateChange.Type, jsonSerializerOptions); } + + var ttlInSeconds = stateChange.TTLInSeconds; + if (ttlInSeconds.HasValue) { + writer.WriteString("ttlInSeconds", ttlInSeconds.Value.ToString()); + } break; default: break; diff --git a/src/Dapr.Actors/Runtime/IActorStateManager.cs b/src/Dapr.Actors/Runtime/IActorStateManager.cs index df1eb3356..75d2a5996 100644 --- a/src/Dapr.Actors/Runtime/IActorStateManager.cs +++ b/src/Dapr.Actors/Runtime/IActorStateManager.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ namespace Dapr.Actors.Runtime using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; + using Dapr.Actors.Communication; /// /// Represents an interface that exposes methods to manage state of an . @@ -31,6 +32,7 @@ public interface IActorStateManager /// Name of the actor state to add. /// Value of the actor state to add. /// The token to monitor for cancellation requests. + /// The time to live for the state. If null, the state will not expire. /// /// A task that represents the asynchronous add operation. /// @@ -43,7 +45,7 @@ public interface IActorStateManager /// The type of state value must be /// Data Contract serializable. /// - Task AddStateAsync(string stateName, T value, CancellationToken cancellationToken = default); + Task AddStateAsync(string stateName, T value, CancellationToken cancellationToken = default, int? ttlInSeconds = null); /// /// Gets an actor state with specified state name. @@ -73,6 +75,7 @@ public interface IActorStateManager /// Type of value associated with given state name. /// Name of the actor state to set. /// Value of the actor state. + /// The time to live for the state. If null, the state will not expire. /// The token to monitor for cancellation requests. /// /// A task that represents the asynchronous set operation. @@ -83,7 +86,7 @@ public interface IActorStateManager /// The type of state value must be /// Data Contract serializable. /// - Task SetStateAsync(string stateName, T value, CancellationToken cancellationToken = default); + Task SetStateAsync(string stateName, T value, CancellationToken cancellationToken = default, int? ttlInSeconds = null); /// /// Removes an actor state with specified state name. @@ -107,6 +110,7 @@ public interface IActorStateManager /// Value of the actor state to add. /// The token to monitor for cancellation requests. /// This is optional and defaults to . + /// The time to live for the state. If null, the state will not expire. /// /// A boolean task that represents the asynchronous add operation. Returns true if the /// value was successfully added and false if an actor state with the same name already exists. @@ -119,7 +123,7 @@ public interface IActorStateManager /// The type of state value must be /// Data Contract serializable. /// - Task TryAddStateAsync(string stateName, T value, CancellationToken cancellationToken = default); + Task TryAddStateAsync(string stateName, T value, CancellationToken cancellationToken = default, int? ttlInSeconds = null); /// /// Attempts to get an actor state with specified state name. @@ -174,6 +178,7 @@ public interface IActorStateManager /// Name of the actor state to get or add. /// Value of the actor state to add if it does not exist. /// The token to monitor for cancellation requests. + /// The time to live for the state. If null, the state will not expire. /// /// A task that represents the asynchronous get or add operation. The value of TResult /// parameter contains value of actor state with given state name. @@ -186,7 +191,7 @@ public interface IActorStateManager /// The type of state value must be /// Data Contract serializable. /// - Task GetOrAddStateAsync(string stateName, T value, CancellationToken cancellationToken = default); + Task GetOrAddStateAsync(string stateName, T value, CancellationToken cancellationToken = default, int? ttlInSeconds = null); /// /// Adds an actor state with given state name, if it does not already exist or updates @@ -197,6 +202,7 @@ public interface IActorStateManager /// Value of the actor state to add if it does not exist. /// Factory function to generate value of actor state to update if it exists. /// The token to monitor for cancellation requests. + /// The time to live for the state. If null, the state will not expire. /// /// A task that represents the asynchronous add/update operation. The value of TResult /// parameter contains value of actor state that was added/updated. @@ -207,7 +213,7 @@ public interface IActorStateManager /// The type of state value must be /// Data Contract serializable. /// - Task AddOrUpdateStateAsync(string stateName, T addValue, Func updateValueFactory, CancellationToken cancellationToken = default); + Task AddOrUpdateStateAsync(string stateName, T addValue, Func updateValueFactory, CancellationToken cancellationToken = default, int? ttlInSeconds = null); /// /// Clears all the cached actor states and any operation(s) performed on diff --git a/src/Dapr.Actors/Runtime/IActorStateSerializer.cs b/src/Dapr.Actors/Runtime/IActorStateSerializer.cs index c6136c057..cff3b7c26 100644 --- a/src/Dapr.Actors/Runtime/IActorStateSerializer.cs +++ b/src/Dapr.Actors/Runtime/IActorStateSerializer.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/src/Dapr.Client/Protos/dapr/proto/common/v1/common.proto b/src/Dapr.Client/Protos/dapr/proto/common/v1/common.proto index 3faea3016..1e63b885d 100644 --- a/src/Dapr.Client/Protos/dapr/proto/common/v1/common.proto +++ b/src/Dapr.Client/Protos/dapr/proto/common/v1/common.proto @@ -77,7 +77,7 @@ message InvokeRequest { HTTPExtension http_extension = 4; } -// InvokeResponse is the response message inclduing data and its content type +// InvokeResponse is the response message including data and its content type // from app callback. // This message is used in InvokeService of Dapr gRPC Service and OnInvoke // of AppCallback gRPC service. diff --git a/src/Dapr.Client/Protos/dapr/proto/dapr/v1/dapr.proto b/src/Dapr.Client/Protos/dapr/proto/dapr/v1/dapr.proto index 883527adb..eafb5452e 100644 --- a/src/Dapr.Client/Protos/dapr/proto/dapr/v1/dapr.proto +++ b/src/Dapr.Client/Protos/dapr/proto/dapr/v1/dapr.proto @@ -169,6 +169,26 @@ service Dapr { // Raise an event to a running workflow instance rpc RaiseEventWorkflowAlpha1 (RaiseEventWorkflowRequest) returns (google.protobuf.Empty) {} + // Starts a new instance of a workflow + rpc StartWorkflowBeta1 (StartWorkflowRequest) returns (StartWorkflowResponse) {} + + // Gets details about a started workflow instance + rpc GetWorkflowBeta1 (GetWorkflowRequest) returns (GetWorkflowResponse) {} + + // Purge Workflow + rpc PurgeWorkflowBeta1 (PurgeWorkflowRequest) returns (google.protobuf.Empty) {} + + // Terminates a running workflow instance + rpc TerminateWorkflowBeta1 (TerminateWorkflowRequest) returns (google.protobuf.Empty) {} + + // Pauses a running workflow instance + rpc PauseWorkflowBeta1 (PauseWorkflowRequest) returns (google.protobuf.Empty) {} + + // Resumes a paused workflow instance + rpc ResumeWorkflowBeta1 (ResumeWorkflowRequest) returns (google.protobuf.Empty) {} + + // Raise an event to a running workflow instance + rpc RaiseEventWorkflowBeta1 (RaiseEventWorkflowRequest) returns (google.protobuf.Empty) {} // Shutdown the sidecar rpc Shutdown (google.protobuf.Empty) returns (google.protobuf.Empty) {} } @@ -542,6 +562,9 @@ message GetActorStateRequest { // GetActorStateResponse is the response conveying the actor's state value. message GetActorStateResponse { bytes data = 1; + + // The metadata which will be sent to app. + map metadata = 2; } // ExecuteActorStateTransactionRequest is the message to execute multiple operations on a specified actor. @@ -580,10 +603,14 @@ message InvokeActorResponse { // GetMetadataResponse is a message that is returned on GetMetadata rpc call message GetMetadataResponse { string id = 1; - repeated ActiveActorsCount active_actors_count = 2; - repeated RegisteredComponents registered_components = 3; - map extended_metadata = 4; - repeated PubsubSubscription subscriptions = 5; + repeated ActiveActorsCount active_actors_count = 2 [json_name = "actors"]; + repeated RegisteredComponents registered_components = 3 [json_name = "components"]; + map extended_metadata = 4 [json_name = "extended"]; + repeated PubsubSubscription subscriptions = 5 [json_name = "subscriptions"]; + repeated MetadataHTTPEndpoint http_endpoints = 6 [json_name = "httpEndpoints"]; + AppConnectionProperties app_connection_properties = 7 [json_name = "appConnectionProperties"]; + string runtime_version = 8 [json_name = "runtimeVersion"]; + repeated string enabled_features = 9 [json_name = "enabledFeatures"]; } message ActiveActorsCount { @@ -598,12 +625,31 @@ message RegisteredComponents { repeated string capabilities = 4; } +message MetadataHTTPEndpoint { + string name = 1 [json_name = "name"]; +} + +message AppConnectionProperties { + int32 port = 1; + string protocol = 2; + string channel_address = 3 [json_name = "channelAddress"]; + int32 max_concurrency = 4 [json_name = "maxConcurrency"]; + AppConnectionHealthProperties health = 5; +} + +message AppConnectionHealthProperties { + string health_check_path = 1 [json_name = "healthCheckPath"]; + string health_probe_interval = 2 [json_name = "healthProbeInterval"]; + string health_probe_timeout = 3 [json_name = "healthProbeTimeout"]; + int32 health_threshold = 4 [json_name = "healthThreshold"]; +} + message PubsubSubscription { - string pubsub_name = 1; - string topic = 2; - map metadata = 3; - PubsubSubscriptionRules rules = 4; - string dead_letter_topic = 5; + string pubsub_name = 1 [json_name = "pubsubname"]; + string topic = 2 [json_name = "topic"]; + map metadata = 3 [json_name = "metadata"]; + PubsubSubscriptionRules rules = 4 [json_name = "rules"]; + string dead_letter_topic = 5 [json_name = "deadLetterTopic"]; } message PubsubSubscriptionRules { @@ -900,7 +946,7 @@ message EncryptRequest { // Request details. Must be present in the first message only. EncryptRequestOptions options = 1; // Chunk of data of arbitrary size. - // common.v1.StreamPayload payload = 2; // TODO: Commented out since it was causing an issue + common.v1.StreamPayload payload = 2; } // EncryptRequestOptions contains options for the first message in the EncryptAlpha1 request. @@ -928,7 +974,7 @@ message EncryptRequestOptions { // EncryptResponse is the response for EncryptAlpha1. message EncryptResponse { // Chunk of data. - // common.v1.StreamPayload payload = 1; // TODO: Commented out since it was causing an issue + common.v1.StreamPayload payload = 1; } // DecryptRequest is the request for DecryptAlpha1. @@ -936,7 +982,7 @@ message DecryptRequest { // Request details. Must be present in the first message only. DecryptRequestOptions options = 1; // Chunk of data of arbitrary size. - // common.v1.StreamPayload payload = 2; // TODO: Commented out since it was causing an issue + common.v1.StreamPayload payload = 2; } // DecryptRequestOptions contains options for the first message in the DecryptAlpha1 request. @@ -952,10 +998,10 @@ message DecryptRequestOptions { // DecryptResponse is the response for DecryptAlpha1. message DecryptResponse { // Chunk of data. - // common.v1.StreamPayload payload = 1; // TODO: Commented out since it was causing an issue + common.v1.StreamPayload payload = 1; } -// GetWorkflowRequest is the request for GetWorkflowAlpha1. +// GetWorkflowRequest is the request for GetWorkflowBeta1. message GetWorkflowRequest { // ID of the workflow instance to query. string instance_id = 1 [json_name = "instanceID"]; @@ -963,7 +1009,7 @@ message GetWorkflowRequest { string workflow_component = 2 [json_name = "workflowComponent"]; } -// GetWorkflowResponse is the response for GetWorkflowAlpha1. +// GetWorkflowResponse is the response for GetWorkflowBeta1. message GetWorkflowResponse { // ID of the workflow instance. string instance_id = 1 [json_name = "instanceID"]; @@ -979,7 +1025,7 @@ message GetWorkflowResponse { map properties = 6; } -// StartWorkflowRequest is the request for StartWorkflowAlpha1. +// StartWorkflowRequest is the request for StartWorkflowBeta1. message StartWorkflowRequest { // The ID to assign to the started workflow instance. If empty, a random ID is generated. string instance_id = 1 [json_name = "instanceID"]; @@ -993,13 +1039,13 @@ message StartWorkflowRequest { bytes input = 5; } -// StartWorkflowResponse is the response for StartWorkflowAlpha1. +// StartWorkflowResponse is the response for StartWorkflowBeta1. message StartWorkflowResponse { // ID of the started workflow instance. string instance_id = 1 [json_name = "instanceID"]; } -// TerminateWorkflowRequest is the request for TerminateWorkflowAlpha1. +// TerminateWorkflowRequest is the request for TerminateWorkflowBeta1. message TerminateWorkflowRequest { // ID of the workflow instance to terminate. string instance_id = 1 [json_name = "instanceID"]; @@ -1007,7 +1053,7 @@ message TerminateWorkflowRequest { string workflow_component = 2 [json_name = "workflowComponent"]; } -// PauseWorkflowRequest is the request for PauseWorkflowAlpha1. +// PauseWorkflowRequest is the request for PauseWorkflowBeta1. message PauseWorkflowRequest { // ID of the workflow instance to pause. string instance_id = 1 [json_name = "instanceID"]; @@ -1015,7 +1061,7 @@ message PauseWorkflowRequest { string workflow_component = 2 [json_name = "workflowComponent"]; } -// ResumeWorkflowRequest is the request for ResumeWorkflowAlpha1. +// ResumeWorkflowRequest is the request for ResumeWorkflowBeta1. message ResumeWorkflowRequest { // ID of the workflow instance to resume. string instance_id = 1 [json_name = "instanceID"]; @@ -1023,7 +1069,7 @@ message ResumeWorkflowRequest { string workflow_component = 2 [json_name = "workflowComponent"]; } -// RaiseEventWorkflowRequest is the request for RaiseEventWorkflowAlpha1. +// RaiseEventWorkflowRequest is the request for RaiseEventWorkflowBeta1. message RaiseEventWorkflowRequest { // ID of the workflow instance to raise an event for. string instance_id = 1 [json_name = "instanceID"]; @@ -1035,10 +1081,10 @@ message RaiseEventWorkflowRequest { bytes event_data = 4; } -// PurgeWorkflowRequest is the request for PurgeWorkflowAlpha1. +// PurgeWorkflowRequest is the request for PurgeWorkflowBeta1. message PurgeWorkflowRequest { // ID of the workflow instance to purge. string instance_id = 1 [json_name = "instanceID"]; // Name of the workflow component. string workflow_component = 2 [json_name = "workflowComponent"]; -} \ No newline at end of file +} diff --git a/test/Dapr.Actors.Test/Runtime/ActorTests.cs b/test/Dapr.Actors.Test/Runtime/ActorTests.cs index b18800f0e..f88b4e03f 100644 --- a/test/Dapr.Actors.Test/Runtime/ActorTests.cs +++ b/test/Dapr.Actors.Test/Runtime/ActorTests.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/test/Dapr.E2E.Test.Actors/State/IStateActor.cs b/test/Dapr.E2E.Test.Actors/State/IStateActor.cs new file mode 100644 index 000000000..e453f7fa3 --- /dev/null +++ b/test/Dapr.E2E.Test.Actors/State/IStateActor.cs @@ -0,0 +1,22 @@ +// ------------------------------------------------------------------------ +// Copyright 2021 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System; +using System.Threading.Tasks; +using Dapr.Actors; + +namespace Dapr.E2E.Test.Actors.State +{ + public interface IStateActor : IPingActor, IActor + { } +} diff --git a/test/Dapr.E2E.Test.App.ReentrantActor/Actors/ReentrantActor.cs b/test/Dapr.E2E.Test.App.ReentrantActor/Actors/ReentrantActor.cs index 5f2d5db86..58776fe28 100644 --- a/test/Dapr.E2E.Test.App.ReentrantActor/Actors/ReentrantActor.cs +++ b/test/Dapr.E2E.Test.App.ReentrantActor/Actors/ReentrantActor.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -67,4 +67,4 @@ private async Task UpdateState(bool isEnter, int callNumber) } } } -} \ No newline at end of file +} diff --git a/test/Dapr.E2E.Test.App/Actors/ReminderActor.cs b/test/Dapr.E2E.Test.App/Actors/ReminderActor.cs index b08e483c2..57536377d 100644 --- a/test/Dapr.E2E.Test.App/Actors/ReminderActor.cs +++ b/test/Dapr.E2E.Test.App/Actors/ReminderActor.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/test/Dapr.E2E.Test.App/Actors/StateActor.cs b/test/Dapr.E2E.Test.App/Actors/StateActor.cs new file mode 100644 index 000000000..f8bff85e7 --- /dev/null +++ b/test/Dapr.E2E.Test.App/Actors/StateActor.cs @@ -0,0 +1,38 @@ +// ------------------------------------------------------------------------ +// Copyright 2021 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System; +using System.Text.Json; +using System.Threading.Tasks; +using Dapr.Actors.Runtime; + +namespace Dapr.E2E.Test.Actors.State +{ + public class StateActor : Actor + { + public StateActor(ActorHost host) + : base(host) + { + } + + public Task GetState(string key) + { + return this.StateManager.GetStateAsync(key); + } + + public Task SetState(string key, string value, int? ttlInSeconds) + { + return this.StateManager.SetStateAsync(key, value, ttlInSeconds = ttlInSeconds); + } + } +} diff --git a/test/Dapr.E2E.Test.App/Actors/TimerActor.cs b/test/Dapr.E2E.Test.App/Actors/TimerActor.cs index bbe6cf7ae..4c6589965 100644 --- a/test/Dapr.E2E.Test.App/Actors/TimerActor.cs +++ b/test/Dapr.E2E.Test.App/Actors/TimerActor.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/test/Dapr.E2E.Test/Actors/E2ETests.StateTests.cs b/test/Dapr.E2E.Test/Actors/E2ETests.StateTests.cs new file mode 100644 index 000000000..853acf944 --- /dev/null +++ b/test/Dapr.E2E.Test/Actors/E2ETests.StateTests.cs @@ -0,0 +1,44 @@ +// ------------------------------------------------------------------------ +// Copyright 2021 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ +namespace Dapr.E2E.Test +{ + using System; + using System.Text.Json; + using System.Threading; + using System.Threading.Tasks; + using Dapr.Actors; + using Dapr.E2E.Test.Actors.State; + using Xunit; + + public partial class E2ETests : IAsyncLifetime + { + [Fact] + public async Task ActorCanSaveStateWithTTL() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); + var proxy = this.ProxyFactory.CreateActorProxy(ActorId.CreateRandom(), "StateActor"); + + await WaitForActorRuntimeAsync(proxy, cts.Token); + + await proxy.SaveState("key", "value", 2, cts.Token); + + state = await.proxy.GetState("key", cts.Token); + Assert.Equal("value", state.Value); + + await Task.Delay(TimeSpan.FromSeconds(2), cts.Token); + + state = await.proxy.GetState("key", cts.Token); + Assert.Null(state.Value); + } + } +} diff --git a/test/Dapr.E2E.Test/configuration/featureconfig.yaml b/test/Dapr.E2E.Test/configuration/featureconfig.yaml index 81ef1ecb1..4806c630f 100644 --- a/test/Dapr.E2E.Test/configuration/featureconfig.yaml +++ b/test/Dapr.E2E.Test/configuration/featureconfig.yaml @@ -12,3 +12,5 @@ spec: enabled: true - name: "proxy.grpc" enabled: true + - name: "ActorStateTTL" + enabled: true