Skip to content

Commit

Permalink
feat(#93): TriggerEvaluator#afterTrigger
Browse files Browse the repository at this point in the history
  • Loading branch information
zero88 committed Dec 20, 2023
1 parent 60b7cfc commit 1d38a84
Show file tree
Hide file tree
Showing 13 changed files with 234 additions and 176 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,10 @@ protected final void doStart(WorkerExecutor workerExecutor) {
}

protected final void doStop(long timerId, TriggerContext context) {
unregisterTimer(timerId);
onComplete(context);
if (context.isStopped()) {
unregisterTimer(timerId);
onComplete(context);
}
}

/**
Expand All @@ -174,18 +176,6 @@ protected final void doStop(long timerId, TriggerContext context) {
*/
protected abstract void unregisterTimer(long timerId);

/**
* Check a trigger context whether to be able to stop by configuration or force stop
*/
protected final TriggerContext shouldStop(@NotNull TriggerContext triggerContext, boolean isForceStop, long round) {
if (isForceStop) {
return TriggerContextFactory.stop(triggerContext, ReasonCode.STOP_BY_JOB);
}
return trigger().shouldStop(round)
? TriggerContextFactory.stop(triggerContext, ReasonCode.STOP_BY_CONFIG)
: triggerContext;
}

/**
* Register a timer id in internal state and increase tick time when the system timer fires
*
Expand All @@ -199,29 +189,27 @@ protected final long onFire(long timerId) {
/**
* Processing the trigger right away after the system timer fires
*/
protected final void onProcess(WorkerExecutor workerExecutor, TriggerContext ctx) {
log(Objects.requireNonNull(ctx.firedAt()), "On fire");
final Duration timeout = timeoutPolicy().evaluationTimeout();
this.<TriggerContext>executeBlocking(workerExecutor, p -> this.wrapTimeout(timeout, p)
.handle(evaluator.beforeRun(trigger, ctx,
jobData.externalId())))
.onSuccess(context -> onTrigger(workerExecutor, context))
.onFailure(t -> onMisfire(TriggerContextFactory.skip(ctx, t instanceof TimeoutException
? ReasonCode.EVALUATION_TIMEOUT
: ReasonCode.UNEXPECTED_ERROR, t)));
protected final void onProcess(WorkerExecutor workerExecutor, TriggerContext triggerContext) {
log(Objects.requireNonNull(triggerContext.firedAt()), "On fire");
this.onEvaluationBeforeTrigger(workerExecutor, triggerContext)
.onSuccess(ctx -> onTrigger(workerExecutor, ctx))
.onFailure(t -> onMisfire(TriggerContextFactory.skip(triggerContext, t instanceof TimeoutException
? ReasonCode.EVALUATION_TIMEOUT
: ReasonCode.UNEXPECTED_ERROR, t)));
}

protected final void onTrigger(WorkerExecutor workerExecutor, TriggerContext triggerContext) {
if (!triggerContext.isReady()) {
onMisfire(triggerContext);
return;
}
final ExecutionContextInternal<OUT> executionContext = new ExecutionContextImpl<>(vertx, triggerContext,
state.increaseRound());
final long round = state.increaseRound();
final ExecutionContextInternal<OUT> executionContext = new ExecutionContextImpl<>(vertx, triggerContext, round);
final Duration timeout = timeoutPolicy().executionTimeout();
log(executionContext.triggeredAt(), "On trigger", triggerContext.tick(), executionContext.round());
this.executeBlocking(workerExecutor, p -> executeJob(executionContext.setup(wrapTimeout(timeout, p))))
.onComplete(ar -> onResult(executionContext, ar.cause()));
log(executionContext.triggeredAt(), "On trigger", triggerContext.tick(), round);
Future.join(onEvaluationAfterTrigger(workerExecutor, triggerContext, round),
executeBlocking(workerExecutor, p -> executeJob(executionContext.setup(wrapTimeout(timeout, p)))))
.onComplete(ar -> onResult(executionContext, ar.result().cause(1)));
}

protected final void onSchedule(long timerId) {
Expand Down Expand Up @@ -260,6 +248,25 @@ protected final void onUnableSchedule(Throwable cause) {
.build());
}

protected final Future<TriggerContext> onEvaluationBeforeTrigger(WorkerExecutor worker, TriggerContext ctx) {
return executeBlocking(worker, p -> {
log(Instant.now(), "On before trigger");
this.wrapTimeout(timeoutPolicy().evaluationTimeout(), p)
.handle(evaluator.beforeTrigger(trigger, ctx, jobData.externalId()));
});
}

protected final Future<TriggerContext> onEvaluationAfterTrigger(WorkerExecutor worker, TriggerContext ctx,
long round) {
return executeBlocking(worker, p -> {
log(Instant.now(), "On after trigger");
wrapTimeout(timeoutPolicy().evaluationTimeout(), p).handle(
evaluator.afterTrigger(trigger(), ctx, jobData.externalId(), round)
.onSuccess(c -> doStop(state.timerId(), c))
.onFailure(t -> LOGGER.error(genMsg(ctx.tick(), round, Instant.now(), "After evaluate"), t)));
});
}

protected final void onMisfire(@NotNull TriggerContext triggerCtx) {
final Instant finishedAt = state.markFinished(triggerCtx.tick());
final String reasonCode = triggerCtx.condition().reasonCode();
Expand Down Expand Up @@ -288,7 +295,7 @@ protected final void onResult(@NotNull ExecutionContext<OUT> executionContext, @
if (asyncCause instanceof TimeoutException) {
LOGGER.warn(genMsg(triggerContext.tick(), ctx.round(), finishedAt, asyncCause.getMessage()));
} else if (asyncCause != null) {
LOGGER.error(genMsg(triggerContext.tick(), ctx.round(), finishedAt, "System error"), asyncCause);
LOGGER.error(genMsg(triggerContext.tick(), ctx.round(), finishedAt, "On result::System error"), asyncCause);
}
monitor.onEach(ExecutionResultImpl.<OUT>builder()
.setExternalId(jobData.externalId())
Expand All @@ -304,9 +311,8 @@ protected final void onResult(@NotNull ExecutionContext<OUT> executionContext, @
.setError(state.addError(ctx.round(),
Optional.ofNullable(ctx.error()).orElse(asyncCause)))
.build());
final TriggerContext transitionCtx = shouldStop(triggerContext, ctx.isForceStop(), ctx.round());
if (transitionCtx.isStopped()) {
doStop(state.timerId(), transitionCtx);
if (ctx.isForceStop()) {
doStop(state.timerId(), TriggerContextFactory.stop(triggerContext, ReasonCode.STOP_BY_JOB));
}
}

Expand Down Expand Up @@ -358,15 +364,15 @@ private <R> Promise<R> wrapTimeout(Duration timeout, Promise<R> promise) {
}

@SuppressWarnings("rawtypes")
private static class InternalTriggerEvaluator extends AbstractTriggerEvaluator {
private static class InternalTriggerEvaluator extends DefaultTriggerEvaluator {

private final AbstractScheduler scheduler;

private InternalTriggerEvaluator(AbstractScheduler scheduler) { this.scheduler = scheduler; }

@Override
protected Future<TriggerContext> internalCheck(@NotNull Trigger trigger, @NotNull TriggerContext ctx,
@Nullable Object externalId) {
protected Future<TriggerContext> internalBeforeTrigger(@NotNull Trigger trigger, @NotNull TriggerContext ctx,
@Nullable Object externalId) {
if (!ctx.isKickoff()) {
throw new IllegalStateException("Trigger condition status must be " + TriggerStatus.KICKOFF);
}
Expand All @@ -375,7 +381,6 @@ protected Future<TriggerContext> internalCheck(@NotNull Trigger trigger, @NotNul

@NotNull
private TriggerContext doCheck(TriggerContext ctx) {
scheduler.log(Instant.now(), "On evaluate");
if (scheduler.state.pending()) {
return TriggerContextFactory.skip(ctx, ReasonCode.NOT_YET_SCHEDULED);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public abstract class AbstractSchedulerBuilder<IN, OUT, T extends Trigger, S ext

@Override
public @NotNull TriggerEvaluator triggerEvaluator() {
return Optional.ofNullable(evaluator).orElseGet(AbstractTriggerEvaluator::noop);
return Optional.ofNullable(evaluator).orElseGet(DefaultTriggerEvaluator::noop);
}

@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.github.zero88.schedulerx.impl;

import org.jetbrains.annotations.ApiStatus.Internal;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import io.github.zero88.schedulerx.trigger.Trigger;
import io.github.zero88.schedulerx.trigger.TriggerContext;
import io.github.zero88.schedulerx.trigger.TriggerEvaluator;
import io.vertx.core.Future;

@Internal
public class DefaultTriggerEvaluator implements TriggerEvaluator {

static TriggerEvaluator noop() {
return new DefaultTriggerEvaluator();
}

private TriggerEvaluator next;

@Override
public final @NotNull Future<TriggerContext> beforeTrigger(@NotNull Trigger trigger,
@NotNull TriggerContext triggerContext,
@Nullable Object externalId) {
return this.internalBeforeTrigger(trigger, triggerContext, externalId)
.flatMap(c -> next == null ? Future.succeededFuture(c) : next.beforeTrigger(trigger, c, externalId));
}

@Override
public final @NotNull Future<TriggerContext> afterTrigger(@NotNull Trigger trigger,
@NotNull TriggerContext triggerContext,
@Nullable Object externalId, long round) {
// @formatter:off
return this.internalAfterTrigger(trigger, triggerContext, externalId, round )
.flatMap(c -> next == null ? Future.succeededFuture(c) : next.afterTrigger(trigger, c, externalId, round));
// @formatter:on
}

@Override
public final @NotNull TriggerEvaluator andThen(@Nullable TriggerEvaluator another) {
this.next = another;
return this;
}

protected Future<TriggerContext> internalBeforeTrigger(@NotNull Trigger trigger,
@NotNull TriggerContext triggerContext,
@Nullable Object externalId) {
return Future.succeededFuture(triggerContext);
}

protected Future<TriggerContext> internalAfterTrigger(@NotNull Trigger trigger,
@NotNull TriggerContext triggerContext,
@Nullable Object externalId, long round) {
return Future.succeededFuture(triggerContext);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import io.github.zero88.schedulerx.TimeoutPolicy;
import io.github.zero88.schedulerx.impl.AbstractScheduler;
import io.github.zero88.schedulerx.impl.AbstractSchedulerBuilder;
import io.github.zero88.schedulerx.impl.AbstractTriggerEvaluator;
import io.github.zero88.schedulerx.impl.DefaultTriggerEvaluator;
import io.github.zero88.schedulerx.impl.TriggerContextFactory;
import io.github.zero88.schedulerx.trigger.TriggerCondition.ReasonCode;
import io.github.zero88.schedulerx.trigger.predicate.EventTriggerPredicate.EventTriggerPredicateException;
Expand Down Expand Up @@ -97,12 +97,12 @@ static final class EventSchedulerBuilderImpl<IN, OUT, T>
}


static final class EventTriggerEvaluator<T> extends AbstractTriggerEvaluator {
static final class EventTriggerEvaluator<T> extends DefaultTriggerEvaluator {

@Override
@SuppressWarnings("unchecked")
protected Future<TriggerContext> internalCheck(@NotNull Trigger trigger, @NotNull TriggerContext ctx,
@Nullable Object externalId) {
protected Future<TriggerContext> internalBeforeTrigger(@NotNull Trigger trigger, @NotNull TriggerContext ctx,
@Nullable Object externalId) {
try {
if (ctx.condition().status() == TriggerCondition.TriggerStatus.READY &&
!((EventTrigger<T>) trigger).getPredicate().test((T) ctx.info())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@
import java.time.Instant;

import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import io.github.zero88.schedulerx.Job;
import io.github.zero88.schedulerx.JobData;
import io.github.zero88.schedulerx.SchedulingMonitor;
import io.github.zero88.schedulerx.TimeoutPolicy;
import io.github.zero88.schedulerx.impl.AbstractScheduler;
import io.github.zero88.schedulerx.impl.AbstractSchedulerBuilder;
import io.github.zero88.schedulerx.impl.DefaultTriggerEvaluator;
import io.github.zero88.schedulerx.impl.TriggerContextFactory;
import io.github.zero88.schedulerx.trigger.TriggerCondition.ReasonCode;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
Expand All @@ -24,7 +27,7 @@ final class IntervalSchedulerImpl<IN, OUT> extends AbstractScheduler<IN, OUT, In
IntervalSchedulerImpl(@NotNull Job<IN, OUT> job, @NotNull JobData<IN> jobData, @NotNull TimeoutPolicy timeoutPolicy,
@NotNull SchedulingMonitor<OUT> monitor, @NotNull IntervalTrigger trigger,
@NotNull TriggerEvaluator evaluator, @NotNull Vertx vertx) {
super(job, jobData, timeoutPolicy, monitor, trigger, evaluator, vertx);
super(job, jobData, timeoutPolicy, monitor, trigger, new IntervalTriggerEvaluator().andThen(evaluator), vertx);
}

protected @NotNull Future<Long> registerTimer(WorkerExecutor workerExecutor) {
Expand Down Expand Up @@ -67,4 +70,20 @@ static final class IntervalSchedulerBuilderImpl<IN, OUT>

}


static final class IntervalTriggerEvaluator extends DefaultTriggerEvaluator {

@Override
protected Future<TriggerContext> internalAfterTrigger(@NotNull Trigger trigger,
@NotNull TriggerContext triggerContext,
@Nullable Object externalId, long round) {
IntervalTrigger interval = (IntervalTrigger) trigger;
if (interval.noRepeatIndefinitely() && round >= interval.getRepeat()) {
return Future.succeededFuture(TriggerContextFactory.stop(triggerContext, ReasonCode.STOP_BY_CONFIG));
}
return Future.succeededFuture(triggerContext);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,6 @@ default long delayInMilliseconds() {
@Override
@NotNull IntervalTrigger validate();

@Override
default boolean shouldStop(long round) {
return noRepeatIndefinitely() && round >= getRepeat();
}

@Override
default JsonObject toJson() {
JsonObject self = JsonObject.of("repeat", getRepeat(), "initialDelay", getInitialDelay(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,6 @@ public interface Trigger extends HasTriggerType, TriggerRepresentation {
*/
@NotNull Trigger validate();

/**
* Verify the execution should be stopped after the current execution round is out of the trigger rule.
* <p/>
* This method will be invoked right away after each execution round is finished regardless of the execution result
* is success or error.
*
* @param round the current execution round
* @since 2.0.0
*/
default boolean shouldStop(long round) { return false; }

/**
* Simulate the next trigger times based on default preview parameter({@link PreviewParameter#byDefault()})
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,24 @@
public interface TriggerEvaluator {

/**
* Check whether the trigger is able to run
* Verify if the trigger can run before each execution round is started.
*
* @param trigger the trigger
* @param context the trigger context
* @param externalId the job external id
* @param trigger the trigger
* @param triggerContext the trigger context
* @param externalId the job external id
* @return a future of the trigger context that is evaluated
*/
@NotNull Future<TriggerContext> beforeRun(@NotNull Trigger trigger, @NotNull TriggerContext context,
@Nullable Object externalId);
@NotNull Future<TriggerContext> beforeTrigger(@NotNull Trigger trigger, @NotNull TriggerContext triggerContext,
@Nullable Object externalId);

/**
* Verify if the trigger should stop executing immediately after one round of execution begins.
*
* @param round the current execution round
* @since 2.0.0
*/
@NotNull Future<TriggerContext> afterTrigger(@NotNull Trigger trigger, @NotNull TriggerContext triggerContext,
@Nullable Object externalId, long round);

/**
* Chain another evaluator
Expand Down
Loading

0 comments on commit 1d38a84

Please sign in to comment.