Skip to content

Commit

Permalink
Refactor job blocking functionality to use BlockRepository strategy
Browse files Browse the repository at this point in the history
The commit introduces a new `IBlockRepository` interface to abstract job blocking operations. Depending on the environment, the system now uses either `PersistentBlockRepository` or `MemoryBlockRepository` for these operations. This refactoring aims to improve performance for non-clustered schedulers.
  • Loading branch information
JezhikLaas committed Dec 18, 2023
1 parent ad4b8b2 commit 22153be
Show file tree
Hide file tree
Showing 10 changed files with 186 additions and 77 deletions.
46 changes: 26 additions & 20 deletions Quartz.Impl.RavenJobStore.UnitTests/ImplementationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -568,11 +568,13 @@ public async Task If_a_trigger_is_added_and_the_assigned_job_is_blocked_Then_the

using (var arrangeSession = Target.DocumentStore!.OpenAsyncSession())
{
await arrangeSession.StoreAsync
await Target.BlockRepository!.BlockJobAsync
(
new BlockedJob(Target.InstanceName,
job.Key.GetDatabaseId(Target.InstanceName))
arrangeSession,
job.Key.GetDatabaseId(Target.InstanceName),
CancellationToken.None
);

await arrangeSession.SaveChangesAsync();
}

Expand Down Expand Up @@ -600,11 +602,13 @@ public async Task If_a_trigger_is_added_and_the_assigned_job_is_blocked_paused_T

using (var arrangeSession = Target.DocumentStore!.OpenAsyncSession())
{
await arrangeSession.StoreAsync
await Target.BlockRepository!.BlockJobAsync
(
new BlockedJob(Target.InstanceName,
job.Key.GetDatabaseId(Target.InstanceName))
arrangeSession,
job.Key.GetDatabaseId(Target.InstanceName),
CancellationToken.None
);

await arrangeSession.SaveChangesAsync();
}

Expand Down Expand Up @@ -1274,7 +1278,7 @@ public async Task If_a_blocked_job_exists_Then_ClearAllSchedulingData_removes_it
await Target.ClearAllSchedulingDataAsync(CancellationToken.None);

using var session = Target.DocumentStore!.OpenAsyncSession();
var anyBlocks = await Target.GetBlockedJobsAsync(session, CancellationToken.None);
var anyBlocks = await Target.BlockRepository!.GetBlockedJobsAsync(session, CancellationToken.None);

anyBlocks.Should().BeEmpty();
}
Expand Down Expand Up @@ -1788,10 +1792,7 @@ await Target.StoreTriggerAsync
);
entity.State = InternalTriggerState.Error;

await session.StoreAsync
(
new BlockedJob(Target.InstanceName, job.Key.GetDatabaseId(Target.InstanceName))
);
await Target.BlockRepository!.BlockJobAsync(session, entity.JobId, CancellationToken.None);

await session.SaveChangesAsync();
}
Expand Down Expand Up @@ -2152,9 +2153,11 @@ await Target.StoreTriggerAsync

using (var session = Target.DocumentStore!.OpenAsyncSession())
{
await session.StoreAsync
await Target.BlockRepository!.BlockJobAsync
(
new BlockedJob(Target.InstanceName, job.Key.GetDatabaseId(Target.InstanceName))
session,
job.Key.GetDatabaseId(Target.InstanceName),
CancellationToken.None
);

await session.SaveChangesAsync();
Expand Down Expand Up @@ -2324,11 +2327,13 @@ await Target.StoreTriggerAsync

using (var session = Target.DocumentStore!.OpenAsyncSession())
{
await session.StoreAsync
await Target.BlockRepository!.BlockJobAsync
(
new BlockedJob(Target.InstanceName, Job.GetId(Target.InstanceName, "Group", "Job"))
session,
job.Key.GetDatabaseId(Target.InstanceName),
CancellationToken.None
);

await session.SaveChangesAsync();
}

Expand Down Expand Up @@ -3166,13 +3171,14 @@ await arrangeSession.StoreAsync

using (var session = Target.DocumentStore!.OpenAsyncSession())
{
var blockedJobExists = await session.Advanced.ExistsAsync
var isJobBlocked = await Target.BlockRepository!.IsJobBlockedAsync
(
BlockedJob.GetId(Target.InstanceName, job.Key.GetDatabaseId(Target.InstanceName)),
session,
job.Key.GetDatabaseId(Target.InstanceName),
CancellationToken.None
);

blockedJobExists.Should().BeFalse();
isJobBlocked.Should().BeFalse();
}
}

Expand All @@ -3195,7 +3201,7 @@ await arrangeSession.StoreAsync
}

using var session = Target.DocumentStore!.OpenAsyncSession();
var result = await Target.IsJobBlockedAsync
var result = await Target.BlockRepository!.IsJobBlockedAsync
(
session,
new JobKey("Job-BusinessTransactions/InvoiceToIntercompany-DispatchToIntercompany", "Group").GetDatabaseId(Target.InstanceName),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
using System.Collections.Concurrent;
using Domla.Quartz.Raven.Strategies;
using Raven.Client.Documents.Session;

namespace Domla.Quartz.Raven.ConcreteStrategies;

internal class MemoryBlockRepository : IBlockRepository
{
private ConcurrentDictionary<string, byte> BlockedJobs { get; } = new();

public Task BlockJobAsync(IAsyncDocumentSession session, string jobId, CancellationToken token)
{
BlockedJobs.TryAdd(jobId, 0);
return Task.CompletedTask;
}

public Task ReleaseJobAsync(IAsyncDocumentSession session, string jobId, CancellationToken token)
{
BlockedJobs.TryRemove(jobId, out _);
return Task.CompletedTask;
}

public Task<bool> IsJobBlockedAsync(IAsyncDocumentSession session, string jobId, CancellationToken token) =>
Task.FromResult(BlockedJobs.ContainsKey(jobId));

public Task<IReadOnlyList<string>> GetBlockedJobsAsync(IAsyncDocumentSession session, CancellationToken token) =>
Task.FromResult<IReadOnlyList<string>>(BlockedJobs.Keys.ToList());

public Task ReleaseAllJobsAsync(IAsyncDocumentSession session, CancellationToken token)
{
BlockedJobs.Clear();
return Task.CompletedTask;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
using Domla.Quartz.Raven.Entities;
using Domla.Quartz.Raven.Indexes;
using Domla.Quartz.Raven.Strategies;
using Raven.Client.Documents;
using Raven.Client.Documents.Session;

namespace Domla.Quartz.Raven.ConcreteStrategies;

internal class PersistentBlockRepository : IBlockRepository
{
private string InstanceName { get; }

public PersistentBlockRepository(string instanceName)
{
InstanceName = instanceName;
}

public Task BlockJobAsync(IAsyncDocumentSession session, string jobId, CancellationToken token) =>
session.StoreAsync(new BlockedJob(InstanceName, jobId), token);

public Task ReleaseJobAsync(IAsyncDocumentSession session, string jobId, CancellationToken token)
{
session.Delete(BlockedJob.GetId(InstanceName, jobId));
return Task.CompletedTask;
}

public Task<bool> IsJobBlockedAsync(IAsyncDocumentSession session, string jobId, CancellationToken token) =>
session.Advanced.ExistsAsync(BlockedJob.GetId(InstanceName, jobId), token);

public async Task<IReadOnlyList<string>> GetBlockedJobsAsync(
IAsyncDocumentSession session,
CancellationToken token) =>
await (
from blocked in session.Query<BlockedJob>(nameof(BlockedJobIndex))
where blocked.Scheduler == InstanceName
select blocked.JobId
).ToListAsync(token).ConfigureAwait(false);

public async Task ReleaseAllJobsAsync(IAsyncDocumentSession session, CancellationToken token)
{
var ids = await (
from blocked in session.Query<BlockedJob>(nameof(BlockedJobIndex))
where blocked.Scheduler == InstanceName
select blocked.Id
).ToListAsync(token).ConfigureAwait(false);

ids.ForEach(session.Delete);
}
}
16 changes: 16 additions & 0 deletions Quartz.Impl.RavenJobStore/Indexes/BlockedJobIndex.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using Domla.Quartz.Raven.Entities;
using Raven.Client.Documents.Indexes;

namespace Domla.Quartz.Raven.Indexes;

internal class BlockedJobIndex : AbstractIndexCreationTask<BlockedJob>
{
internal BlockedJobIndex()
{
Map = blockedJobs => from blockedJob in blockedJobs
select new
{
blockedJob.Scheduler
};
}
}
11 changes: 0 additions & 11 deletions Quartz.Impl.RavenJobStore/Indexes/JobIndex.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,4 @@ internal JobIndex()
job.RequestsRecovery
};
}
}
internal class BlockedJobIndex : AbstractIndexCreationTask<BlockedJob>
{
internal BlockedJobIndex()
{
Map = blockedJobs => from blockedJob in blockedJobs
select new
{
blockedJob.Scheduler
};
}
}
7 changes: 5 additions & 2 deletions Quartz.Impl.RavenJobStore/Quartz.Impl.RavenJobStore.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<Nullable>enable</Nullable>
<TargetFrameworks>net7.0;net6.0</TargetFrameworks>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<Version>1.0.0-beta.2</Version>
<Version>1.0.0-beta.3</Version>
<Title>Quartz Job store using RavenDB</Title>
<Authors>Uwe Laas</Authors>
<PackageProjectUrl>https://github.com/JezhikLaas/quartznet-RavenJobStore</PackageProjectUrl>
Expand All @@ -19,7 +19,10 @@
<PackageReadmeFile>README.md</PackageReadmeFile>
<RootNamespace>Domla.Quartz.Raven</RootNamespace>
<PackageIcon>crow.png</PackageIcon>
<PackageReleaseNotes>* 1.0 - beta.2
<PackageReleaseNotes>* 1.0-bet.3
Uses in-memory blocks for non-clustered schedulers to improve performance.

* 1.0 - beta.2
Waiting for indexes before streaming seems to be a performance killer - try to wait only for the needed index(es).

* 1.0 - beta.1
Expand Down
Loading

0 comments on commit 22153be

Please sign in to comment.