Skip to content

Commit

Permalink
Actor state TTL support
Browse files Browse the repository at this point in the history
Signed-off-by: joshvanl <me@joshvanl.dev>
  • Loading branch information
JoshVanL committed Oct 12, 2023
1 parent 3b979e6 commit de0a847
Show file tree
Hide file tree
Showing 21 changed files with 357 additions and 82 deletions.
2 changes: 1 addition & 1 deletion examples/Actor/DemoActor/DemoActor.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// ------------------------------------------------------------------------
// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
50 changes: 50 additions & 0 deletions src/Dapr.Actors/Communication/ActorStateResponse.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// ------------------------------------------------------------------------
// Copyright 2023 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

namespace Dapr.Actors.Communication
{
using System;

/// <summary>
/// Represents a response from fetching an actor state key.
/// </summary>
internal class ActorStateResponse<T>
{
/// <summary>
/// Initializes a new instance of the <see cref="ActorStateResponse{T}"/> class.
/// </summary>
/// <param name="value">The response value.</param>
/// <param name="ttlExpireTime">The time to live expiration time.</param>
public ActorStateResponse(T value, DateTime? ttlExpireTime)
{
this.Value = value;
this.TTLExpireTime = ttlExpireTime;
}

/// <summary>
/// Gets the response value as a string.
/// </summary>
/// <value>
/// The response value as a string.
/// </value>
public T Value { get; }

/// <summary>
/// Gets the time to live expiration time.
/// </summary>
/// <value>
/// The time to live expiration time.
/// </value>
public DateTime? TTLExpireTime { get; }
}
}
3 changes: 2 additions & 1 deletion src/Dapr.Actors/Constants.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// ------------------------------------------------------------------------
// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@ internal static class Constants
public const string RequestHeaderName = "X-DaprRequestHeader";
public const string ErrorResponseHeaderName = "X-DaprErrorResponseHeader";
public const string ReentrancyRequestHeaderName = "Dapr-Reentrancy-Id";
public const string TTLResponseHeaderName = "Metadata.ttlExpireTime";
public const string Dapr = "dapr";
public const string Config = "config";
public const string State = "state";
Expand Down
15 changes: 13 additions & 2 deletions src/Dapr.Actors/DaprHttpInteractor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public DaprHttpInteractor(
this.httpClient.Timeout = requestTimeout ?? this.httpClient.Timeout;
}

public async Task<string> GetStateAsync(string actorType, string actorId, string keyName, CancellationToken cancellationToken = default)
public async Task<ActorStateResponse<string>> GetStateAsync(string actorType, string actorId, string keyName, CancellationToken cancellationToken = default)
{
var relativeUrl = string.Format(CultureInfo.InvariantCulture, Constants.ActorStateKeyRelativeUrlFormat, actorType, actorId, keyName);

Expand All @@ -72,7 +72,18 @@ HttpRequestMessage RequestFunc()

using var response = await this.SendAsync(RequestFunc, relativeUrl, cancellationToken);
var stringResponse = await response.Content.ReadAsStringAsync();
return stringResponse;

var ttlExpireTime = new DateTime();
if (response.Headers.TryGetValues(Constants.TTLResponseHeaderName, out IEnumerable<string> headerValues))
{
var ttlExpireTimeString = headerValues.First();
if (!string.IsNullOrEmpty(ttlExpireTimeString))
{
ttlExpireTime = DateTime.Parse(ttlExpireTimeString, CultureInfo.InvariantCulture);
}
}

return new ActorStateResponse<string>(stringResponse, ttlExpireTime);
}

public Task SaveStateTransactionallyAsync(string actorType, string actorId, string data, CancellationToken cancellationToken = default)
Expand Down
4 changes: 2 additions & 2 deletions src/Dapr.Actors/IDaprInteractor.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// ------------------------------------------------------------------------
// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -52,7 +52,7 @@ internal interface IDaprInteractor
/// <param name="keyName">Name of key to get value for.</param>
/// <param name="cancellationToken">Cancels the operation.</param>
/// <returns>A task that represents the asynchronous operation.</returns>
Task<string> GetStateAsync(string actorType, string actorId, string keyName, CancellationToken cancellationToken = default);
Task<ActorStateResponse<string>> GetStateAsync(string actorType, string actorId, string keyName, CancellationToken cancellationToken = default);

/// <summary>
/// Invokes Actor method.
Expand Down
40 changes: 38 additions & 2 deletions src/Dapr.Actors/Runtime/ActorStateChange.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// ------------------------------------------------------------------------
// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,14 +27,16 @@ public sealed class ActorStateChange
/// <param name="type">The type of value associated with given actor state name.</param>
/// <param name="value">The value associated with given actor state name.</param>
/// <param name="changeKind">The kind of state change for given actor state name.</param>
public ActorStateChange(string stateName, Type type, object value, StateChangeKind changeKind)
/// <param name="ttlExpireTime">The time to live for the state.</param>
public ActorStateChange(string stateName, Type type, object value, StateChangeKind changeKind, DateTime? ttlExpireTime)
{
ArgumentVerifier.ThrowIfNull(stateName, nameof(stateName));

this.StateName = stateName;
this.Type = type;
this.Value = value;
this.ChangeKind = changeKind;
this.TTLExpireTime = ttlExpireTime;
}

/// <summary>
Expand Down Expand Up @@ -68,5 +70,39 @@ public ActorStateChange(string stateName, Type type, object value, StateChangeKi
/// The kind of state change for given actor state name.
/// </value>
public StateChangeKind ChangeKind { get; }

/// <summary>
/// Gets the time to live for the state.
/// </summary>
/// <value>
/// The time to live for the state.
/// </value>
/// <remarks>
/// If null, the state will not expire.
/// </remarks>
public DateTime? TTLExpireTime { get; }

/// <summary>
/// Gets the time to live in seconds for the state.
/// </summary>
/// <value>
/// The time to live for the state.
/// </value>
/// <remarks>
/// If null, the state will not expire.
/// </remarks>
public int? TTLInSeconds {
get
{
if (this.TTLExpireTime.HasValue)
{
return (int)Math.Ceiling((this.TTLExpireTime.Value - DateTime.UtcNow).TotalSeconds);
}
else
{
return null;
}
}
}
}
}
65 changes: 39 additions & 26 deletions src/Dapr.Actors/Runtime/ActorStateManager.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// ------------------------------------------------------------------------
// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@
using System.Threading;
using System.Threading.Tasks;
using Dapr.Actors.Resources;
using Dapr.Actors.Communication;

namespace Dapr.Actors.Runtime
{
Expand All @@ -35,17 +36,17 @@ internal ActorStateManager(Actor actor)
this.defaultTracker = new Dictionary<string, StateMetadata>();
}

public async Task AddStateAsync<T>(string stateName, T value, CancellationToken cancellationToken)
public async Task AddStateAsync<T>(string stateName, T value, CancellationToken cancellationToken, int? ttlInSeconds)
{
EnsureStateProviderInitialized();
if (!(await this.TryAddStateAsync(stateName, value, cancellationToken)))

if (!(await this.TryAddStateAsync(stateName, value, cancellationToken, ttlInSeconds)))
{
throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, SR.ActorStateAlreadyExists, stateName));
}
}

public async Task<bool> TryAddStateAsync<T>(string stateName, T value, CancellationToken cancellationToken)
public async Task<bool> TryAddStateAsync<T>(string stateName, T value, CancellationToken cancellationToken = default, int? ttlInSeconds = null)
{
ArgumentVerifier.ThrowIfNull(stateName, nameof(stateName));

Expand All @@ -60,7 +61,7 @@ public async Task<bool> TryAddStateAsync<T>(string stateName, T value, Cancellat
// Check if the property was marked as remove in the cache
if (stateMetadata.ChangeKind == StateChangeKind.Remove)
{
stateChangeTracker[stateName] = StateMetadata.Create(value, StateChangeKind.Update);
stateChangeTracker[stateName] = StateMetadata.Create(value, StateChangeKind.Update, ttlInSeconds: ttlInSeconds);
return true;
}

Expand All @@ -72,7 +73,7 @@ public async Task<bool> TryAddStateAsync<T>(string stateName, T value, Cancellat
return false;
}

stateChangeTracker[stateName] = StateMetadata.Create(value, StateChangeKind.Add);
stateChangeTracker[stateName] = StateMetadata.Create(value, StateChangeKind.Add, ttlInSeconds: ttlInSeconds);
return true;
}

Expand Down Expand Up @@ -102,8 +103,8 @@ public async Task<ConditionalValue<T>> TryGetStateAsync<T>(string stateName, Can
{
var stateMetadata = stateChangeTracker[stateName];

// Check if the property was marked as remove in the cache
if (stateMetadata.ChangeKind == StateChangeKind.Remove)
// Check if the property was marked as remove in the cache or is expired
if (stateMetadata.ChangeKind == StateChangeKind.Remove || stateMetadata.TTLExpireTime <= DateTime.UtcNow)
{
return new ConditionalValue<T>(false, default);
}
Expand All @@ -114,13 +115,13 @@ public async Task<ConditionalValue<T>> TryGetStateAsync<T>(string stateName, Can
var conditionalResult = await this.TryGetStateFromStateProviderAsync<T>(stateName, cancellationToken);
if (conditionalResult.HasValue)
{
stateChangeTracker.Add(stateName, StateMetadata.Create(conditionalResult.Value, StateChangeKind.None));
stateChangeTracker.Add(stateName, StateMetadata.Create(conditionalResult.Value.Value, StateChangeKind.None, conditionalResult.Value.TTLExpireTime));
}

return conditionalResult;
return new ConditionalValue<T>(false, default);
}

public async Task SetStateAsync<T>(string stateName, T value, CancellationToken cancellationToken)
public async Task SetStateAsync<T>(string stateName, T value, CancellationToken cancellationToken, int? ttlInSeconds)
{
ArgumentVerifier.ThrowIfNull(stateName, nameof(stateName));

Expand All @@ -141,11 +142,11 @@ public async Task SetStateAsync<T>(string stateName, T value, CancellationToken
}
else if (await this.actor.Host.StateProvider.ContainsStateAsync(this.actorTypeName, this.actor.Id.ToString(), stateName, cancellationToken))
{
stateChangeTracker.Add(stateName, StateMetadata.Create(value, StateChangeKind.Update));
stateChangeTracker.Add(stateName, StateMetadata.Create(value, StateChangeKind.Update, ttlInSeconds: ttlInSeconds));
}
else
{
stateChangeTracker[stateName] = StateMetadata.Create(value, StateChangeKind.Add);
stateChangeTracker[stateName] = StateMetadata.Create(value, StateChangeKind.Add, ttlInSeconds: ttlInSeconds);
}
}

Expand Down Expand Up @@ -217,7 +218,7 @@ public async Task<bool> ContainsStateAsync(string stateName, CancellationToken c
return false;
}

public async Task<T> GetOrAddStateAsync<T>(string stateName, T value, CancellationToken cancellationToken)
public async Task<T> GetOrAddStateAsync<T>(string stateName, T value, CancellationToken cancellationToken, int? ttlInSeconds)
{
EnsureStateProviderInitialized();

Expand All @@ -231,15 +232,16 @@ public async Task<T> GetOrAddStateAsync<T>(string stateName, T value, Cancellati
var changeKind = this.IsStateMarkedForRemove(stateName) ? StateChangeKind.Update : StateChangeKind.Add;

var stateChangeTracker = GetContextualStateTracker();
stateChangeTracker[stateName] = StateMetadata.Create(value, changeKind);
stateChangeTracker[stateName] = StateMetadata.Create(value, changeKind, ttlInSeconds: ttlInSeconds);
return value;
}

public async Task<T> AddOrUpdateStateAsync<T>(
string stateName,
T addValue,
Func<string, T, T> updateValueFactory,
CancellationToken cancellationToken = default)
CancellationToken cancellationToken = default,
int? ttlInSeconds = null)
{
ArgumentVerifier.ThrowIfNull(stateName, nameof(stateName));

Expand All @@ -254,7 +256,7 @@ public async Task<T> AddOrUpdateStateAsync<T>(
// Check if the property was marked as remove in the cache
if (stateMetadata.ChangeKind == StateChangeKind.Remove)
{
stateChangeTracker[stateName] = StateMetadata.Create(addValue, StateChangeKind.Update);
stateChangeTracker[stateName] = StateMetadata.Create(addValue, StateChangeKind.Update, ttlInSeconds: ttlInSeconds);
return addValue;
}

Expand All @@ -272,13 +274,13 @@ public async Task<T> AddOrUpdateStateAsync<T>(
var conditionalResult = await this.TryGetStateFromStateProviderAsync<T>(stateName, cancellationToken);
if (conditionalResult.HasValue)
{
var newValue = updateValueFactory.Invoke(stateName, conditionalResult.Value);
stateChangeTracker.Add(stateName, StateMetadata.Create(newValue, StateChangeKind.Update));
var newValue = updateValueFactory.Invoke(stateName, conditionalResult.Value.Value);
stateChangeTracker.Add(stateName, StateMetadata.Create(newValue, StateChangeKind.Update, ttlInSeconds: ttlInSeconds));

return newValue;
}

stateChangeTracker[stateName] = StateMetadata.Create(addValue, StateChangeKind.Add);
stateChangeTracker[stateName] = StateMetadata.Create(addValue, StateChangeKind.Add, ttlInSeconds: ttlInSeconds);
return addValue;
}

Expand Down Expand Up @@ -310,7 +312,7 @@ public async Task SaveStateAsync(CancellationToken cancellationToken = default)
if (stateMetadata.ChangeKind != StateChangeKind.None)
{
stateChangeList.Add(
new ActorStateChange(stateName, stateMetadata.Type, stateMetadata.Value, stateMetadata.ChangeKind));
new ActorStateChange(stateName, stateMetadata.Type, stateMetadata.Value, stateMetadata.ChangeKind, stateMetadata.TTLExpireTime));

if (stateMetadata.ChangeKind == StateChangeKind.Remove)
{
Expand Down Expand Up @@ -362,7 +364,7 @@ private bool IsStateMarkedForRemove(string stateName)
return false;
}

private Task<ConditionalValue<T>> TryGetStateFromStateProviderAsync<T>(string stateName, CancellationToken cancellationToken)
private Task<ConditionalValue<ActorStateResponse<T>>> TryGetStateFromStateProviderAsync<T>(string stateName, CancellationToken cancellationToken)
{
EnsureStateProviderInitialized();
return this.actor.Host.StateProvider.TryLoadStateAsync<T>(this.actorTypeName, this.actor.Id.ToString(), stateName, cancellationToken);
Expand Down Expand Up @@ -392,11 +394,20 @@ private Dictionary<string, StateMetadata> GetContextualStateTracker()

private sealed class StateMetadata
{
private StateMetadata(object value, Type type, StateChangeKind changeKind)
private StateMetadata(object value, Type type, StateChangeKind changeKind, DateTime? ttlExpireTime = null, int? ttlInSeconds = null)
{
this.Value = value;
this.Type = type;
this.ChangeKind = changeKind;

if (ttlExpireTime.HasValue && ttlInSeconds.HasValue) {
throw new ArgumentException("Cannot specify both TTLExpireTime and TTLInSeconds");
}
if (ttlInSeconds.HasValue) {
this.TTLExpireTime = DateTime.UtcNow.AddSeconds(ttlInSeconds.Value);
} else {
this.TTLExpireTime = ttlExpireTime;
}
}

public object Value { get; set; }
Expand All @@ -405,9 +416,11 @@ private StateMetadata(object value, Type type, StateChangeKind changeKind)

public Type Type { get; }

public static StateMetadata Create<T>(T value, StateChangeKind changeKind)
public DateTime? TTLExpireTime { get; set; }

public static StateMetadata Create<T>(T value, StateChangeKind changeKind, DateTime? ttlExpireTime = null, int? ttlInSeconds = null)
{
return new StateMetadata(value, typeof(T), changeKind);
return new StateMetadata(value, typeof(T), changeKind, ttlExpireTime, ttlInSeconds);
}

public static StateMetadata CreateForRemove()
Expand Down
4 changes: 2 additions & 2 deletions src/Dapr.Actors/Runtime/ConditionalValue.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// ------------------------------------------------------------------------
// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -42,4 +42,4 @@ public ConditionalValue(bool hasValue, TValue value)
/// <returns>The value of the object. If HasValue is <languageKeyword>false</languageKeyword>, returns the default value for type of the TValue parameter.</returns>
public TValue Value { get; }
}
}
}
Loading

0 comments on commit de0a847

Please sign in to comment.