Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix activity variable size limit bug #274

Merged
merged 1 commit into from
Jun 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class CamundaExecutor implements JavaDelegate {

public static final String EXECUTOR = "executor";
public static final String ACTIVITY = "activity";

public static final String SERIALISED_ACTIVITY = "serialisedActivity";
public static final ObjectMapper OBJECT_MAPPER;

// set MDC entries so that executors can produce log that we can contextualize
Expand All @@ -61,12 +61,9 @@ public class CamundaExecutor implements JavaDelegate {
SimpleModule module = new SimpleModule();
module.addDeserializer(List.class, new EscapedJsonVariableDeserializer<>(List.class));
module.addDeserializer(Map.class, new EscapedJsonVariableDeserializer<>(Map.class));
OBJECT_MAPPER = JsonMapper.builder()
.addModule(module)
.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false)
OBJECT_MAPPER = JsonMapper.builder().addModule(module).configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false)
// to escape # or $ in message received content and still serialize it to JSON
.configure(JsonReadFeature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, true)
.build();
.configure(JsonReadFeature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, true).build();
OBJECT_MAPPER.setPropertyNamingStrategy(PropertyNamingStrategies.KEBAB_CASE);
// serialized properties must be annotated explicitly with @JsonProperty
OBJECT_MAPPER.setVisibility(PropertyAccessor.GETTER, JsonAutoDetect.Visibility.NONE);
Expand Down Expand Up @@ -106,13 +103,14 @@ public void execute(DelegateExecution execution) throws Exception {
} catch (NoSuchBeanDefinitionException noSuchBeanDefinitionException) {
executor = (ActivityExecutor<?>) implClass.getDeclaredConstructor().newInstance();
}


Type type =
((ParameterizedType) (implClass.getGenericInterfaces()[0])).getActualTypeArguments()[0];
Type type = ((ParameterizedType) (implClass.getGenericInterfaces()[0])).getActualTypeArguments()[0];

// escape break line and new line characters
String activityAsJsonString = ((String) execution.getVariable(ACTIVITY)).replaceAll("(\\r|\\n|\\r\\n)+", "\\\\n");
String activityAsJsonString =
((Map) execution.getVariable(SERIALISED_ACTIVITY)).get(execution.getVariable(ACTIVITY))
.toString()
.replaceAll("(\\r|\\n|\\r\\n)+", "\\\\n");

BaseActivity activity =
(BaseActivity) OBJECT_MAPPER.readValue(activityAsJsonString, Class.forName(type.getTypeName()));

Expand All @@ -138,9 +136,8 @@ private static void logErrorVariables(DelegateExecution execution, BaseActivity
innerMap.put("message", e.getCause() == null ? e.getMessage() : e.getCause().getMessage());
innerMap.put("activityInstId", execution.getActivityInstanceId());
innerMap.put("activityId", activity.getId());
ObjectValue objectValue = Variables.objectValue(innerMap)
.serializationDataFormat(Variables.SerializationDataFormats.JSON)
.create();
ObjectValue objectValue =
Variables.objectValue(innerMap).serializationDataFormat(Variables.SerializationDataFormats.JSON).create();
execution.getProcessEngineServices()
.getRuntimeService()
.setVariable(execution.getId(), ActivityExecutorContext.ERROR, objectValue);
Expand Down Expand Up @@ -183,9 +180,8 @@ public void setOutputVariables(Map<String, Object> variables) {

Map<String, Object> outer = new HashMap<>();
outer.put(ActivityExecutorContext.OUTPUTS, innerMap);
ObjectValue objectValue = Variables.objectValue(outer)
.serializationDataFormat(Variables.SerializationDataFormats.JSON)
.create();
ObjectValue objectValue =
Variables.objectValue(outer).serializationDataFormat(Variables.SerializationDataFormats.JSON).create();

// flatten outputs for message correlation
Map<String, Object> flattenOutputs = new HashMap<>();
Expand Down Expand Up @@ -267,6 +263,5 @@ public File getResourceFile(Path resourcePath) throws IOException {
public Path saveResource(Path resourcePath, byte[] content) throws IOException {
return resourceLoader.saveResource(resourcePath, content);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@
import com.symphony.bdk.workflow.engine.WorkflowDirectedGraph.NodeChildren;
import com.symphony.bdk.workflow.engine.WorkflowNode;
import com.symphony.bdk.workflow.engine.WorkflowNodeType;
import com.symphony.bdk.workflow.engine.camunda.CamundaExecutor;
import com.symphony.bdk.workflow.engine.camunda.CamundaTranslatedWorkflowContext;
import com.symphony.bdk.workflow.engine.camunda.WorkflowDirectedGraphService;
import com.symphony.bdk.workflow.engine.camunda.bpmn.builder.WorkflowNodeBpmnBuilderRegistry;
import com.symphony.bdk.workflow.engine.camunda.variable.VariablesListener;
import com.symphony.bdk.workflow.swadl.v1.Activity;
import com.symphony.bdk.workflow.swadl.v1.Workflow;
import com.symphony.bdk.workflow.swadl.v1.activity.BaseActivity;

import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.RequiredArgsConstructor;
Expand All @@ -29,10 +32,18 @@
import org.camunda.bpm.model.bpmn.builder.ExclusiveGatewayBuilder;
import org.camunda.bpm.model.bpmn.builder.ProcessBuilder;
import org.camunda.bpm.model.bpmn.builder.SubProcessBuilder;
import org.camunda.bpm.model.bpmn.instance.camunda.CamundaEntry;
import org.camunda.bpm.model.bpmn.instance.camunda.CamundaInputOutput;
import org.camunda.bpm.model.bpmn.instance.camunda.CamundaInputParameter;
import org.camunda.bpm.model.bpmn.instance.camunda.CamundaMap;
import org.camunda.bpm.model.xml.ModelValidationException;
import org.springframework.stereotype.Component;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* Events are created with async before to make sure they are not blocking the dispatch of events (starting or
Expand Down Expand Up @@ -79,7 +90,8 @@ public Deployment deployWorkflow(CamundaTranslatedWorkflowContext context) {

private DeploymentBuilder setWorkflowTokenIfExists(DeploymentBuilder deploymentBuilder, Workflow workflow) {
workflow.getActivities().forEach(activity -> {
Optional<String> token = activity.getEvents().getEvents()
Optional<String> token = activity.getEvents()
.getEvents()
.stream()
.filter(event -> event.getRequestReceived() != null && event.getRequestReceived().getToken() != null)
.map(event -> event.getRequestReceived().getToken())
Expand All @@ -101,11 +113,61 @@ private CamundaTranslatedWorkflowContext workflowToBpmn(Workflow workflow) throw
BuildProcessContext context = new BuildProcessContext(workflowDirectedGraph, process);
buildWorkflowInDfs(new NodeChildren(context.getStartEvents()), "", context);
AbstractFlowNodeBuilder<?, ?> builder = closeUpSubProcessesIfAny(context, context.getLastNodeBuilder());

BpmnModelInstance instance = builder.done();
process.addExtensionElement(VariablesListener.create(instance, workflow.getVariables()));
injectActivityDefAsInput(instance, workflow.getActivities());
return new CamundaTranslatedWorkflowContext(workflow, workflowDirectedGraph, instance);
}

/**
* Fix bug where the activity definition contains a variable, which could be resolved, during the workflow process,
* with a big size exceeding the Camunda DB text size limit. The fix is to make the activity definition an map object
* as the input parameter, instead of a simple string previously. An object parameter variable is stored in bytearray table
* as BLOB type, while a string has a limit of 4000 characters.
*
* @param instance the bpmn model instance being built
* @param activities the swadl activity list
* @throws JsonProcessingException json serialisation exception
*/
private void injectActivityDefAsInput(BpmnModelInstance instance, List<Activity> activities)
throws JsonProcessingException {
Map<String, BaseActivity> activityMap =
activities.stream().collect(Collectors.toMap(a -> a.getActivity().getId(), Activity::getActivity));

Collection<CamundaInputOutput> activityInputOutputElements =
instance.getModelElementsByType(CamundaInputOutput.class);
for (CamundaInputOutput inputOutput : activityInputOutputElements) {
CamundaInputParameter activityNameInputParam = extractActivityNameInputParam(inputOutput);
addSerialisedActivityInputParam(instance, inputOutput, activityNameInputParam, activityMap);
}
}

private void addSerialisedActivityInputParam(BpmnModelInstance instance, CamundaInputOutput inputOutput,
CamundaInputParameter activityNameInputParam, Map<String, BaseActivity> activityMap)
throws JsonProcessingException {
CamundaMap map = instance.newInstance(CamundaMap.class);
CamundaEntry entry = instance.newInstance(CamundaEntry.class);
CamundaInputParameter inputParameter = instance.newInstance(CamundaInputParameter.class);

String activityName = activityNameInputParam.getTextContent();
entry.setCamundaKey(activityName);
entry.setTextContent(CamundaExecutor.OBJECT_MAPPER.writeValueAsString(activityMap.get(activityName)));
map.getCamundaEntries().add(entry);

inputParameter.setCamundaName(CamundaExecutor.SERIALISED_ACTIVITY);
inputParameter.setValue(map);
inputOutput.addChildElement(inputParameter);
}

private static CamundaInputParameter extractActivityNameInputParam(CamundaInputOutput inputOutput) {
return inputOutput.getChildElementsByType(CamundaInputParameter.class)
.stream()
.filter(input -> CamundaExecutor.ACTIVITY.equals(input.getCamundaName()))
.findFirst()
.orElseThrow(() -> new IllegalStateException("Activity missing its name"));
}

private AbstractFlowNodeBuilder<?, ?> closeUpSubProcessesIfAny(BuildProcessContext context,
AbstractFlowNodeBuilder<?, ?> builder) {
while (context.hasEventSubProcess()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import com.symphony.bdk.workflow.engine.WorkflowNode;
import com.symphony.bdk.workflow.engine.camunda.bpmn.BuildProcessContext;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.camunda.bpm.model.bpmn.builder.AbstractFlowNodeBuilder;
import org.camunda.bpm.model.bpmn.builder.AbstractGatewayBuilder;
import org.camunda.bpm.model.bpmn.builder.SubProcessBuilder;
Expand All @@ -15,7 +14,7 @@ public abstract class AbstractNodeBpmnBuilder implements WorkflowNodeBpmnBuilder

@Override
public AbstractFlowNodeBuilder<?, ?> connect(WorkflowNode element, String parentId,
AbstractFlowNodeBuilder<?, ?> builder, BuildProcessContext context) throws JsonProcessingException {
AbstractFlowNodeBuilder<?, ?> builder, BuildProcessContext context) {
String nodeId = element.getId();
if (context.isAlreadyBuilt(nodeId)) {
if (element.isConditional()) {
Expand Down Expand Up @@ -63,5 +62,5 @@ protected void connectToExistingNode(String nodeId, AbstractFlowNodeBuilder<?, ?
}

protected abstract AbstractFlowNodeBuilder<?, ?> build(WorkflowNode element, String parentId,
AbstractFlowNodeBuilder<?, ?> builder, BuildProcessContext context) throws JsonProcessingException;
AbstractFlowNodeBuilder<?, ?> builder, BuildProcessContext context);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import com.symphony.bdk.workflow.engine.camunda.bpmn.BuildProcessContext;
import com.symphony.bdk.workflow.swadl.v1.EventWithTimeout;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.camunda.bpm.model.bpmn.builder.AbstractCatchEventBuilder;
import org.camunda.bpm.model.bpmn.builder.AbstractFlowNodeBuilder;
import org.camunda.bpm.model.bpmn.builder.AbstractGatewayBuilder;
Expand All @@ -17,7 +16,7 @@ public class ActivityExpiredNodeBuilder extends ActivityNodeBuilder {

@Override
public AbstractFlowNodeBuilder<?, ?> build(WorkflowNode element, String parentId,
AbstractFlowNodeBuilder<?, ?> builder, BuildProcessContext context) throws JsonProcessingException {
AbstractFlowNodeBuilder<?, ?> builder, BuildProcessContext context) {
if (hasNoExclusiveFormReplyParent(element, context)) {
if (context.hasTimeoutSubProcess()) {
builder = context.removeLastSubProcessTimeoutBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import com.symphony.bdk.workflow.engine.WorkflowNodeType;
import com.symphony.bdk.workflow.engine.camunda.bpmn.BuildProcessContext;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.camunda.bpm.model.bpmn.builder.AbstractActivityBuilder;
import org.camunda.bpm.model.bpmn.builder.AbstractFlowNodeBuilder;
import org.springframework.stereotype.Component;
Expand All @@ -14,17 +13,13 @@ public class ActivityFailedNodeBuilder extends ActivityNodeBuilder {

@Override
protected void connectToExistingNode(String nodeId, AbstractFlowNodeBuilder<?, ?> builder) {
((AbstractActivityBuilder<?, ?>) builder).boundaryEvent()
.name("error_" + nodeId)
.error().connectTo(nodeId);
((AbstractActivityBuilder<?, ?>) builder).boundaryEvent().name("error_" + nodeId).error().connectTo(nodeId);
}

@Override
public AbstractFlowNodeBuilder<?, ?> build(WorkflowNode element, String parentId,
AbstractFlowNodeBuilder<?, ?> builder, BuildProcessContext context) throws JsonProcessingException {
builder = ((AbstractActivityBuilder<?, ?>) builder).boundaryEvent()
.name("error_" + element.getId())
.error();
AbstractFlowNodeBuilder<?, ?> builder, BuildProcessContext context) {
builder = ((AbstractActivityBuilder<?, ?>) builder).boundaryEvent().name("error_" + element.getId()).error();
return addTask(builder, element.getActivity());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import com.symphony.bdk.workflow.swadl.v1.activity.BaseActivity;
import com.symphony.bdk.workflow.swadl.v1.activity.ExecuteScript;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.camunda.bpm.engine.delegate.ExecutionListener;
import org.camunda.bpm.model.bpmn.builder.AbstractFlowNodeBuilder;
import org.springframework.stereotype.Component;
Expand All @@ -19,7 +18,7 @@ public class ActivityNodeBuilder extends AbstractNodeBpmnBuilder {

@Override
public AbstractFlowNodeBuilder<?, ?> build(WorkflowNode element, String parentId,
AbstractFlowNodeBuilder<?, ?> builder, BuildProcessContext context) throws JsonProcessingException {
AbstractFlowNodeBuilder<?, ?> builder, BuildProcessContext context) {
return addTask(builder, element.getActivity());
}

Expand All @@ -28,8 +27,7 @@ public WorkflowNodeType type() {
return WorkflowNodeType.ACTIVITY;
}

protected AbstractFlowNodeBuilder<?, ?> addTask(AbstractFlowNodeBuilder<?, ?> eventBuilder, BaseActivity activity)
throws JsonProcessingException {
protected AbstractFlowNodeBuilder<?, ?> addTask(AbstractFlowNodeBuilder<?, ?> eventBuilder, BaseActivity activity) {
// hardcoded so we can rely on Camunda's script task instead of a service task
if (activity instanceof ExecuteScript) {
return addScriptTask(eventBuilder, (ExecuteScript) activity);
Expand All @@ -48,15 +46,14 @@ public WorkflowNodeType type() {
.camundaExecutionListenerClass(ExecutionListener.EVENTNAME_START, ScriptTaskAuditListener.class);
}

private AbstractFlowNodeBuilder<?, ?> addServiceTask(AbstractFlowNodeBuilder<?, ?> builder, BaseActivity activity)
throws JsonProcessingException {
private AbstractFlowNodeBuilder<?, ?> addServiceTask(AbstractFlowNodeBuilder<?, ?> builder, BaseActivity activity) {
return builder.serviceTask()
.id(activity.getId())
.name(activity.getId())
.camundaAsyncAfter()
.camundaClass(CamundaExecutor.class)
.camundaInputParameter(CamundaExecutor.EXECUTOR,
ActivityRegistry.getActivityExecutors().get(activity.getClass()).getName())
.camundaInputParameter(CamundaExecutor.ACTIVITY, CamundaExecutor.OBJECT_MAPPER.writeValueAsString(activity));
.camundaInputParameter(CamundaExecutor.ACTIVITY, activity.getId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import com.symphony.bdk.workflow.engine.camunda.bpmn.BuildProcessContext;
import com.symphony.bdk.workflow.engine.camunda.variable.FormVariableListener;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.camunda.bpm.engine.delegate.ExecutionListener;
import org.camunda.bpm.model.bpmn.builder.AbstractFlowNodeBuilder;
import org.springframework.stereotype.Component;
Expand All @@ -14,7 +13,7 @@
public class JoinActivityNodeBuilder extends AbstractNodeBpmnBuilder {
@Override
protected AbstractFlowNodeBuilder<?, ?> build(WorkflowNode element, String parentId,
AbstractFlowNodeBuilder<?, ?> builder, BuildProcessContext context) throws JsonProcessingException {
AbstractFlowNodeBuilder<?, ?> builder, BuildProcessContext context) {
return builder.parallelGateway(element.getId())
.camundaExecutionListenerClass(ExecutionListener.EVENTNAME_START, FormVariableListener.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@
import com.symphony.bdk.workflow.engine.WorkflowNodeType;
import com.symphony.bdk.workflow.engine.camunda.bpmn.BuildProcessContext;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.camunda.bpm.model.bpmn.builder.AbstractFlowNodeBuilder;

public interface WorkflowNodeBpmnBuilder {
String ERROR_CODE = "408";
String DEFAULT_FORM_REPLIED_EVENT_TIMEOUT = "PT24H";

AbstractFlowNodeBuilder<?, ?> connect(WorkflowNode element, String parentId, AbstractFlowNodeBuilder<?, ?> builder,
BuildProcessContext context) throws JsonProcessingException;
BuildProcessContext context);

WorkflowNodeType type();
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.symphony.bdk.workflow.engine.executor.request;

import static com.symphony.bdk.workflow.engine.camunda.CamundaExecutor.OBJECT_MAPPER;

import com.symphony.bdk.workflow.engine.executor.ActivityExecutor;
import com.symphony.bdk.workflow.engine.executor.ActivityExecutorContext;
import com.symphony.bdk.workflow.engine.executor.request.client.HttpClient;
Expand Down Expand Up @@ -45,12 +43,10 @@ public void execute(ActivityExecutorContext<ExecuteRequest> execution) throws IO
headersToString(activity.getHeaders()));

log.info("Received response {}", response.getCode());
String valueAsString = OBJECT_MAPPER.writeValueAsString(response.getContent());

Map<String, Object> outputs = new HashMap<>();
outputs.put(OUTPUT_STATUS_KEY, response.getCode());
outputs.put(OUTPUT_BODY_KEY,
valueAsString.length() > 1000 ? valueAsString.substring(0, 1000) : response.getContent());
outputs.put(OUTPUT_BODY_KEY, response.getContent());
execution.setOutputVariables(outputs);
}

Expand Down