From 8ee4892b83b07c86fb49e7ad6141288615ce9c6e Mon Sep 17 00:00:00 2001 From: zero88 Date: Sat, 19 Aug 2023 19:29:15 +0700 Subject: [PATCH] feat(#69): Add TriggerCondition --- .../zero88/schedulerx/ExecutionContext.java | 21 ++- .../zero88/schedulerx/ExecutionResult.java | 4 + .../schedulerx/impl/AbstractScheduler.java | 109 +++++++----- .../schedulerx/impl/ExecutionContextImpl.java | 8 +- .../schedulerx/impl/ExecutionResultImpl.java | 38 ++-- .../impl/InternalTriggerContext.java | 27 --- .../impl/TriggerContextFactory.java | 164 ++++++++++++++++++ .../schedulerx/trigger/CronSchedulerImpl.java | 5 +- .../trigger/EventSchedulerImpl.java | 61 ++++--- .../trigger/EventTriggerPredicate.java | 2 + .../trigger/IntervalSchedulerImpl.java | 3 +- .../schedulerx/trigger/TriggerCondition.java | 72 ++++++++ .../schedulerx/trigger/TriggerContext.java | 39 +++-- .../zero88/schedulerx/SchedulerTest.java | 25 ++- .../schedulerx/trigger/CronSchedulerTest.java | 1 + .../trigger/EventSchedulerTest.java | 78 +++++++-- .../trigger/IntervalSchedulerTest.java | 30 +++- .../zero88/schedulerx/SchedulingAsserter.java | 20 ++- 18 files changed, 528 insertions(+), 179 deletions(-) delete mode 100644 core/src/main/java/io/github/zero88/schedulerx/impl/InternalTriggerContext.java create mode 100644 core/src/main/java/io/github/zero88/schedulerx/impl/TriggerContextFactory.java create mode 100644 core/src/main/java/io/github/zero88/schedulerx/trigger/TriggerCondition.java diff --git a/core/src/main/java/io/github/zero88/schedulerx/ExecutionContext.java b/core/src/main/java/io/github/zero88/schedulerx/ExecutionContext.java index b778933..72a5c55 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/ExecutionContext.java +++ b/core/src/main/java/io/github/zero88/schedulerx/ExecutionContext.java @@ -1,6 +1,7 @@ package io.github.zero88.schedulerx; import java.time.Instant; +import java.util.Objects; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -31,12 +32,22 @@ public interface ExecutionContext { */ long round(); + /** + * Runtime trigger context + * + * @see TriggerContext + * @since 2.0.0 + */ + @NotNull TriggerContext triggerContext(); + /** * Trigger execution at time * * @return triggeredAt */ - @NotNull Instant triggeredAt(); + default @NotNull Instant triggeredAt() { + return Objects.requireNonNull(triggerContext().triggerAt()); + } /** * Executed at time @@ -45,14 +56,6 @@ public interface ExecutionContext { */ @NotNull Instant executedAt(); - /** - * Runtime trigger context - * - * @see TriggerContext - * @since 2.0.0 - */ - @NotNull TriggerContext triggerContext(); - /** * Check whether force stop execution or not * diff --git a/core/src/main/java/io/github/zero88/schedulerx/ExecutionResult.java b/core/src/main/java/io/github/zero88/schedulerx/ExecutionResult.java index da46ef5..104a3b2 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/ExecutionResult.java +++ b/core/src/main/java/io/github/zero88/schedulerx/ExecutionResult.java @@ -5,6 +5,8 @@ import org.jetbrains.annotations.Nullable; +import io.github.zero88.schedulerx.trigger.TriggerContext; + /** * Represents for task result will be pass on each event of {@link SchedulingMonitor} * @@ -25,6 +27,8 @@ public interface ExecutionResult { */ @Nullable T externalId(); + TriggerContext triggerContext(); + /** * Only {@code not null} in {@link SchedulingMonitor#onUnableSchedule(ExecutionResult)} * diff --git a/core/src/main/java/io/github/zero88/schedulerx/impl/AbstractScheduler.java b/core/src/main/java/io/github/zero88/schedulerx/impl/AbstractScheduler.java index aef6b6f..abbcd7d 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/impl/AbstractScheduler.java +++ b/core/src/main/java/io/github/zero88/schedulerx/impl/AbstractScheduler.java @@ -1,6 +1,7 @@ package io.github.zero88.schedulerx.impl; import java.time.Instant; +import java.util.Objects; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -8,14 +9,15 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import io.github.zero88.schedulerx.ExecutionContext; +import io.github.zero88.schedulerx.ExecutionResult; import io.github.zero88.schedulerx.JobData; import io.github.zero88.schedulerx.Scheduler; -import io.github.zero88.schedulerx.Task; -import io.github.zero88.schedulerx.ExecutionContext; -import io.github.zero88.schedulerx.TaskExecutor; import io.github.zero88.schedulerx.SchedulingMonitor; -import io.github.zero88.schedulerx.ExecutionResult; +import io.github.zero88.schedulerx.Task; import io.github.zero88.schedulerx.trigger.Trigger; +import io.github.zero88.schedulerx.trigger.TriggerCondition.ReasonCode; +import io.github.zero88.schedulerx.trigger.TriggerCondition.TriggerStatus; import io.github.zero88.schedulerx.trigger.TriggerContext; import io.vertx.core.AsyncResult; import io.vertx.core.Future; @@ -36,7 +38,7 @@ public abstract class AbstractScheduler implements Scheduler { @SuppressWarnings("java:S3416") - protected static final Logger LOGGER = LoggerFactory.getLogger(TaskExecutor.class); + protected static final Logger LOGGER = LoggerFactory.getLogger(Scheduler.class); private final @NotNull Vertx vertx; private final @NotNull SchedulerStateInternal state; @@ -129,7 +131,7 @@ public final void executeTask(@NotNull ExecutionContext executionContext) { public final void cancel() { if (!state.completed()) { trace(Instant.now(), "Canceling the task"); - doStop(state.timerId()); + doStop(state.timerId(), TriggerContextFactory.stop(trigger().type(), ReasonCode.STOP_BY_MANUAL)); } } @@ -139,9 +141,9 @@ protected final void doStart(WorkerExecutor workerExecutor) { .onFailure(this::onUnableSchedule); } - protected final void doStop(long timerId) { + protected final void doStop(long timerId, TriggerContext context) { unregisterTimer(timerId); - onCompleted(); + onCompleted(context); } protected abstract @NotNull Future registerTimer(@NotNull Promise promise, @@ -149,34 +151,53 @@ protected final void doStop(long timerId) { protected void unregisterTimer(long timerId) { vertx.cancelTimer(timerId); } - protected InternalTriggerContext shouldRun(@NotNull Instant triggerAt, @NotNull TriggerContext triggerContext) { - state.increaseTick(); + protected final TriggerContext shouldRun(@NotNull TriggerContext triggerContext) { + if (triggerContext.condition().status() != TriggerStatus.INITIALIZED) { + return triggerContext; + } + if (state.pending()) { + return TriggerContextFactory.skip(triggerContext, ReasonCode.NOT_YET_SCHEDULED); + } if (state.completed()) { - trace(triggerAt, "The task execution is already completed"); + return TriggerContextFactory.skip(triggerContext, ReasonCode.ALREADY_STOPPED); } if (state.executing()) { - onMisfire(triggerAt, "The task is still running"); + return TriggerContextFactory.skip(triggerContext, ReasonCode.TASK_IS_RUNNING); } - return InternalTriggerContext.create(state.idle() && trigger().shouldExecute(triggerAt), triggerContext); + return evaluateTrigger(triggerContext); } - protected final boolean shouldStop(@Nullable ExecutionContext executionContext, long round) { - return (executionContext != null && executionContext.isForceStop()) || trigger().shouldStop(round); + protected final TriggerContext shouldStop(@NotNull TriggerContext triggerContext, boolean isForceStop, long round) { + if (isForceStop) { + return TriggerContextFactory.stop(triggerContext, ReasonCode.STOP_BY_TASK); + } + return trigger().shouldStop(round) + ? TriggerContextFactory.stop(triggerContext, ReasonCode.STOP_BY_CONFIG) + : triggerContext; + } + + protected TriggerContext evaluateTrigger(@NotNull TriggerContext triggerContext) { + return trigger().shouldExecute(Objects.requireNonNull(triggerContext.triggerAt())) + ? TriggerContextFactory.ready(triggerContext) + : TriggerContextFactory.skip(triggerContext, ReasonCode.CONDITION_IS_NOT_MATCHED); } protected final void run(WorkerExecutor workerExecutor, TriggerContext triggerContext) { - final Instant triggerAt = Instant.now(); - final InternalTriggerContext internalContext = shouldRun(triggerAt, triggerContext); - if (internalContext.shouldRun()) { - final TriggerContext triggerCtx = TriggerContext.create(internalContext.type(), internalContext.info()); + state.increaseTick(); + final TriggerContext transitionCtx = shouldRun(triggerContext); + if (transitionCtx.condition().isReady()) { final ExecutionContextInternal ctx = new ExecutionContextImpl<>(vertx, state.increaseRound(), - triggerAt, triggerCtx); - trace(triggerAt, "Trigger the task execution"); + transitionCtx); + trace(Objects.requireNonNull(transitionCtx.triggerAt()), "Trigger the task execution"); if (workerExecutor != null) { - workerExecutor.executeBlocking(promise -> executeTask(onExecute(promise, ctx)), this::onResult); + workerExecutor.executeBlocking(promise -> executeTask(onExecute(promise, ctx)), + asyncResult -> onResult(transitionCtx, asyncResult)); } else { - vertx.executeBlocking(promise -> executeTask(onExecute(promise, ctx)), this::onResult); + vertx.executeBlocking(promise -> executeTask(onExecute(promise, ctx)), + asyncResult -> onResult(transitionCtx, asyncResult)); } + } else { + onMisfire(transitionCtx); } } @@ -206,54 +227,61 @@ protected final void onSchedule(long timerId) { } protected final void onUnableSchedule(Throwable t) { + final TriggerContext triggerContext = TriggerContextFactory.failed(trigger.type(), + ReasonCode.FAILED_TO_SCHEDULE, t); monitor.onUnableSchedule(ExecutionResultImpl.builder() .setExternalId(jobData.externalId()) .setTick(state.tick()) .setRound(state.round()) .setUnscheduledAt(Instant.now()) - .setError(t) + .setTriggerContext(triggerContext) .build()); } - protected final void onMisfire(@NotNull Instant triggerAt, String reason) { - trace(triggerAt, "Skip the execution::" + reason); + protected final void onMisfire(@NotNull TriggerContext triggerContext) { + trace(Objects.requireNonNull(triggerContext.triggerAt()), + "Skip the execution::" + triggerContext.condition().reasonCode()); monitor.onMisfire(ExecutionResultImpl.builder() .setExternalId(jobData.externalId()) .setTick(state.tick()) .setAvailableAt(state.availableAt()) - .setTriggeredAt(triggerAt) + .setTriggeredAt(triggerContext.triggerAt()) + .setTriggerContext(triggerContext) .build()); } @SuppressWarnings("unchecked") - protected final void onResult(@NotNull AsyncResult asyncResult) { + protected final void onResult(@NotNull TriggerContext triggerContext, @NotNull AsyncResult asyncResult) { state.markIdle(); final Instant finishedAt = Instant.now(); - if (asyncResult.failed()) { - LOGGER.warn(genMsg(state.tick(), state.round(), finishedAt, "Programming error"), asyncResult.cause()); - } - ExecutionContextInternal executionContext = (ExecutionContextInternal) asyncResult.result(); + TriggerContext transitionCtx; if (asyncResult.succeeded()) { trace(finishedAt, "Received the task result"); + final ExecutionContextInternal executionCtx = (ExecutionContextInternal) asyncResult.result(); monitor.onEach(ExecutionResultImpl.builder() .setExternalId(jobData.externalId()) .setAvailableAt(state.availableAt()) .setTick(state.tick()) - .setRound(executionContext.round()) - .setTriggeredAt(executionContext.triggeredAt()) - .setExecutedAt(executionContext.executedAt()) + .setRound(executionCtx.round()) + .setTriggerContext(triggerContext) + .setTriggeredAt(executionCtx.triggeredAt()) + .setExecutedAt(executionCtx.executedAt()) .setFinishedAt(finishedAt) - .setData(state.addData(executionContext.round(), executionContext.data())) - .setError(state.addError(executionContext.round(), executionContext.error())) + .setData(state.addData(executionCtx.round(), executionCtx.data())) + .setError(state.addError(executionCtx.round(), executionCtx.error())) .setCompleted(state.completed()) .build()); + transitionCtx = shouldStop(triggerContext, executionCtx.isForceStop(), state.round()); + } else { + LOGGER.warn(genMsg(state.tick(), state.round(), finishedAt, "Programming error"), asyncResult.cause()); + transitionCtx = shouldStop(triggerContext, false, state.round()); } - if (shouldStop(executionContext, state.round())) { - doStop(state.timerId()); + if (transitionCtx.condition().isStop()) { + doStop(state.timerId(), transitionCtx); } } - protected final void onCompleted() { + protected final void onCompleted(TriggerContext context) { state.markCompleted(); final Instant completedAt = Instant.now(); trace(completedAt, "The task execution is completed"); @@ -262,6 +290,7 @@ protected final void onCompleted() { .setAvailableAt(state.availableAt()) .setTick(state.tick()) .setRound(state.round()) + .setTriggerContext(context) .setCompleted(state.completed()) .setCompletedAt(completedAt) .setData(state.lastData()) diff --git a/core/src/main/java/io/github/zero88/schedulerx/impl/ExecutionContextImpl.java b/core/src/main/java/io/github/zero88/schedulerx/impl/ExecutionContextImpl.java index 19d552b..90c3331 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/impl/ExecutionContextImpl.java +++ b/core/src/main/java/io/github/zero88/schedulerx/impl/ExecutionContextImpl.java @@ -13,7 +13,6 @@ final class ExecutionContextImpl implements ExecutionContextInternal promise; @@ -21,10 +20,9 @@ final class ExecutionContextImpl implements ExecutionContextInternal implements ExecutionResult { @@ -21,29 +22,34 @@ final class ExecutionResultImpl implements ExecutionResult { private final long tick; private final long round; private final boolean completed; + private final TriggerContext triggerContext; private final Throwable error; private final OUTPUT data; ExecutionResultImpl(ExecutionResultBuilder builder) { - this.unscheduledAt = builder.unscheduledAt; - this.rescheduledAt = builder.rescheduledAt; - this.availableAt = builder.availableAt; - this.triggeredAt = builder.triggeredAt; - this.executedAt = builder.executedAt; - this.finishedAt = builder.finishedAt; - this.completedAt = builder.completedAt; - this.externalId = builder.externalId; - this.tick = builder.tick; - this.round = builder.round; - this.completed = builder.completed; - this.error = builder.error; - this.data = builder.data; + this.unscheduledAt = builder.unscheduledAt; + this.rescheduledAt = builder.rescheduledAt; + this.availableAt = builder.availableAt; + this.triggeredAt = builder.triggeredAt; + this.executedAt = builder.executedAt; + this.finishedAt = builder.finishedAt; + this.completedAt = builder.completedAt; + this.externalId = builder.externalId; + this.tick = builder.tick; + this.round = builder.round; + this.completed = builder.completed; + this.triggerContext = builder.triggerContext; + this.error = builder.error; + this.data = builder.data; } @Override @SuppressWarnings("unchecked") public @Nullable T externalId() { return (T) this.externalId; } + @Override + public TriggerContext triggerContext() { return this.triggerContext; } + public Instant unscheduledAt() { return this.unscheduledAt; } public Instant rescheduledAt() { return this.rescheduledAt; } @@ -81,6 +87,7 @@ static final class ExecutionResultBuilder { long tick; long round; boolean completed; + TriggerContext triggerContext; Throwable error; OUTPUT data; @@ -139,6 +146,11 @@ public ExecutionResultBuilder setCompleted(boolean completed) { return this; } + public ExecutionResultBuilder setTriggerContext(TriggerContext triggerContext) { + this.triggerContext = triggerContext; + return this; + } + public ExecutionResultBuilder setError(Throwable error) { this.error = error; return this; diff --git a/core/src/main/java/io/github/zero88/schedulerx/impl/InternalTriggerContext.java b/core/src/main/java/io/github/zero88/schedulerx/impl/InternalTriggerContext.java deleted file mode 100644 index 3db75be..0000000 --- a/core/src/main/java/io/github/zero88/schedulerx/impl/InternalTriggerContext.java +++ /dev/null @@ -1,27 +0,0 @@ -package io.github.zero88.schedulerx.impl; - -import org.jetbrains.annotations.ApiStatus.Internal; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; - -import io.github.zero88.schedulerx.trigger.TriggerContext; - -@Internal -public interface InternalTriggerContext extends TriggerContext { - - boolean shouldRun(); - - static InternalTriggerContext create(boolean shouldRun, TriggerContext ctx) { - return new InternalTriggerContext() { - @Override - public boolean shouldRun() { return shouldRun; } - - @Override - public @Nullable Object info() { return ctx.info(); } - - @Override - public @NotNull String type() { return ctx.type(); } - }; - } - -} diff --git a/core/src/main/java/io/github/zero88/schedulerx/impl/TriggerContextFactory.java b/core/src/main/java/io/github/zero88/schedulerx/impl/TriggerContextFactory.java new file mode 100644 index 0000000..f4ff92a --- /dev/null +++ b/core/src/main/java/io/github/zero88/schedulerx/impl/TriggerContextFactory.java @@ -0,0 +1,164 @@ +package io.github.zero88.schedulerx.impl; + +import java.time.Instant; +import java.util.Objects; + +import org.jetbrains.annotations.ApiStatus.Internal; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import io.github.zero88.schedulerx.trigger.TriggerCondition; +import io.github.zero88.schedulerx.trigger.TriggerCondition.TriggerStatus; +import io.github.zero88.schedulerx.trigger.TriggerContext; + +/** + * The factory to create trigger context. + * + * @since 2.0.0 + */ +@Internal +public final class TriggerContextFactory { + + private TriggerContextFactory() { } + + /** + * Create trigger context in {@link TriggerStatus#INITIALIZED} state + * + * @param triggerType the trigger type + */ + public static @NotNull TriggerContext init(@NotNull String triggerType) { return init(triggerType, null); } + + /** + * Create trigger context in {@link TriggerStatus#INITIALIZED} state + * + * @param triggerType the trigger type + * @param info the trigger context info + */ + public static @NotNull TriggerContext init(@NotNull String triggerType, @Nullable T info) { + final Instant triggerAt = Instant.now(); + final TriggerCondition condition = () -> TriggerStatus.INITIALIZED; + return new TriggerContext() { + @Override + public @NotNull String type() { return triggerType; } + + public @NotNull Instant triggerAt() { return triggerAt; } + + @Override + public @NotNull TriggerCondition condition() { return condition; } + + @Override + public T info() { return info; } + }; + } + + /** + * Transition trigger context to {@link TriggerStatus#READY} state. + * + * @param ctx the current trigger context + */ + public static @NotNull TriggerContext ready(@NotNull TriggerContext ctx) { + return transition(ctx, TriggerStatus.READY, null, null); + } + + /** + * Transition trigger context to {@link TriggerStatus#SKIP} state. + * + * @param ctx the current trigger context + * @param reason the transition reason + */ + public static @NotNull TriggerContext skip(@NotNull TriggerContext ctx, @NotNull String reason) { + return transition(ctx, TriggerStatus.SKIP, reason, null); + } + + /** + * Transition trigger context to {@link TriggerStatus#SKIP} state. + * + * @param ctx the current trigger context + * @param reason the transition reason + * @param cause the transition cause + */ + public static @NotNull TriggerContext skip(@NotNull TriggerContext ctx, @NotNull String reason, + @NotNull Throwable cause) { + return transition(ctx, TriggerStatus.SKIP, reason, cause); + } + + /** + * Transition trigger context to {@link TriggerStatus#STOP} state. + * + * @param ctx the current trigger context + * @param reason the transition reason + */ + public static @NotNull TriggerContext stop(@NotNull TriggerContext ctx, @Nullable String reason) { + return transition(ctx, TriggerStatus.STOP, reason, null); + } + + /** + * Create a trigger context in {@link TriggerStatus#STOP} state. + * + * @param triggerType the trigger type + * @param reason the stopped reason + */ + public static @NotNull TriggerContext stop(String triggerType, String reason) { + return noop(triggerType, createCondition(TriggerStatus.STOP, reason, null)); + } + + /** + * Create a trigger context in {@link TriggerStatus#FAILED} state. + * + * @param triggerType the trigger type + * @param reason the transition reason + * @param cause the failed cause + */ + public static TriggerContext failed(String triggerType, String reason, @Nullable Throwable cause) { + return noop(triggerType, createCondition(TriggerStatus.FAILED, reason, cause)); + } + + static @NotNull TriggerContext transition(@NotNull TriggerContext ctx, @NotNull TriggerStatus status, + @Nullable String reason, @Nullable Throwable cause) { + final TriggerCondition condition = createCondition(status, reason, cause); + return new TriggerContext() { + + @Override + public @NotNull String type() { return ctx.type(); } + + @Override + public @NotNull Instant triggerAt() { return Objects.requireNonNull(ctx.triggerAt()); } + + @Override + public @NotNull TriggerCondition condition() { return condition; } + + @Override + public @Nullable Object info() { return ctx.info(); } + }; + } + + static @NotNull TriggerCondition createCondition(@NotNull TriggerStatus status, @Nullable String reason, + @Nullable Throwable cause) { + return new TriggerCondition() { + @Override + public @NotNull TriggerStatus status() { return status; } + + @Override + public @Nullable String reasonCode() { return reason; } + + @Override + public @Nullable Throwable cause() { return cause; } + }; + } + + @NotNull + static TriggerContext noop(String triggerType, TriggerCondition condition) { + return new TriggerContext() { + + @Override + public @NotNull String type() { return triggerType; } + + @Override + public @Nullable Instant triggerAt() { return null; } + + @Override + public @NotNull TriggerCondition condition() { return condition; } + }; + } + +} diff --git a/core/src/main/java/io/github/zero88/schedulerx/trigger/CronSchedulerImpl.java b/core/src/main/java/io/github/zero88/schedulerx/trigger/CronSchedulerImpl.java index 13668fd..cc64034 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/trigger/CronSchedulerImpl.java +++ b/core/src/main/java/io/github/zero88/schedulerx/trigger/CronSchedulerImpl.java @@ -5,10 +5,11 @@ import org.jetbrains.annotations.NotNull; import io.github.zero88.schedulerx.JobData; -import io.github.zero88.schedulerx.Task; import io.github.zero88.schedulerx.SchedulingMonitor; +import io.github.zero88.schedulerx.Task; import io.github.zero88.schedulerx.impl.AbstractScheduler; import io.github.zero88.schedulerx.impl.AbstractSchedulerBuilder; +import io.github.zero88.schedulerx.impl.TriggerContextFactory; import io.vertx.core.Future; import io.vertx.core.Promise; import io.vertx.core.Vertx; @@ -27,7 +28,7 @@ final class CronSchedulerImpl extends AbstractScheduler { - run(workerExecutor, TriggerContext.empty(trigger().type())); + run(workerExecutor, TriggerContextFactory.init(trigger().type())); doStart(workerExecutor); })); } catch (Exception ex) { diff --git a/core/src/main/java/io/github/zero88/schedulerx/trigger/EventSchedulerImpl.java b/core/src/main/java/io/github/zero88/schedulerx/trigger/EventSchedulerImpl.java index ad0845b..9ac70fe 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/trigger/EventSchedulerImpl.java +++ b/core/src/main/java/io/github/zero88/schedulerx/trigger/EventSchedulerImpl.java @@ -1,16 +1,16 @@ package io.github.zero88.schedulerx.trigger; -import java.time.Instant; import java.util.Objects; import org.jetbrains.annotations.NotNull; import io.github.zero88.schedulerx.JobData; -import io.github.zero88.schedulerx.Task; import io.github.zero88.schedulerx.SchedulingMonitor; +import io.github.zero88.schedulerx.Task; import io.github.zero88.schedulerx.impl.AbstractScheduler; import io.github.zero88.schedulerx.impl.AbstractSchedulerBuilder; -import io.github.zero88.schedulerx.impl.InternalTriggerContext; +import io.github.zero88.schedulerx.impl.TriggerContextFactory; +import io.github.zero88.schedulerx.trigger.TriggerCondition.ReasonCode; import io.vertx.core.Future; import io.vertx.core.Promise; import io.vertx.core.Vertx; @@ -34,16 +34,14 @@ final class EventSchedulerImpl extends AbstractScheduler run(workerExecutor, TriggerContext.create(trigger().type(), msg))) - .completionHandler(event -> { - if (event.failed()) { - promise.fail( - new IllegalStateException("Unable to register a subscriber on address[" + address + "]", - event.cause())); - } else { - promise.complete((long) consumer.hashCode()); - } - }); + consumer.handler(msg -> run(workerExecutor, createTriggerContext(msg))).completionHandler(event -> { + if (event.failed()) { + promise.fail(new IllegalStateException("Unable to register a subscriber on address[" + address + "]", + event.cause())); + } else { + promise.complete((long) consumer.hashCode()); + } + }); return promise.future(); } @@ -56,20 +54,33 @@ protected void unregisterTimer(long timerId) { @Override @SuppressWarnings("unchecked") - protected InternalTriggerContext shouldRun(@NotNull Instant triggerAt, @NotNull TriggerContext triggerContext) { - final InternalTriggerContext internalContext = super.shouldRun(triggerAt, triggerContext); - if (internalContext.shouldRun()) { - final EventTriggerPredicate predicate = trigger().getPredicate(); - final Message msg = (Message) internalContext.info(); - final T info = predicate.convert(msg.headers(), msg.body()); - final boolean shouldRun = predicate.test(info); - if (!shouldRun) { - onMisfire(triggerAt, "The event trigger info is not matched"); + protected TriggerContext evaluateTrigger(@NotNull TriggerContext triggerContext) { + final TriggerContext ctx = super.evaluateTrigger(triggerContext); + try { + if (ctx.condition().status() == TriggerCondition.TriggerStatus.READY && + !trigger().getPredicate().test((T) triggerContext.info())) { + return TriggerContextFactory.skip(ctx, ReasonCode.CONDITION_IS_NOT_MATCHED); } - final TriggerContext ctx = TriggerContext.create(internalContext.type(), info); - return InternalTriggerContext.create(shouldRun, ctx); + } catch (Exception ex) { + return handleException(ctx, ex); + } + return ctx; + } + + private TriggerContext createTriggerContext(Message msg) { + try { + T eventMsg = trigger().getPredicate().convert(msg.headers(), msg.body()); + return TriggerContextFactory.init(trigger().type(), eventMsg); + } catch (Exception ex) { + return handleException(TriggerContextFactory.init(trigger().type(), msg), ex); } - return internalContext; + } + + private TriggerContext handleException(TriggerContext context, Exception cause) { + String reason = cause instanceof ClassCastException + ? ReasonCode.CONDITION_IS_NOT_MATCHED + : ReasonCode.UNEXPECTED_ERROR; + return TriggerContextFactory.skip(context, reason, cause); } // @formatter:off diff --git a/core/src/main/java/io/github/zero88/schedulerx/trigger/EventTriggerPredicate.java b/core/src/main/java/io/github/zero88/schedulerx/trigger/EventTriggerPredicate.java index 81724fc..73fae32 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/trigger/EventTriggerPredicate.java +++ b/core/src/main/java/io/github/zero88/schedulerx/trigger/EventTriggerPredicate.java @@ -3,6 +3,7 @@ import java.util.Optional; import java.util.function.Predicate; +import org.jetbrains.annotations.ApiStatus.Internal; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -66,6 +67,7 @@ static EventTriggerPredicate ignoreType(Predicate predicate) { return ignoreType(predicate, null); } + @Internal static EventTriggerPredicate ignoreType(@NotNull Predicate predicate, @Nullable String toString) { return new EventTriggerPredicate() { @Override diff --git a/core/src/main/java/io/github/zero88/schedulerx/trigger/IntervalSchedulerImpl.java b/core/src/main/java/io/github/zero88/schedulerx/trigger/IntervalSchedulerImpl.java index ac3d277..7672355 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/trigger/IntervalSchedulerImpl.java +++ b/core/src/main/java/io/github/zero88/schedulerx/trigger/IntervalSchedulerImpl.java @@ -10,6 +10,7 @@ import io.github.zero88.schedulerx.SchedulingMonitor; import io.github.zero88.schedulerx.impl.AbstractScheduler; import io.github.zero88.schedulerx.impl.AbstractSchedulerBuilder; +import io.github.zero88.schedulerx.impl.TriggerContextFactory; import io.vertx.core.Future; import io.vertx.core.Promise; import io.vertx.core.Vertx; @@ -27,7 +28,7 @@ final class IntervalSchedulerImpl extends AbstractScheduler vertx().setPeriodic(trigger().intervalInMilliseconds(), timerId -> run(workerExecutor, - TriggerContext.empty(trigger().type()))); + TriggerContextFactory.init(trigger().type()))); if (trigger().noDelay()) { promise.complete(supplier.getAsLong()); } else { diff --git a/core/src/main/java/io/github/zero88/schedulerx/trigger/TriggerCondition.java b/core/src/main/java/io/github/zero88/schedulerx/trigger/TriggerCondition.java new file mode 100644 index 0000000..553cca1 --- /dev/null +++ b/core/src/main/java/io/github/zero88/schedulerx/trigger/TriggerCondition.java @@ -0,0 +1,72 @@ +package io.github.zero88.schedulerx.trigger; + +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Represents the trigger condition to describe more insights when the trigger is ready, skipped, or stopped. + * + * @since 2.0.0 + */ +public interface TriggerCondition { + + /** + * The trigger status + * + * @see TriggerStatus + */ + @NotNull TriggerStatus status(); + + /** + * The reason code that makes the trigger has specific status, should be in {@code camelCase} format. + */ + default @Nullable String reasonCode() { return null; } + + /** + * The cause that makes the trigger has specific status. + */ + default @Nullable Throwable cause() { return null; } + + /** + * Check whether the trigger is failed or not. + */ + default boolean isFailed() { return TriggerStatus.FAILED == status(); } + + /** + * Check whether the trigger is ready or not. + */ + default boolean isReady() { return TriggerStatus.READY == status(); } + + /** + * Check whether the trigger is skipped or not. + */ + default boolean isSkip() { return TriggerStatus.SKIP == status(); } + + /** + * Check whether the trigger is stopped or not. + */ + default boolean isStop() { return TriggerStatus.STOP == status(); } + + enum TriggerStatus { + + INITIALIZED, FAILED, READY, STOP, SKIP + } + + + class ReasonCode { + + public static final String FAILED_TO_SCHEDULE = "TriggerIsFailedToSchedule"; + public static final String NOT_YET_SCHEDULED = "TriggerIsNotYetScheduled"; + public static final String ALREADY_STOPPED = "TriggerIsAlreadyStopped"; + public static final String CONDITION_IS_NOT_MATCHED = "ConditionIsNotMatched"; + public static final String STOP_BY_TASK = "ForceStopByTask"; + public static final String STOP_BY_CONFIG = "StopByTriggerConfig"; + public static final String STOP_BY_MANUAL = "StopManually"; + public static final String TASK_IS_RUNNING = "TaskIsRunning"; + public static final String UNEXPECTED_ERROR = "UnexpectedError"; + + private ReasonCode() { } + + } + +} diff --git a/core/src/main/java/io/github/zero88/schedulerx/trigger/TriggerContext.java b/core/src/main/java/io/github/zero88/schedulerx/trigger/TriggerContext.java index 71e9d76..431600f 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/trigger/TriggerContext.java +++ b/core/src/main/java/io/github/zero88/schedulerx/trigger/TriggerContext.java @@ -1,8 +1,12 @@ package io.github.zero88.schedulerx.trigger; +import java.time.Instant; + import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import io.github.zero88.schedulerx.Scheduler; + /** * A runtime trigger information * @@ -10,20 +14,29 @@ */ public interface TriggerContext extends HasTriggerType { - @Nullable Object info(); - - static TriggerContext empty(@NotNull String type) { - return create(type, null); - } + /** + * The time that the trigger is fired. + * + * @apiNote In most case this value is {@code not null}, only when the trigger is cancel manually via + * {@link Scheduler#cancel()}, it will be {@code null} + */ + @Nullable Instant triggerAt(); - static TriggerContext create(@NotNull String type, T info) { - return new TriggerContext() { - @Override - public @NotNull String type() { return type; } + /** + * The current trigger condition + * + * @see TriggerCondition + */ + @NotNull TriggerCondition condition(); - @Override - public T info() { return info; } - }; - } + /** + * The trigger context info + *

+ *

    + *
  • In case of the timing-based trigger, this value is {@code null}
  • + *
  • In case of the event-based trigger, this value is an event message
  • + *
+ */ + default @Nullable Object info() { return null; } } diff --git a/core/src/test/java/io/github/zero88/schedulerx/SchedulerTest.java b/core/src/test/java/io/github/zero88/schedulerx/SchedulerTest.java index fe56c67..7c49dcb 100644 --- a/core/src/test/java/io/github/zero88/schedulerx/SchedulerTest.java +++ b/core/src/test/java/io/github/zero88/schedulerx/SchedulerTest.java @@ -18,6 +18,7 @@ import io.github.zero88.schedulerx.trigger.CronTrigger; import io.github.zero88.schedulerx.trigger.IntervalScheduler; import io.github.zero88.schedulerx.trigger.IntervalTrigger; +import io.github.zero88.schedulerx.trigger.TriggerCondition.ReasonCode; import io.vertx.core.Vertx; import io.vertx.core.WorkerExecutor; import io.vertx.core.json.JsonArray; @@ -41,16 +42,16 @@ void test_scheduler_should_start_only_once_time(Vertx vertx, VertxTestContext te .setTestContext(testCtx) .setCompleted(completed) .build(); - final CronScheduler executor = CronScheduler.builder() - .setVertx(vertx) - .setTrigger(trigger) - .setTask(NoopTask.create(2)) - .setMonitor(asserter) - .build(); + final CronScheduler scheduler = CronScheduler.builder() + .setVertx(vertx) + .setTrigger(trigger) + .setTask(NoopTask.create(2)) + .setMonitor(asserter) + .build(); final int nbOfThreads = 5; final int nbOfFailed = nbOfThreads - 1; - final List store = TestUtils.simulateRunActionInParallel(testCtx, executor::start, nbOfThreads); + final List store = TestUtils.simulateRunActionInParallel(testCtx, scheduler::start, nbOfThreads); Assertions.assertTrue(testCtx.awaitCompletion(timeout, TimeUnit.SECONDS)); Assertions.assertEquals(nbOfFailed, store.size()); @@ -64,9 +65,7 @@ void test_scheduler_should_start_only_once_time(Vertx vertx, VertxTestContext te void test_scheduler_should_run_task_in_dedicated_thread(Vertx vertx, VertxTestContext testContext) { final String threadName = "HELLO"; final WorkerExecutor worker = vertx.createSharedWorkerExecutor(threadName, 1); - final SchedulingAsserter asserter = SchedulingAsserter.builder() - .setTestContext(testContext) - .build(); + final SchedulingAsserter asserter = SchedulingAsserter.builder().setTestContext(testContext).build(); final Task task = (d, ec) -> Assertions.assertEquals(threadName + "-0", Thread.currentThread().getName()); final IntervalTrigger trigger = IntervalTrigger.builder().interval(2).repeat(1).build(); @@ -111,12 +110,10 @@ void test_scheduler_should_maintain_external_id_from_jobData_to_task_result(Obje @Test void test_scheduler_should_able_to_force_stop(Vertx vertx, VertxTestContext testContext) { final Consumer> completed = result -> { - Assertions.assertNotNull(result.availableAt()); - Assertions.assertNotNull(result.completedAt()); Assertions.assertEquals(3, result.round()); Assertions.assertEquals(3, result.tick()); - Assertions.assertTrue(result.isCompleted()); - Assertions.assertFalse(result.isError()); + Assertions.assertTrue(result.triggerContext().condition().isStop()); + Assertions.assertEquals(ReasonCode.STOP_BY_TASK, result.triggerContext().condition().reasonCode()); }; final SchedulingAsserter asserter = SchedulingAsserter.builder() .setTestContext(testContext) diff --git a/core/src/test/java/io/github/zero88/schedulerx/trigger/CronSchedulerTest.java b/core/src/test/java/io/github/zero88/schedulerx/trigger/CronSchedulerTest.java index 6bee60b..6fab5b3 100644 --- a/core/src/test/java/io/github/zero88/schedulerx/trigger/CronSchedulerTest.java +++ b/core/src/test/java/io/github/zero88/schedulerx/trigger/CronSchedulerTest.java @@ -52,6 +52,7 @@ void test_run_task_by_cron(Vertx vertx, VertxTestContext testContext) { Assertions.assertNull(result.error()); Assertions.assertEquals("OK", result.data()); } + Assertions.assertNull(result.triggerContext().info()); }; final Consumer> onCompleted = result -> { Assertions.assertEquals(4, result.round()); diff --git a/core/src/test/java/io/github/zero88/schedulerx/trigger/EventSchedulerTest.java b/core/src/test/java/io/github/zero88/schedulerx/trigger/EventSchedulerTest.java index 46eff2f..2dc2de1 100644 --- a/core/src/test/java/io/github/zero88/schedulerx/trigger/EventSchedulerTest.java +++ b/core/src/test/java/io/github/zero88/schedulerx/trigger/EventSchedulerTest.java @@ -8,6 +8,8 @@ import java.util.function.Consumer; import java.util.stream.Stream; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -21,6 +23,8 @@ import io.github.zero88.schedulerx.SchedulingMonitor; import io.github.zero88.schedulerx.ExecutionResult; import io.github.zero88.schedulerx.TestUtils; +import io.github.zero88.schedulerx.trigger.TriggerCondition.ReasonCode; +import io.vertx.core.MultiMap; import io.vertx.core.Vertx; import io.vertx.core.json.JsonArray; import io.vertx.junit5.VertxExtension; @@ -30,27 +34,57 @@ class EventSchedulerTest { private static Stream provide_invalid_interval() { - return Stream.of(arguments(EventTriggerPredicate.any(), Arrays.asList(1, "COMPLETED"), - (Consumer>) (r) -> { - Throwable e = r.error(); - Assertions.assertTrue(r.isError()); - Assertions.assertNotNull(e); - Assertions.assertInstanceOf(ClassCastException.class, e); - Assertions.assertTrue(e.getMessage() - .contains( - "java.lang.Integer cannot be cast to java.lang" + - ".String")); - }), arguments(EventTriggerPredicate.create("COMPLETED"::equals), - Arrays.asList("Hello", "COMPLETED"), - (Consumer>) (r) -> { })); + final Arguments arg1 = arguments(EventTriggerPredicate.create(o -> o.startsWith("CO")), + Arrays.asList(1, "COMPLETED"), (Consumer>) (r) -> { + final TriggerCondition condition = r.triggerContext().condition(); + final Throwable err = condition.cause(); + Assertions.assertEquals(ReasonCode.CONDITION_IS_NOT_MATCHED, condition.reasonCode()); + Assertions.assertNotNull(err); + Assertions.assertInstanceOf(ClassCastException.class, err); + Assertions.assertTrue( + err.getMessage().contains("java.lang.Integer cannot be cast to java.lang.String")); + }); + final Arguments arg2 = arguments(EventTriggerPredicate.create("COMPLETED"::equals), + Arrays.asList("Hello", "COMPLETED"), + (Consumer>) (r) -> Assertions.assertEquals( + ReasonCode.CONDITION_IS_NOT_MATCHED, + r.triggerContext().condition().reasonCode())); + final Arguments arg3 = arguments(new EventTriggerPredicate() { + @Override + public String convert(@NotNull MultiMap headers, @Nullable Object body) { + if ("1".equals(body)) + throw new RuntimeException("failed in convert"); + return (String) body; + } + + @Override + public boolean test(@Nullable String eventMessage) { return true; } + }, Arrays.asList("1", "COMPLETED"), (Consumer>) (r) -> { + final TriggerCondition condition = r.triggerContext().condition(); + final Throwable err = condition.cause(); + Assertions.assertEquals(ReasonCode.UNEXPECTED_ERROR, condition.reasonCode()); + Assertions.assertNotNull(err); + Assertions.assertInstanceOf(RuntimeException.class, err); + }); + final Arguments arg4 = arguments(EventTriggerPredicate.create(o -> { + if ("1".equals(o)) + throw new IllegalArgumentException("Throw in test"); + return true; + }), Arrays.asList("1", "COMPLETED"), (Consumer>) (r) -> { + final TriggerCondition condition = r.triggerContext().condition(); + final Throwable err = condition.cause(); + Assertions.assertEquals(ReasonCode.UNEXPECTED_ERROR, condition.reasonCode()); + Assertions.assertNotNull(err); + Assertions.assertInstanceOf(IllegalArgumentException.class, err); + }); + return Stream.of(arg1, arg2, arg3, arg4); } - @SuppressWarnings({ "rawtypes", "unchecked" }) @ParameterizedTest @MethodSource("provide_invalid_interval") - void test_event_trigger_misfire_when_event_info_is_not_match(EventTriggerPredicate predicate, List sendData, - Consumer> validator, Vertx vertx, - VertxTestContext testContext) { + void test_event_trigger_misfire(EventTriggerPredicate predicate, List data, + Consumer> validator, Vertx vertx, + VertxTestContext testContext) { final String address = "schedulerx.event.1"; final EventTrigger trigger = EventTrigger.builder() .localOnly(true) @@ -59,20 +93,26 @@ void test_event_trigger_misfire_when_event_info_is_not_match(EventTriggerPredica .build(); final Consumer> onMisfire = result -> { Assertions.assertEquals(1, result.tick()); + Assertions.assertEquals(0, result.round()); validator.accept(result); }; + final Consumer> onCompleted = result -> { + Assertions.assertEquals(2, result.tick()); + Assertions.assertEquals(1, result.round()); + }; final SchedulingMonitor asserter = SchedulingAsserter.builder() .setTestContext(testContext) .setMisfire(onMisfire) + .setCompleted(onCompleted) .build(); EventScheduler.builder() .setVertx(vertx) .setMonitor(asserter) .setTrigger(trigger) - .setTask(NoopTask.create(sendData.size() - 1)) + .setTask(NoopTask.create(data.size() - 1)) .build() .start(); - sendData.forEach(d -> { + data.forEach(d -> { vertx.eventBus().publish(address, d); TestUtils.sleep(1000, testContext); }); diff --git a/core/src/test/java/io/github/zero88/schedulerx/trigger/IntervalSchedulerTest.java b/core/src/test/java/io/github/zero88/schedulerx/trigger/IntervalSchedulerTest.java index 8426268..47aada2 100644 --- a/core/src/test/java/io/github/zero88/schedulerx/trigger/IntervalSchedulerTest.java +++ b/core/src/test/java/io/github/zero88/schedulerx/trigger/IntervalSchedulerTest.java @@ -17,6 +17,7 @@ import io.github.zero88.schedulerx.SchedulingMonitor; import io.github.zero88.schedulerx.ExecutionResult; import io.github.zero88.schedulerx.TestUtils; +import io.github.zero88.schedulerx.trigger.TriggerCondition.ReasonCode; import io.vertx.core.Vertx; import io.vertx.junit5.Checkpoint; import io.vertx.junit5.VertxExtension; @@ -86,15 +87,15 @@ void test_run_blocking_task_in_the_end(Vertx vertx, VertxTestContext testContext .setTestContext(testContext) .setCompleted(onComplete) .build(); - final IntervalTrigger trigger = IntervalTrigger.builder().interval(2).repeat(3).build(); + final IntervalTrigger trigger = IntervalTrigger.builder().interval(1).repeat(3).build(); IntervalScheduler.builder() .setVertx(vertx) .setMonitor(asserter) .setTrigger(trigger) .setTask((jobData, ctx) -> { - TestUtils.sleep(3000, testContext); - checkpoint.flag(); - }) + TestUtils.sleep(3000, testContext); + checkpoint.flag(); + }) .build() .start(); } @@ -113,6 +114,7 @@ void test_task_should_be_executed_in_interval_trigger(Vertx vertx, VertxTestCont Assertions.assertNull(result.error()); Assertions.assertEquals("OK", result.data()); } + Assertions.assertNull(result.triggerContext().info()); }; final SchedulingAsserter asserter = SchedulingAsserter.builder() .setTestContext(context) @@ -140,4 +142,24 @@ void test_task_should_be_executed_in_interval_trigger(Vertx vertx, VertxTestCont .start(); } + @Test + void test_scheduler_should_be_stopped_when_reach_to_target_round(Vertx vertx, VertxTestContext context) { + final Consumer> onCompleted = result -> { + Assertions.assertTrue(result.triggerContext().condition().isStop()); + Assertions.assertEquals(ReasonCode.STOP_BY_CONFIG, result.triggerContext().condition().reasonCode()); + }; + final SchedulingAsserter asserter = SchedulingAsserter.builder() + .setTestContext(context) + .setCompleted(onCompleted) + .build(); + final IntervalTrigger trigger = IntervalTrigger.builder().interval(1).repeat(3).build(); + IntervalScheduler.builder() + .setVertx(vertx) + .setMonitor(asserter) + .setTrigger(trigger) + .setTask(NoopTask.create()) + .build() + .start(); + } + } diff --git a/core/src/testFixtures/java/io/github/zero88/schedulerx/SchedulingAsserter.java b/core/src/testFixtures/java/io/github/zero88/schedulerx/SchedulingAsserter.java index 7a10f65..38038aa 100644 --- a/core/src/testFixtures/java/io/github/zero88/schedulerx/SchedulingAsserter.java +++ b/core/src/testFixtures/java/io/github/zero88/schedulerx/SchedulingAsserter.java @@ -8,6 +8,7 @@ import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.Assertions; +import io.github.zero88.schedulerx.trigger.TriggerCondition.TriggerStatus; import io.vertx.junit5.VertxTestContext; /** @@ -50,6 +51,8 @@ public void onUnableSchedule(@NotNull ExecutionResult result) { verify(result, r -> { Assertions.assertNotNull(result.externalId()); Assertions.assertNotNull(result.unscheduledAt()); + Assertions.assertNotNull(result.triggerContext()); + Assertions.assertTrue(result.triggerContext().condition().isFailed()); Assertions.assertNull(result.availableAt()); Assertions.assertNull(result.rescheduledAt()); Assertions.assertNull(result.triggeredAt()); @@ -82,6 +85,9 @@ public void onMisfire(@NotNull ExecutionResult result) { Assertions.assertNotNull(result.externalId()); Assertions.assertNotNull(result.availableAt()); Assertions.assertNotNull(result.triggeredAt()); + Assertions.assertNotNull(result.triggerContext()); + Assertions.assertNotNull(result.triggerContext().condition()); + Assertions.assertTrue(result.triggerContext().condition().isSkip()); Assertions.assertNull(result.rescheduledAt()); Assertions.assertNull(result.executedAt()); Assertions.assertNull(result.finishedAt()); @@ -97,6 +103,8 @@ public void onEach(@NotNull ExecutionResult result) { Assertions.assertNotNull(result.externalId()); Assertions.assertNotNull(result.availableAt()); Assertions.assertNotNull(result.triggeredAt()); + Assertions.assertNotNull(result.triggerContext()); + Assertions.assertTrue(result.triggerContext().condition().isReady()); Assertions.assertNotNull(result.executedAt()); Assertions.assertNotNull(result.finishedAt()); Assertions.assertNull(result.rescheduledAt()); @@ -111,6 +119,8 @@ public void onCompleted(@NotNull ExecutionResult result) { Assertions.assertNotNull(result.externalId()); Assertions.assertNotNull(result.availableAt()); Assertions.assertNotNull(result.completedAt()); + Assertions.assertNotNull(result.triggerContext()); + Assertions.assertTrue(result.triggerContext().condition().isStop()); Assertions.assertTrue(result.isCompleted()); Assertions.assertNull(result.triggeredAt()); Assertions.assertNull(result.executedAt()); @@ -155,13 +165,11 @@ public static SchedulingMonitor unableScheduleAsserter(@NotNull Vertx @Nullable String errorMsg) { return SchedulingAsserter.builder().setTestContext(testContext).setUnableSchedule(result -> { testContext.verify(() -> { - Assertions.assertNull(result.availableAt()); - Assertions.assertNotNull(result.externalId()); - Assertions.assertNotNull(result.unscheduledAt()); - Assertions.assertNotNull(result.error()); - Assertions.assertInstanceOf(errorClazz, result.error()); + final Throwable cause = result.triggerContext().condition().cause(); + Assertions.assertNotNull(cause); + Assertions.assertInstanceOf(errorClazz, cause); if (errorMsg != null) { - Assertions.assertEquals(errorMsg, result.error().getMessage()); + Assertions.assertEquals(errorMsg, cause.getMessage()); } }); testContext.completeNow();