Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge branch 'dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
kishorebanala committed Jul 14, 2018
2 parents bdd7061 + 3710e09 commit e6947b6
Show file tree
Hide file tree
Showing 29 changed files with 958 additions and 560 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ buildscript {
}
}
plugins {
id 'nebula.netflixoss' version '5.0.0'
id 'nebula.netflixoss' version '5.1.1'
}

// Establish version and status
Expand Down
71 changes: 60 additions & 11 deletions client/python/conductor/conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def post(self, resPath, queryParams, body, headers=None):
theUrl = "{}/{}".format(self.baseURL, resPath)
theHeader = self.headers
if headers is not None:
theHeader = dict(self.headers.items() + headers.items())
theHeader = self.mergeTwoDicts(self.headers, headers)
if body is not None:
jsonBody = json.dumps(body, ensure_ascii=False)
resp = requests.post(theUrl, params=queryParams, data=jsonBody, headers=theHeader)
Expand All @@ -58,7 +58,7 @@ def put(self, resPath, queryParams=None, body=None, headers=None):
theUrl = "{}/{}".format(self.baseURL, resPath)
theHeader = self.headers
if headers is not None:
theHeader = dict(self.headers.items() + headers.items())
theHeader = self.mergeTwoDicts(self.headers, headers)

if body is not None:
jsonBody = json.dumps(body, ensure_ascii=False)
Expand All @@ -75,10 +75,20 @@ def delete(self, resPath, queryParams):
self.__print(resp)
self.__checkForSuccess(resp)

def makeUrl(self, urlformat, *argv):
url = self.baseResource + '/' + urlformat.format(*argv)
def makeUrl(self, urlformat=None, *argv):
url = self.baseResource + '/'
if urlformat:
url += urlformat.format(*argv)
return url

def makeParams(self, **kwargs):
return dict((k, v) for k, v in kwargs.items() if v is not None) or None

def mergeTwoDicts(self, x, y):
z = x.copy()
z.update(y)
return z

def __print(self, resp):
if self.printUrl:
print(resp.url)
Expand Down Expand Up @@ -110,9 +120,7 @@ def __init__(self, baseURL):

def getWorkflowDef(self, wfname, version=1):
url = self.makeUrl('workflow/{}', wfname)
params = {}
params['version'] = version
return self.get(url, params)
return self.get(url, self.makeParams(version=version))

def createWorkflowDef(self, wfdObj):
url = self.makeUrl('workflow')
Expand All @@ -138,9 +146,9 @@ def registerTaskDef(self, taskDefObj):
url = self.makeUrl('taskdefs')
self.put(url, None, taskDefObj)

def unRegisterTaskDef(self, tdName):
def unRegisterTaskDef(self, tdName, reason=None):
url = self.makeUrl('taskdefs/{}', tdName)
self.delete(url)
self.delete(url, self.makeParams(reason=reason))

def getAllTaskDefs(self):
url = self.makeUrl('taskdefs')
Expand Down Expand Up @@ -202,9 +210,11 @@ def getTasksInQueue(self, taskName):
url = self.makeUrl('queue/{}', taskName)
return self.get(url)

def removeTaskFromQueue(self, taskId):
def removeTaskFromQueue(self, taskId, reason=None):
url = self.makeUrl('queue/{}', taskId)
self.delete(url)
params = {}
params['reason'] = reason
self.delete(url, params)

def getTaskQueueSizes(self, listOfTaskName):
url = self.makeUrl('queue/sizes')
Expand Down Expand Up @@ -245,6 +255,10 @@ def terminateWorkflow(self, wfId, reason=None):
params['reason'] = reason
self.delete(url, params)

def removeWorkflow(self, wfId, archiveWorkflow, reason=None):
url = self.makeUrl('{}/remove', wfId)
self.delete(url, self.makeParams(archiveWorkflow=archiveWorkflow, reason=reason))

def pauseWorkflow(self, wfId):
url = self.makeUrl('{}/pause', wfId)
self.put(url)
Expand All @@ -267,6 +281,41 @@ def restartWorkflow(self, wfId, taskRefName, fromTaskRef):
params['from'] = fromTaskRef
self.post(url, params, None)

class EventServicesClient(BaseClient):
BASE_RESOURCE = 'event'

def __init__(self, baseURL):
BaseClient.__init__(self, baseURL, self.BASE_RESOURCE)

def getEventHandlerDef(self, event, activeOnly=True):
url = self.makeUrl('{}', event)
params = {}
params['activeOnly'] = activeOnly
return self.get(url, params)

def getEventHandlerDefs(self):
url = self.makeUrl()
return self.get(url)

def createEventHandlerDef(self, ehObj):
url = self.makeUrl()
return self.post(url, None, ehObj)

def updateEventHandlerDef(self, ehObj):
url = self.makeUrl()
return self.put(url, None, ehObj)

def removeEventHandler(self, ehName):
url = self.makeUrl('{}', ehName)
self.delete(url, {})

def getEventHandlerQueues(self):
url = self.makeUrl('queues')
return self.get(url)

def getEventHandlerQueuesProviders(self):
url = self.makeUrl('queues/providers')
return self.get(url)

class WFClientMgr:
def __init__(self, server_url='http://localhost:8080/api/'):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,6 @@ public void retry(String workflowId) throws Exception {
executionDAO.updateTasks(workflow.getTasks());

decide(workflowId);

}

public Task getPendingTaskByWorkflow(String taskReferenceName, String workflowId) {
Expand Down Expand Up @@ -339,11 +338,20 @@ void completeWorkflow(Workflow wf) throws Exception {
logger.debug("Completed workflow execution for {}", wf.getWorkflowId());
executionDAO.updateTasks(wf.getTasks());

// If the following task, for some reason fails, the sweep will take
// care of this again!
// If the following task, for some reason fails, the sweep will take care of this again!
if (workflow.getParentWorkflowId() != null) {
Workflow parent = executionDAO.getWorkflow(workflow.getParentWorkflowId(), false);
WorkflowDef parentDef = metadataDAO.get(parent.getWorkflowType(), parent.getVersion());
logger.debug("Completed sub-workflow {}, deciding parent workflow {}", wf.getWorkflowId(), wf.getParentWorkflowId());

Task parentWorkflowTask = executionDAO.getTask(workflow.getParentWorkflowTaskId());
// If parent is FAILED and the sub workflow task in parent is FAILED, we want to resume them
if (StringUtils.isBlank(parentDef.getFailureWorkflow()) && parent.getStatus() == WorkflowStatus.FAILED && parentWorkflowTask.getStatus() == FAILED) {
parentWorkflowTask.setStatus(IN_PROGRESS);
executionDAO.updateTask(parentWorkflowTask);
parent.setStatus(WorkflowStatus.RUNNING);
executionDAO.updateWorkflow(parent);
}
decide(parent.getWorkflowId());
}
Monitors.recordWorkflowCompletion(workflow.getWorkflowType(), workflow.getEndTime() - workflow.getStartTime(), wf.getOwnerApp());
Expand Down Expand Up @@ -420,7 +428,7 @@ public void terminateWorkflow(Workflow workflow, String reason, String failureWo

public void updateTask(TaskResult taskResult) throws Exception {
if (taskResult == null) {
logger.info("null task given for update..." + taskResult);
logger.info("null task given for update");
throw new ApplicationException(Code.INVALID_INPUT, "Task object is null");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public class ElasticSearchDAO implements IndexDAO {

private static final String TASK_DOC_TYPE = "task";

private static final String LOG_DOC_TYPE = "task";
private static final String LOG_DOC_TYPE = "task_log";

private static final String EVENT_DOC_TYPE = "event";

Expand Down Expand Up @@ -182,17 +182,17 @@ private void updateIndexName(Configuration config) {
*/
private void initIndex() throws Exception {

//0. Add the index template
GetIndexTemplatesResponse result = elasticSearchClient.admin().indices().prepareGetTemplates("wfe_template").execute().actionGet();
//0. Add the tasklog template
GetIndexTemplatesResponse result = elasticSearchClient.admin().indices().prepareGetTemplates("tasklog_template").execute().actionGet();
if (result.getIndexTemplates().isEmpty()) {
logger.info("Creating the index template 'wfe_template'");
InputStream stream = ElasticSearchDAO.class.getResourceAsStream("/template.json");
logger.info("Creating the index template 'tasklog_template'");
InputStream stream = ElasticSearchDAO.class.getResourceAsStream("/template_tasklog.json");
byte[] templateSource = IOUtils.toByteArray(stream);

try {
elasticSearchClient.admin().indices().preparePutTemplate("wfe_template").setSource(templateSource).execute().actionGet();
elasticSearchClient.admin().indices().preparePutTemplate("tasklog_template").setSource(templateSource).execute().actionGet();
} catch (Exception e) {
logger.error(e.getMessage(), e);
logger.error("Failed to init tasklog_template", e);
}
}

Expand All @@ -206,11 +206,11 @@ private void initIndex() throws Exception {
}
}

//2. Mapping for the workflow document type
GetMappingsResponse response = elasticSearchClient.admin().indices().prepareGetMappings(indexName).addTypes(WORKFLOW_DOC_TYPE).execute().actionGet();
if (response.mappings().isEmpty()) {
//2. Add Mappings for the workflow document type
GetMappingsResponse getMappingsResponse = elasticSearchClient.admin().indices().prepareGetMappings(indexName).addTypes(WORKFLOW_DOC_TYPE).execute().actionGet();
if (getMappingsResponse.mappings().isEmpty()) {
logger.info("Adding the workflow type mappings");
InputStream stream = ElasticSearchDAO.class.getResourceAsStream("/wfe_type.json");
InputStream stream = ElasticSearchDAO.class.getResourceAsStream("/mappings_docType_workflow.json");
byte[] bytes = IOUtils.toByteArray(stream);
String source = new String(bytes);
try {
Expand All @@ -219,6 +219,20 @@ private void initIndex() throws Exception {
logger.error(e.getMessage(), e);
}
}

//3. Add Mappings for task document type
getMappingsResponse = elasticSearchClient.admin().indices().prepareGetMappings(indexName).addTypes(TASK_DOC_TYPE).execute().actionGet();
if (getMappingsResponse.mappings().isEmpty()) {
logger.info("Adding the task type mappings");
InputStream stream = ElasticSearchDAO.class.getResourceAsStream("/mappings_docType_task.json");
byte[] bytes = IOUtils.toByteArray(stream);
String source = new String(bytes);
try {
elasticSearchClient.admin().indices().preparePutMapping(indexName).setType(TASK_DOC_TYPE).setSource(source).execute().actionGet();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}

@Override
Expand Down
68 changes: 68 additions & 0 deletions es2-persistence/src/main/resources/mappings_docType_task.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
{
"task": {
"properties": {
"correlationId": {
"type": "string",
"index": "not_analyzed"
},
"endTime": {
"type": "date",
"format": "strict_date_optional_time||epoch_millis"
},
"executionTime": {
"type": "long"
},
"input": {
"type": "string",
"index": "analyzed"
},
"output": {
"type": "string",
"index": "analyzed"
},
"queueWaitTime": {
"type": "long"
},
"reasonForIncompletion": {
"type": "string",
"index": "not_analyzed"
},
"scheduledTime": {
"type": "date",
"format": "strict_date_optional_time||epoch_millis"
},
"startTime": {
"type": "date",
"format": "strict_date_optional_time||epoch_millis"
},
"status": {
"type": "string",
"index": "not_analyzed"
},
"taskDefName": {
"type": "string",
"index": "not_analyzed"
},
"taskId": {
"type": "string",
"index": "not_analyzed"
},
"taskType": {
"type": "string",
"index": "not_analyzed"
},
"updateTime": {
"type": "date",
"format": "strict_date_optional_time||epoch_millis"
},
"workflowId": {
"type": "string",
"index": "not_analyzed"
},
"workflowType": {
"type": "string",
"index": "not_analyzed"
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
{
"workflow": {
"properties": {
"archived": {
"type": "boolean"
},
"correlationId": {
"type": "string",
"index": "not_analyzed",
Expand Down Expand Up @@ -69,6 +66,10 @@
"type": "string",
"index": "no",
"include_in_all": false
},
"event": {
"index": "not_analyzed",
"type": "string"
}
}
}
Expand Down
Loading

0 comments on commit e6947b6

Please sign in to comment.