Skip to content

Commit

Permalink
Fix blocking issue for self-terminating jobs
Browse files Browse the repository at this point in the history
This change resolves an orphaned job block problem specific to self-terminating jobs. The code now ensures that upon job termination, job blocking status is correctly updated, eliminating potential lingering blocks. The change also includes a unit test to verify the fix and the project version has been updated to 1.0.0-beta.1.
  • Loading branch information
JezhikLaas committed Dec 17, 2023
1 parent 1a8c965 commit 1d18fae
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 5 deletions.
43 changes: 43 additions & 0 deletions Quartz.Impl.RavenJobStore.UnitTests/SingleSchedulerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,49 @@ public async Task If_a_durable_job_terminates_itself_Then_no_orphaned_blocks_rem
counter.Should().BeGreaterThan(0);
}

[Fact(DisplayName = "If a non-durable job terminates itself Then no orphaned blocks remain")]
public async Task If_a_non_durable_job_terminates_itself_Then_no_orphaned_blocks_remain()
{
Scheduler = await CreateSingleSchedulerAsync("Test");
await Scheduler.Start();

var watcher = new ControllingWatcher(Scheduler.SchedulerInstanceId, SchedulerExecutionStep.Completed);

var store = GetStore(Scheduler);
store.DebugWatcher = watcher;

var job = JobBuilder
.Create(typeof(TerminatingJob))
.WithIdentity("Job", "Group")
.Build();

var trigger = (IOperableTrigger)TriggerBuilder.Create()
.WithIdentity("Trigger", "Group")
.StartNow()
.WithSimpleSchedule(schedule => schedule
.WithInterval(TimeSpan.FromSeconds(1))
.RepeatForever()
)
.ForJob(job)
.Build();

await Scheduler.ScheduleJob(job, trigger, CancellationToken.None);

watcher.WaitForEvent(TimeSpan.FromSeconds(15));

using var session = store.DocumentStore.ThrowIfNull().OpenAsyncSession();

var counter = 10;
var block = await session.Query<BlockedJob>().AnyAsync();
while (counter-- > 0 && block)
{
await Task.Delay(TimeSpan.FromSeconds(0.1));
block = await session.Query<BlockedJob>().AnyAsync();
}

counter.Should().BeGreaterThan(0);
}

[Fact(DisplayName = "If a non concurrent job reschedules itself Then the replaced trigger is not blocked")]
public async Task If_a_non_concurrent_job_reschedules_itself_Then_the_replaced_trigger_is_not_blocked()
{
Expand Down
7 changes: 5 additions & 2 deletions Quartz.Impl.RavenJobStore/Quartz.Impl.RavenJobStore.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<Nullable>enable</Nullable>
<TargetFrameworks>net7.0;net6.0</TargetFrameworks>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<Version>0.9.0-prerelease</Version>
<Version>1.0.0-beta.1</Version>
<Title>Quartz Job store using RavenDB</Title>
<Authors>Uwe Laas</Authors>
<PackageProjectUrl>https://github.com/JezhikLaas/quartznet-RavenJobStore</PackageProjectUrl>
Expand All @@ -19,7 +19,10 @@
<PackageReadmeFile>README.md</PackageReadmeFile>
<RootNamespace>Domla.Quartz.Raven</RootNamespace>
<PackageIcon>crow.png</PackageIcon>
<PackageReleaseNotes>* 0.9
<PackageReleaseNotes>* 1.0 - beta.1
Fixes an orphaned job block bug which occured with self removing jobs. This is the first release candidate.

* 0.9
Fixes a bug which sets a trigger to be blocked unexpectedly.

* 0.8
Expand Down
6 changes: 5 additions & 1 deletion Quartz.Impl.RavenJobStore/RavenJobStore.Implementation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,9 @@ internal async Task<bool> RemoveTriggerAsync(TriggerKey triggerKey, Cancellation

if (triggersForJob.Count == 1 && job.Durable == false)
{
session.Delete(BlockedJob.GetId(InstanceName, job.Id));
session.Delete(job.Id);

await Signaler.NotifySchedulerListenersJobDeleted(job.JobKey, token).ConfigureAwait(false);
}

Expand Down Expand Up @@ -1600,6 +1602,8 @@ internal async Task<IReadOnlyCollection<TriggerFiredResult>> TriggersFiredAsync(
.LoadAsync<Trigger>(triggerKeys, token)
.ConfigureAwait(false);

var blockedJobs = await GetBlockedJobsAsync(session, token).ConfigureAwait(false);

foreach (var (_, storedTrigger) in storedTriggers)
{
if (storedTrigger?.State != InternalTriggerState.Acquired)
Expand All @@ -1608,7 +1612,7 @@ internal async Task<IReadOnlyCollection<TriggerFiredResult>> TriggersFiredAsync(
result.Add(new TriggerFiredResult(new RavenDbException("Trigger is not acquired")));
continue;
}
var isJobBlocked = await IsJobBlockedAsync(session, storedTrigger.JobId, token).ConfigureAwait(false);
var isJobBlocked = blockedJobs.Contains(storedTrigger.JobId);
if (isJobBlocked)
{
// This should force Quartz to release this
Expand Down
4 changes: 2 additions & 2 deletions Quartz.Impl.RavenJobStore/RavenJobStore.Utils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,8 @@ private async Task<Trigger> CreateConfiguredTriggerAsync(
return trigger;
}

internal async Task<bool> IsJobBlockedAsync(IAsyncDocumentSession session, string jobId, CancellationToken token) =>
await session.Advanced.ExistsAsync(BlockedJob.GetId(InstanceName, jobId), token).ConfigureAwait(false);
internal Task<bool> IsJobBlockedAsync(IAsyncDocumentSession session, string jobId, CancellationToken token) =>
session.Advanced.ExistsAsync(BlockedJob.GetId(InstanceName, jobId), token);

internal async Task<IReadOnlyList<string>> GetBlockedJobsAsync
(
Expand Down

0 comments on commit 1d18fae

Please sign in to comment.