Skip to content

Commit

Permalink
Fix if condition issue in form reply sub process
Browse files Browse the repository at this point in the history
  • Loading branch information
yinan-symphony committed Oct 4, 2022
1 parent d37d1b7 commit 648d969
Show file tree
Hide file tree
Showing 11 changed files with 144 additions and 41 deletions.
20 changes: 14 additions & 6 deletions workflow-bot-app/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@ javadoc {
dependencies {
implementation project(':workflow-language')

implementation platform('org.finos.symphony.bdk:symphony-bdk-bom:2.9.0')
implementation platform('org.finos.symphony.bdk:symphony-bdk-bom:2.9.0') {
exclude group: 'org.slf4j', module: 'slf4j-api'
}
implementation platform('com.fasterxml.jackson:jackson-bom:2.13.4')
implementation platform('org.springframework.boot:spring-boot-dependencies:2.6.6')

implementation 'org.apache.httpcomponents.client5:httpclient5-fluent:5.1.3'
implementation ('org.apache.httpcomponents.client5:httpclient5-fluent:5.1.3') {
exclude group: 'org.slf4j', module: 'slf4j-api'
}

implementation 'org.finos.symphony.bdk:symphony-bdk-core-spring-boot-starter'
implementation 'org.finos.symphony.bdk.ext:symphony-group-extension'
Expand All @@ -28,8 +32,8 @@ dependencies {
runtimeOnly 'io.micrometer:micrometer-registry-prometheus'
runtimeOnly 'com.h2database:h2'

implementation 'org.slf4j:slf4j-api'
runtimeOnly 'ch.qos.logback:logback-classic'
implementation 'org.slf4j:slf4j-api:1.7.36'
implementation 'ch.qos.logback:logback-classic:1.2.11'

implementation 'org.aspectj:aspectjrt:1.9.9.1'
implementation 'org.aspectj:aspectjweaver:1.9.9.1'
Expand Down Expand Up @@ -63,10 +67,14 @@ dependencies {
testImplementation 'org.junit.jupiter:junit-jupiter-params'
testImplementation 'org.assertj:assertj-core'
testImplementation 'org.awaitility:awaitility'
testImplementation 'com.github.tomakehurst:wiremock-jre8:2.34.0'
testImplementation ('com.github.tomakehurst:wiremock-jre8:2.34.0') {
exclude group: 'org.slf4j', module: 'slf4j-api'
}
testImplementation 'io.rest-assured:rest-assured:4.5.1'
testImplementation 'org.hamcrest:hamcrest-all:1.3'
testImplementation("org.camunda.community.mockito:camunda-platform-7-mockito:6.17.1")
testImplementation("org.camunda.community.mockito:camunda-platform-7-mockito:6.17.1") {
exclude group: 'org.slf4j', module: 'slf4j-api'
}
}

bootBuildImage {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,7 @@ private void computeEvents(int activityIndex, String activityId, List<Activity>
directGraph.readWorkflowNode(activityId)
.addIfCondition(eventNodeId, activity.getActivity().getIfCondition());
}

if (activityIndex == 0 || !directGraph.getChildren(eventNodeId)
.getChildren()
.contains(activities.get(activityIndex - 1).getActivity().getId())) {
computeActivity(activityIndex, activities, eventNodeId, event, onEvents, directGraph);
}

computeActivity(activityIndex, activities, eventNodeId, event, onEvents, directGraph);
computeSignal(directGraph, event, eventNodeId, activityIndex, activities);
} else if (event.getActivityExpired() != null) {
eventNodeId = computeExpiredActivity(event, activity.getActivity().getId(), directGraph);
Expand Down Expand Up @@ -223,7 +217,8 @@ private void computeActivity(int activityIndex, List<Activity> activities, Strin
if (activityIndex == 0) {
validateFirstActivity(activities.get(0).getActivity(), event, workflow.getId());
directGraph.addStartEvent(nodeId);
} else {
} else if (!directGraph.getParents(activities.get(activityIndex - 1).getActivity().getId())
.contains(nodeId)) { // the current event node is not a parent of previous activity
boolean isParallel = onEvents.isParallel();
String allOfEventParentId = onEvents.getParentId();
String parentId = isParallel && StringUtils.isNotEmpty(allOfEventParentId) ? allOfEventParentId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,7 @@ public Optional<String> toSignalName(Event event, Workflow workflow) {
}

@SuppressWarnings("unchecked")
public <T> void dispatch(RealTimeEvent<T> event)
throws PresentationMLParserException {

public <T> void dispatch(RealTimeEvent<T> event) throws PresentationMLParserException {
Map<String, Object> processVariables = new HashMap<>();
processVariables.put(ActivityExecutorContext.EVENT,
new EventHolder<>(event.getInitiator(), event.getSource(), new HashMap<>()));
Expand Down Expand Up @@ -283,6 +281,7 @@ private void messageSentToMessage(RealTimeEvent<V4MessageSent> event,
// Event's message cannot be null, this if statement is only added to fix Sonar warnings
if (event.getSource().getMessage() != null) {
log.debug("receive message [{}]", event.getSource().getMessage().getMessageId());
log.trace("receive message [{}]", event.getSource().getMessage().getMessage());
String presentationMl = event.getSource().getMessage().getMessage();
String receivedContent = PresentationMLParser.getTextContent(presentationMl);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
@Component
public class CamundaBpmnBuilder {
public static final String DEPLOYMENT_RESOURCE_TOKEN_KEY = "WORKFLOW_TOKEN";
public static final String EXCLUSIVE_GATEWAY_SUFFIX = "_ex_g";
public static final String EXCLUSIVE_GATEWAY_SUFFIX = "_exclusive_gateway";
public static final String EVENT_GATEWAY_SUFFIX = "_event_gateway";
public static final String FORK_GATEWAY = "_fork_gateway";

Expand Down Expand Up @@ -145,7 +145,7 @@ private void computeChildren(WorkflowNode currentNode, AbstractFlowNodeBuilder<?
BuildProcessContext context) throws JsonProcessingException {
String currentNodeId = currentNode.getId();
NodeChildren currentNodeChildren = context.readChildren(currentNodeId);
if (currentNodeChildren != null) {
if (currentNodeChildren != null && !currentNodeChildren.isEmpty()) {
if (currentNodeChildren.getGateway() == WorkflowDirectGraph.Gateway.PARALLEL) {
builder = builder.parallelGateway(currentNodeId + FORK_GATEWAY);
} else {
Expand All @@ -168,16 +168,17 @@ private void computeChildren(WorkflowNode currentNode, AbstractFlowNodeBuilder<?

private AbstractFlowNodeBuilder<?, ?> exclusiveSubTreeNodes(String currentNodeId, WorkflowNodeType currentNodeType,
AbstractFlowNodeBuilder<?, ?> builder, BuildProcessContext context, NodeChildren currentNodeChildren) {
if (currentNodeType == WorkflowNodeType.FORM_REPLIED_EVENT || hasFormRepliedEvent(context,
currentNodeChildren)) {

if (currentNodeType == WorkflowNodeType.FORM_REPLIED_EVENT) {
log.trace("the node [{}] itself is a form replied event", currentNodeId);
boolean activities = hasActivitiesOnly(context, currentNodeChildren);
boolean conditional = hasConditionalString(context, currentNodeChildren, currentNodeId);
log.trace("are the children of the node [{}]'s all activities ? [{}], is there any condition in children ? [{}]",
currentNodeId, activities, conditional);
builder = addGateway(currentNodeId, builder, activities, conditional);

log.trace("the node [{}] itself or one of its children is a form replied event", currentNodeId);
builder = addGateway(currentNodeId, builder, activities, conditional, currentNodeChildren.getChildren().size());
return builder;
}
if (hasFormRepliedEvent(context, currentNodeChildren)) {
log.trace("one of [{}] children is a form replied event", currentNodeId);
return builder;
}
// in case of conditional loop, add a default end event
Expand All @@ -198,17 +199,17 @@ private void computeChildren(WorkflowNode currentNode, AbstractFlowNodeBuilder<?
boolean conditional = hasConditionalString(context, currentNodeChildren, currentNodeId);
log.trace("are the children of the node [{}]'s all activities ? [{}], is there any condition in children ? [{}]",
currentNodeId, activities, conditional);
builder = addGateway(currentNodeId, builder, activities, conditional);
builder = addGateway(currentNodeId, builder, activities, conditional, currentNodeChildren.getChildren().size());
return builder;
}

private AbstractFlowNodeBuilder<?, ?> addGateway(String currentNodeId, AbstractFlowNodeBuilder<?, ?> builder,
boolean activities, boolean conditional) {
boolean activities, boolean conditional, int childrenSize) {
// determine the gateway type
if (activities && conditional) {
log.trace("an exclusive gateway is added follow the node [{}]", currentNodeId);
builder = builder.exclusiveGateway((currentNodeId.replace("/", "") + EXCLUSIVE_GATEWAY_SUFFIX));
} else if (!activities && conditional) {
} else if (!activities && (conditional || childrenSize > 1)) {
log.trace("an event gateway is added follow the node [{}]", currentNodeId);
builder = builder.eventBasedGateway().id(currentNodeId + EVENT_GATEWAY_SUFFIX);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class SignalNodeBuilder extends AbstractNodeBpmnBuilder {
builder = ((AbstractCatchEventBuilder<?, ?>) builder).camundaAsyncBefore()
.signal(element.getId())
.name(element.getId());
} else if (builder instanceof AbstractGatewayBuilder) {
} else {
builder = builder.intermediateCatchEvent().camundaAsyncBefore().signal(element.getId()).name(element.getId());
}
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@ public class SendMessageExecutor extends OboExecutor<SendMessage, V4Message>

@Override
public void execute(ActivityExecutorContext<SendMessage> execution) throws IOException {
log.debug("Sending message...");
SendMessage activity = execution.getActivity();
List<String> streamIds = resolveStreamId(execution, activity, execution.bdk().streams());
log.debug("Sending message to rooms {}", streamIds);

Message messageToSend = this.buildMessage(execution);
log.trace("message content \n {}", messageToSend.getContent());

V4Message message;
if (streamIds.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -276,8 +277,7 @@ void sendFormOutputsArePreserved() throws Exception {
return true;
});

assertThat(workflow)
.executed("init", "check");
assertThat(workflow).executed("init", "check");
}

@Test
Expand Down Expand Up @@ -323,4 +323,56 @@ void sendMessageUpdateMessage() throws Exception {
assertThat(workflow).executed("init", "message-received_/hey", "update");
});
}

@Test
void formRepliedSendMessageOnConditionIf() throws Exception {
Workflow workflow =
SwadlParser.fromYaml(getClass().getResourceAsStream("/form/send-form-reply-conditional.swadl.yaml"));

when(messageService.send(anyString(), any(Message.class))).thenReturn(message("msgId"));

engine.deploy(workflow);

// trigger workflow execution
engine.onEvent(messageReceived("/test"));
verify(messageService, timeout(5000)).send(anyString(), contains("form"));
clearInvocations(messageService);

await().atMost(5, TimeUnit.SECONDS).ignoreExceptions().until(() -> {
engine.onEvent(form("msgId", "testForm", Collections.singletonMap("action", "create")));
return true;
});
verify(messageService, timeout(5000)).send(anyString(), contains("Create"));

Thread.sleep(1000);
assertThat(workflow).executed(workflow, "testForm", "resCreate");
}

@Test
void formRepliedSendMessageOnConditionElse() throws Exception {
Workflow workflow =
SwadlParser.fromYaml(getClass().getResourceAsStream("/form/send-form-reply-conditional.swadl.yaml"));

when(messageService.send(anyString(), any(Message.class))).thenReturn(message("msgId"));

engine.deploy(workflow);

// trigger workflow execution
engine.onEvent(messageReceived("/test"));
verify(messageService, timeout(5000)).send(anyString(), contains("form"));
clearInvocations(messageService);

await().atMost(5, TimeUnit.SECONDS).ignoreExceptions().until(() -> {
engine.onEvent(form("msgId", "testForm", Collections.singletonMap("action", "menu")));
return true;
});
verify(messageService, timeout(5000)).send(anyString(), contains("Menu"));
clearInvocations(messageService);

sleepToTimeout(1000);
engine.onEvent(messageReceived("/continue"));
verify(messageService, timeout(5000)).send(anyString(), contains("DONE"));

assertThat(workflow).executed(workflow, "testForm", "resMenu", "finish");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ public static RealTimeEvent<V4MessageSent> messageReceived(String content) {

V4MessageSent messageSent = new V4MessageSent();
V4Message message = new V4Message();
message.setMessageId("msgId");
message.setMessage("<presentationML>" + content + "</presentationML>");
messageSent.setMessage(message);
V4Stream stream = new V4Stream();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,13 @@ public WorkflowAssert isExecuted() {

public WorkflowAssert executed(String... activities) {
isNotNull();
assertExecuted(activities);
assertExecuted(Optional.empty(), activities);
return this;
}

public WorkflowAssert executed(Workflow workflow, String... activities) {
isNotNull();
assertExecuted(Optional.of(workflow), activities);
return this;
}

Expand Down Expand Up @@ -150,7 +156,7 @@ public static void assertExecuted(Workflow workflow) {
.map(Activity::getActivity)
.map(BaseActivity::getId)
.toArray(String[]::new);
assertExecuted(activityIds);
assertExecuted(Optional.empty(), activityIds);
}

public static void assertExecuted(Optional<String> process, List<String> activities) {
Expand All @@ -171,12 +177,14 @@ public static void assertExecuted(Optional<String> process, List<String> activit
}

// activityIds represent all successfully executed activities and not only a subset
private static void assertExecuted(String... activityIds) {
Assertions.assertThat(listExecutedActivities()).containsExactly(activityIds);
private static void assertExecuted(Optional<Workflow> optionalWorkflow, String... activityIds) {
Assertions.assertThat(listExecutedActivities(optionalWorkflow)).containsExactly(activityIds);
}

private static List<String> listExecutedActivities() {
String process = lastProcess().orElseThrow();
private static List<String> listExecutedActivities(Optional<Workflow> optionalWorkflow) {
final String process = optionalWorkflow.map(workflow -> lastProcess(workflow).orElseThrow())
.orElseGet(() -> lastProcess().orElseThrow());

await().atMost(20, SECONDS).until(() -> processIsCompleted(process));

List<HistoricActivityInstance> processes =
Expand All @@ -199,7 +207,7 @@ private static List<String> listExecutedActivities() {
private static void assertNotExecuted(String... activityIds) {

Assertions.assertThat(Arrays.stream(activityIds)
.anyMatch(listExecutedActivities()::contains))
.anyMatch(listExecutedActivities(Optional.empty())::contains))
.isFalse();
}

Expand Down
4 changes: 2 additions & 2 deletions workflow-bot-app/src/test/resources/application-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ logging:
org.camunda.bpm.engine.pvm: DEBUG
org.camunda.bpm.dmn.feel: DEBUG
org.camunda.bpm.engine.script: DEBUG
com.symphony.bdk.workflow.engine.camunda.bpmn.CamundaBpmnBuilder: DEBUG
com.symphony.bdk.workflow: DEBUG
com.symphony.bdk.workflow.engine.camunda.bpmn.CamundaBpmnBuilder: TRACE
com.symphony.bdk.workflow: TRACE

camunda:
bpm:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
id: form-reply-conditional-message
activities:
- send-message:
id: testForm
on:
message-received:
content: /test
content: |
<messageML>
<p><b>Test</b></p>
<form id="testForm">
Hi, what can I do for you?
<button name="create" type="action">Create</button>
<button name="menu" type="action">Menu</button>
</form>
</messageML>
- send-message:
id: resCreate
if: ${testForm.action=='create'}
on:
form-replied:
form-id: testForm
exclusive: true
content: Create
- send-message:
id: resMenu
else: {}
on:
form-replied:
form-id: testForm
exclusive: true
content: Menu
- send-message:
id: finish
on:
message-received:
content: /continue
content: DONE

0 comments on commit 648d969

Please sign in to comment.