Skip to content

Commit

Permalink
feat(#103): add SchedulingMonitorInternal
Browse files Browse the repository at this point in the history
  • Loading branch information
zero88 committed Jan 15, 2024
1 parent 3df5ede commit 17cafa1
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.github.zero88.schedulerx.JobExecutorConfig;
import io.github.zero88.schedulerx.Scheduler;
import io.github.zero88.schedulerx.SchedulerConfig;
import io.github.zero88.schedulerx.SchedulingLogMonitor;
import io.github.zero88.schedulerx.SchedulingMonitor;
import io.github.zero88.schedulerx.TimeClock;
import io.github.zero88.schedulerx.TimeoutBlock;
Expand Down Expand Up @@ -58,7 +57,7 @@ public abstract class AbstractScheduler<IN, OUT, T extends Trigger>

private final @NotNull Vertx vertx;
private final @NotNull SchedulerStateInternal<OUT> state;
private final @NotNull SchedulingMonitor<OUT> monitor;
private final @NotNull SchedulingMonitorInternal<OUT> monitor;
private final @NotNull JobData<IN> jobData;
private final @NotNull Job<IN, OUT> job;
private final @NotNull T trigger;
Expand All @@ -74,13 +73,13 @@ protected AbstractScheduler(Vertx vertx, TimeClock clock, SchedulingMonitor<OUT>
JobData<IN> jobData, TimeoutPolicy timeoutPolicy, T trigger,
TriggerEvaluator evaluator) {
this.vertx = Objects.requireNonNull(vertx, "Vertx instance is required");
this.clock = Optional.ofNullable(clock).orElseGet(TimeClockImpl::new);
this.monitor = Optional.ofNullable(monitor).orElseGet(SchedulingLogMonitor::create);
this.state = new SchedulerStateImpl<>(this.clock);
this.job = Objects.requireNonNull(job, "Job is required");
this.trigger = Objects.requireNonNull(trigger, "Trigger is required");
this.clock = Optional.ofNullable(clock).orElseGet(TimeClockImpl::new);
this.jobData = Optional.ofNullable(jobData).orElseGet(JobData::empty);
this.timeoutPolicy = Optional.ofNullable(timeoutPolicy).orElseGet(TimeoutPolicy::byDefault);
this.trigger = Objects.requireNonNull(trigger, "Trigger is required");
this.monitor = new SchedulingMonitorImpl<>(vertx, monitor);
this.state = new SchedulerStateImpl<>(this.clock);
this.evaluator = new InternalTriggerEvaluator(this, evaluator);
}

Expand All @@ -91,7 +90,7 @@ protected AbstractScheduler(Vertx vertx, TimeClock clock, SchedulingMonitor<OUT>
public final @NotNull TimeClock clock() { return clock; }

@Override
public final @NotNull SchedulingMonitor<OUT> monitor() { return monitor; }
public final @NotNull SchedulingMonitor<OUT> monitor() { return monitor.unwrap(); }

@Override
public final @NotNull Job<IN, OUT> job() { return job; }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.github.zero88.schedulerx.impl;

import static io.github.zero88.schedulerx.impl.Utils.brackets;

import java.util.Objects;
import java.util.function.Consumer;

import io.github.zero88.schedulerx.ExecutionResult;
import io.github.zero88.schedulerx.SchedulingMonitor;
import io.vertx.core.impl.logging.Logger;

abstract class SchedulingMonitorAbstract<OUT> implements SchedulingMonitor<OUT> {

protected abstract Logger logger();

protected final void dispatch(Class<? extends SchedulingMonitor> monitorCls,
Consumer<ExecutionResult<OUT>> dispatcher, ExecutionResult<OUT> result) {
try {
if (Objects.nonNull(dispatcher)) {
dispatcher.accept(result);
}
} catch (Throwable ex) {
logger().warn(
"Unexpected error in " + brackets(monitorCls.getName()) + "when dispatching the execution result", ex);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package io.github.zero88.schedulerx.impl;

import java.util.Optional;

import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import io.github.zero88.schedulerx.ExecutionResult;
import io.github.zero88.schedulerx.SchedulingLogMonitor;
import io.github.zero88.schedulerx.SchedulingMonitor;
import io.github.zero88.schedulerx.WorkerExecutorFactory;
import io.vertx.core.Vertx;
import io.vertx.core.WorkerExecutor;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;

final class SchedulingMonitorImpl<OUT> extends SchedulingMonitorAbstract<OUT>
implements SchedulingMonitorInternal<OUT> {

private final WorkerExecutor executor;
private final SchedulingMonitor<OUT> monitor;

public SchedulingMonitorImpl(@NotNull Vertx vertx, @Nullable SchedulingMonitor<OUT> monitor) {
this.executor = WorkerExecutorFactory.createMonitorWorker(vertx);
this.monitor = Optional.ofNullable(monitor).orElseGet(SchedulingLogMonitor::create);
}

@Override
protected Logger logger() {
return LoggerFactory.getLogger(SchedulingMonitorInternal.class);
}

@Override
public void onUnableSchedule(@NotNull ExecutionResult<OUT> result) {
executor.executeBlocking(event -> dispatch(monitor.getClass(), monitor::onUnableSchedule, result));
}

@Override
public void onSchedule(@NotNull ExecutionResult<OUT> result) {
executor.executeBlocking(event -> dispatch(monitor.getClass(), monitor::onSchedule, result));
}

@Override
public void onMisfire(@NotNull ExecutionResult<OUT> result) {
executor.executeBlocking(event -> dispatch(monitor.getClass(), monitor::onMisfire, result));
}

@Override
public void onEach(@NotNull ExecutionResult<OUT> result) {
executor.executeBlocking(event -> dispatch(monitor.getClass(), monitor::onEach, result));
}

@Override
public void onCompleted(@NotNull ExecutionResult<OUT> result) {
executor.executeBlocking(event -> dispatch(monitor.getClass(), monitor::onCompleted, result));
}

public SchedulingMonitor<OUT> unwrap() {
return this.monitor;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.github.zero88.schedulerx.impl;

import io.github.zero88.schedulerx.SchedulingMonitor;

/**
* An internal scheduling monitor to ensure the monitor operation is run on the dedicated thread
*
* @param <OUT> Type of job result data
* @since 2.0.0
*/
interface SchedulingMonitorInternal<OUT> extends SchedulingMonitor<OUT> {

SchedulingMonitor<OUT> unwrap();

}
19 changes: 19 additions & 0 deletions core/src/test/java/io/github/zero88/schedulerx/SchedulerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,25 @@ void test_scheduler_should_evaluate_trigger_in_dedicated_thread(WorkerThreadChec
vertx.eventBus().publish(address, "test");
}

@Test
void test_scheduler_should_monitor_result_in_dedicated_thread(Vertx vertx, VertxTestContext testContext) {
final WorkerThreadChecker c0 = WorkerThreadChecker.create(v -> null, "scheduler.x-monitor-thread-2s");
final SchedulingMonitor<Object> asserter = SchedulingAsserter.builder()
.setTestContext(testContext)
.setSchedule(r -> c0.doAssert())
.setEach(r -> c0.doAssert())
.setCompleted(r -> c0.doAssert())
.build();
final IntervalTrigger trigger = IntervalTrigger.builder().interval(Duration.ofSeconds(2)).repeat(2).build();
IntervalScheduler.builder()
.setVertx(vertx)
.setMonitor(asserter)
.setTrigger(trigger)
.setJob(NoopJob.create())
.build()
.start();
}

@Test
void test_scheduler_should_be_timeout_in_execution(Vertx vertx, VertxTestContext testContext) {
final Duration timeout = Duration.ofSeconds(2);
Expand Down

0 comments on commit 17cafa1

Please sign in to comment.