Skip to content

Commit

Permalink
refactor(scheduler): logging and internal AbstractScheduler method
Browse files Browse the repository at this point in the history
  • Loading branch information
zero88 committed Oct 28, 2023
1 parent 61f7bb2 commit a51e550
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 43 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package io.github.zero88.schedulerx.impl;

import static io.github.zero88.schedulerx.impl.Utils.brackets;

import java.text.MessageFormat;
import java.time.Instant;
import java.util.Objects;
import java.util.concurrent.locks.Lock;
Expand Down Expand Up @@ -131,7 +134,7 @@ public final void executeTask(@NotNull ExecutionContext<OUT> executionContext) {
@Override
public final void cancel() {
if (!state.completed()) {
log(Instant.now(), "Canceling the task");
log(Instant.now(), "On cancel");
doStop(state.timerId(), TriggerContextFactory.stop(trigger().type(), ReasonCode.STOP_BY_MANUAL));
}
}
Expand All @@ -145,27 +148,36 @@ protected final void doStop(long timerId, TriggerContext context) {
onComplete(context);
}

/**
* Register a new timer in the system based on the trigger configuration
*/
protected abstract @NotNull Future<Long> registerTimer(@Nullable WorkerExecutor workerExecutor);

/**
* Unregister current timer id out of the system
*/
protected abstract void unregisterTimer(long timerId);

protected final TriggerTransitionContext shouldRun(@NotNull TriggerTransitionContext triggerContext) {
/**
* Check a trigger kickoff context whether to be able to run new execution or not
*/
protected final TriggerTransitionContext shouldRun(@NotNull TriggerTransitionContext kickOffContext) {
log(Instant.now(), "On evaluate");
if (!triggerContext.isKickoff()) {
throw new IllegalStateException("Trigger condition status must be " + TriggerStatus.KICKOFF);
}
if (state.pending()) {
return TriggerContextFactory.skip(triggerContext, ReasonCode.NOT_YET_SCHEDULED);
return TriggerContextFactory.skip(kickOffContext, ReasonCode.NOT_YET_SCHEDULED);
}
if (state.completed()) {
return TriggerContextFactory.skip(triggerContext, ReasonCode.ALREADY_STOPPED);
return TriggerContextFactory.skip(kickOffContext, ReasonCode.ALREADY_STOPPED);
}
if (state.executing()) {
return TriggerContextFactory.skip(triggerContext, ReasonCode.TASK_IS_RUNNING);
return TriggerContextFactory.skip(kickOffContext, ReasonCode.TASK_IS_RUNNING);
}
return evaluateTrigger(triggerContext);
return evaluateTriggerRule(kickOffContext);
}

/**
* Check a trigger context whether to be able to stop by configuration or force stop
*/
protected final TriggerTransitionContext shouldStop(@NotNull TriggerTransitionContext triggerContext,
boolean isForceStop, long round) {
if (isForceStop) {
Expand All @@ -176,8 +188,15 @@ protected final TriggerTransitionContext shouldStop(@NotNull TriggerTransitionCo
: triggerContext;
}

protected TriggerTransitionContext evaluateTrigger(@NotNull TriggerTransitionContext triggerContext) {
final Instant firedAt = Objects.requireNonNull(triggerContext.firedAt());
/**
* Evaluate a trigger kickoff context on trigger rule
*/
protected TriggerTransitionContext evaluateTriggerRule(@NotNull TriggerTransitionContext triggerContext) {
if (!triggerContext.isKickoff()) {
throw new IllegalStateException("Trigger condition status must be " + TriggerStatus.KICKOFF);
}
final Instant firedAt = Objects.requireNonNull(triggerContext.firedAt(),
"Kickoff context is missing a fired at time");
if (trigger().rule().isExceeded(firedAt)) {
return TriggerContextFactory.stop(triggerContext, ReasonCode.STOP_BY_CONFIG);
}
Expand All @@ -187,7 +206,7 @@ protected TriggerTransitionContext evaluateTrigger(@NotNull TriggerTransitionCon
}

/**
* Register a timer id and increase tick time when the system timer fires
* Register a timer id in internal state and increase tick time when the system timer fires
*
* @param timerId the system timer id
* @return the current number times of the tick counter
Expand All @@ -196,7 +215,10 @@ protected final long onFire(long timerId) {
return state.timerId(timerId).increaseTick();
}

protected final void onRun(WorkerExecutor workerExecutor, TriggerTransitionContext kickoffContext) {
/**
* Processing the trigger when the system timer fires
*/
protected final void onProcess(WorkerExecutor workerExecutor, TriggerTransitionContext kickoffContext) {
log(Objects.requireNonNull(kickoffContext.firedAt()), "On fire");
this.<TriggerTransitionContext>executeBlocking(workerExecutor, p -> p.complete(shouldRun(kickoffContext)))
.onSuccess(context -> onTrigger(workerExecutor, context))
Expand Down Expand Up @@ -243,8 +265,8 @@ protected final void onUnableSchedule(Throwable cause) {
.setExternalId(jobData.externalId())
.setTriggerContext(ctx)
.setUnscheduledAt(Instant.now())
.setTick(state.tick())
.setRound(state.round())
.setTick(-1)
.setRound(-1)
.build());
}

Expand Down Expand Up @@ -325,8 +347,9 @@ protected final void log(@NotNull Instant at, @NotNull String event, long tick,
private String genMsg(long tick, long round, @NotNull Instant at, @NotNull String event) {
String tId = state.pending() ? "-" : String.valueOf(state.timerId());
String tickAndRound = state.pending() ? "-/-" : (tick + "/" + round);
return "Scheduling[" + tId + "::" + trigger.type() + "::" + jobData.externalId() + "][" + tickAndRound + "]" +
"::[" + at + "] - " + event;
return MessageFormat.format("Scheduling{0}{1}::{2} - {3}",
brackets(tId + "::" + trigger.type() + "::" + jobData.externalId()),
brackets(tickAndRound), brackets(at), event);
}

private <R> Future<R> executeBlocking(WorkerExecutor workerExecutor, Consumer<Promise<R>> operation) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.github.zero88.schedulerx.trigger;

import static io.github.zero88.schedulerx.impl.Utils.brackets;

import java.time.Instant;
import java.time.temporal.ChronoUnit;

Expand Down Expand Up @@ -32,10 +34,10 @@ final class CronSchedulerImpl<IN, OUT> extends AbstractScheduler<IN, OUT, CronTr
final long nextTriggerAfter = trigger().nextTriggerAfter(now);
final Instant nextTriggerTime = now.plus(nextTriggerAfter, ChronoUnit.MILLIS);
nextTimerId = vertx().setTimer(nextTriggerAfter, tId -> {
onRun(workerExecutor, TriggerContextFactory.kickoff(trigger().type(), onFire(tId)));
onProcess(workerExecutor, TriggerContextFactory.kickoff(trigger().type(), onFire(tId)));
doStart(workerExecutor);
});
log(now, "Next schedule at [" + nextTriggerTime + "] by timerId[" + nextTimerId + "]");
log(now, "Next schedule at" + brackets(nextTriggerTime) + " by timerId" + brackets(nextTimerId));
return Future.succeededFuture(nextTimerId);
} catch (Exception ex) {
return Future.failedFuture(ex);
Expand All @@ -45,7 +47,7 @@ final class CronSchedulerImpl<IN, OUT> extends AbstractScheduler<IN, OUT, CronTr
@Override
protected void unregisterTimer(long timerId) {
boolean result = vertx().cancelTimer(nextTimerId);
log(Instant.now(), "Unregistered timerId[" + nextTimerId + "][" + result + "]");
log(Instant.now(), "Unregistered timerId" + brackets(nextTimerId) + brackets(result));
}

static final class CronSchedulerBuilderImpl<IN, OUT>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.github.zero88.schedulerx.trigger;

import static io.github.zero88.schedulerx.impl.Utils.brackets;

import java.time.Instant;
import java.util.Objects;

Expand Down Expand Up @@ -39,11 +41,11 @@ final class EventSchedulerImpl<IN, OUT, T> extends AbstractScheduler<IN, OUT, Ev
consumer = trigger().isLocalOnly()
? vertx().eventBus().localConsumer(address)
: vertx().eventBus().consumer(address);
consumer.handler(msg -> onRun(workerExecutor, createKickoffContext(msg, onFire(timerId))))
consumer.handler(msg -> onProcess(workerExecutor, createKickoffContext(msg, onFire(timerId))))
.completionHandler(event -> {
if (event.failed()) {
promise.fail(
new IllegalStateException("Unable to register a subscriber on address[" + address + "]",
new IllegalStateException("Unable to register a subscriber on address" + brackets(address),
event.cause()));
} else {
promise.complete(timerId);
Expand All @@ -57,15 +59,15 @@ protected void unregisterTimer(long timerId) {
if (Objects.nonNull(consumer)) {
consumer.unregister()
.onComplete(r -> log(Instant.now(),
"Unregistered EventBus subscriber on address[" + consumer.address() + "]" +
"[" + r.succeeded() + "][" + r.cause() + "]"));
"Unregistered EventBus subscriber on address" + brackets(consumer.address()) +
brackets(r.succeeded()) + brackets(r.cause())));
}
}

@Override
@SuppressWarnings("unchecked")
protected TriggerTransitionContext evaluateTrigger(@NotNull TriggerTransitionContext triggerContext) {
final TriggerTransitionContext ctx = super.evaluateTrigger(triggerContext);
protected TriggerTransitionContext evaluateTriggerRule(@NotNull TriggerTransitionContext triggerContext) {
final TriggerTransitionContext ctx = super.evaluateTriggerRule(triggerContext);
try {
if (ctx.condition().status() == TriggerCondition.TriggerStatus.READY &&
!trigger().getPredicate().test((T) triggerContext.info())) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package io.github.zero88.schedulerx.trigger;

import static io.github.zero88.schedulerx.impl.Utils.brackets;

import java.time.Instant;
import java.util.function.LongSupplier;

import org.jetbrains.annotations.NotNull;

Expand All @@ -26,17 +27,13 @@ final class IntervalSchedulerImpl<IN, OUT> extends AbstractScheduler<IN, OUT, In

protected @NotNull Future<Long> registerTimer(WorkerExecutor workerExecutor) {
try {
final long interval = trigger().intervalInMilliseconds();
final String type = trigger().type();
final LongSupplier timer = () -> vertx().setPeriodic(interval, tId -> onRun(workerExecutor,
TriggerContextFactory.kickoff(type, onFire(tId))));
if (trigger().noDelay()) {
return Future.succeededFuture(timer.getAsLong());
return Future.succeededFuture(createPeriodicTimer(workerExecutor));
}
final Promise<Long> promise = Promise.promise();
final long delay = trigger().delayInMilliseconds();
log(Instant.now(), "Delay [" + delay + "ms] then register the trigger in the scheduler");
vertx().setTimer(delay, ignore -> promise.complete(timer.getAsLong()));
log(Instant.now(), "Delay " + brackets(delay + "ms") + " then register the trigger in the scheduler");
vertx().setTimer(delay, ignore -> promise.complete(createPeriodicTimer(workerExecutor)));
return promise.future();
} catch (Exception e) {
return Future.failedFuture(e);
Expand All @@ -46,7 +43,12 @@ final class IntervalSchedulerImpl<IN, OUT> extends AbstractScheduler<IN, OUT, In
@Override
protected void unregisterTimer(long timerId) {
boolean result = vertx().cancelTimer(timerId);
log(Instant.now(), "Unregistered timerId[" + timerId + "][" + result + "]");
log(Instant.now(), "Unregistered timerId" + brackets(timerId) + brackets(result));
}

private long createPeriodicTimer(WorkerExecutor executor) {
return vertx().setPeriodic(trigger().intervalInMilliseconds(),
id -> onProcess(executor, TriggerContextFactory.kickoff(trigger().type(), onFire(id))));
}

// @formatter:off
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.github.zero88.schedulerx.trigger.rule;

import static io.github.zero88.schedulerx.impl.Utils.brackets;

import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.time.DateTimeException;
Expand Down Expand Up @@ -47,7 +49,7 @@ Timeframe create(String type, Object from, Object to, Map<String, Object> other)
final Class<?> cls = Class.forName(Objects.requireNonNull(type, "Timeframe type is required"));
final Constructor<? extends Timeframe> constructor = this.timeframeMapper.get(cls);
if (constructor == null) {
throw new UnsupportedOperationException("Unrecognized a timeframe with type[" + type + "]");
throw new UnsupportedOperationException("Unrecognized a timeframe with type" + brackets(type));
}
final Timeframe instance = constructor.newInstance();
if (instance instanceof BaseTimeframe) {
Expand All @@ -57,7 +59,7 @@ Timeframe create(String type, Object from, Object to, Map<String, Object> other)
} catch (InstantiationException | IllegalAccessException | InvocationTargetException ex) {
throw new IllegalArgumentException("Unable to init new timeframe instance", ex);
} catch (ClassNotFoundException ex) {
throw new UnsupportedOperationException("Unrecognized a timeframe with type[" + type + "]", ex);
throw new UnsupportedOperationException("Unrecognized a timeframe with type" + brackets(type), ex);
} catch (DateTimeException | NullPointerException ex) {
throw new IllegalArgumentException("Unable to parse a timeframe. Cause: " + ex.getMessage(), ex);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.github.zero88.schedulerx.trigger.rule;

import static io.github.zero88.schedulerx.impl.Utils.brackets;

import java.time.DateTimeException;
import java.util.Objects;

Expand Down Expand Up @@ -45,7 +47,7 @@ default <T> T normalize(@NotNull TimeParser<T> parser, Object rawValue) {
try {
return parser.parse(rawValue);
} catch (ClassCastException ex) {
throw new DateTimeException("Unsupported input type[" + rawValue.getClass().getName() + "]");
throw new DateTimeException("Unsupported input type" + brackets(rawValue.getClass().getName()));
}
}

Expand Down
10 changes: 6 additions & 4 deletions core/src/test/java/io/github/zero88/schedulerx/TestUtils.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.github.zero88.schedulerx;

import static io.github.zero88.schedulerx.impl.Utils.brackets;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
Expand All @@ -20,9 +22,9 @@ private TestUtils() { }
@SuppressWarnings("java:S2925")
public static void block(Duration duration, VertxTestContext testContext) {
try {
LOGGER.info("Doing a mock stuff in [" + duration + "]...");
LOGGER.info("Doing a mock stuff in " + brackets(duration) + "...");
TimeUnit.MILLISECONDS.sleep(duration.toMillis());
LOGGER.info("Wake up after [" + duration + "]!!!");
LOGGER.info("Wake up after " + brackets(duration) + "!!!");
} catch (InterruptedException e) {
testContext.failNow(e);
}
Expand Down Expand Up @@ -61,9 +63,9 @@ public TestWorker(String name, Runnable runnable, CountDownLatch latch, List<Exc
@Override
public void run() {
try {
LOGGER.info("[" + getName() + "] created, blocked by the latch...");
LOGGER.info(brackets(getName()) + " created, blocked by the latch...");
latch.await();
LOGGER.info("[" + getName() + "] starts at: " + Instant.now());
LOGGER.info(brackets(getName()) + " started at" + brackets(Instant.now()));
runnable.run();
} catch (InterruptedException e) {
testContext.failNow(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,16 @@ public void onUnableSchedule(@NotNull ExecutionResult<OUT> result) {
Assertions.assertNotNull(result.unscheduledAt());
Assertions.assertNotNull(result.triggerContext());
Assertions.assertTrue(result.triggerContext().isError());
Assertions.assertEquals(-1, result.tick());
Assertions.assertEquals(-1, result.round());
Assertions.assertNull(result.availableAt());
Assertions.assertNull(result.rescheduledAt());
Assertions.assertNull(result.triggeredAt());
Assertions.assertNull(result.executedAt());
Assertions.assertNull(result.finishedAt());
Assertions.assertNull(result.completedAt());
verify(result, unableSchedule);
});
verify(result, unableSchedule);
}

@Override
Expand Down

0 comments on commit a51e550

Please sign in to comment.