Skip to content

Commit

Permalink
feat: add DefaultStep and DefaultMetadataStep classes
Browse files Browse the repository at this point in the history
  • Loading branch information
juliengalet committed Nov 25, 2021
1 parent fb65c6b commit 85a5878
Show file tree
Hide file tree
Showing 9 changed files with 1,445 additions and 155 deletions.
1,274 changes: 1,122 additions & 152 deletions README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public final RetryableFlow<T> build() {

public interface Delay<T extends FlowContext> {
/**
* Define the delay between failure and the next retry.
* Define the delay in milliseconds between failure and the next retry.
*
* @param delay The delay
* @return {@link RetryableFlowBuilder.Build} builder step
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package io.github.juliengalet.reactorflow.flow;

import io.github.juliengalet.reactorflow.builder.StepFlowBuilder;
import io.github.juliengalet.reactorflow.report.FlowContext;
import io.github.juliengalet.reactorflow.report.Metadata;
import io.github.juliengalet.reactorflow.report.Report;
import reactor.core.publisher.Mono;

/**
* Class that should be extended, in order to be able to create {@link StepFlow}, with injecting services possibility
*
* @param <T> The context type
* @param <M> The metadata type
*/
public class DefaultMetadataStep<T extends FlowContext, M> {
private static final String DEFAULT_NAME = "Default";

/**
* Overridable method that should return the name of your step.
*
* @return The name
*/
protected String getName() {
return DEFAULT_NAME;
}

/**
* Overridable method that should implement the logic of the step.
*
* @param context The current {@link T} flow context
* @param metadata The {@link M} metadata instance
* @return A {@link Report} inside a Mono
*/
protected Mono<Report<T>> getExecution(T context, Metadata<M> metadata) {
return Mono.just(Report.success(context));
}

/**
* This method build the step. You should call it to plug your step as a {@link StepFlow} inside flows.
*
* @return The built {@link StepFlow}
*/
public final StepFlow<T, M> getStep() {
return StepFlowBuilder
.<T, M>defaultBuilder()
.named(getName())
.execution(this::getExecution)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package io.github.juliengalet.reactorflow.flow;

import io.github.juliengalet.reactorflow.builder.StepFlowBuilder;
import io.github.juliengalet.reactorflow.report.FlowContext;
import io.github.juliengalet.reactorflow.report.Metadata;
import io.github.juliengalet.reactorflow.report.Report;
import reactor.core.publisher.Mono;

/**
* Class that should be extended, in order to be able to create {@link StepFlow}, with injecting services possibility
* (see {@link DefaultMetadataStep} for the version allowing metadata type customization).
*
* @param <T> The context type
*/
public class DefaultStep<T extends FlowContext> {
private static final String DEFAULT_NAME = "Default";

/**
* Overridable method that should return the name of your step.
*
* @return The name
*/
protected String getName() {
return DEFAULT_NAME;
}

/**
* Overridable method that should implement the logic of the step.
*
* @param context The current {@link T} flow context
* @param metadata The metadata instance
* @return A {@link Report} inside a Mono
*/
protected Mono<Report<T>> getExecution(T context, Metadata<Object> metadata) {
return Mono.just(Report.success(context));
}

/**
* This method build the step. You should call it to plug your step as a {@link StepFlow} inside flows.
*
* @return The built {@link StepFlow}
*/
public final StepFlow<T, Object> getStep() {
return StepFlowBuilder
.<T, Object>defaultBuilder()
.named(getName())
.execution(this::getExecution)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,23 @@
import io.github.juliengalet.reactorflow.builder.SequentialFlowBuilder;
import io.github.juliengalet.reactorflow.builder.SwitchFlowBuilder;
import io.github.juliengalet.reactorflow.exception.RecoverableFlowException;
import io.github.juliengalet.reactorflow.flow.Flow;
import io.github.juliengalet.reactorflow.flow.SequentialFlow;
import io.github.juliengalet.reactorflow.report.FlowContext;
import io.github.juliengalet.reactorflow.report.Status;
import io.github.juliengalet.reactorflow.testutils.ErrorStepFlow;
import io.github.juliengalet.reactorflow.testutils.SuccessFinallyStepFlow;
import io.github.juliengalet.reactorflow.testutils.SuccessStepFlow;
import io.github.juliengalet.reactorflow.testutils.SuccessWithIntegerMetadataStepFlow;
import io.github.juliengalet.reactorflow.testutils.SuccessWithStringMetadataStepFlow;
import io.github.juliengalet.reactorflow.testutils.WarningStepFlow;
import org.junit.jupiter.api.Test;
import reactor.test.StepVerifier;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import static io.github.juliengalet.reactorflow.testutils.TestUtils.assertAndLog;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -75,4 +81,76 @@ final void complexCase() {
.assertNext(assertAndLog(globalReport -> assertThat(globalReport.getStatus()).isEqualTo(Status.WARNING)))
.verifyComplete();
}

@Test
@SuppressWarnings("unchecked")
void complexCaseWithMetadata() {
String STRING_LIST_KEY = "STRING_LIST";
String INTEGER_LIST_KEY = "INTEGER_LIST";
String INTEGER_LIST_2_KEY = "INTEGER_LIST_2";

Flow<FlowContext> flowToTest = ParallelFlowBuilder
.builderForMetadataType(String.class)
.named("Parallel")
.parallelizeFromArray(flowContext -> (List<String>) flowContext.get(STRING_LIST_KEY))
.parallelizedFlow(
SequentialFlowBuilder
.defaultBuilder()
.named("Parallelized sequential")
.then(SuccessWithStringMetadataStepFlow.flowNamed("String metadata step"))
.then(ParallelFlowBuilder
.builderForMetadataType(Integer.class)
.named("Nested parallel")
.parallelizeFromArray(flowContext -> (List<Integer>) flowContext.get(INTEGER_LIST_KEY))
.parallelizedFlow(SuccessWithIntegerMetadataStepFlow.flowNamed("Integer metadata step"))
.mergeStrategy(ParallelFlowBuilder.defaultMergeStrategy())
.build()
)
.then(ParallelFlowBuilder
.builderForMetadataType(Integer.class)
.named("Nested parallel 2")
.parallelizeFromArray(flowContext -> (List<Integer>) flowContext.get(INTEGER_LIST_2_KEY))
.parallelizedFlow(SequentialFlowBuilder
.defaultBuilder()
.named("Nested sequential")
.then(SuccessWithIntegerMetadataStepFlow.flowNamed("Integer metadata step 2"))
.then(ErrorStepFlow.flowNamed("Error"))
.doFinally(SuccessFinallyStepFlow.flowNamed("Nested finally"))
.build()
)
.mergeStrategy(ParallelFlowBuilder.defaultMergeStrategy())
.build()
)
.doFinally(SuccessFinallyStepFlow.flowNamed("Finally"))
.build()
)
.mergeStrategy(ParallelFlowBuilder.defaultMergeStrategy())
.build();

FlowContext initialContext = FlowContext.createFrom(Map.of(
STRING_LIST_KEY, List.of("Item 1", "Item 2", "Item 3"),
INTEGER_LIST_KEY, List.of(1, 2, 3),
INTEGER_LIST_2_KEY, List.of(4, 5, 6)
));

StepVerifier
.create(flowToTest.run(initialContext))
.assertNext(assertAndLog(globalReport -> {
assertThat(globalReport.getStatus()).isEqualTo(Status.ERROR);
Set<Map.Entry<String, Object>> entrySet = globalReport.getContext().getEntrySet();

long finallyEntries = entrySet.stream().filter(entry -> entry.getKey().startsWith("Finally |")).count();
long nestedFinallyEntries = entrySet.stream().filter(entry -> entry.getKey().startsWith("Nested finally |")).count();
long integerMetadataEntries = entrySet.stream().filter(entry -> entry.getKey().startsWith("Integer metadata step |")).count();
long integerMetadata2Entries = entrySet.stream().filter(entry -> entry.getKey().startsWith("Integer metadata step 2 |")).count();
long stringMetadataEntries = entrySet.stream().filter(entry -> entry.getKey().startsWith("String metadata step |")).count();

assertThat(finallyEntries).isEqualTo(3);
assertThat(nestedFinallyEntries).isEqualTo(9);
assertThat(integerMetadataEntries).isEqualTo(9);
assertThat(integerMetadata2Entries).isEqualTo(9);
assertThat(stringMetadataEntries).isEqualTo(3);
}))
.verifyComplete();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import io.github.juliengalet.reactorflow.report.Report;
import io.github.juliengalet.reactorflow.report.Status;
import io.github.juliengalet.reactorflow.testutils.CustomContext;
import io.github.juliengalet.reactorflow.testutils.TestUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
Expand All @@ -23,6 +22,7 @@
import java.util.List;
import java.util.Map;

import static io.github.juliengalet.reactorflow.testutils.TestUtils.assertAndLog;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;

Expand Down Expand Up @@ -140,6 +140,25 @@ void givenSuccessClassicExecution_stepFlow_shouldSuccess() {
.verifyComplete();
}

@Test
void givenSuccessClassicExecutionExtendingDefaultMetadataStep_stepFlow_shouldSuccess() {
StepFlow<FlowContext, Object> step = new DefaultStep<>().getStep();

StepVerifier
.create(step.run(FlowContext.create()))
.assertNext(assertAndLog(globalReport -> assertThat(globalReport.getStatus()).isEqualTo(Status.SUCCESS)))
.verifyComplete();
}

@Test
void givenSuccessClassicExecutionExtendingDefaultStep_stepFlow_shouldSuccess() {
StepFlow<FlowContext, Object> step = new DefaultMetadataStep<>().getStep();

StepVerifier
.create(step.run(FlowContext.create()))
.assertNext(assertAndLog(globalReport -> assertThat(globalReport.getStatus()).isEqualTo(Status.SUCCESS)))
.verifyComplete();
}

@Test
void givenSuccessWithMonoContext_stepFlow_shouldSuccess() {
Expand Down Expand Up @@ -330,7 +349,7 @@ void givenTypeErrorInMetadataStep_stepFlow_shouldError() {

StepVerifier
.create(testWithMetadata.run(FlowContext.createFrom(Map.of("data", List.of(new Date())))))
.assertNext(TestUtils.assertAndLog(globalReport -> {
.assertNext(assertAndLog(globalReport -> {
assertThat(globalReport.getStatus()).isEqualTo(Status.ERROR);
Assertions.assertThat(globalReport.getAllRecoveredErrors()).isEmpty();
Assertions.assertThat(globalReport.getAllErrors()).hasSize(1);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.github.juliengalet.reactorflow.testutils;

import io.github.juliengalet.reactorflow.builder.StepFlowBuilder;
import io.github.juliengalet.reactorflow.exception.FlowException;
import io.github.juliengalet.reactorflow.flow.StepFlow;
import io.github.juliengalet.reactorflow.flow.StepWithMetadata;
import io.github.juliengalet.reactorflow.report.FlowContext;
import io.github.juliengalet.reactorflow.report.Metadata;
import io.github.juliengalet.reactorflow.report.Report;
import reactor.core.publisher.Mono;

import java.util.UUID;
import java.util.stream.Collectors;

public final class SuccessFinallyStepFlow<T extends FlowContext, M> implements StepWithMetadata<T, M> {
private final String name;

public static <T extends FlowContext, M> StepFlow<T, M> flowNamed(String name) {
return StepFlowBuilder
.<T, M>defaultBuilder()
.named(name)
.execution(new SuccessFinallyStepFlow<>(name))
.build();
}

public static <T extends FlowContext, M> SuccessFinallyStepFlow<T, M> named(String name) {
return new SuccessFinallyStepFlow<>(name);
}

private SuccessFinallyStepFlow(String name) {
this.name = name;
}

@Override
public Mono<Report<T>> apply(T context, Metadata<M> metadata) {
String errorEntry = String.format(
"%s | %s (%s %s) | %s",
this.name,
metadata.getErrors().stream().map(FlowException::getMessage).collect(Collectors.joining(", ")),
metadata.getData().getClass().getSimpleName(),
metadata.getData(),
UUID.randomUUID().toString()
);
context.put(errorEntry, errorEntry);
return Mono.just(Report.success(context));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.github.juliengalet.reactorflow.testutils;

import io.github.juliengalet.reactorflow.builder.StepFlowBuilder;
import io.github.juliengalet.reactorflow.flow.StepFlow;
import io.github.juliengalet.reactorflow.flow.StepWithMetadata;
import io.github.juliengalet.reactorflow.report.FlowContext;
import io.github.juliengalet.reactorflow.report.Metadata;
import io.github.juliengalet.reactorflow.report.Report;
import reactor.core.publisher.Mono;

import java.util.UUID;

public final class SuccessWithIntegerMetadataStepFlow<T extends FlowContext> implements StepWithMetadata<T, Integer> {
private final String name;

public static <T extends FlowContext> StepFlow<T, Integer> flowNamed(String name) {
return StepFlowBuilder
.<T, Integer>defaultBuilder()
.named(name)
.execution(new SuccessWithIntegerMetadataStepFlow<>(name))
.build();
}

public static <T extends FlowContext> SuccessWithIntegerMetadataStepFlow<T> named(String name) {
return new SuccessWithIntegerMetadataStepFlow<>(name);
}

private SuccessWithIntegerMetadataStepFlow(String name) {
this.name = name;
}

@Override
public Mono<Report<T>> apply(T context, Metadata<Integer> metadata) {
String metadataEntry = String.format("%s | %s | %s", this.name, metadata.getData(), UUID.randomUUID().toString());
context.put(metadataEntry, metadataEntry);
return Mono.just(Report.success(context));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.github.juliengalet.reactorflow.testutils;

import io.github.juliengalet.reactorflow.builder.StepFlowBuilder;
import io.github.juliengalet.reactorflow.flow.StepFlow;
import io.github.juliengalet.reactorflow.flow.StepWithMetadata;
import io.github.juliengalet.reactorflow.report.FlowContext;
import io.github.juliengalet.reactorflow.report.Metadata;
import io.github.juliengalet.reactorflow.report.Report;
import reactor.core.publisher.Mono;

import java.util.UUID;

public final class SuccessWithStringMetadataStepFlow<T extends FlowContext> implements StepWithMetadata<T, String> {
private final String name;

public static <T extends FlowContext> StepFlow<T, String> flowNamed(String name) {
return StepFlowBuilder
.<T, String>defaultBuilder()
.named(name)
.execution(new SuccessWithStringMetadataStepFlow<>(name))
.build();
}

public static <T extends FlowContext> SuccessWithStringMetadataStepFlow<T> named(String name) {
return new SuccessWithStringMetadataStepFlow<>(name);
}

private SuccessWithStringMetadataStepFlow(String name) {
this.name = name;
}

@Override
public Mono<Report<T>> apply(T context, Metadata<String> metadata) {
String metadataEntry = String.format("%s | %s | %s", this.name, metadata.getData(), UUID.randomUUID().toString());
context.put(metadataEntry, metadataEntry);
return Mono.just(Report.success(context));
}
}

0 comments on commit 85a5878

Please sign in to comment.