Skip to content

Commit

Permalink
feat(#93): TriggerEvaluator#beforeRun
Browse files Browse the repository at this point in the history
  • Loading branch information
zero88 committed Dec 19, 2023
1 parent 5253452 commit 57e759c
Show file tree
Hide file tree
Showing 10 changed files with 231 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.jetbrains.annotations.NotNull;

import io.github.zero88.schedulerx.trigger.Trigger;
import io.github.zero88.schedulerx.trigger.TriggerEvaluator;
import io.vertx.core.Vertx;

/**
Expand All @@ -21,10 +22,14 @@ public interface SchedulerBuilder<IN, OUT, TRIGGER extends Trigger, SCHEDULER ex
SELF extends SchedulerBuilder<IN, OUT, TRIGGER, SCHEDULER, SELF>>
extends JobExecutorContext<IN, OUT>, SchedulerContext<TRIGGER, OUT> {

@NotNull TriggerEvaluator triggerEvaluator();

@NotNull SELF setVertx(@NotNull Vertx vertx);

@NotNull SELF setTrigger(@NotNull TRIGGER trigger);

@NotNull SELF setTriggerEvaluator(@NotNull TriggerEvaluator evaluator);

@NotNull SELF setMonitor(@NotNull SchedulingMonitor<OUT> monitor);

@NotNull SELF setJob(@NotNull Job<IN, OUT> job);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import io.github.zero88.schedulerx.trigger.TriggerCondition.ReasonCode;
import io.github.zero88.schedulerx.trigger.TriggerCondition.TriggerStatus;
import io.github.zero88.schedulerx.trigger.TriggerContext;
import io.github.zero88.schedulerx.trigger.TriggerEvaluator;
import io.github.zero88.schedulerx.trigger.rule.TriggerRule;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
Expand All @@ -56,21 +58,23 @@ public abstract class AbstractScheduler<IN, OUT, T extends Trigger> implements S
private final @NotNull JobData<IN> jobData;
private final @NotNull Job<IN, OUT> job;
private final @NotNull T trigger;
private final @NotNull TriggerEvaluator evaluator;
private final @NotNull TimeoutPolicy timeoutPolicy;
private final Lock lock = new ReentrantLock();
private boolean didStart = false;
private boolean didTriggerValidation = false;
private IllegalArgumentException invalidTrigger;

protected AbstractScheduler(@NotNull Vertx vertx, @NotNull SchedulingMonitor<OUT> monitor,
@NotNull JobData<IN> jobData, @NotNull Job<IN, OUT> job, @NotNull T trigger,
@NotNull TimeoutPolicy timeoutPolicy) {
protected AbstractScheduler(@NotNull Job<IN, OUT> job, @NotNull JobData<IN> jobData,
@NotNull TimeoutPolicy timeoutPolicy, @NotNull SchedulingMonitor<OUT> monitor,
@NotNull T trigger, @NotNull TriggerEvaluator evaluator, @NotNull Vertx vertx) {
this.job = job;
this.jobData = jobData;
this.timeoutPolicy = timeoutPolicy;
this.vertx = vertx;
this.trigger = trigger;
this.monitor = monitor;
this.evaluator = new InternalTriggerEvaluator(this).andThen(evaluator);
this.state = new SchedulerStateImpl<>();
}

Expand Down Expand Up @@ -170,23 +174,6 @@ protected final void doStop(long timerId, TriggerContext context) {
*/
protected abstract void unregisterTimer(long timerId);

/**
* Check a trigger kickoff context whether to be able to run new execution or not
*/
protected final TriggerContext shouldRun(@NotNull TriggerContext kickOffContext) {
log(Instant.now(), "On evaluate");
if (state.pending()) {
return TriggerContextFactory.skip(kickOffContext, ReasonCode.NOT_YET_SCHEDULED);
}
if (state.completed()) {
return TriggerContextFactory.skip(kickOffContext, ReasonCode.ALREADY_STOPPED);
}
if (state.executing()) {
return TriggerContextFactory.skip(kickOffContext, ReasonCode.JOB_IS_RUNNING);
}
return evaluateTriggerRule(kickOffContext);
}

/**
* Check a trigger context whether to be able to stop by configuration or force stop
*/
Expand All @@ -199,23 +186,6 @@ protected final TriggerContext shouldStop(@NotNull TriggerContext triggerContext
: triggerContext;
}

/**
* Evaluate a trigger kickoff context on trigger rule
*/
protected TriggerContext evaluateTriggerRule(@NotNull TriggerContext triggerContext) {
if (!triggerContext.isKickoff()) {
throw new IllegalStateException("Trigger condition status must be " + TriggerStatus.KICKOFF);
}
final Instant firedAt = Objects.requireNonNull(triggerContext.firedAt(),
"Kickoff context is missing a fired at time");
if (trigger().rule().isExceeded(firedAt)) {
return TriggerContextFactory.stop(triggerContext, ReasonCode.STOP_BY_CONFIG);
}
return trigger().shouldExecute(firedAt)
? TriggerContextFactory.ready(triggerContext)
: TriggerContextFactory.skip(triggerContext, ReasonCode.CONDITION_IS_NOT_MATCHED);
}

/**
* Register a timer id in internal state and increase tick time when the system timer fires
*
Expand All @@ -232,9 +202,9 @@ protected final long onFire(long timerId) {
protected final void onProcess(WorkerExecutor workerExecutor, TriggerContext ctx) {
log(Objects.requireNonNull(ctx.firedAt()), "On fire");
final Duration timeout = timeoutPolicy().evaluationTimeout();
this.<TriggerContext>executeBlocking(workerExecutor,
p -> wrapTimeout(timeoutPolicy().evaluationTimeout(), p).complete(
shouldRun(ctx)))
this.<TriggerContext>executeBlocking(workerExecutor, p -> this.wrapTimeout(timeout, p)
.handle(evaluator.beforeRun(trigger, ctx,
jobData.externalId())))
.onSuccess(context -> onTrigger(workerExecutor, context))
.onFailure(t -> onMisfire(TriggerContextFactory.skip(ctx, t instanceof TimeoutException
? ReasonCode.EVALUATION_TIMEOUT
Expand Down Expand Up @@ -387,4 +357,44 @@ private <R> Promise<R> wrapTimeout(Duration timeout, Promise<R> promise) {
return new TimeoutBlock(vertx, timeout).wrap(promise);
}

@SuppressWarnings("rawtypes")
private static class InternalTriggerEvaluator extends AbstractTriggerEvaluator {

private final AbstractScheduler scheduler;

private InternalTriggerEvaluator(AbstractScheduler scheduler) { this.scheduler = scheduler; }

@Override
protected Future<TriggerContext> internalCheck(@NotNull Trigger trigger, @NotNull TriggerContext ctx,
@Nullable Object externalId) {
if (!ctx.isKickoff()) {
throw new IllegalStateException("Trigger condition status must be " + TriggerStatus.KICKOFF);
}
return Future.succeededFuture(doCheck(ctx));
}

@NotNull
private TriggerContext doCheck(TriggerContext ctx) {
scheduler.log(Instant.now(), "On evaluate");
if (scheduler.state.pending()) {
return TriggerContextFactory.skip(ctx, ReasonCode.NOT_YET_SCHEDULED);
}
if (scheduler.state.completed()) {
return TriggerContextFactory.skip(ctx, ReasonCode.ALREADY_STOPPED);
}
if (scheduler.state.executing()) {
return TriggerContextFactory.skip(ctx, ReasonCode.JOB_IS_RUNNING);
}
final Instant firedAt = Objects.requireNonNull(ctx.firedAt());
final TriggerRule rule = scheduler.trigger().rule();
if (rule.isExceeded(firedAt)) {
return TriggerContextFactory.stop(ctx, ReasonCode.STOP_BY_CONFIG);
}
return rule.satisfy(firedAt)
? TriggerContextFactory.ready(ctx)
: TriggerContextFactory.skip(ctx, ReasonCode.CONDITION_IS_NOT_MATCHED);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.github.zero88.schedulerx.SchedulingMonitor;
import io.github.zero88.schedulerx.TimeoutPolicy;
import io.github.zero88.schedulerx.trigger.Trigger;
import io.github.zero88.schedulerx.trigger.TriggerEvaluator;
import io.vertx.core.Vertx;

/**
Expand All @@ -30,6 +31,7 @@ public abstract class AbstractSchedulerBuilder<IN, OUT, T extends Trigger, S ext
private JobData<IN> jobData;
private Job<IN, OUT> job;
private T trigger;
private TriggerEvaluator evaluator;
private TimeoutPolicy timeoutPolicy;

@Override
Expand All @@ -45,6 +47,11 @@ public abstract class AbstractSchedulerBuilder<IN, OUT, T extends Trigger, S ext
@Override
public @NotNull T trigger() { return Objects.requireNonNull(trigger, "Trigger is required"); }

@Override
public @NotNull TriggerEvaluator triggerEvaluator() {
return Optional.ofNullable(evaluator).orElseGet(AbstractTriggerEvaluator::noop);
}

@Override
public @NotNull Job<IN, OUT> job() { return Objects.requireNonNull(job, "Job is required"); }

Expand All @@ -71,6 +78,12 @@ public abstract class AbstractSchedulerBuilder<IN, OUT, T extends Trigger, S ext
return (B) this;
}

@Override
public @NotNull B setTriggerEvaluator(@NotNull TriggerEvaluator evaluator) {
this.evaluator = evaluator;
return (B) this;
}

public @NotNull B setMonitor(@NotNull SchedulingMonitor<OUT> monitor) {
this.monitor = monitor;
return (B) this;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package io.github.zero88.schedulerx.impl;

import org.jetbrains.annotations.ApiStatus.Internal;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import io.github.zero88.schedulerx.trigger.Trigger;
import io.github.zero88.schedulerx.trigger.TriggerContext;
import io.github.zero88.schedulerx.trigger.TriggerEvaluator;
import io.vertx.core.Future;

@Internal
public abstract class AbstractTriggerEvaluator implements TriggerEvaluator {

static TriggerEvaluator noop() {
return new AbstractTriggerEvaluator() {
@Override
protected Future<TriggerContext> internalCheck(@NotNull Trigger trigger,
@NotNull TriggerContext triggerContext,
@Nullable Object externalId) {
return Future.succeededFuture(triggerContext);
}
};
}

private TriggerEvaluator next;

@Override
public @NotNull Future<TriggerContext> beforeRun(@NotNull Trigger trigger, @NotNull TriggerContext triggerContext,
@Nullable Object externalId) {
return this.internalCheck(trigger, triggerContext, externalId)
.flatMap(ctx -> next == null ? Future.succeededFuture(ctx) : next.beforeRun(trigger, ctx, externalId));
}

@Override
public @NotNull TriggerEvaluator andThen(@Nullable TriggerEvaluator another) {
this.next = another;
return this;
}

protected abstract Future<TriggerContext> internalCheck(@NotNull Trigger trigger,
@NotNull TriggerContext triggerContext,
@Nullable Object externalId);

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ final class CronSchedulerImpl<IN, OUT> extends AbstractScheduler<IN, OUT, CronTr

private long nextTimerId;

CronSchedulerImpl(@NotNull Vertx vertx, @NotNull SchedulingMonitor<OUT> monitor, @NotNull JobData<IN> jobData,
@NotNull Job<IN, OUT> job, @NotNull CronTrigger trigger, @NotNull TimeoutPolicy timeoutPolicy) {
super(vertx, monitor, jobData, job, trigger, timeoutPolicy);
CronSchedulerImpl(@NotNull Job<IN, OUT> job, @NotNull JobData<IN> jobData, @NotNull TimeoutPolicy timeoutPolicy,
@NotNull SchedulingMonitor<OUT> monitor, @NotNull CronTrigger trigger,
@NotNull TriggerEvaluator evaluator, @NotNull Vertx vertx) {
super(job, jobData, timeoutPolicy, monitor, trigger, evaluator, vertx);
}

@Override
Expand Down Expand Up @@ -56,7 +57,8 @@ static final class CronSchedulerBuilderImpl<IN, OUT>
implements CronSchedulerBuilder<IN, OUT> {

public @NotNull CronScheduler<IN, OUT> build() {
return new CronSchedulerImpl<>(vertx(), monitor(), jobData(), job(), trigger(), timeoutPolicy());
return new CronSchedulerImpl<>(job(), jobData(), timeoutPolicy(), monitor(), trigger(), triggerEvaluator(),
vertx());
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
import java.util.Objects;

import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import io.github.zero88.schedulerx.Job;
import io.github.zero88.schedulerx.JobData;
import io.github.zero88.schedulerx.SchedulingMonitor;
import io.github.zero88.schedulerx.TimeoutPolicy;
import io.github.zero88.schedulerx.impl.AbstractScheduler;
import io.github.zero88.schedulerx.impl.AbstractSchedulerBuilder;
import io.github.zero88.schedulerx.impl.AbstractTriggerEvaluator;
import io.github.zero88.schedulerx.impl.TriggerContextFactory;
import io.github.zero88.schedulerx.trigger.TriggerCondition.ReasonCode;
import io.github.zero88.schedulerx.trigger.predicate.EventTriggerPredicate.EventTriggerPredicateException;
Expand All @@ -28,10 +30,10 @@ final class EventSchedulerImpl<IN, OUT, T> extends AbstractScheduler<IN, OUT, Ev

private MessageConsumer<Object> consumer;

EventSchedulerImpl(@NotNull Vertx vertx, @NotNull SchedulingMonitor<OUT> monitor, @NotNull JobData<IN> jobData,
@NotNull Job<IN, OUT> job, @NotNull EventTrigger<T> trigger,
@NotNull TimeoutPolicy timeoutPolicy) {
super(vertx, monitor, jobData, job, trigger, timeoutPolicy);
EventSchedulerImpl(@NotNull Job<IN, OUT> job, @NotNull JobData<IN> jobData, @NotNull TimeoutPolicy timeoutPolicy,
@NotNull SchedulingMonitor<OUT> monitor, @NotNull EventTrigger<T> trigger,
@NotNull TriggerEvaluator evaluator, @NotNull Vertx vertx) {
super(job, jobData, timeoutPolicy, monitor, trigger, new EventTriggerEvaluator<>().andThen(evaluator), vertx);
}

@Override
Expand Down Expand Up @@ -65,21 +67,6 @@ protected void unregisterTimer(long timerId) {
}
}

@Override
@SuppressWarnings("unchecked")
protected TriggerContext evaluateTriggerRule(@NotNull TriggerContext triggerContext) {
final TriggerContext ctx = super.evaluateTriggerRule(triggerContext);
try {
if (ctx.condition().status() == TriggerCondition.TriggerStatus.READY &&
!trigger().getPredicate().test((T) triggerContext.info())) {
return TriggerContextFactory.skip(ctx, ReasonCode.CONDITION_IS_NOT_MATCHED);
}
} catch (Exception ex) {
return handleException(ctx, ex);
}
return ctx;
}

private TriggerContext createKickoffContext(Message<Object> msg, long tick) {
try {
T eventMsg = trigger().getPredicate().convert(msg.headers(), msg.body());
Expand All @@ -89,7 +76,7 @@ private TriggerContext createKickoffContext(Message<Object> msg, long tick) {
}
}

private TriggerContext handleException(TriggerContext context, Exception cause) {
static TriggerContext handleException(TriggerContext context, Exception cause) {
String reason = cause instanceof ClassCastException || cause instanceof EventTriggerPredicateException
? ReasonCode.CONDITION_IS_NOT_MATCHED
: ReasonCode.UNEXPECTED_ERROR;
Expand All @@ -103,7 +90,28 @@ static final class EventSchedulerBuilderImpl<IN, OUT, T>
// @formatter:on

public @NotNull EventScheduler<IN, OUT, T> build() {
return new EventSchedulerImpl<>(vertx(), monitor(), jobData(), job(), trigger(), timeoutPolicy());
return new EventSchedulerImpl<>(job(), jobData(), timeoutPolicy(), monitor(), trigger(), triggerEvaluator(),
vertx());
}

}


static final class EventTriggerEvaluator<T> extends AbstractTriggerEvaluator {

@Override
@SuppressWarnings("unchecked")
protected Future<TriggerContext> internalCheck(@NotNull Trigger trigger, @NotNull TriggerContext ctx,
@Nullable Object externalId) {
try {
if (ctx.condition().status() == TriggerCondition.TriggerStatus.READY &&
!((EventTrigger<T>) trigger).getPredicate().test((T) ctx.info())) {
return Future.succeededFuture(TriggerContextFactory.skip(ctx, ReasonCode.CONDITION_IS_NOT_MATCHED));
}
} catch (Exception ex) {
return Future.succeededFuture(handleException(ctx, ex));
}
return Future.succeededFuture(ctx);
}

}
Expand Down
Loading

0 comments on commit 57e759c

Please sign in to comment.