diff --git a/core/src/main/java/io/github/zero88/schedulerx/ExecutionContext.java b/core/src/main/java/io/github/zero88/schedulerx/ExecutionContext.java index dc6b625..0df408b 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/ExecutionContext.java +++ b/core/src/main/java/io/github/zero88/schedulerx/ExecutionContext.java @@ -1,7 +1,6 @@ package io.github.zero88.schedulerx; import java.time.Instant; -import java.util.Objects; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -62,12 +61,12 @@ public interface ExecutionContext { boolean isForceStop(); /** - * Mark a flag stop/cancel to cancel executor + * Mark a force stop flag to unregister the trigger out of the system timer */ void forceStopExecution(); /** - * Completed execution with result data per each round + * Notify finish an execution per each round with its result data * * @param data object data * @apiNote if task is {@code async} then it should be invoked in handling async result stage @@ -75,7 +74,7 @@ public interface ExecutionContext { void complete(@Nullable OUT data); /** - * Failed execution with error per each round + * Notify finish an execution per each round with its error data * * @param throwable execution error * @apiNote if task is {@code async} then it should be invoked in handling async result stage diff --git a/core/src/main/java/io/github/zero88/schedulerx/impl/AbstractScheduler.java b/core/src/main/java/io/github/zero88/schedulerx/impl/AbstractScheduler.java index 17e2db1..f46632b 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/impl/AbstractScheduler.java +++ b/core/src/main/java/io/github/zero88/schedulerx/impl/AbstractScheduler.java @@ -4,6 +4,7 @@ import java.util.Objects; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; import org.jetbrains.annotations.ApiStatus.Internal; import org.jetbrains.annotations.NotNull; @@ -117,7 +118,7 @@ public final void start(WorkerExecutor workerExecutor) { @Override public final void executeTask(@NotNull ExecutionContext executionContext) { try { - log(executionContext.executedAt(), "Start to execute the task"); + log(executionContext.executedAt(), "On execute"); task.execute(jobData(), executionContext); if (!task.isAsync()) { ((ExecutionContextInternal) executionContext).internalComplete(); @@ -136,25 +137,22 @@ public final void cancel() { } protected final void doStart(WorkerExecutor workerExecutor) { - this.registerTimer(Promise.promise(), workerExecutor) - .onSuccess(this::onSchedule) - .onFailure(this::onUnableSchedule); + this.registerTimer(workerExecutor).onSuccess(this::onSchedule).onFailure(this::onUnableSchedule); } protected final void doStop(long timerId, TriggerContext context) { unregisterTimer(timerId); - onCompleted(context); + onComplete(context); } - protected abstract @NotNull Future registerTimer(@NotNull Promise promise, - @Nullable WorkerExecutor workerExecutor); + protected abstract @NotNull Future registerTimer(@Nullable WorkerExecutor workerExecutor); protected abstract void unregisterTimer(long timerId); protected final TriggerTransitionContext shouldRun(@NotNull TriggerTransitionContext triggerContext) { - log(Instant.now(), "Evaluating the trigger condition..."); - if (triggerContext.condition().status() != TriggerStatus.KICKOFF) { - return triggerContext; + log(Instant.now(), "On evaluate"); + if (!triggerContext.isKickoff()) { + throw new IllegalStateException("Trigger condition status must be " + TriggerStatus.KICKOFF); } if (state.pending()) { return TriggerContextFactory.skip(triggerContext, ReasonCode.NOT_YET_SCHEDULED); @@ -188,47 +186,51 @@ protected TriggerTransitionContext evaluateTrigger(@NotNull TriggerTransitionCon : TriggerContextFactory.skip(triggerContext, ReasonCode.CONDITION_IS_NOT_MATCHED); } - protected final void run(WorkerExecutor workerExecutor, TriggerTransitionContext triggerContext) { - state.increaseTick(); - final TriggerTransitionContext transitionCtx = shouldRun(triggerContext); - if (transitionCtx.isReady()) { - final ExecutionContextInternal ctx = new ExecutionContextImpl<>(vertx, state.increaseRound(), - transitionCtx); - log(ctx.triggeredAt(), "Triggering the task execution..."); - if (workerExecutor != null) { - workerExecutor.executeBlocking(promise -> executeTask(onExecute(promise, ctx)), - asyncResult -> onResult(transitionCtx, asyncResult)); - } else { - vertx.executeBlocking(promise -> executeTask(onExecute(promise, ctx)), - asyncResult -> onResult(transitionCtx, asyncResult)); - } - } else { - onMisfire(transitionCtx); - } + /** + * Register a timer id and increase tick time when the system timer fires + * + * @param timerId the system timer id + * @return the current number times of the tick counter + */ + protected final long onFire(long timerId) { + return state.timerId(timerId).increaseTick(); } - protected final void log(@NotNull Instant at, @NotNull String event) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace(genMsg(state.tick(), state.round(), at, event)); + protected final void onRun(WorkerExecutor workerExecutor, TriggerTransitionContext kickoffContext) { + log(Objects.requireNonNull(kickoffContext.firedAt()), "On fire"); + this.executeBlocking(workerExecutor, p -> p.complete(shouldRun(kickoffContext))) + .onSuccess(context -> onTrigger(workerExecutor, context)) + .onFailure(t -> onMisfire(TriggerContextFactory.skip(kickoffContext, ReasonCode.UNEXPECTED_ERROR, t))); + } + + protected final void onTrigger(WorkerExecutor workerExecutor, TriggerTransitionContext triggerContext) { + if (!triggerContext.isReady()) { + onMisfire(triggerContext); + return; } + final ExecutionContextInternal executionContext = new ExecutionContextImpl<>(vertx, triggerContext, + state.increaseRound()); + log(executionContext.triggeredAt(), "On trigger", triggerContext.tick(), executionContext.round()); + this.executeBlocking(workerExecutor, p -> executeTask(executionContext.setup(p))) + .onComplete(ar -> onResult(triggerContext, ar)); } protected final void onSchedule(long timerId) { - final TriggerContext ctx = TriggerContextFactory.scheduled(trigger().type()); + final TriggerContext context = TriggerContextFactory.scheduled(trigger().type()); ExecutionResult result; if (state.pending()) { result = ExecutionResultImpl.builder() .setExternalId(jobData.externalId()) - .setAvailableAt(state.timerId(timerId).markAvailable()) - .setTriggerContext(ctx) + .setAvailableAt(state.markAvailable()) + .setTriggerContext(context) .build(); } else { result = ExecutionResultImpl.builder() .setExternalId(jobData.externalId()) .setAvailableAt(state.availableAt()) + .setTriggerContext(context) .setRescheduledAt(Instant.now()) - .setTriggerContext(ctx) - .setTick(state.timerId(timerId).tick()) + .setTick(state.tick()) .setRound(state.round()) .build(); } @@ -239,46 +241,49 @@ protected final void onUnableSchedule(Throwable cause) { final TriggerContext ctx = TriggerContextFactory.error(trigger.type(), ReasonCode.FAILED_TO_SCHEDULE, cause); monitor.onUnableSchedule(ExecutionResultImpl.builder() .setExternalId(jobData.externalId()) + .setTriggerContext(ctx) + .setUnscheduledAt(Instant.now()) .setTick(state.tick()) .setRound(state.round()) - .setUnscheduledAt(Instant.now()) - .setTriggerContext(ctx) .build()); } protected final void onMisfire(@NotNull TriggerTransitionContext triggerContext) { - log(Objects.requireNonNull(triggerContext.firedAt()), - "Skip the execution::" + triggerContext.condition().reasonCode()); + final Instant finishedAt = state.markFinished(triggerContext.tick()); + log(finishedAt, "On misfire::" + triggerContext.condition().reasonCode(), triggerContext.tick()); monitor.onMisfire(ExecutionResultImpl.builder() .setExternalId(jobData.externalId()) - .setTick(state.tick()) .setAvailableAt(state.availableAt()) - .setFiredAt(triggerContext.firedAt()) .setTriggerContext(triggerContext) + .setTick(triggerContext.tick()) + .setFiredAt(triggerContext.firedAt()) + .setRound(state.round()) + .setFinishedAt(finishedAt) .build()); } @SuppressWarnings("unchecked") protected final void onResult(@NotNull TriggerTransitionContext triggerContext, @NotNull AsyncResult asyncResult) { - final Instant finishedAt = state.markIdle(); + final Instant finishedAt = state.markFinished(triggerContext.tick()); TriggerTransitionContext transitionCtx; if (asyncResult.succeeded()) { - log(finishedAt, "Received the task result"); final ExecutionContextInternal executionCtx = (ExecutionContextInternal) asyncResult.result(); + log(finishedAt, "On result", triggerContext.tick(), executionCtx.round()); monitor.onEach(ExecutionResultImpl.builder() .setExternalId(jobData.externalId()) .setAvailableAt(state.availableAt()) - .setTick(state.tick()) - .setRound(executionCtx.round()) .setTriggerContext(triggerContext) + .setTick(triggerContext.tick()) + .setFiredAt(triggerContext.firedAt()) + .setRound(executionCtx.round()) .setTriggeredAt(executionCtx.triggeredAt()) .setExecutedAt(executionCtx.executedAt()) .setFinishedAt(finishedAt) .setData(state.addData(executionCtx.round(), executionCtx.data())) .setError(state.addError(executionCtx.round(), executionCtx.error())) .build()); - transitionCtx = shouldStop(triggerContext, executionCtx.isForceStop(), state.round()); + transitionCtx = shouldStop(triggerContext, executionCtx.isForceStop(), executionCtx.round()); } else { LOGGER.warn(genMsg(state.tick(), state.round(), finishedAt, "Programming error"), asyncResult.cause()); transitionCtx = shouldStop(triggerContext, false, state.round()); @@ -288,9 +293,9 @@ protected final void onResult(@NotNull TriggerTransitionContext triggerContext, } } - protected final void onCompleted(TriggerContext context) { + protected final void onComplete(TriggerContext context) { final Instant completedAt = state.markCompleted(); - log(completedAt, "The task execution is completed"); + log(completedAt, "On complete"); monitor.onCompleted(ExecutionResultImpl.builder() .setExternalId(jobData.externalId()) .setAvailableAt(state.availableAt()) @@ -303,13 +308,31 @@ protected final void onCompleted(TriggerContext context) { .build()); } - private ExecutionContextInternal onExecute(@NotNull Promise promise, - @NotNull ExecutionContextInternal executionContext) { - return executionContext.setup(promise, state.markExecuting()); + protected final void log(@NotNull Instant at, @NotNull String event) { + log(at, event, state.tick(), state.round()); + } + + protected final void log(@NotNull Instant at, @NotNull String event, long tick) { + log(at, event, tick, state.round()); + } + + protected final void log(@NotNull Instant at, @NotNull String event, long tick, long round) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace(genMsg(tick, round, at, event)); + } } private String genMsg(long tick, long round, @NotNull Instant at, @NotNull String event) { - return "Scheduling[" + tick + "][" + round + "][" + at + "]::[" + jobData.externalId() + "] - " + event; + String tId = state.pending() ? "-" : String.valueOf(state.timerId()); + String tickAndRound = state.pending() ? "-/-" : (tick + "/" + round); + return "Scheduling[" + tId + "::" + trigger.type() + "::" + jobData.externalId() + "][" + tickAndRound + "]" + + "::[" + at + "] - " + event; + } + + private Future executeBlocking(WorkerExecutor workerExecutor, Consumer> operation) { + return workerExecutor == null + ? vertx.executeBlocking(operation::accept, false) + : workerExecutor.executeBlocking(operation::accept, false); } } diff --git a/core/src/main/java/io/github/zero88/schedulerx/impl/ExecutionContextImpl.java b/core/src/main/java/io/github/zero88/schedulerx/impl/ExecutionContextImpl.java index 9106c60..da9ac8d 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/impl/ExecutionContextImpl.java +++ b/core/src/main/java/io/github/zero88/schedulerx/impl/ExecutionContextImpl.java @@ -21,7 +21,7 @@ final class ExecutionContextImpl implements ExecutionContextInternal implements ExecutionContextInternal setup(@NotNull Promise promise, - @NotNull Instant executedAt) { + public @NotNull ExecutionContextInternal setup(@NotNull Promise promise) { if (Objects.nonNull(this.promise)) { throw new IllegalStateException("TaskExecutionContext is already setup"); } this.promise = promise; - this.executedAt = executedAt; + this.executedAt = Instant.now(); return this; } - @Override - public @NotNull Vertx vertx() { return vertx; } + public @NotNull Vertx vertx() { return this.vertx; } - @Override - public @NotNull TriggerContext triggerContext() { return triggerContext; } + public @NotNull TriggerContext triggerContext() { return this.triggerContext; } @Override - public @NotNull Instant triggeredAt() { return triggeredAt; } + public @NotNull Instant triggeredAt() { return this.triggeredAt; } @Override - public @NotNull Instant executedAt() { return executedAt; } + public @NotNull Instant executedAt() { return this.executedAt; } @Override - public long round() { return round; } + public long round() { return this.round; } @Override - public OUTPUT data() { return data; } + public OUTPUT data() { return this.data; } @Override - public Throwable error() { return error; } + public Throwable error() { return this.error; } @Override - public boolean isForceStop() { return forceStop; } + public boolean isForceStop() { return this.forceStop; } @Override - public void forceStopExecution() { forceStop = true; } + public void forceStopExecution() { this.forceStop = true; } @Override public void complete(OUTPUT data) { diff --git a/core/src/main/java/io/github/zero88/schedulerx/impl/ExecutionContextInternal.java b/core/src/main/java/io/github/zero88/schedulerx/impl/ExecutionContextInternal.java index 8f2c286..77a3344 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/impl/ExecutionContextInternal.java +++ b/core/src/main/java/io/github/zero88/schedulerx/impl/ExecutionContextInternal.java @@ -1,7 +1,5 @@ package io.github.zero88.schedulerx.impl; -import java.time.Instant; - import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -11,16 +9,13 @@ interface ExecutionContextInternal extends ExecutionContext { /** - * Setup task execution context + * Prepare to execute task * - * @param promise promise - * @param executedAt execution at time + * @param promise promise * @return a reference to this for fluent API - * @apiNote It will be invoked by system. In any attempts invoking, {@link IllegalStateException} will be - * thrown - * @see Promise + * @apiNote It will be invoked by system. In any attempts invoking, {@link IllegalStateException} will be thrown */ - @NotNull ExecutionContextInternal setup(@NotNull Promise promise, @NotNull Instant executedAt); + @NotNull ExecutionContextInternal setup(@NotNull Promise promise); void internalComplete(); diff --git a/core/src/main/java/io/github/zero88/schedulerx/impl/SchedulerStateImpl.java b/core/src/main/java/io/github/zero88/schedulerx/impl/SchedulerStateImpl.java index 6e0490a..5e685f7 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/impl/SchedulerStateImpl.java +++ b/core/src/main/java/io/github/zero88/schedulerx/impl/SchedulerStateImpl.java @@ -20,7 +20,6 @@ final class SchedulerStateImpl implements SchedulerStateInternal private final AtomicLong tick = new AtomicLong(0); private final AtomicLong round = new AtomicLong(0); private final AtomicBoolean completed = new AtomicBoolean(false); - private final AtomicBoolean executing = new AtomicBoolean(false); private final ConcurrentMap inProgress = new ConcurrentHashMap<>(); private final AtomicBoolean pending = new AtomicBoolean(true); private final AtomicReference> data = new AtomicReference<>(new SimpleEntry<>(0L, null)); @@ -53,7 +52,11 @@ final class SchedulerStateImpl implements SchedulerStateInternal @Override public long increaseTick() { - return tick.incrementAndGet(); + final long current = this.tick.incrementAndGet(); + if (!executing()) { + inProgress.put(current, true); + } + return current; } @Override @@ -75,14 +78,8 @@ public long increaseTick() { } @Override - public @NotNull Instant markExecuting() { - executing.set(true); - return Instant.now(); - } - - @Override - public @NotNull Instant markIdle() { - executing.set(false); + public @NotNull Instant markFinished(long tick) { + inProgress.remove(tick); return Instant.now(); } diff --git a/core/src/main/java/io/github/zero88/schedulerx/impl/SchedulerStateInternal.java b/core/src/main/java/io/github/zero88/schedulerx/impl/SchedulerStateInternal.java index 3e87ad7..520e2ce 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/impl/SchedulerStateInternal.java +++ b/core/src/main/java/io/github/zero88/schedulerx/impl/SchedulerStateInternal.java @@ -26,19 +26,12 @@ interface SchedulerStateInternal extends SchedulerState { @NotNull Instant markAvailable(); /** - * Mark task is executing - * - * @return this for fluent api - * @see #executing() - */ - @NotNull Instant markExecuting(); - - /** - * Mark the current trigger tick is already handled. + * Mark trigger tick is already handled * + * @param tick the trigger tick * @return finished at time */ - @NotNull Instant markIdle(); + @NotNull Instant markFinished(long tick); /** * Mark the current trigger is completed, no more fire by the system timer. @@ -48,7 +41,7 @@ interface SchedulerStateInternal extends SchedulerState { @NotNull Instant markCompleted(); /** - * Increase the tick counter and marks trigger tick is in progress + * Increase the tick counter and mark the current tick is in progress * * @return next tick */ diff --git a/core/src/main/java/io/github/zero88/schedulerx/impl/TriggerContextFactory.java b/core/src/main/java/io/github/zero88/schedulerx/impl/TriggerContextFactory.java index 332a7d1..b3a8e2b 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/impl/TriggerContextFactory.java +++ b/core/src/main/java/io/github/zero88/schedulerx/impl/TriggerContextFactory.java @@ -34,18 +34,21 @@ private TriggerContextFactory() { } * Create trigger context in {@link TriggerStatus#KICKOFF} state * * @param triggerType the trigger type + * @param tick the tick */ - public static @NotNull TriggerTransitionContext kickoff(@NotNull String triggerType) { - return kickoff(triggerType, null); + public static @NotNull TriggerTransitionContext kickoff(@NotNull String triggerType, long tick) { + return kickoff(triggerType, tick, null); } /** * Create trigger context in {@link TriggerStatus#KICKOFF} state * * @param triggerType the trigger type + * @param tick the tick * @param info the trigger context info */ - public static @NotNull TriggerTransitionContext kickoff(@NotNull String triggerType, @Nullable T info) { + public static @NotNull TriggerTransitionContext kickoff(@NotNull String triggerType, long tick, + @Nullable T info) { final Instant firedAt = Instant.now(); final TriggerCondition condition = createCondition(TriggerStatus.KICKOFF, null, null); return new TriggerTransitionContext() { @@ -53,7 +56,7 @@ private TriggerContextFactory() { } public @NotNull String type() { return triggerType; } @Override - public long tick() { return 0; } + public long tick() { return tick; } public @NotNull Instant firedAt() { return firedAt; } diff --git a/core/src/main/java/io/github/zero88/schedulerx/trigger/CronSchedulerImpl.java b/core/src/main/java/io/github/zero88/schedulerx/trigger/CronSchedulerImpl.java index 5f9f2da..1adc74d 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/trigger/CronSchedulerImpl.java +++ b/core/src/main/java/io/github/zero88/schedulerx/trigger/CronSchedulerImpl.java @@ -1,6 +1,7 @@ package io.github.zero88.schedulerx.trigger; import java.time.Instant; +import java.time.temporal.ChronoUnit; import org.jetbrains.annotations.NotNull; @@ -11,7 +12,6 @@ import io.github.zero88.schedulerx.impl.AbstractSchedulerBuilder; import io.github.zero88.schedulerx.impl.TriggerContextFactory; import io.vertx.core.Future; -import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.WorkerExecutor; @@ -26,19 +26,20 @@ final class CronSchedulerImpl extends AbstractScheduler registerTimer(@NotNull Promise promise, WorkerExecutor workerExecutor) { + protected @NotNull Future registerTimer(WorkerExecutor workerExecutor) { try { - final long nextTriggerAfter = trigger().nextTriggerAfter(Instant.now()); - nextTimerId = vertx().setTimer(nextTriggerAfter, timerId -> { - promise.complete(timerId); - run(workerExecutor, TriggerContextFactory.kickoff(trigger().type())); + final Instant now = Instant.now(); + final long nextTriggerAfter = trigger().nextTriggerAfter(now); + final Instant nextTriggerTime = now.plus(nextTriggerAfter, ChronoUnit.MILLIS); + nextTimerId = vertx().setTimer(nextTriggerAfter, tId -> { + onRun(workerExecutor, TriggerContextFactory.kickoff(trigger().type(), onFire(tId))); doStart(workerExecutor); }); - log(Instant.now(), "Next schedule after " + nextTriggerAfter + "ms with timerId[" + nextTimerId + "]"); + log(now, "Next schedule at [" + nextTriggerTime + "] by timerId[" + nextTimerId + "]"); + return Future.succeededFuture(nextTimerId); } catch (Exception ex) { - promise.fail(ex); + return Future.failedFuture(ex); } - return promise.future(); } @Override diff --git a/core/src/main/java/io/github/zero88/schedulerx/trigger/EventSchedulerImpl.java b/core/src/main/java/io/github/zero88/schedulerx/trigger/EventSchedulerImpl.java index 9dceecd..85aabff 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/trigger/EventSchedulerImpl.java +++ b/core/src/main/java/io/github/zero88/schedulerx/trigger/EventSchedulerImpl.java @@ -32,19 +32,23 @@ final class EventSchedulerImpl extends AbstractScheduler registerTimer(@NotNull Promise promise, WorkerExecutor workerExecutor) { + protected @NotNull Future registerTimer(WorkerExecutor workerExecutor) { + final Promise promise = Promise.promise(); + final long timerId = trigger().hashCode(); final String address = trigger().getAddress(); consumer = trigger().isLocalOnly() ? vertx().eventBus().localConsumer(address) : vertx().eventBus().consumer(address); - consumer.handler(msg -> run(workerExecutor, createTriggerContext(msg))).completionHandler(event -> { - if (event.failed()) { - promise.fail(new IllegalStateException("Unable to register a subscriber on address[" + address + "]", - event.cause())); - } else { - promise.complete((long) consumer.hashCode()); - } - }); + consumer.handler(msg -> onRun(workerExecutor, createKickoffContext(msg, onFire(timerId)))) + .completionHandler(event -> { + if (event.failed()) { + promise.fail( + new IllegalStateException("Unable to register a subscriber on address[" + address + "]", + event.cause())); + } else { + promise.complete(timerId); + } + }); return promise.future(); } @@ -73,12 +77,12 @@ protected TriggerTransitionContext evaluateTrigger(@NotNull TriggerTransitionCon return ctx; } - private TriggerTransitionContext createTriggerContext(Message msg) { + private TriggerTransitionContext createKickoffContext(Message msg, long tick) { try { T eventMsg = trigger().getPredicate().convert(msg.headers(), msg.body()); - return TriggerContextFactory.kickoff(trigger().type(), eventMsg); + return TriggerContextFactory.kickoff(trigger().type(), tick, eventMsg); } catch (Exception ex) { - return handleException(TriggerContextFactory.kickoff(trigger().type()), ex); + return handleException(TriggerContextFactory.kickoff(trigger().type(), tick, msg), ex); } } diff --git a/core/src/main/java/io/github/zero88/schedulerx/trigger/IntervalSchedulerImpl.java b/core/src/main/java/io/github/zero88/schedulerx/trigger/IntervalSchedulerImpl.java index a871a51..75c6673 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/trigger/IntervalSchedulerImpl.java +++ b/core/src/main/java/io/github/zero88/schedulerx/trigger/IntervalSchedulerImpl.java @@ -24,22 +24,23 @@ final class IntervalSchedulerImpl extends AbstractScheduler registerTimer(@NotNull Promise promise, WorkerExecutor workerExecutor) { + protected @NotNull Future registerTimer(WorkerExecutor workerExecutor) { try { - LongSupplier supplier = () -> vertx().setPeriodic(trigger().intervalInMilliseconds(), - tId -> run(workerExecutor, - TriggerContextFactory.kickoff(trigger().type()))); + final long interval = trigger().intervalInMilliseconds(); + final String type = trigger().type(); + final LongSupplier timer = () -> vertx().setPeriodic(interval, tId -> onRun(workerExecutor, + TriggerContextFactory.kickoff(type, onFire(tId)))); if (trigger().noDelay()) { - promise.complete(supplier.getAsLong()); - } else { - final long delay = trigger().delayInMilliseconds(); - log(Instant.now(), "Delay [" + delay + "ms] then register the task in the scheduler"); - vertx().setTimer(delay, ignore -> promise.complete(supplier.getAsLong())); + return Future.succeededFuture(timer.getAsLong()); } + final Promise promise = Promise.promise(); + final long delay = trigger().delayInMilliseconds(); + log(Instant.now(), "Delay [" + delay + "ms] then register the trigger in the scheduler"); + vertx().setTimer(delay, ignore -> promise.complete(timer.getAsLong())); + return promise.future(); } catch (Exception e) { - promise.fail(e); + return Future.failedFuture(e); } - return promise.future(); } @Override