Skip to content

Commit

Permalink
feature(#100): TimeClock interface
Browse files Browse the repository at this point in the history
  • Loading branch information
zero88 committed Dec 28, 2023
1 parent 9cd0420 commit b57714f
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 30 deletions.
23 changes: 23 additions & 0 deletions core/src/main/java/io/github/zero88/schedulerx/TimeClock.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.github.zero88.schedulerx;

import java.time.Instant;

import org.jetbrains.annotations.ApiStatus.Internal;
import org.jetbrains.annotations.NotNull;

/**
* Represents for time clock
*
* @since 2.0.0
*/
@Internal
public interface TimeClock {

/**
* Obtains the current instant from the system clock.
*
* @return the current instant using the system clock, not null
*/
@NotNull Instant now();

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.github.zero88.schedulerx.JobExecutor;
import io.github.zero88.schedulerx.Scheduler;
import io.github.zero88.schedulerx.SchedulingMonitor;
import io.github.zero88.schedulerx.TimeClock;
import io.github.zero88.schedulerx.TimeoutBlock;
import io.github.zero88.schedulerx.TimeoutPolicy;
import io.github.zero88.schedulerx.WorkerExecutorFactory;
Expand Down Expand Up @@ -59,6 +60,7 @@ public abstract class AbstractScheduler<IN, OUT, T extends Trigger> implements S
private final @NotNull T trigger;
private final @NotNull TriggerEvaluator evaluator;
private final @NotNull TimeoutPolicy timeoutPolicy;
private final @NotNull TimeClock clock;
private final Lock lock = new ReentrantLock();
private boolean didStart = false;
private boolean didTriggerValidation = false;
Expand All @@ -74,12 +76,15 @@ protected AbstractScheduler(@NotNull Job<IN, OUT> job, @NotNull JobData<IN> jobD
this.trigger = trigger;
this.monitor = monitor;
this.evaluator = new InternalTriggerEvaluator(this).andThen(evaluator);
this.state = new SchedulerStateImpl<>();
this.clock = new TimeClockImpl();
this.state = new SchedulerStateImpl<>(clock);
}

@Override
public final @NotNull Vertx vertx() { return this.vertx; }

public final @NotNull TimeClock clock() { return this.clock; }

@Override
public final @NotNull SchedulingMonitor<OUT> monitor() { return this.monitor; }

Expand Down Expand Up @@ -149,7 +154,7 @@ public final void executeJob(@NotNull ExecutionContext<OUT> executionContext) {
@Override
public final void cancel() {
if (!state.completed()) {
log(Instant.now(), "On cancel");
log(clock.now(), "On cancel");
doStop(state.timerId(), TriggerContextFactory.cancel(trigger().type(), state.tick()));
}
}
Expand Down Expand Up @@ -203,8 +208,9 @@ protected final void onTrigger(WorkerExecutor workerExecutor, TriggerContext tri
return;
}
final long round = state.increaseRound();
final ExecutionContextInternal<OUT> executionContext = new ExecutionContextImpl<>(vertx, triggerContext, round);
final Duration timeout = timeoutPolicy().executionTimeout();
final ExecutionContextInternal<OUT> executionContext = new ExecutionContextImpl<>(vertx, clock, triggerContext,
round);
log(executionContext.triggeredAt(), "On trigger", triggerContext.tick(), round);
Future.join(onEvaluationAfterTrigger(workerExecutor, triggerContext, round),
executeBlocking(workerExecutor, p -> executeJob(executionContext.setup(wrapTimeout(timeout, p)))))
Expand All @@ -228,7 +234,7 @@ protected final void onSchedule(long timerId) {
.setExternalId(jobData.externalId())
.setAvailableAt(state.availableAt())
.setTriggerContext(context)
.setRescheduledAt(Instant.now())
.setRescheduledAt(clock.now())
.setTick(context.tick())
.setRound(state.round())
.build();
Expand All @@ -241,15 +247,15 @@ protected final void onUnableSchedule(Throwable cause) {
monitor.onUnableSchedule(ExecutionResultImpl.<OUT>builder()
.setExternalId(jobData.externalId())
.setTriggerContext(ctx)
.setUnscheduledAt(Instant.now())
.setUnscheduledAt(clock.now())
.setTick(ctx.tick())
.setRound(ctx.tick())
.build());
}

protected final Future<TriggerContext> onEvaluationBeforeTrigger(WorkerExecutor worker, TriggerContext ctx) {
return executeBlocking(worker, p -> {
log(Instant.now(), "On before trigger");
log(clock.now(), "On before trigger");
wrapTimeout(timeoutPolicy().evaluationTimeout(), p).handle(
evaluator.beforeTrigger(trigger, ctx, jobData.externalId()));
});
Expand All @@ -258,11 +264,11 @@ protected final Future<TriggerContext> onEvaluationBeforeTrigger(WorkerExecutor
protected final Future<TriggerContext> onEvaluationAfterTrigger(WorkerExecutor worker, TriggerContext ctx,
long round) {
return executeBlocking(worker, p -> {
log(Instant.now(), "On after trigger");
log(clock.now(), "On after trigger");
wrapTimeout(timeoutPolicy().evaluationTimeout(), p).handle(
evaluator.afterTrigger(trigger(), ctx, jobData.externalId(), round)
.onSuccess(c -> doStop(state.timerId(), c))
.onFailure(t -> LOGGER.error(genMsg(ctx.tick(), round, Instant.now(), "After evaluate"), t)));
.onFailure(t -> LOGGER.error(genMsg(ctx.tick(), round, clock.now(), "After evaluate"), t)));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import org.jetbrains.annotations.NotNull;

import io.github.zero88.schedulerx.TimeClock;
import io.github.zero88.schedulerx.trigger.TriggerContext;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
Expand All @@ -14,18 +15,20 @@ final class ExecutionContextImpl<OUTPUT> implements ExecutionContextInternal<OUT
private final Vertx vertx;
private final long round;
private final TriggerContext triggerContext;
private final TimeClock clock;
private final Instant triggeredAt;
private Instant executedAt;
private Promise<Object> promise;
private OUTPUT data;
private Throwable error;
private boolean forceStop = false;

ExecutionContextImpl(Vertx vertx, TriggerContext triggerContext, long round) {
ExecutionContextImpl(Vertx vertx, TimeClock clock, TriggerContext triggerContext, long round) {
this.vertx = vertx;
this.round = round;
this.triggerContext = triggerContext;
this.triggeredAt = Instant.now();
this.clock = clock;
this.triggeredAt = this.clock.now();
}

@Override
Expand All @@ -34,7 +37,7 @@ final class ExecutionContextImpl<OUTPUT> implements ExecutionContextInternal<OUT
throw new IllegalStateException("ExecutionContext is already setup");
}
this.promise = promise;
this.executedAt = Instant.now();
this.executedAt = this.clock.now();
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

import org.jetbrains.annotations.NotNull;

import io.github.zero88.schedulerx.TimeClock;

final class SchedulerStateImpl<OUTPUT> implements SchedulerStateInternal<OUTPUT> {

private final AtomicReference<Instant> availableAt = new AtomicReference<>();
Expand All @@ -24,8 +26,11 @@ final class SchedulerStateImpl<OUTPUT> implements SchedulerStateInternal<OUTPUT>
private final AtomicBoolean pending = new AtomicBoolean(true);
private final AtomicReference<Entry<Long, OUTPUT>> data = new AtomicReference<>(new SimpleEntry<>(0L, null));
private final AtomicReference<Entry<Long, Throwable>> error = new AtomicReference<>(new SimpleEntry<>(0L, null));
private final TimeClock clock;
private long timerId;

SchedulerStateImpl(TimeClock clock) { this.clock = clock; }

@Override
public Instant availableAt() { return availableAt.get(); }

Expand Down Expand Up @@ -73,20 +78,20 @@ public long increaseTick() {
@Override
public @NotNull Instant markAvailable() {
pending.set(false);
availableAt.set(Instant.now());
availableAt.set(clock.now());
return availableAt();
}

@Override
public @NotNull Instant markFinished(long tick) {
inProgress.remove(tick);
return Instant.now();
return clock.now();
}

@Override
public @NotNull Instant markCompleted() {
completed.set(true);
return Instant.now();
return clock.now();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.github.zero88.schedulerx.impl;

import java.time.Instant;

import org.jetbrains.annotations.ApiStatus.Internal;
import org.jetbrains.annotations.NotNull;

import io.github.zero88.schedulerx.TimeClock;

@Internal
class TimeClockImpl implements TimeClock {

@Override
public @NotNull Instant now() {
return Instant.now();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,21 +66,23 @@ public static TriggerContext error(String triggerType, String reason, @Nullable
* Create trigger context in {@link TriggerStatus#KICKOFF} state
*
* @param triggerType the trigger type
* @param firedAt the fired at
* @param tick the tick
*/
public static @NotNull TriggerContext kickoff(@NotNull String triggerType, long tick) {
return kickoff(triggerType, tick, null);
public static @NotNull TriggerContext kickoff(@NotNull String triggerType, @NotNull Instant firedAt, long tick) {
return kickoff(triggerType, firedAt, tick, null);
}

/**
* Create trigger context in {@link TriggerStatus#KICKOFF} state
*
* @param triggerType the trigger type
* @param firedAt the fired at
* @param tick the tick
* @param info the trigger context info
*/
public static @NotNull <T> TriggerContext kickoff(@NotNull String triggerType, long tick, @Nullable T info) {
final Instant firedAt = Instant.now();
public static @NotNull <T> TriggerContext kickoff(@NotNull String triggerType, @NotNull Instant firedAt, long tick,
@Nullable T info) {
final TriggerCondition condition = createCondition(TriggerStatus.KICKOFF, null, null);
return new TriggerContext() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ final class CronSchedulerImpl<IN, OUT> extends AbstractScheduler<IN, OUT, CronTr
@Override
protected @NotNull Future<Long> registerTimer(WorkerExecutor workerExecutor) {
try {
final Instant now = Instant.now();
final Instant now = clock().now();
final long nextTriggerAfter = trigger().nextTriggerAfter(now);
final Instant nextTriggerTime = now.plus(nextTriggerAfter, ChronoUnit.MILLIS);
nextTimerId = vertx().setTimer(nextTriggerAfter, tId -> {
onProcess(workerExecutor, TriggerContextFactory.kickoff(trigger().type(), onFire(tId)));
onProcess(workerExecutor, TriggerContextFactory.kickoff(trigger().type(), clock().now(), onFire(tId)));
doStart(workerExecutor);
});
log(now, "Next schedule at" + brackets(nextTriggerTime) + " by timerId" + brackets(nextTimerId));
Expand All @@ -49,7 +49,7 @@ final class CronSchedulerImpl<IN, OUT> extends AbstractScheduler<IN, OUT, CronTr
@Override
protected void unregisterTimer(long timerId) {
boolean result = vertx().cancelTimer(nextTimerId);
log(Instant.now(), "Unregistered timerId" + brackets(nextTimerId) + brackets(result));
log(clock().now(), "Unregistered timerId" + brackets(nextTimerId) + brackets(result));
}

static final class CronSchedulerBuilderImpl<IN, OUT>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import static io.github.zero88.schedulerx.impl.Utils.brackets;

import java.time.Instant;
import java.util.Objects;

import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -61,7 +60,7 @@ final class EventSchedulerImpl<IN, OUT, T> extends AbstractScheduler<IN, OUT, Ev
protected void unregisterTimer(long timerId) {
if (Objects.nonNull(consumer)) {
consumer.unregister()
.onComplete(r -> log(Instant.now(),
.onComplete(r -> log(clock().now(),
"Unregistered EventBus subscriber on address" + brackets(consumer.address()) +
brackets(r.succeeded()) + brackets(r.cause())));
}
Expand All @@ -70,9 +69,9 @@ protected void unregisterTimer(long timerId) {
private TriggerContext createKickoffContext(Message<Object> msg, long tick) {
try {
T eventMsg = trigger().getPredicate().convert(msg.headers(), msg.body());
return TriggerContextFactory.kickoff(trigger().type(), tick, eventMsg);
return TriggerContextFactory.kickoff(trigger().type(), clock().now(), tick, eventMsg);
} catch (Exception ex) {
return handleException(TriggerContextFactory.kickoff(trigger().type(), tick, msg), ex);
return handleException(TriggerContextFactory.kickoff(trigger().type(), clock().now(), tick, msg), ex);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import static io.github.zero88.schedulerx.trigger.IntervalTrigger.REPEAT_INDEFINITELY;

import java.time.Duration;
import java.time.Instant;

import org.jetbrains.annotations.NotNull;

Expand Down Expand Up @@ -37,7 +36,7 @@ final class IntervalSchedulerImpl<IN, OUT> extends AbstractScheduler<IN, OUT, In
}
final Promise<Long> promise = Promise.promise();
final long delay = trigger().initialDelay().toMillis();
log(Instant.now(), "Delay " + brackets(delay + "ms") + " then register the trigger in the scheduler");
log(clock().now(), "Delay " + brackets(delay + "ms") + " then register the trigger in the scheduler");
vertx().setTimer(delay, ignore -> promise.complete(createPeriodicTimer(workerExecutor)));
return promise.future();
} catch (Exception e) {
Expand All @@ -48,13 +47,15 @@ final class IntervalSchedulerImpl<IN, OUT> extends AbstractScheduler<IN, OUT, In
@Override
protected void unregisterTimer(long timerId) {
boolean result = vertx().cancelTimer(timerId);
log(Instant.now(), "Unregistered timerId" + brackets(timerId) + brackets(result));
log(clock().now(), "Unregistered timerId" + brackets(timerId) + brackets(result));
}

private long createPeriodicTimer(WorkerExecutor executor) {
final long millis = trigger().interval().toMillis();
return this.vertx()
.setPeriodic(trigger().interval().toMillis(),
id -> onProcess(executor, TriggerContextFactory.kickoff(trigger().type(), onFire(id))));
.setPeriodic(millis, id -> onProcess(executor,
TriggerContextFactory.kickoff(trigger().type(), clock().now(),
onFire(id))));
}

// @formatter:off
Expand Down

0 comments on commit b57714f

Please sign in to comment.