Skip to content

Commit

Permalink
Add data field to SendMessage activity (#159)
Browse files Browse the repository at this point in the history
- Add a new data field to SendMessage activity to support
   interative templates

 - Add errors field to activity API, show error message if the process
   fails due to un-handled exception
  • Loading branch information
yinan-symphony authored Oct 5, 2022
1 parent f2f561d commit 9a93761
Show file tree
Hide file tree
Showing 24 changed files with 282 additions and 57 deletions.
82 changes: 82 additions & 0 deletions docs/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,7 @@ Key | Type | Required |
[to](#to) | Map | No |
[content](#send-message-content) | String/Object | Yes |
[attachments](#attachments) | List | No |
[data](#data) | String | No |

Output | Type |
----|----|
Expand Down Expand Up @@ -871,6 +872,87 @@ encoded urls are accepted.

Path to the file to be attached to the message. The path is relative to the workflows folder.

#### data

A [structured object](https://docs.developers.symphony.com/building-bots-on-symphony/messages/overview-of-messageml/entities/structured-objects)
can be sent as part of a message in this field. It must be a json string.


Example:

```yaml
activities:
- send-message:
id: echo
on:
message-received:
content: /echo
content:
${text(event.source.message.message)}
data: "{
\"object001\":
{
\"type\": \"org.symphonyoss.fin.security\",
\"version\": \"1.0\",
\"id\":
[
{
\"type\": \"org.symphonyoss.fin.security.id.ticker\",
\"value\": \"IBM\"
},
{
\"type\": \"org.symphonyoss.fin.security.id.isin\",
\"value\": \"US0378331005\"
},
{
\"type\": \"org.symphonyoss.fin.security.id.cusip\",
\"value\": \"037833100\"
}
]
}
}"
```
One could also use [Utility function](#utility-functions) to escape the data json string, the same example
above can be done like
Example
```yaml
variables:
data: {
"object001":
{
"type": "org.symphonyoss.fin.security",
"version": "1.0",
"id":
[
{
"type": "org.symphonyoss.fin.security.id.ticker",
"value": "IBM"
},
{
"type": "org.symphonyoss.fin.security.id.isin",
"value": "US0378331005"
},
{
"type": "org.symphonyoss.fin.security.id.cusip",
"value": "037833100"
}
]
}
}
activities:
- send-message:
id: echo
on:
message-received:
content: /echo
content:
${text(event.source.message.message)}
data: ${escape(variables.data)}
```
### update-message
Update an existing message into a stream. Returns the new updated message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
import lombok.Data;

import java.util.List;
import java.util.Map;

@Data
@JsonInclude(JsonInclude.Include.NON_NULL)
public class WorkflowActivitiesView {
private List<ActivityInstanceView> activities;
private VariableView globalVariables;
private Map<String, Object> error;
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@
@Builder
public class WorkflowDefinitionView {
private String workflowId;
private List<Map<String, String>> variables;
private Map<String, Object> variables;
private List<TaskDefinitionView> flowNodes;
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.springframework.cache.annotation.Cacheable;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -44,6 +45,9 @@ public class WorkflowDirectGraph {
@Getter
private final List<String> startEvents = new ArrayList<>();

@Getter
private final Map<String, Object> variables = new HashMap<>();

public void addParent(String id, String parent) {
parents.computeIfAbsent(id, k -> new HashSet<>()).add(parent);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public WorkflowDirectGraph build() {
String activityId = computeParallelJoinGateway(directGraph, activity);
computeEvents(i, activityId, activities, directGraph);
}
directGraph.getVariables().putAll(workflow.getVariables());
return directGraph;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,24 +106,38 @@ public void execute(DelegateExecution execution) throws Exception {

// escape break line and new lin characters
String activityAsJsonString = ((String) execution.getVariable(ACTIVITY)).replaceAll("(\\r|\\n|\\r\\n)+", "\\\\n");
Object activity = OBJECT_MAPPER.readValue(activityAsJsonString, Class.forName(type.getTypeName()));
BaseActivity activity =
(BaseActivity) OBJECT_MAPPER.readValue(activityAsJsonString, Class.forName(type.getTypeName()));

EventHolder event = (EventHolder) execution.getVariable(ActivityExecutorContext.EVENT);

try {
setMdc(execution);
auditTrailLogger.execute(execution, activity.getClass().getSimpleName());
executor.execute(
new CamundaActivityExecutorContext(execution, (BaseActivity) activity, event, resourceLoader, bdk));
new CamundaActivityExecutorContext(execution, activity, event, resourceLoader, bdk));
} catch (Exception e) {
log.error(String.format("Activity %s from workflow %s failed",
execution.getActivityInstanceId(), execution.getProcessInstanceId()), e);
log.error(String.format("Activity from workflow %s failed", execution.getProcessDefinitionId()), e);
logErrorVariables(execution, activity, e);
throw new BpmnError("FAILURE", e);
} finally {
clearMdc();
}
}

private static void logErrorVariables(DelegateExecution execution, BaseActivity activity, Exception e) {
Map<String, Object> innerMap = new HashMap<>();
innerMap.put("message", e.getCause() == null ? e.getMessage() : e.getCause().getMessage());
innerMap.put("activityInstId", execution.getActivityInstanceId());
innerMap.put("activityId", activity.getId());
ObjectValue objectValue = Variables.objectValue(innerMap)
.serializationDataFormat(Variables.SerializationDataFormats.JSON)
.create();
execution.getProcessEngineServices()
.getRuntimeService()
.setVariable(execution.getId(), ActivityExecutorContext.ERROR, objectValue);
}

private void setMdc(DelegateExecution execution) {
MDC.put(MDC_PROCESS_ID, execution.getProcessInstanceId());
MDC.put(MDC_ACTIVITY_ID, execution.getActivityInstanceId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ public VariableCmdaApiQueryRepository(RepositoryService repositoryService,
}

@Override
public VariablesDomain findGlobalVarsByWorkflowInstanceId(String id) {
public VariablesDomain findVarsByWorkflowInstanceIdAndVarName(String id, String name) {
HistoricVariableInstanceEntity variables =
(HistoricVariableInstanceEntity) historyService.createHistoricVariableInstanceQuery()
.processInstanceId(id)
.variableName("variables")
.variableName(name)
.singleResult();
VariablesDomain variablesDomain = new VariablesDomain();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.symphony.bdk.workflow.swadl.v1.activity.message.SendMessage;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;

import java.io.ByteArrayInputStream;
Expand Down Expand Up @@ -141,6 +142,9 @@ private String createOrGetStreamId(List<Long> userIds, StreamService streamServi

private Message buildMessage(ActivityExecutorContext<SendMessage> execution) throws IOException {
Message.MessageBuilder builder = Message.builder().content(extractContent(execution));
if (StringUtils.isNotBlank(execution.getActivity().getData())) {
builder.data(execution.getActivity().getData());
}
if (execution.getActivity().getAttachments() != null) {
for (SendMessage.Attachment attachment : execution.getActivity().getAttachments()) {
this.handleFileAttachment(builder, attachment, execution);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import java.util.List;

public interface VariableQueryRepository extends QueryRepository<VariablesDomain, String> {
VariablesDomain findGlobalVarsByWorkflowInstanceId(String id);
VariablesDomain findVarsByWorkflowInstanceIdAndVarName(String id, String name);

List<VariablesDomain> findGlobalVarsHistoryByWorkflowInstId(String id, Instant occurredBefore, Instant occurredAfter);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.symphony.bdk.workflow.engine.WorkflowDirectGraph;
import com.symphony.bdk.workflow.engine.WorkflowNode;
import com.symphony.bdk.workflow.engine.camunda.WorkflowDirectGraphCachingService;
import com.symphony.bdk.workflow.engine.executor.ActivityExecutorContext;
import com.symphony.bdk.workflow.monitoring.repository.ActivityQueryRepository;
import com.symphony.bdk.workflow.monitoring.repository.VariableQueryRepository;
import com.symphony.bdk.workflow.monitoring.repository.WorkflowInstQueryRepository;
Expand All @@ -27,7 +28,6 @@

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -86,19 +86,20 @@ public WorkflowActivitiesView listWorkflowInstanceActivities(String workflowId,
activity -> activity.setType(TaskTypeEnum.findByAbbr(activityIdToTypeMap.get(activity.getActivityId()))));
}

VariablesDomain globalVariables = this.variableQueryRepository.findGlobalVarsByWorkflowInstanceId(instanceId);
VariablesDomain globalVariables = this.variableQueryRepository.findVarsByWorkflowInstanceIdAndVarName(instanceId,
ActivityExecutorContext.VARIABLES);
VariablesDomain error =
this.variableQueryRepository.findVarsByWorkflowInstanceIdAndVarName(instanceId, ActivityExecutorContext.ERROR);
WorkflowActivitiesView result = new WorkflowActivitiesView();
result.setActivities(activities);
result.setGlobalVariables(new VariableView(globalVariables));
if (!error.getOutputs().isEmpty()) {
result.setError(error.getOutputs());
}
return result;
}

public WorkflowDefinitionView getWorkflowDefinition(String workflowId) {
WorkflowDefinitionView.WorkflowDefinitionViewBuilder builder = WorkflowDefinitionView.builder()
.workflowId(workflowId)
.flowNodes(new ArrayList<>())
.variables(Collections.emptyList());

WorkflowDirectGraph directGraph = this.workflowDirectGraphCachingService.getDirectGraph(workflowId);
ArrayList<TaskDefinitionView> activities = new ArrayList<>();

Expand Down Expand Up @@ -127,9 +128,10 @@ public WorkflowDefinitionView getWorkflowDefinition(String workflowId) {

activities.add(taskDefinitionViewBuilder.build());
});

builder.flowNodes(activities);
return builder.build();
return WorkflowDefinitionView.builder()
.workflowId(workflowId)
.flowNodes(activities)
.variables(directGraph.getVariables()).build();
}

public List<VariableView> listWorkflowInstanceGlobalVars(String workflowId, String instanceId, Instant updatedBefore,
Expand Down
12 changes: 8 additions & 4 deletions workflow-bot-app/src/main/resources/application-local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@ wdk:
workflows:
path: ./workflows
properties:
monitoring-token: ${wdk.monitoring.token:} # The default value is an empty String
monitoring-token: token # The default value is an empty String

# BDK configuration for local development
bdk:
host: xxx
host: develop.symphony.com
app:
appId: appsoufiane
privateKey:
path: /Users/yinan.liu/Projects/certs/appSoufianneprivatekey.pem
bot:
username: xxx
username: wdkbot
privateKey:
path: xxx
path: /Users/yinan.liu/Projects/certs/privatekey.pem

logging:
level:
Expand Down
2 changes: 1 addition & 1 deletion workflow-bot-app/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ spring:
username: sa
password:
driver-class-name: org.h2.Driver
url: jdbc:h2:mem:process_engine;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE
url: jdbc:h2:file:~/.data/process_engine;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE;AUTO_SERVER=TRUE
mvc:
pathmatch:
matching-strategy: ant_path_matcher
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -13,8 +15,11 @@
import com.symphony.bdk.workflow.swadl.SwadlParser;
import com.symphony.bdk.workflow.swadl.v1.Workflow;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.Map;

class ErrorIntegrationTest extends IntegrationTest {

@Test
Expand Down Expand Up @@ -138,4 +143,22 @@ void onActivityFailedRetry() throws Exception {
assertThat(workflow).executed("fallback", "failing");
}

@Test
void sendMessageOnMessage_withException_errorVarNotEmpty() throws Exception {
final Workflow workflow =
SwadlParser.fromYaml(getClass().getResourceAsStream("/message/send-message-on-message.swadl.yaml"));

RuntimeException toBeThrown = mock(RuntimeException.class);
when(toBeThrown.getMessage()).thenReturn("Exception");
doThrow(toBeThrown).when(messageService).send(anyString(), any(Message.class));

engine.deploy(workflow);
engine.onEvent(messageReceived("/message"));

sleepToTimeout(500);
Map<String, Object> errors = getVariable(lastProcess().get(), "error");
Assertions.assertThat(errors.get("message")).isEqualTo("Exception");
Assertions.assertThat(errors.get("activityId")).isEqualTo("sendMessage1");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.camunda.bpm.engine.RepositoryService;
import org.camunda.bpm.engine.history.HistoricActivityInstance;
import org.camunda.bpm.engine.history.HistoricProcessInstance;
import org.camunda.bpm.engine.history.HistoricVariableInstance;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -58,6 +59,7 @@
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -265,6 +267,18 @@ public static Boolean processIsCompleted(String processId) {
return false;
}

public static Map<String, Object> getVariable(String processId, String name) {
HistoricVariableInstance var = historyService.createHistoricVariableInstanceQuery()
.processInstanceId(processId)
.variableName(name)
.singleResult();
if (var == null) {
return Collections.emptyMap();
} else {
return new HashMap<>((Map<String, Object>) var.getValue());
}
}

public static Optional<String> lastProcess() {
List<HistoricProcessInstance> processes = historyService.createHistoricProcessInstanceQuery()
.orderByProcessInstanceStartTime().desc()
Expand Down
Loading

0 comments on commit 9a93761

Please sign in to comment.