Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug form replied and if condition #155

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions workflow-bot-app/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@ javadoc {
dependencies {
implementation project(':workflow-language')

implementation platform('org.finos.symphony.bdk:symphony-bdk-bom:2.9.0')
implementation platform('org.finos.symphony.bdk:symphony-bdk-bom:2.9.0') {
exclude group: 'org.slf4j', module: 'slf4j-api'
}
implementation platform('com.fasterxml.jackson:jackson-bom:2.13.4')
implementation platform('org.springframework.boot:spring-boot-dependencies:2.6.6')

implementation 'org.apache.httpcomponents.client5:httpclient5-fluent:5.1.3'
implementation ('org.apache.httpcomponents.client5:httpclient5-fluent:5.1.3') {
exclude group: 'org.slf4j', module: 'slf4j-api'
}

implementation 'org.finos.symphony.bdk:symphony-bdk-core-spring-boot-starter'
implementation 'org.finos.symphony.bdk.ext:symphony-group-extension'
Expand All @@ -28,8 +32,8 @@ dependencies {
runtimeOnly 'io.micrometer:micrometer-registry-prometheus'
runtimeOnly 'com.h2database:h2'

implementation 'org.slf4j:slf4j-api'
runtimeOnly 'ch.qos.logback:logback-classic'
implementation 'org.slf4j:slf4j-api:1.7.36'
implementation 'ch.qos.logback:logback-classic:1.2.11'

implementation 'org.aspectj:aspectjrt:1.9.9.1'
implementation 'org.aspectj:aspectjweaver:1.9.9.1'
Expand Down Expand Up @@ -63,10 +67,14 @@ dependencies {
testImplementation 'org.junit.jupiter:junit-jupiter-params'
testImplementation 'org.assertj:assertj-core'
testImplementation 'org.awaitility:awaitility'
testImplementation 'com.github.tomakehurst:wiremock-jre8:2.34.0'
testImplementation ('com.github.tomakehurst:wiremock-jre8:2.34.0') {
exclude group: 'org.slf4j', module: 'slf4j-api'
}
testImplementation 'io.rest-assured:rest-assured:4.5.1'
testImplementation 'org.hamcrest:hamcrest-all:1.3'
testImplementation("org.camunda.community.mockito:camunda-platform-7-mockito:6.17.1")
testImplementation("org.camunda.community.mockito:camunda-platform-7-mockito:6.17.1") {
exclude group: 'org.slf4j', module: 'slf4j-api'
}
}

bootBuildImage {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ private void computeEvents(int activityIndex, String activityId, List<Activity>
Optional<String> signalName = eventMapper.toSignalName(event, workflow);
if (signalName.isPresent()) {
eventNodeId = signalName.get();

if (activity.getActivity() != null && StringUtils.isNotBlank(activity.getActivity().getIfCondition())) {
directGraph.readWorkflowNode(activityId)
.addIfCondition(eventNodeId, activity.getActivity().getIfCondition());
}
computeActivity(activityIndex, activities, eventNodeId, event, onEvents, directGraph);
computeSignal(directGraph, event, eventNodeId, activityIndex, activities);
} else if (event.getActivityExpired() != null) {
Expand Down Expand Up @@ -213,7 +218,8 @@ private void computeActivity(int activityIndex, List<Activity> activities, Strin
if (activityIndex == 0) {
validateFirstActivity(activities.get(0).getActivity(), event, workflow.getId());
directGraph.addStartEvent(nodeId);
} else {
} else if (!directGraph.getParents(activities.get(activityIndex - 1).getActivity().getId())
.contains(nodeId)) { // the current event node is not a parent of previous activity
boolean isParallel = onEvents.isParallel();
String allOfEventParentId = onEvents.getParentId();
String parentId = isParallel && StringUtils.isNotEmpty(allOfEventParentId) ? allOfEventParentId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,7 @@ public Optional<String> toSignalName(Event event, Workflow workflow) {
}

@SuppressWarnings("unchecked")
public <T> void dispatch(RealTimeEvent<T> event)
throws PresentationMLParserException {

public <T> void dispatch(RealTimeEvent<T> event) throws PresentationMLParserException {
Map<String, Object> processVariables = new HashMap<>();
processVariables.put(ActivityExecutorContext.EVENT,
new EventHolder<>(event.getInitiator(), event.getSource(), new HashMap<>()));
Expand Down Expand Up @@ -283,6 +281,7 @@ private void messageSentToMessage(RealTimeEvent<V4MessageSent> event,
// Event's message cannot be null, this if statement is only added to fix Sonar warnings
if (event.getSource().getMessage() != null) {
log.debug("receive message [{}]", event.getSource().getMessage().getMessageId());
log.trace("receive message [{}]", event.getSource().getMessage().getMessage());
String presentationMl = event.getSource().getMessage().getMessage();
String receivedContent = PresentationMLParser.getTextContent(presentationMl);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
import com.symphony.bdk.workflow.engine.WorkflowDirectGraph;
import com.symphony.bdk.workflow.engine.WorkflowNodeType;

import lombok.experimental.UtilityClass;
import org.camunda.bpm.model.bpmn.builder.AbstractFlowNodeBuilder;
import org.camunda.bpm.model.bpmn.builder.AbstractGatewayBuilder;
import org.camunda.bpm.model.bpmn.builder.SubProcessBuilder;

/**
* Helper class on checks or common actions
*/
@UtilityClass
public class BpmnBuilderHelper {

public static AbstractFlowNodeBuilder<?, ?> endEventSubProcess(BuildProcessContext context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private void computeChildren(WorkflowNode currentNode, AbstractFlowNodeBuilder<?
BuildProcessContext context) throws JsonProcessingException {
String currentNodeId = currentNode.getId();
NodeChildren currentNodeChildren = context.readChildren(currentNodeId);
if (currentNodeChildren != null) {
if (currentNodeChildren != null && !currentNodeChildren.isEmpty()) {
if (currentNodeChildren.getGateway() == WorkflowDirectGraph.Gateway.PARALLEL) {
builder = builder.parallelGateway(currentNodeId + FORK_GATEWAY);
} else {
Expand All @@ -168,9 +168,17 @@ private void computeChildren(WorkflowNode currentNode, AbstractFlowNodeBuilder<?

private AbstractFlowNodeBuilder<?, ?> exclusiveSubTreeNodes(String currentNodeId, WorkflowNodeType currentNodeType,
AbstractFlowNodeBuilder<?, ?> builder, BuildProcessContext context, NodeChildren currentNodeChildren) {
if (currentNodeType == WorkflowNodeType.FORM_REPLIED_EVENT || hasFormRepliedEvent(context,
currentNodeChildren)) {
log.trace("the node [{}] itself or one of its children is a form replied event", currentNodeId);
if (currentNodeType == WorkflowNodeType.FORM_REPLIED_EVENT) {
log.trace("the node [{}] itself is a form replied event", currentNodeId);
boolean activities = hasActivitiesOnly(context, currentNodeChildren);
boolean conditional = hasConditionalString(context, currentNodeChildren, currentNodeId);
log.trace("are the children of the node [{}]'s all activities ? [{}], is there any condition in children ? [{}]",
currentNodeId, activities, conditional);
builder = addGateway(currentNodeId, builder, activities, conditional, currentNodeChildren.getChildren().size());
return builder;
}
if (hasFormRepliedEvent(context, currentNodeChildren)) {
log.trace("one of [{}] children is a form replied event", currentNodeId);
return builder;
}
// in case of conditional loop, add a default end event
Expand All @@ -191,28 +199,42 @@ private void computeChildren(WorkflowNode currentNode, AbstractFlowNodeBuilder<?
boolean conditional = hasConditionalString(context, currentNodeChildren, currentNodeId);
log.trace("are the children of the node [{}]'s all activities ? [{}], is there any condition in children ? [{}]",
currentNodeId, activities, conditional);
builder = addGateway(currentNodeId, builder, activities, conditional);
builder = addGateway(currentNodeId, builder, activities, conditional, currentNodeChildren.getChildren().size());
return builder;
}

private AbstractFlowNodeBuilder<?, ?> addGateway(String currentNodeId, AbstractFlowNodeBuilder<?, ?> builder,
boolean activities, boolean conditional) {
boolean activities, boolean conditional, int childrenSize) {
// determine the gateway type
if (activities && conditional) {
log.trace("an exclusive gateway is added follow the node [{}]", currentNodeId);
builder = builder.exclusiveGateway(currentNodeId + EXCLUSIVE_GATEWAY_SUFFIX);
} else if (!activities) {
builder = builder.exclusiveGateway((currentNodeId.replace("/", "") + EXCLUSIVE_GATEWAY_SUFFIX));
} else if (!activities && (conditional || childrenSize > 1)) {
log.trace("an event gateway is added follow the node [{}]", currentNodeId);
builder = builder.eventBasedGateway().id(currentNodeId + EVENT_GATEWAY_SUFFIX);
}
return builder;
}

@SuppressWarnings("checkstyle:JavadocTagContinuationIndentation")
private void leafNode(String currentNodeId, AbstractFlowNodeBuilder<?, ?> camundaBuilder,
BuildProcessContext context) {
camundaBuilder = camundaBuilder.endEvent();
context.addNodeBuilder(currentNodeId, camundaBuilder);
context.addLastNodeBuilder(camundaBuilder);
// if builder is an instance of sub process builder, the node should be already ended {@see SignalNodeBuilder#31},
// skip the ending
/*
* on:
* one-of:
* - form-replied:
* form-id: init
* exclusive: true
* - message-received:
* content: hey
*/
if (!(camundaBuilder instanceof SubProcessBuilder)) {
camundaBuilder = camundaBuilder.endEvent();
context.addNodeBuilder(currentNodeId, camundaBuilder);
context.addLastNodeBuilder(camundaBuilder);
}
}

private boolean hasFormRepliedEvent(BuildProcessContext context, NodeChildren currentNodeChildren) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
import static com.symphony.bdk.workflow.engine.camunda.bpmn.CamundaBpmnBuilder.EXCLUSIVE_GATEWAY_SUFFIX;

import com.symphony.bdk.workflow.engine.WorkflowNode;
import com.symphony.bdk.workflow.engine.camunda.bpmn.BpmnBuilderHelper;
import com.symphony.bdk.workflow.engine.camunda.bpmn.BuildProcessContext;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.camunda.bpm.model.bpmn.builder.AbstractFlowNodeBuilder;
import org.camunda.bpm.model.bpmn.builder.AbstractGatewayBuilder;
import org.camunda.bpm.model.bpmn.builder.SubProcessBuilder;

import java.util.List;
import java.util.stream.Collectors;

public abstract class AbstractNodeBpmnBuilder implements WorkflowNodeBpmnBuilder {

Expand All @@ -34,6 +35,11 @@ public abstract class AbstractNodeBpmnBuilder implements WorkflowNodeBpmnBuilder
// then connect the activity
if (context.hasEventSubProcess() && context.getParents(element.getId()).size() > 1) {
builder = endEventSubProcess(context, builder);
// since the sub process is ended here, and the child node is going to be connected by this ended sub process,
// therefore, all other branches remove their same child and are going to be ended inside the sub process.
List<String> parents =
context.getParents(element.getId()).stream().filter(k -> !k.equals(parentId)).collect(Collectors.toList());
parents.forEach(parent -> context.getChildren(parent).removeChild(element.getId()));
}
return build(element, parentId, builder, context);
}
Expand All @@ -58,22 +64,9 @@ public abstract class AbstractNodeBpmnBuilder implements WorkflowNodeBpmnBuilder
}

protected void connectToExistingNode(String nodeId, AbstractFlowNodeBuilder<?, ?> builder) {
// if builder is an instance of sub process builder, the node should be already connected, skip the connection
/*
* on:
* one-of:
* - form-replied:
* form-id: init
* exclusive: true
* - message-received:
* content: hey
*/
if (!(builder instanceof SubProcessBuilder)) {
builder.connectTo(nodeId);
}
builder.connectTo(nodeId);
}

protected abstract AbstractFlowNodeBuilder<?, ?> build(WorkflowNode element, String parentId,
AbstractFlowNodeBuilder<?, ?> builder, BuildProcessContext context) throws JsonProcessingException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class SignalNodeBuilder extends AbstractNodeBpmnBuilder {
builder = ((AbstractCatchEventBuilder<?, ?>) builder).camundaAsyncBefore()
.signal(element.getId())
.name(element.getId());
} else if (builder instanceof AbstractGatewayBuilder) {
} else {
builder = builder.intermediateCatchEvent().camundaAsyncBefore().signal(element.getId()).name(element.getId());
}
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ public class SendMessageExecutor extends OboExecutor<SendMessage, V4Message>

@Override
public void execute(ActivityExecutorContext<SendMessage> execution) throws IOException {
log.debug("Sending message...");
SendMessage activity = execution.getActivity();
List<String> streamIds = resolveStreamId(execution, activity, execution.bdk().streams());
log.debug("Sending message to rooms {}", streamIds);

Message messageToSend = this.buildMessage(execution);
log.trace("message content \n {}", messageToSend.getContent());

V4Message message;
if (streamIds.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -22,6 +23,9 @@
import org.apache.commons.lang3.StringEscapeUtils;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.Collections;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -306,8 +310,7 @@ void sendFormOutputsArePreserved() throws Exception {
return true;
});

assertThat(workflow)
.executed("init", "check");
assertThat(workflow).executed("init", "check");
}

@Test
Expand Down Expand Up @@ -353,4 +356,80 @@ void sendMessageUpdateMessage() throws Exception {
assertThat(workflow).executed("init", "message-received_/hey", "update");
});
}

@Test
void formRepliedSendMessageOnConditionIf() throws Exception {
Workflow workflow =
SwadlParser.fromYaml(getClass().getResourceAsStream("/form/send-form-reply-conditional.swadl.yaml"));

when(messageService.send(anyString(), any(Message.class))).thenReturn(message("msgId"));

engine.deploy(workflow);

// trigger workflow execution
engine.onEvent(messageReceived("/test"));
verify(messageService, timeout(5000)).send(anyString(), contains("form"));
clearInvocations(messageService);

await().atMost(5, TimeUnit.SECONDS).ignoreExceptions().until(() -> {
engine.onEvent(form("msgId", "testForm", Collections.singletonMap("action", "create")));
return true;
});
verify(messageService, timeout(5000)).send(anyString(), contains("Create"));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this verifies what you think.
In line 337, we verify that we sent the form (content contains the word form).
In line 343, we want to verify that we sent a the message "Create". I don't know why Mockito.verify does not get updated, and it gets applied on the first one. So both Mockito.verify applies on the first messageService.send(..), which contains both "form" and "Create" words.

If you change the activity "resCreate" in swadl to send something else than "Create", let's say "CreateABC", and you change the test with (contains("CreateABC")), the test will fail.. One way to force Mockito.verify to apply to the new sendMessage call, is to change the to.streamId="" to a different one and replace anyString() in verify(..).

verify(messageService, timeout(5000)).send("ANOTHER_STREAM_ID", contains("SOMETHING_DIFF_FROM_CREATE"));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, i see your point, i add then the Mockito.clearInvocations(T mock) to clear the invocations on messageService mock after each verify, so that we are sure we have the right verification.


Thread.sleep(1000);
assertThat(workflow).executed(workflow, "testForm", "resCreate");
}

@Test
void formRepliedSendMessageOnConditionElse() throws Exception {
Workflow workflow =
SwadlParser.fromYaml(getClass().getResourceAsStream("/form/send-form-reply-conditional.swadl.yaml"));

when(messageService.send(anyString(), any(Message.class))).thenReturn(message("msgId"));

engine.deploy(workflow);

// trigger workflow execution
engine.onEvent(messageReceived("/test"));
verify(messageService, timeout(5000)).send(anyString(), contains("form"));
clearInvocations(messageService);

await().atMost(5, TimeUnit.SECONDS).ignoreExceptions().until(() -> {
engine.onEvent(form("msgId", "testForm", Collections.singletonMap("action", "menu")));
return true;
});
verify(messageService, timeout(5000)).send(anyString(), contains("Menu"));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

clearInvocations(messageService);

sleepToTimeout(1000);
engine.onEvent(messageReceived("/continue"));
verify(messageService, timeout(5000)).send(anyString(), contains("DONE"));

assertThat(workflow).executed(workflow, "testForm", "resMenu", "finish");
}

@ParameterizedTest
@CsvSource(value = {"GOOG,response0", "GOOGLE,response1"})
void formReplied_fork_condition_join_activity(String tickerValue, String expectedActivity) throws Exception {
Workflow workflow =
SwadlParser.fromYaml(getClass().getResourceAsStream("/form/send-form-reply-join-activity.swadl.yaml"));

when(messageService.send(anyString(), any(Message.class))).thenReturn(message("msgId"));

engine.deploy(workflow);

// trigger workflow execution
engine.onEvent(messageReceived("/go"));
verify(messageService, timeout(2000)).send(anyString(), contains("form"));
clearInvocations(messageService);

await().atMost(5, TimeUnit.SECONDS).ignoreExceptions().until(() -> {
engine.onEvent(form("msgId", "sendForm", Collections.singletonMap("ticker", tickerValue)));
return true;
});
verify(messageService, timeout(2000)).send(anyString(), contains("END"));

assertThat(workflow).executed(workflow, "sendForm", expectedActivity, "response2");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ public static RealTimeEvent<V4MessageSent> messageReceived(String content) {

V4MessageSent messageSent = new V4MessageSent();
V4Message message = new V4Message();
message.setMessageId("msgId");
message.setMessage("<presentationML>" + content + "</presentationML>");
messageSent.setMessage(message);
V4Stream stream = new V4Stream();
Expand Down
Loading