Skip to content

Commit

Permalink
Merge pull request #35 from Angeling3/synchronous-job-implementations
Browse files Browse the repository at this point in the history
Add Task.Run-based implementations to the job scheduler and enqueuer
  • Loading branch information
montyclt authored Nov 11, 2024
2 parents a72fb8d + c45a2be commit dc88b3b
Show file tree
Hide file tree
Showing 21 changed files with 815 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<Project Sdk="Microsoft.NET.Sdk">
<Import Project="../MSBuild/Base.props"/>
<Import Project="../MSBuild/Packable.props"/>

<ItemGroup Label="Project references">
<ProjectReference Include="..\Bootstrapping\Bootstrapping.csproj" />
<ProjectReference Include="..\Foundation\Foundation.csproj" />
</ItemGroup>
</Project>
39 changes: 39 additions & 0 deletions src/ContractImplementations.TaskRunJobs/RetryHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using IOKode.OpinionatedFramework.Jobs;

namespace IOKode.OpinionatedFramework.ContractImplementations.TaskRunJobs;

internal static class RetryHelper
{
public static async Task RetryOnExceptionAsync(IJob job, int maxAttempts)
{
bool shouldRetry = false;
int attempt = 0;
var exceptions = new List<Exception>();

do
{
try
{
attempt++;
await job.InvokeAsync(default);
shouldRetry = false;
}
catch (Exception ex)
{
exceptions.Add(ex);

if (attempt < maxAttempts)
{
shouldRetry = true;
}
else
{
throw new AggregateException(exceptions);
}
}
} while (shouldRetry);
}
}
29 changes: 29 additions & 0 deletions src/ContractImplementations.TaskRunJobs/ServiceExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using IOKode.OpinionatedFramework.Bootstrapping;
using IOKode.OpinionatedFramework.Configuration;
using IOKode.OpinionatedFramework.Jobs;
using IOKode.OpinionatedFramework.Logging;
using Microsoft.Extensions.DependencyInjection;

namespace IOKode.OpinionatedFramework.ContractImplementations.TaskRunJobs;

public static class ServiceExtensions
{
public static void AddTaskRunJobScheduler(this IOpinionatedServiceCollection services)
{
services.AddSingleton<IJobScheduler>(serviceProvider =>
{
var configurationProvider = serviceProvider.GetRequiredService<IConfigurationProvider>();
var logging = serviceProvider.GetRequiredService<ILogging>();
return new TaskRunJobScheduler(configurationProvider, logging);
});
}

public static void AddTaskRunJobEnqueuer(this IOpinionatedServiceCollection services)
{
services.AddSingleton<IJobEnqueuer>(serviceProvider =>
{
var configurationProvider = serviceProvider.GetRequiredService<IConfigurationProvider>();
return new TaskRunJobEnqueuer(configurationProvider);
});
}
}
33 changes: 33 additions & 0 deletions src/ContractImplementations.TaskRunJobs/TaskRunJobEnqueuer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using IOKode.OpinionatedFramework.Configuration;
using IOKode.OpinionatedFramework.Jobs;

namespace IOKode.OpinionatedFramework.ContractImplementations.TaskRunJobs;

public class TaskRunJobEnqueuer(IConfigurationProvider configuration) : IJobEnqueuer
{
public Task EnqueueAsync(Queue queue, IJob job, CancellationToken cancellationToken)
{
_ = RetryHelper.RetryOnExceptionAsync(job, configuration.GetValue<int>("TaskRun:JobEnqueuer:MaxAttempts"));
return Task.CompletedTask;
}

public Task EnqueueWithDelayAsync(Queue queue, IJob job, TimeSpan delay, CancellationToken cancellationToken)
{
_ = Task.Run(async () =>
{
await Task.Delay(delay, cancellationToken);

if (cancellationToken.IsCancellationRequested)
{
return;
}

await RetryHelper.RetryOnExceptionAsync(job, configuration.GetValue<int>("TaskRun:JobEnqueuer:MaxAttempts"));
}, cancellationToken);

return Task.CompletedTask;
}
}
83 changes: 83 additions & 0 deletions src/ContractImplementations.TaskRunJobs/TaskRunJobScheduler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Cronos;
using IOKode.OpinionatedFramework.Configuration;
using IOKode.OpinionatedFramework.Ensuring;
using IOKode.OpinionatedFramework.Jobs;
using IOKode.OpinionatedFramework.Logging;

namespace IOKode.OpinionatedFramework.ContractImplementations.TaskRunJobs;

public class TaskRunJobScheduler(IConfigurationProvider configuration, ILogging logging) : IJobScheduler
{
private List<TaskRunMutableScheduledJob> registeredJobs = new();

public Task<ScheduledJob> ScheduleAsync(IJob job, CronExpression interval, CancellationToken cancellationToken)
{
var scheduledJob = new TaskRunMutableScheduledJob(interval, job);
this.registeredJobs.Add(scheduledJob);
Task.Run(async () =>
{
while (!scheduledJob.IsFinalized)
{
var now = DateTime.UtcNow;
var nextOccurrence = scheduledJob.Interval.GetNextOccurrence(scheduledJob.LastInvocation);

if (nextOccurrence == null)
{
throw new FormatException("The cron expression next occurrence is not found.");
}

if (now >= nextOccurrence)
{
try
{
await RetryHelper.RetryOnExceptionAsync(job, configuration.GetValue<int>("TaskRun:JobScheduler:MaxAttempts"));
}
catch (AggregateException ex)
{
logging.Error(ex, "An scheduled job reached max attempts.");
}

scheduledJob.LastInvocation = now;
}

var delay = scheduledJob.Interval.GetNextOccurrence(scheduledJob.LastInvocation)! - now;
await Task.Delay(delay.Value);
}
}, cancellationToken);

return Task.FromResult((ScheduledJob) scheduledJob);
}

public Task RescheduleAsync(ScheduledJob scheduledJob, CronExpression interval, CancellationToken cancellationToken)
{
Ensure.Type.IsAssignableTo(scheduledJob.GetType(), typeof(MutableScheduledJob))
.ElseThrowsIllegalArgument($"Type must be assignable to {nameof(MutableScheduledJob)} type.", nameof(scheduledJob));

var mutableScheduledJob = this.registeredJobs.Find(j => j.Identifier == scheduledJob.Identifier);
Ensure.Object.NotNull(mutableScheduledJob)
.ElseThrowsIllegalArgument($"The {nameof(ScheduledJob.Identifier)} value was not found on the schedule jobs.", nameof(scheduledJob));

mutableScheduledJob!.ChangeInterval(interval);
((MutableScheduledJob) scheduledJob).ChangeInterval(interval);

return Task.CompletedTask;
}

public Task UnscheduleAsync(ScheduledJob scheduledJob, CancellationToken cancellationToken)
{
var identifier = scheduledJob.Identifier;

var mutableScheduledJob = this.registeredJobs.Find(j => j.Identifier == identifier);
Ensure.Object.NotNull(mutableScheduledJob)
.ElseThrowsIllegalArgument($"The {nameof(ScheduledJob.Identifier)} value was not found on the schedule jobs.", nameof(scheduledJob));

mutableScheduledJob!.CancelLoop();
this.registeredJobs.Remove(mutableScheduledJob);

return Task.CompletedTask;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using System;
using Cronos;
using IOKode.OpinionatedFramework.Jobs;

namespace IOKode.OpinionatedFramework.ContractImplementations.TaskRunJobs;

internal class TaskRunMutableScheduledJob : MutableScheduledJob
{
public DateTime LastInvocation { get; set; }
public bool IsFinalized { get; private set; }

public TaskRunMutableScheduledJob(CronExpression interval, IJob job) : base(interval, job)
{
LastInvocation = DateTime.UtcNow;
}

public void CancelLoop()
{
IsFinalized = true;
}
}
2 changes: 1 addition & 1 deletion src/Foundation/Emailing/EmailExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public static async Task SendAsync(this Email email, CancellationToken cancellat
/// <summary>
/// Enqueue the email.
/// </summary>
public static async Task QueueAsync(this Email email, string queue, CancellationToken cancellationToken = default)
public static async Task QueueAsync(this Email email, Queue queue, CancellationToken cancellationToken = default)
{
var enqueuer = Locator.Resolve<IJobEnqueuer>();
await enqueuer.EnqueueAsync(queue, new AsyncDelegateJob(async () => await email.SendAsync()), cancellationToken);
Expand Down
46 changes: 44 additions & 2 deletions src/Foundation/Jobs/IJobEnqueuer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,51 @@

namespace IOKode.OpinionatedFramework.Jobs;

/// <summary>
/// Defines a job enqueuer contract that allows enqueuing jobs with optional delay, in either the default or a specified queue.
/// </summary>
[AddToFacade("Job")]
public interface IJobEnqueuer
{
public Task EnqueueAsync(string queue, IJob job, CancellationToken cancellationToken);
public Task EnqueueWithDelayAsync(string queue, IJob job, TimeSpan delay, CancellationToken cancellationToken);
/// <summary>
/// Enqueues a job in the default queue.
/// </summary>
/// <param name="job">The job to be enqueued.</param>
/// <param name="cancellationToken">A token to cancel the enqueuing process, but it does NOT cancel the job once it is enqueued.</param>
/// <returns>A <see cref="Task"/> that represents the asynchronous operation of enqueueing.</returns>
public Task EnqueueAsync(IJob job, CancellationToken cancellationToken = default)
{
return EnqueueAsync(Queue.Default, job, cancellationToken);
}

/// <summary>
/// Enqueues a job with a specified delay in the default queue.
/// </summary>
/// <param name="job">The job to be enqueued.</param>
/// <param name="delay">The delay before the job is enqueued.</param>
/// <param name="cancellationToken">A token to cancel the enqueuing process, but it does NOT cancel the job once it is enqueued.</param>
/// <returns>A <see cref="Task"/> that represents the asynchronous operation of enqueueing.</returns>
public Task EnqueueWithDelayAsync(IJob job, TimeSpan delay, CancellationToken cancellationToken = default)
{
return EnqueueWithDelayAsync(Queue.Default, job, delay, cancellationToken);
}

/// <summary>
/// Enqueues a job in a specific queue.
/// </summary>
/// <param name="queue">The queue where the job will be enqueued.</param>
/// <param name="job">The job to be enqueued.</param>
/// <param name="cancellationToken">A token to cancel the enqueuing process, but it does NOT cancel the job once it is enqueued.</param>
/// <returns>A <see cref="Task"/> that represents the asynchronous operation of enqueueing.</returns>
public Task EnqueueAsync(Queue queue, IJob job, CancellationToken cancellationToken = default);

/// <summary>
/// Enqueues a job with a specified delay in a specific queue.
/// </summary>
/// <param name="queue">The queue where the job will be enqueued.</param>
/// <param name="job">The job to be enqueued.</param>
/// <param name="delay">The delay before the job is enqueued.</param>
/// <param name="cancellationToken">A token to cancel the enqueuing process, but it does NOT cancel the job once it is enqueued.</param>
/// <returns>A <see cref="Task"/> that represents the asynchronous operation of enqueueing.</returns>
public Task EnqueueWithDelayAsync(Queue queue, IJob job, TimeSpan delay, CancellationToken cancellationToken = default);
}
36 changes: 35 additions & 1 deletion src/Foundation/Jobs/IJobScheduler.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,46 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Cronos;
using IOKode.OpinionatedFramework.Facades;

namespace IOKode.OpinionatedFramework.Jobs;

/// <summary>
/// Defines a job scheduler contract that allows scheduling, rescheduling, and unscheduling jobs based on a cron expression.
/// </summary>
[AddToFacade("Job")]
public interface IJobScheduler
{
public Task ScheduleAsync(IJob job, CronExpression interval, CancellationToken cancellationToken);
/// <summary>
/// Schedules a job to run at intervals specified by a <see cref="CronExpression"/>.
/// </summary>
/// <param name="job">The job to be scheduled.</param>
/// <param name="interval">The cron expression representing the interval for the job to run.</param>
/// <param name="cancellationToken">A token to cancel the scheduling process, but it does NOT cancel the job once it is scheduled.</param>
/// <returns>A <see cref="Task"/> that represents the asynchronous operation of scheduling. The task result contains a <see cref="ScheduledJob"/> representing the scheduled job.</returns>
Task<ScheduledJob> ScheduleAsync(IJob job, CronExpression interval, CancellationToken cancellationToken = default);

/// <summary>
/// Reschedules an existing scheduled job to run at a new interval.
/// </summary>
/// <param name="scheduledJob">The job to be rescheduled.</param>
/// <param name="interval">The new cron expression representing the interval for the job to run.</param>
/// <param name="cancellationToken">A token to cancel the rescheduling process, but it does NOT cancel the job itself once it is rescheduled.</param>
/// <returns>A <see cref="Task"/> that represents the asynchronous operation of rescheduling.</returns>
/// <exception cref="ArgumentException">Thrown if the <paramref name="scheduledJob"/> was created with a different
/// <see cref="IJobScheduler"/> implementation than the one being used to reschedule it, or if it was not generated
/// by the <see cref="ScheduleAsync"/> method (e.g., manually instantiated).</exception>
Task RescheduleAsync(ScheduledJob scheduledJob, CronExpression interval, CancellationToken cancellationToken = default);

/// <summary>
/// Unschedules an existing scheduled job.
/// </summary>
/// <param name="scheduledJob">The job to be unscheduled.</param>
/// <param name="cancellationToken">A token to cancel the unscheduling process, but it does NOT cancel the job itself if it is already running.</param>
/// <returns>A <see cref="Task"/> that represents the asynchronous operation of unscheduling.</returns>
/// <exception cref="ArgumentException">Thrown if the <paramref name="scheduledJob"/> was created with a different
/// <see cref="IJobScheduler"/> implementation than the one being used to unschedule it, or if it was not generated
/// by the <see cref="ScheduleAsync"/> method (e.g., manually instantiated).</exception>
Task UnscheduleAsync(ScheduledJob scheduledJob, CancellationToken cancellationToken = default);
}
4 changes: 2 additions & 2 deletions src/Foundation/Jobs/JobExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ public static async Task ScheduleAsync(this IJob job, CronExpression interval,
await scheduler.ScheduleAsync(job, interval, cancellationToken);
}

public static async Task EnqueueAsync(this IJob job, string queue, CancellationToken cancellationToken)
public static async Task EnqueueAsync(this IJob job, Queue queue, CancellationToken cancellationToken)
{
var enqueuer = Locator.Resolve<IJobEnqueuer>();
await enqueuer.EnqueueAsync(queue, job, cancellationToken);
}

public static async Task EnqueueWithDelayAsync(this IJob job, string queue, TimeSpan delay, CancellationToken cancellationToken)
public static async Task EnqueueWithDelayAsync(this IJob job, Queue queue, TimeSpan delay, CancellationToken cancellationToken)
{
var enqueuer = Locator.Resolve<IJobEnqueuer>();
await enqueuer.EnqueueWithDelayAsync(queue, job, delay, cancellationToken);
Expand Down
Loading

0 comments on commit dc88b3b

Please sign in to comment.