Skip to content

Commit

Permalink
Merge pull request #155 from symphony-soufiane/fix-bug-form-replied-a…
Browse files Browse the repository at this point in the history
…nd-if-conditio

Fix bug form replied and if condition
  • Loading branch information
yinan-symphony authored Oct 6, 2022
2 parents 9a93761 + c8205c0 commit 16487fc
Show file tree
Hide file tree
Showing 14 changed files with 254 additions and 50 deletions.
20 changes: 14 additions & 6 deletions workflow-bot-app/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@ javadoc {
dependencies {
implementation project(':workflow-language')

implementation platform('org.finos.symphony.bdk:symphony-bdk-bom:2.9.0')
implementation platform('org.finos.symphony.bdk:symphony-bdk-bom:2.9.0') {
exclude group: 'org.slf4j', module: 'slf4j-api'
}
implementation platform('com.fasterxml.jackson:jackson-bom:2.13.4')
implementation platform('org.springframework.boot:spring-boot-dependencies:2.6.6')

implementation 'org.apache.httpcomponents.client5:httpclient5-fluent:5.1.3'
implementation ('org.apache.httpcomponents.client5:httpclient5-fluent:5.1.3') {
exclude group: 'org.slf4j', module: 'slf4j-api'
}

implementation 'org.finos.symphony.bdk:symphony-bdk-core-spring-boot-starter'
implementation 'org.finos.symphony.bdk.ext:symphony-group-extension'
Expand All @@ -28,8 +32,8 @@ dependencies {
runtimeOnly 'io.micrometer:micrometer-registry-prometheus'
runtimeOnly 'com.h2database:h2'

implementation 'org.slf4j:slf4j-api'
runtimeOnly 'ch.qos.logback:logback-classic'
implementation 'org.slf4j:slf4j-api:1.7.36'
implementation 'ch.qos.logback:logback-classic:1.2.11'

implementation 'org.aspectj:aspectjrt:1.9.9.1'
implementation 'org.aspectj:aspectjweaver:1.9.9.1'
Expand Down Expand Up @@ -63,10 +67,14 @@ dependencies {
testImplementation 'org.junit.jupiter:junit-jupiter-params'
testImplementation 'org.assertj:assertj-core'
testImplementation 'org.awaitility:awaitility'
testImplementation 'com.github.tomakehurst:wiremock-jre8:2.34.0'
testImplementation ('com.github.tomakehurst:wiremock-jre8:2.34.0') {
exclude group: 'org.slf4j', module: 'slf4j-api'
}
testImplementation 'io.rest-assured:rest-assured:4.5.1'
testImplementation 'org.hamcrest:hamcrest-all:1.3'
testImplementation("org.camunda.community.mockito:camunda-platform-7-mockito:6.17.1")
testImplementation("org.camunda.community.mockito:camunda-platform-7-mockito:6.17.1") {
exclude group: 'org.slf4j', module: 'slf4j-api'
}
}

bootBuildImage {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ private void computeEvents(int activityIndex, String activityId, List<Activity>
Optional<String> signalName = eventMapper.toSignalName(event, workflow);
if (signalName.isPresent()) {
eventNodeId = signalName.get();

if (activity.getActivity() != null && StringUtils.isNotBlank(activity.getActivity().getIfCondition())) {
directGraph.readWorkflowNode(activityId)
.addIfCondition(eventNodeId, activity.getActivity().getIfCondition());
}
computeActivity(activityIndex, activities, eventNodeId, event, onEvents, directGraph);
computeSignal(directGraph, event, eventNodeId, activityIndex, activities);
} else if (event.getActivityExpired() != null) {
Expand Down Expand Up @@ -213,7 +218,8 @@ private void computeActivity(int activityIndex, List<Activity> activities, Strin
if (activityIndex == 0) {
validateFirstActivity(activities.get(0).getActivity(), event, workflow.getId());
directGraph.addStartEvent(nodeId);
} else {
} else if (!directGraph.getParents(activities.get(activityIndex - 1).getActivity().getId())
.contains(nodeId)) { // the current event node is not a parent of previous activity
boolean isParallel = onEvents.isParallel();
String allOfEventParentId = onEvents.getParentId();
String parentId = isParallel && StringUtils.isNotEmpty(allOfEventParentId) ? allOfEventParentId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,7 @@ public Optional<String> toSignalName(Event event, Workflow workflow) {
}

@SuppressWarnings("unchecked")
public <T> void dispatch(RealTimeEvent<T> event)
throws PresentationMLParserException {

public <T> void dispatch(RealTimeEvent<T> event) throws PresentationMLParserException {
Map<String, Object> processVariables = new HashMap<>();
processVariables.put(ActivityExecutorContext.EVENT,
new EventHolder<>(event.getInitiator(), event.getSource(), new HashMap<>()));
Expand Down Expand Up @@ -283,6 +281,7 @@ private void messageSentToMessage(RealTimeEvent<V4MessageSent> event,
// Event's message cannot be null, this if statement is only added to fix Sonar warnings
if (event.getSource().getMessage() != null) {
log.debug("receive message [{}]", event.getSource().getMessage().getMessageId());
log.trace("receive message [{}]", event.getSource().getMessage().getMessage());
String presentationMl = event.getSource().getMessage().getMessage();
String receivedContent = PresentationMLParser.getTextContent(presentationMl);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
import com.symphony.bdk.workflow.engine.WorkflowDirectGraph;
import com.symphony.bdk.workflow.engine.WorkflowNodeType;

import lombok.experimental.UtilityClass;
import org.camunda.bpm.model.bpmn.builder.AbstractFlowNodeBuilder;
import org.camunda.bpm.model.bpmn.builder.AbstractGatewayBuilder;
import org.camunda.bpm.model.bpmn.builder.SubProcessBuilder;

/**
* Helper class on checks or common actions
*/
@UtilityClass
public class BpmnBuilderHelper {

public static AbstractFlowNodeBuilder<?, ?> endEventSubProcess(BuildProcessContext context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private void computeChildren(WorkflowNode currentNode, AbstractFlowNodeBuilder<?
BuildProcessContext context) throws JsonProcessingException {
String currentNodeId = currentNode.getId();
NodeChildren currentNodeChildren = context.readChildren(currentNodeId);
if (currentNodeChildren != null) {
if (currentNodeChildren != null && !currentNodeChildren.isEmpty()) {
if (currentNodeChildren.getGateway() == WorkflowDirectGraph.Gateway.PARALLEL) {
builder = builder.parallelGateway(currentNodeId + FORK_GATEWAY);
} else {
Expand All @@ -168,9 +168,17 @@ private void computeChildren(WorkflowNode currentNode, AbstractFlowNodeBuilder<?

private AbstractFlowNodeBuilder<?, ?> exclusiveSubTreeNodes(String currentNodeId, WorkflowNodeType currentNodeType,
AbstractFlowNodeBuilder<?, ?> builder, BuildProcessContext context, NodeChildren currentNodeChildren) {
if (currentNodeType == WorkflowNodeType.FORM_REPLIED_EVENT || hasFormRepliedEvent(context,
currentNodeChildren)) {
log.trace("the node [{}] itself or one of its children is a form replied event", currentNodeId);
if (currentNodeType == WorkflowNodeType.FORM_REPLIED_EVENT) {
log.trace("the node [{}] itself is a form replied event", currentNodeId);
boolean activities = hasActivitiesOnly(context, currentNodeChildren);
boolean conditional = hasConditionalString(context, currentNodeChildren, currentNodeId);
log.trace("are the children of the node [{}]'s all activities ? [{}], is there any condition in children ? [{}]",
currentNodeId, activities, conditional);
builder = addGateway(currentNodeId, builder, activities, conditional, currentNodeChildren.getChildren().size());
return builder;
}
if (hasFormRepliedEvent(context, currentNodeChildren)) {
log.trace("one of [{}] children is a form replied event", currentNodeId);
return builder;
}
// in case of conditional loop, add a default end event
Expand All @@ -191,28 +199,42 @@ private void computeChildren(WorkflowNode currentNode, AbstractFlowNodeBuilder<?
boolean conditional = hasConditionalString(context, currentNodeChildren, currentNodeId);
log.trace("are the children of the node [{}]'s all activities ? [{}], is there any condition in children ? [{}]",
currentNodeId, activities, conditional);
builder = addGateway(currentNodeId, builder, activities, conditional);
builder = addGateway(currentNodeId, builder, activities, conditional, currentNodeChildren.getChildren().size());
return builder;
}

private AbstractFlowNodeBuilder<?, ?> addGateway(String currentNodeId, AbstractFlowNodeBuilder<?, ?> builder,
boolean activities, boolean conditional) {
boolean activities, boolean conditional, int childrenSize) {
// determine the gateway type
if (activities && conditional) {
log.trace("an exclusive gateway is added follow the node [{}]", currentNodeId);
builder = builder.exclusiveGateway(currentNodeId + EXCLUSIVE_GATEWAY_SUFFIX);
} else if (!activities) {
builder = builder.exclusiveGateway((currentNodeId.replace("/", "") + EXCLUSIVE_GATEWAY_SUFFIX));
} else if (!activities && (conditional || childrenSize > 1)) {
log.trace("an event gateway is added follow the node [{}]", currentNodeId);
builder = builder.eventBasedGateway().id(currentNodeId + EVENT_GATEWAY_SUFFIX);
}
return builder;
}

@SuppressWarnings("checkstyle:JavadocTagContinuationIndentation")
private void leafNode(String currentNodeId, AbstractFlowNodeBuilder<?, ?> camundaBuilder,
BuildProcessContext context) {
camundaBuilder = camundaBuilder.endEvent();
context.addNodeBuilder(currentNodeId, camundaBuilder);
context.addLastNodeBuilder(camundaBuilder);
// if builder is an instance of sub process builder, the node should be already ended {@see SignalNodeBuilder#31},
// skip the ending
/*
* on:
* one-of:
* - form-replied:
* form-id: init
* exclusive: true
* - message-received:
* content: hey
*/
if (!(camundaBuilder instanceof SubProcessBuilder)) {
camundaBuilder = camundaBuilder.endEvent();
context.addNodeBuilder(currentNodeId, camundaBuilder);
context.addLastNodeBuilder(camundaBuilder);
}
}

private boolean hasFormRepliedEvent(BuildProcessContext context, NodeChildren currentNodeChildren) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
import static com.symphony.bdk.workflow.engine.camunda.bpmn.CamundaBpmnBuilder.EXCLUSIVE_GATEWAY_SUFFIX;

import com.symphony.bdk.workflow.engine.WorkflowNode;
import com.symphony.bdk.workflow.engine.camunda.bpmn.BpmnBuilderHelper;
import com.symphony.bdk.workflow.engine.camunda.bpmn.BuildProcessContext;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.camunda.bpm.model.bpmn.builder.AbstractFlowNodeBuilder;
import org.camunda.bpm.model.bpmn.builder.AbstractGatewayBuilder;
import org.camunda.bpm.model.bpmn.builder.SubProcessBuilder;

import java.util.List;
import java.util.stream.Collectors;

public abstract class AbstractNodeBpmnBuilder implements WorkflowNodeBpmnBuilder {

Expand All @@ -34,6 +35,11 @@ public abstract class AbstractNodeBpmnBuilder implements WorkflowNodeBpmnBuilder
// then connect the activity
if (context.hasEventSubProcess() && context.getParents(element.getId()).size() > 1) {
builder = endEventSubProcess(context, builder);
// since the sub process is ended here, and the child node is going to be connected by this ended sub process,
// therefore, all other branches remove their same child and are going to be ended inside the sub process.
List<String> parents =
context.getParents(element.getId()).stream().filter(k -> !k.equals(parentId)).collect(Collectors.toList());
parents.forEach(parent -> context.getChildren(parent).removeChild(element.getId()));
}
return build(element, parentId, builder, context);
}
Expand All @@ -58,22 +64,9 @@ public abstract class AbstractNodeBpmnBuilder implements WorkflowNodeBpmnBuilder
}

protected void connectToExistingNode(String nodeId, AbstractFlowNodeBuilder<?, ?> builder) {
// if builder is an instance of sub process builder, the node should be already connected, skip the connection
/*
* on:
* one-of:
* - form-replied:
* form-id: init
* exclusive: true
* - message-received:
* content: hey
*/
if (!(builder instanceof SubProcessBuilder)) {
builder.connectTo(nodeId);
}
builder.connectTo(nodeId);
}

protected abstract AbstractFlowNodeBuilder<?, ?> build(WorkflowNode element, String parentId,
AbstractFlowNodeBuilder<?, ?> builder, BuildProcessContext context) throws JsonProcessingException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class SignalNodeBuilder extends AbstractNodeBpmnBuilder {
builder = ((AbstractCatchEventBuilder<?, ?>) builder).camundaAsyncBefore()
.signal(element.getId())
.name(element.getId());
} else if (builder instanceof AbstractGatewayBuilder) {
} else {
builder = builder.intermediateCatchEvent().camundaAsyncBefore().signal(element.getId()).name(element.getId());
}
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ public class SendMessageExecutor extends OboExecutor<SendMessage, V4Message>

@Override
public void execute(ActivityExecutorContext<SendMessage> execution) throws IOException {
log.debug("Sending message...");
SendMessage activity = execution.getActivity();
List<String> streamIds = resolveStreamId(execution, activity, execution.bdk().streams());
log.debug("Sending message to rooms {}", streamIds);

Message messageToSend = this.buildMessage(execution);
log.trace("message content \n {}", messageToSend.getContent());

V4Message message;
if (streamIds.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -22,6 +23,9 @@
import org.apache.commons.lang3.StringEscapeUtils;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.Collections;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -306,8 +310,7 @@ void sendFormOutputsArePreserved() throws Exception {
return true;
});

assertThat(workflow)
.executed("init", "check");
assertThat(workflow).executed("init", "check");
}

@Test
Expand Down Expand Up @@ -353,4 +356,80 @@ void sendMessageUpdateMessage() throws Exception {
assertThat(workflow).executed("init", "message-received_/hey", "update");
});
}

@Test
void formRepliedSendMessageOnConditionIf() throws Exception {
Workflow workflow =
SwadlParser.fromYaml(getClass().getResourceAsStream("/form/send-form-reply-conditional.swadl.yaml"));

when(messageService.send(anyString(), any(Message.class))).thenReturn(message("msgId"));

engine.deploy(workflow);

// trigger workflow execution
engine.onEvent(messageReceived("/test"));
verify(messageService, timeout(5000)).send(anyString(), contains("form"));
clearInvocations(messageService);

await().atMost(5, TimeUnit.SECONDS).ignoreExceptions().until(() -> {
engine.onEvent(form("msgId", "testForm", Collections.singletonMap("action", "create")));
return true;
});
verify(messageService, timeout(5000)).send(anyString(), contains("Create"));

Thread.sleep(1000);
assertThat(workflow).executed(workflow, "testForm", "resCreate");
}

@Test
void formRepliedSendMessageOnConditionElse() throws Exception {
Workflow workflow =
SwadlParser.fromYaml(getClass().getResourceAsStream("/form/send-form-reply-conditional.swadl.yaml"));

when(messageService.send(anyString(), any(Message.class))).thenReturn(message("msgId"));

engine.deploy(workflow);

// trigger workflow execution
engine.onEvent(messageReceived("/test"));
verify(messageService, timeout(5000)).send(anyString(), contains("form"));
clearInvocations(messageService);

await().atMost(5, TimeUnit.SECONDS).ignoreExceptions().until(() -> {
engine.onEvent(form("msgId", "testForm", Collections.singletonMap("action", "menu")));
return true;
});
verify(messageService, timeout(5000)).send(anyString(), contains("Menu"));
clearInvocations(messageService);

sleepToTimeout(1000);
engine.onEvent(messageReceived("/continue"));
verify(messageService, timeout(5000)).send(anyString(), contains("DONE"));

assertThat(workflow).executed(workflow, "testForm", "resMenu", "finish");
}

@ParameterizedTest
@CsvSource(value = {"GOOG,response0", "GOOGLE,response1"})
void formReplied_fork_condition_join_activity(String tickerValue, String expectedActivity) throws Exception {
Workflow workflow =
SwadlParser.fromYaml(getClass().getResourceAsStream("/form/send-form-reply-join-activity.swadl.yaml"));

when(messageService.send(anyString(), any(Message.class))).thenReturn(message("msgId"));

engine.deploy(workflow);

// trigger workflow execution
engine.onEvent(messageReceived("/go"));
verify(messageService, timeout(2000)).send(anyString(), contains("form"));
clearInvocations(messageService);

await().atMost(5, TimeUnit.SECONDS).ignoreExceptions().until(() -> {
engine.onEvent(form("msgId", "sendForm", Collections.singletonMap("ticker", tickerValue)));
return true;
});
verify(messageService, timeout(2000)).send(anyString(), contains("END"));

assertThat(workflow).executed(workflow, "sendForm", expectedActivity, "response2");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ public static RealTimeEvent<V4MessageSent> messageReceived(String content) {

V4MessageSent messageSent = new V4MessageSent();
V4Message message = new V4Message();
message.setMessageId("msgId");
message.setMessage("<presentationML>" + content + "</presentationML>");
messageSent.setMessage(message);
V4Stream stream = new V4Stream();
Expand Down
Loading

0 comments on commit 16487fc

Please sign in to comment.