Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Added dedicated methods to serialize/deserialize input/output data #2919

Merged
merged 1 commit into from
Apr 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 37 additions & 4 deletions core/src/main/java/com/netflix/conductor/model/TaskModel.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.protobuf.Any;

public class TaskModel {
Expand Down Expand Up @@ -69,8 +70,6 @@ public boolean isRetriable() {

private Status status;

private Map<String, Object> inputData = new HashMap<>();

private String referenceTaskName;

private int retryCount;
Expand Down Expand Up @@ -119,8 +118,6 @@ public boolean isRetriable() {

private String workerId;

private Map<String, Object> outputData = new HashMap<>();

private WorkflowTask workflowTask;

private String domain;
Expand Down Expand Up @@ -159,6 +156,10 @@ public boolean isRetriable() {

@JsonIgnore private Map<String, Object> outputPayload = new HashMap<>();

@JsonIgnore private Map<String, Object> inputData = new HashMap<>();

@JsonIgnore private Map<String, Object> outputData = new HashMap<>();

public String getTaskType() {
return taskType;
}
Expand All @@ -175,17 +176,33 @@ public void setStatus(Status status) {
this.status = status;
}

@JsonIgnore
public Map<String, Object> getInputData() {
return externalInputPayloadStoragePath != null ? inputPayload : inputData;
}

@JsonIgnore
public void setInputData(Map<String, Object> inputData) {
if (inputData == null) {
inputData = new HashMap<>();
}
this.inputData = inputData;
}

/** @deprecated Used only for JSON serialization and deserialization. */
@JsonProperty("inputData")
@Deprecated
public void setRawInputData(Map<String, Object> inputData) {
setInputData(inputData);
}

/** @deprecated Used only for JSON serialization and deserialization. */
@JsonProperty("inputData")
@Deprecated
public Map<String, Object> getRawInputData() {
return inputData;
}

public String getReferenceTaskName() {
return referenceTaskName;
}
Expand Down Expand Up @@ -365,17 +382,33 @@ public void setWorkerId(String workerId) {
this.workerId = workerId;
}

@JsonIgnore
public Map<String, Object> getOutputData() {
return externalOutputPayloadStoragePath != null ? outputPayload : outputData;
}

@JsonIgnore
public void setOutputData(Map<String, Object> outputData) {
if (outputData == null) {
outputData = new HashMap<>();
}
this.outputData = outputData;
}

/** @deprecated Used only for JSON serialization and deserialization. */
@JsonProperty("outputData")
@Deprecated
public void setRawOutputData(Map<String, Object> inputData) {
setOutputData(inputData);
}

/** @deprecated Used only for JSON serialization and deserialization. */
@JsonProperty("outputData")
@Deprecated
public Map<String, Object> getRawOutputData() {
return outputData;
}

public WorkflowTask getWorkflowTask() {
return workflowTask;
}
Expand Down
49 changes: 41 additions & 8 deletions core/src/main/java/com/netflix/conductor/model/WorkflowModel.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.netflix.conductor.common.run.Workflow;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;

public class WorkflowModel {
Expand Down Expand Up @@ -63,14 +64,6 @@ public boolean isSuccessful() {

private List<TaskModel> tasks = new LinkedList<>();

private Map<String, Object> input = new HashMap<>();

private Map<String, Object> output = new HashMap<>();

@JsonIgnore private Map<String, Object> inputPayload = new HashMap<>();

@JsonIgnore private Map<String, Object> outputPayload = new HashMap<>();

private String correlationId;

private String reRunFromWorkflowId;
Expand Down Expand Up @@ -110,6 +103,14 @@ public boolean isSuccessful() {

private Status previousStatus;

@JsonIgnore private Map<String, Object> input = new HashMap<>();

@JsonIgnore private Map<String, Object> output = new HashMap<>();

@JsonIgnore private Map<String, Object> inputPayload = new HashMap<>();

@JsonIgnore private Map<String, Object> outputPayload = new HashMap<>();

public Status getPreviousStatus() {
return previousStatus;
}
Expand Down Expand Up @@ -170,28 +171,60 @@ public void setTasks(List<TaskModel> tasks) {
this.tasks = tasks;
}

@JsonIgnore
public Map<String, Object> getInput() {
return externalInputPayloadStoragePath != null ? inputPayload : input;
}

@JsonIgnore
public void setInput(Map<String, Object> input) {
if (input == null) {
input = new HashMap<>();
}
this.input = input;
}

@JsonIgnore
public Map<String, Object> getOutput() {
return externalOutputPayloadStoragePath != null ? outputPayload : output;
}

@JsonIgnore
public void setOutput(Map<String, Object> output) {
if (output == null) {
output = new HashMap<>();
}
this.output = output;
}

/** @deprecated Used only for JSON serialization and deserialization. */
@Deprecated
@JsonProperty("input")
public Map<String, Object> getRawInput() {
return input;
}

/** @deprecated Used only for JSON serialization and deserialization. */
@Deprecated
@JsonProperty("input")
public void setRawInput(Map<String, Object> input) {
setInput(input);
}

/** @deprecated Used only for JSON serialization and deserialization. */
@Deprecated
@JsonProperty("output")
public Map<String, Object> getRawOutput() {
return output;
}

/** @deprecated Used only for JSON serialization and deserialization. */
@Deprecated
@JsonProperty("output")
public void setRawOutput(Map<String, Object> output) {
setOutput(output);
}

public String getCorrelationId() {
return correlationId;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.model

import com.netflix.conductor.common.config.ObjectMapperProvider

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import spock.lang.Specification
import spock.lang.Subject

class TaskModelSpec extends Specification {

@Subject
TaskModel taskModel

private static final ObjectMapper objectMapper = new ObjectMapperProvider().getObjectMapper()

def setup() {
taskModel = new TaskModel()
}

def "check inputData serialization"() {
given:
String path = "task/input/${UUID.randomUUID()}.json"
taskModel.addInput(['key1': 'value1', 'key2': 'value2'])
taskModel.externalizeInput(path)

when:
def json = objectMapper.writeValueAsString(taskModel)
println(json)

then:
json != null
JsonNode node = objectMapper.readTree(json)
node.path("inputData").isEmpty()
node.path("externalInputPayloadStoragePath").isTextual()
}

def "check outputData serialization"() {
given:
String path = "task/output/${UUID.randomUUID()}.json"
taskModel.addOutput(['key1': 'value1', 'key2': 'value2'])
taskModel.externalizeOutput(path)

when:
def json = objectMapper.writeValueAsString(taskModel)
println(json)

then:
json != null
JsonNode node = objectMapper.readTree(json)
node.path("outputData").isEmpty()
node.path("externalOutputPayloadStoragePath").isTextual()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.model

import com.netflix.conductor.common.config.ObjectMapperProvider
import com.netflix.conductor.common.metadata.workflow.WorkflowDef

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import spock.lang.Specification
import spock.lang.Subject

class WorkflowModelSpec extends Specification {

@Subject
WorkflowModel workflowModel

private static final ObjectMapper objectMapper = new ObjectMapperProvider().getObjectMapper()

def setup() {
def workflowDef = new WorkflowDef(name: "test def name", version: 1)
workflowModel = new WorkflowModel(workflowDefinition: workflowDef)
}

def "check input serialization"() {
given:
String path = "task/input/${UUID.randomUUID()}.json"
workflowModel.input = ['key1': 'value1', 'key2': 'value2']
workflowModel.externalizeInput(path)

when:
def json = objectMapper.writeValueAsString(workflowModel)
println(json)

then:
json != null
JsonNode node = objectMapper.readTree(json)
node.path("input").isEmpty()
node.path("externalInputPayloadStoragePath").isTextual()
}

def "check output serialization"() {
given:
String path = "task/output/${UUID.randomUUID()}.json"
workflowModel.output = ['key1': 'value1', 'key2': 'value2']
workflowModel.externalizeOutput(path)

when:
def json = objectMapper.writeValueAsString(workflowModel)
println(json)

then:
json != null
JsonNode node = objectMapper.readTree(json)
node.path("output").isEmpty()
node.path("externalOutputPayloadStoragePath").isTextual()
}
}