diff --git a/build/lib/control/action.py b/build/lib/control/action.py deleted file mode 100644 index 606bfd6..0000000 --- a/build/lib/control/action.py +++ /dev/null @@ -1,713 +0,0 @@ -str_about = ''' - The action module provides functionality to run individual - plugins as well as "pipelines" of plugins. - - This module is the contact "surface" between dypi and a CUBE - instance. Control/manipulation of the ChRIS instance is effected - by a set of CLI scripts that this module creates and then executes. - - NOTE: This module is "fragily" dependent on python-chrisclient and - caw! Changes in those modules could break things here rather - completely. -''' - -from . import jobber -from state import data -import os -os.environ['XDG_CONFIG_HOME'] = '/tmp' -import re -import pudb -import json -from argparse import ArgumentParser, Namespace -from chrisclient import client -import time - -class PluginRun: - ''' - A class wrapper about the CLI tool "chrispl-run" that POSTs a pl-shexec - to CUBE. - ''' - def __init__(self, *args, **kwargs): - self.env = None - self.plugin = '' - self.shell : jobber.Jobber = jobber.Jobber({ - 'verbosity' : 1, - 'noJobLogging': True - }) - self.attachToPluginID : str = '' - self.options : Namespace = None - for k, v in kwargs.items(): - if k == 'attachToPluginID' : self.attachToPluginID = v - if k == 'env' : self.env = v - if k == 'options' : self.options = v - - self.l_runCMDresp : list = [] - self.l_branchInstanceID : list = [] - - def PLpfdorun_args(self, str_input : str) -> dict: - ''' - Return the argument string pertinent to the pl-pfdorun plugin - ''' - # pudb.set_trace() - str_filter : str = "" - # Remove any '*' and/or '/' chars from pattern search. This will - # transform a string of '**/*dcm" to just 'dcm', suitable for pl-shexec - str_ff : str = re.subn(r'[*/]', '', self.options.pattern)[0] - if not self.options.inNode: - str_filter = "--fileFilter=%s" % str_input - else: - str_filter = "--dirFilter=%s" % str_input - if len(str_ff): str_filter += ";--fileFilter=%s" % str_ff - - str_args : str = """ - %s; - --exec=cp %%inputWorkingDir/%%inputWorkingFile %%outputWorkingDir/%%inputWorkingFile; - --noJobLogging; - --verbose=5; - --pftelDB=%s; - --title=%s; - --previous_id=%s - """ % (str_filter, self.options.pftelDB, str_input, self.env.CUBE.parentPluginInstanceID) - - str_args = re.sub(r';\n.*--', ';--', str_args) - str_args = str_args.strip() - return { - 'args': str_args - } - - def chrispl_onCUBEargs(self): - ''' - Return a string specifying the CUBE instance - ''' - return { - 'onCUBE': json.dumps(self.env.CUBE.onCUBE()) - } - - def chrispl_run_cmd(self, str_inputData : str) -> dict: - ''' - Return the CLI for the chrispl_run - ''' - str_cmd = """chrispl-run --plugin name=pl-shexec --args="%s" --onCUBE %s""" % ( - self.PLpfdorun_args(str_inputData)['args'], - json.dumps(self.chrispl_onCUBEargs()['onCUBE'], indent = 4) - ) - str_cmd = str_cmd.strip().replace('\n', '') - return { - 'cmd' : str_cmd - } - - def __call__(self, str_input : str, **kwargs) ->dict: - ''' - Copy the to the output using pl-pfdorun. If the in-node - self.options.inNode is true, perform a bulk copy of all files in the - passed directory that conform to the filter. - ''' - # Remove the '/incoming/' from the str_input - str_inputTarget : str = str_input.split('/')[-1] - d_PLCmd : dict = self.chrispl_run_cmd(str_inputTarget) - str_PLCmd : str = d_PLCmd['cmd'] - str_PLCmdfile : str = '/tmp/%s.sh' % str_inputTarget - branchID : int = -1 - b_status : bool = False - - str_append : str = "" - for k,v in kwargs.items(): - if k == 'append' : str_append = v - - str_PLCmd += " " + str_append - if self.options: - str_scriptDir : str = '%s/%s' % (self.options.outputdir, str_inputTarget) - os.makedirs(str_scriptDir, exist_ok = True) - str_PLCmdfile = '%s/%s/copy.sh' % (self.options.outputdir, str_inputTarget) - - with open(str_PLCmdfile, 'w') as f: - f.write('#!/bin/bash\n') - f.write(str_PLCmd) - os.chmod(str_PLCmdfile, 0o755) - d_runCMDresp : dict = self.shell.job_run(str_PLCmdfile) - if not d_runCMDresp['returncode']: - b_status = True - self.l_runCMDresp.append(d_runCMDresp) - branchID : int = d_runCMDresp['stdout'].split()[2] - self.l_branchInstanceID.append(branchID) - else: - b_status = False - - return { - 'status' : b_status, - 'run' : d_runCMDresp, - 'input' : str_input, - 'branchInstanceID' : branchID - } - -class LLDcomputeflow: - ''' - A class to create / manage the LLD compute flow - ''' - - def __init__(self, *args, **kwargs): - self.env : data.env = None - self.options : Namespace = None - - for k, v in kwargs.items(): - if k == 'env' : self.env = v - if k == 'options' : self.options = v - - self.cl : client.Client = None - self.cl = client.Client( - self.env.CUBE('url'), - self.env.CUBE('username'), - self.env.CUBE('password') - ) - self.d_pipelines : dict = self.cl.get_pipelines() - self.pltopo : int = self.cl.get_plugins({'name': 'pl-topologicalcopy'}) - self.newTreeID : int = -1 - self.ld_workflowhist : list = [] - self.ld_topologicalNode : dict = {'data': []} - - def pluginInstanceID_findWithTitle(self, - d_workflowDetail : dict, - node_title : str - ) -> int: - """ - Determine the plugin instance id in the `d_workflowDetail` that has - title substring . If the d_workflowDetail is simply a plugin - instance, return its id provided it has the . - - Args: - d_workflowDetail (dict): workflow detail data structure - node_title (str): the node to find - - Returns: - int: id of the found node, or -1 - """ - def plugin_hasTitle(d_plinfo : dict, title : str) -> bool: - """ - Does this node (`d_plinfo`) have this `title`? - - Args: - d_plinfo (dict): the plugin data description - title (str): the name of this node - - Returns: - bool: yay or nay - """ - - nonlocal pluginIDwithTitle - if title.lower() in d_plinfo['title'].lower(): - pluginIDwithTitle = d_plinfo['id'] - return True - else: - return False - - pluginIDwithTitle : int = -1 - d_plinfo : dict = {} - if 'data' in d_workflowDetail: - for d_plinfo in d_workflowDetail['data']: - if plugin_hasTitle(d_plinfo, node_title): break - else: - plugin_hasTitle(d_workflowDetail, node_title) - - return pluginIDwithTitle - - def waitForNodeInWorkflow(self, - d_workflowDetail : dict, - node_title : str, - **kwargs - ) -> dict: - """ - Wait for a node in a workflow to transition to a finishedState - - Args: - d_workflowDetail (dict): the workflow in which the node - exists - node_title (str): the title of the node to find - - kwargs: - waitPoll = the polling interval in seconds - totalPolls = total number of polling before abandoning; - if this is 0, then poll forever. - - Future: expand to wait on list of node_titles - - Returns: - dict: _description_ - """ - waitPoll : int = 5 - totalPolls : int = 100 - pollCount : int = 0 - b_finished : bool = False - waitOnPluginID : int = self.pluginInstanceID_findWithTitle( - d_workflowDetail, node_title - ) - str_pluginStatus: str = 'unknown' - d_plinfo : dict = {} - - for k,v in kwargs.items(): - if k == 'waitPoll': waitPoll = v - if k == 'totalPolls': totalPolls = v - - if waitOnPluginID >= 0: - while 'finished' not in str_pluginStatus.lower() and \ - pollCount <= totalPolls : - d_plinfo = self.cl.get_plugin_instance_by_id(waitOnPluginID) - str_pluginStatus = d_plinfo['status'] - time.sleep(waitPoll) - if totalPolls: pollCount += 1 - if 'finished' in d_plinfo['status']: - b_finished = d_plinfo['status'] == 'finishedSuccessfully' - return { - 'finished' : b_finished, - 'status' : str_pluginStatus, - 'workflow' : d_workflowDetail, - 'plinst' : d_plinfo, - 'polls' : pollCount, - 'plid' : waitOnPluginID - } - - def pluginParameters_setInNodes(self, - d_piping : dict, - d_pluginParameters : dict - ) -> dict: - """ - Override default parameters in the `d_piping` - - Args: - d_piping (dict): the current default parameters for the - plugins in a pipeline - d_pluginParameters (dict): a list of plugins and parameters to - set in the response - - Returns: - dict: a new piping structure with changes to some parameter values - if required. If no d_pluginParameters is passed, simply - return the piping unchanged. - - """ - for pluginTitle,d_parameters in d_pluginParameters.items(): - for piping in d_piping: - if pluginTitle in piping.get('title'): - for k,v in d_parameters.items(): - for d_default in piping.get('plugin_parameter_defaults'): - if k in d_default.get('name'): - d_default['default'] = v - return d_piping - - def pipelineWithName_getNodes( - self, - str_pipelineName : str, - d_pluginParameters : dict = {} - ) -> dict : - """ - Find a pipeline that contains the passed name - and if found, return a nodes dictionary. Optionally set relevant - plugin parameters to values described in - - - Args: - str_pipelineName (str): the name of the pipeline to find - d_pluginParameters (dict): a set of optional plugin parameter - overrides - - Returns: - dict: node dictionary (name, compute env, default parameters) - and id of the pipeline - """ - # pudb.set_trace() - id_pipeline : int = -1 - ld_node : list = [] - d_pipeline : dict = self.cl.get_pipelines({'name': str_pipelineName}) - if 'data' in d_pipeline: - id_pipeline : int = d_pipeline['data'][0]['id'] - d_response : dict = self.cl.get_pipeline_default_parameters( - id_pipeline, {'limit': 1000} - ) - if 'data' in d_response: - ld_node = self.pluginParameters_setInNodes( - self.cl.compute_workflow_nodes_info(d_response['data'], True), - d_pluginParameters) - for piping in ld_node: - if piping.get('compute_resource_name'): - del piping['compute_resource_name'] - return { - 'nodes' : ld_node, - 'id' : id_pipeline - } - - def workflow_schedule(self, - inputDataNodeID : str, - str_pipelineName : str, - d_pluginParameters : dict = {} - ) -> dict: - """ - Schedule a workflow that has name off a given node id - of . - - Args: - inputDataNodeID (str): id of parent node - str_pipelineName (str): substring of workflow name to connect - d_pluginParameters (dict): optional structure of default parameter - overrides - - Returns: - dict: result from calling the client `get_workflow_plugin_instances` - """ - d_pipeline : dict = self.pipelineWithName_getNodes( - str_pipelineName, d_pluginParameters - ) - d_workflow : dict = self.cl.create_workflow( - d_pipeline['id'], - { - 'previous_plugin_inst_id' : inputDataNodeID, - 'nodes_info' : json.dumps(d_pipeline['nodes']) - }) - d_workflowInst : dict = self.cl.get_workflow_plugin_instances( - d_workflow['id'], {'limit': 1000} - ) - self.ld_workflowhist.append({ - 'name' : str_pipelineName, - 'pipeline' : d_pipeline, - 'previous_plugin_inst_id' : inputDataNodeID, - 'pipeline_plugins' : d_workflowInst - }) - return d_workflowInst - - def topologicalNode_run(self, - str_nodeTitle : str, - l_nodes : list, - str_filterArgs - ) -> dict: - """ - Perform a toplogical join between nodes - - Args: - str_nodeTitle (str): title of this join node - l_nodes (list): list of node ids to join - (logical parent is node[0]) - str_filterArgs (_type_): CLI filter arguments - - Returns: - dict: the plugin instance creation data structure - """ - idTopo : int = self.pltopo['data'][0]['id'] - d_plInstTopo : dict = self.cl.create_plugin_instance( - idTopo, - { - 'filter' : str_filterArgs, - 'plugininstances' : ','.join(map(str, l_nodes)), - 'title' : str_nodeTitle, - 'previous_id' : l_nodes[0] - } - ) - self.ld_topologicalNode['data'].append(d_plInstTopo) - return d_plInstTopo - - def nodes_join(self, str_title : str, l_nodes : list, str_joinArgs : str): - d_topological_run : dict = self.topologicalNode_run( - str_title, l_nodes, str_joinArgs - ) - d_topological_done : dict = self.waitForNodeInWorkflow( - d_topological_run, - str_title - ) - return d_topological_done - - def parentNode_isFinished(self, *args) -> bool: - """ - Check if the parent node is finished at this instance. Return - appropriate bool. - - Returns: - bool: is parent done? True or False - - """ - d_parent : dict = None - b_finished : bool = False - if len(args) : d_parent = args[0] - if not d_parent : b_finished = True - else : b_finished = d_parent['finished'] - return b_finished - - def parentNode_IDappend(self, l_nodes : list, *args) -> list: - """ - Append the node ID of the parent in the *args to l_nodes and - return the new list - - Args: - l_nodes (list): a list of node IDs - - Returns: - list: the parent id appended to the end of the list - """ - d_parent : dict = None - if len(args): - d_parent = args[0] - l_nodes.append(self.parentNode_IDget(*args)) - return l_nodes - - def parentNode_IDget(self, *args) -> int: - """ - - Simply get the plugin instance of the passed parent node - - Returns: - int: parent plugin instance id of passed `d_parent` structure - """ - id : int = -1 - d_parent : dict = None - if len(args): - d_parent = args[0] - id = d_parent['plinst']['id'] - return id - - def pluginID_findInWorkflowDesc(self, tp_workflowAndNode : tuple) -> int : - """ - Given a tuple of (, ) substrings, - return the corresponding plugin instance id of node - in workflow . Handle the special case - when the "workflow" is a "topological" node. - - Args: - tp_workflowAndNode (tuple): two tuple element containing substrings - describing a workflow and a node within - that workflow - - Returns: - int: the plugin instance ID of the found node, otherwise -1 - """ - pluginID : int = -1 - l_hit : list = [] - workflow = None - if type(tp_workflowAndNode) == int: - return tp_workflowAndNode - str_workflow, str_node = tp_workflowAndNode - - # pudb.set_trace() - if str_workflow.lower() == 'topological': - workflow = self.ld_topologicalNode - else: - filterHit = filter(lambda p: str_workflow in p['name'], self.ld_workflowhist) - workflow = list(filterHit)[0]['pipeline_plugins'] - if workflow: - pluginID = self.pluginInstanceID_findWithTitle( - workflow, str_node - ) - return pluginID - - def nodeIDs_verify(self, l_nodeID : list) -> list[int]: - """ - - Verify that a list of contains only int - types. This will map any 'distalNodeIDs' that are string - tuples of (, ) to the corrsponding - plugin instance id - - - Args: - l_nodeID (list): node list to verify - - Returns: - list: list containing only node IDs - """ - l_nodeID: list[int] = [self.pluginID_findInWorkflowDesc(x) for x in l_nodeID] - return l_nodeID - - def flow_executeAndBlockUntilNodeComplete( - self, - *args, - **kwargs, - ) -> dict: - """ - Execute a workflow identified by a (sub string) in its - by anchoring it to in the - feed/compute tree. This can be supplied in the - kwargs, or if omitted, then the "parent" node passed in args[0] - is assumed to be the connector. - - Once attached to a node, the whole workflow is scheduled. This - workflow will have N>=1 compute nodes, each identified by a - title. This method will only "return" to a caller when one of - these nodes with 'waitForNodeWithTitle' enters the finished - state. Note that this state can be 'finishedSuccessfully' or - 'finishedWithError'. - - Possible future extension: block until node _list_ complete - """ - d_prior : dict = None - str_workflowTitle : str = "no workflow title" - attachToNodeID : int = -1 - str_blockNodeTitle : str = "no node title" - b_canFlow : bool = False - d_pluginParameters : dict = {} - d_ret : dict = {} - - for k, v in kwargs.items(): - if k == 'workflowTitle' : str_workflowTitle = v - if k == 'attachToNodeID' : attachToNodeID = v - if k == 'waitForNodeWithTitle' : str_blockNodeTitle = v - if k == 'pluginParameters' : d_pluginParameters = v - - if self.parentNode_isFinished(*args): - if attachToNodeID == -1: - attachToNodeID = self.parentNode_IDget(*args) - d_ret = self.waitForNodeInWorkflow( - self.workflow_schedule( - attachToNodeID, - str_workflowTitle, - d_pluginParameters - ), - str_blockNodeTitle, - **kwargs - ) - if len(args): - d_ret['prior'] = args[0] - else: - d_ret['prior'] = None - return d_ret - - def flows_connect( - self, - *args, - **kwargs) -> dict: - """ - Perform a toplogical join by using the args[0] as logical - parent and connect this parent to a list of distalNodeIDs - - - Returns: - dict: data structure on the nodes_join operation - """ - d_prior : dict = None - str_joinNodeTitle : str = "no title specified for topo node" - l_nodeID : list = [] - str_topoJoinArgs : str = "" - b_canFlow : bool = False - d_ret : dict = {} - b_invertOrder : bool = False - - # pudb.set_trace() - for k, v in kwargs.items(): - if k == 'connectionNodeTitle' : str_joinNodeTitle = v - if k == 'distalNodeIDs' : l_nodeID = v - if k == 'invertIDorder' : b_invert = v - if k == 'topoJoinArgs' : str_topoJoinArgs = v - - if self.parentNode_isFinished(*args): - l_nodeID = self.parentNode_IDappend(l_nodeID, *args) - l_nodeID = self.nodeIDs_verify(l_nodeID) - if b_invertOrder: l_nodeID.reverse() - d_ret = self.nodes_join( - str_joinNodeTitle, - l_nodeID, - str_topoJoinArgs - ) - if len(args): - d_ret['prior'] = args[0] - else: - d_ret['prior'] = None - return d_ret - - def computeFlow_build(self) -> dict: - """The main controller for the compute flow logic - - Somewhat pedantically, this method demonstrates how to inject - override parameters for certain plugin parameters in the - workflow. - - Returns: - dict: a composite structure of the last call executed. - """ - - self.env.set_trace() - totalPolls:int = 100 if not self.options.notimeout else 0 - - d_ret : dict = \ - self.flow_executeAndBlockUntilNodeComplete( - self.flows_connect( - self.flow_executeAndBlockUntilNodeComplete( - self.flows_connect( - self.flow_executeAndBlockUntilNodeComplete( - self.flows_connect( - self.flow_executeAndBlockUntilNodeComplete( - attachToNodeID = self.newTreeID, - workflowTitle = 'Leg Length Discrepency inference on DICOM inputs v20230324-1 using CPU', - waitForNodeWithTitle = 'heatmaps', - totalPolls = totalPolls, - pluginParameters = { - 'dcm-to-mha' : { - 'imageName' : 'composite.png', - 'rotate' : '90', - 'pftelDB' : self.options.pftelDB - }, - 'generate-landmark-heatmaps' : { - 'heatmapThreshold' : '0.5', - 'imageType' : 'jpg', - 'compositeWeight' : '0.3,0.7', - 'pftelDB' : self.options.pftelDB - } - } - ), - connectionNodeTitle = 'mergeDICOMSwithInference', - distalNodeIDs = [self.newTreeID], - topoJoinArgs = '\.dcm$,\.csv$' - ), - workflowTitle = 'Leg Length Discrepency prediction formatter v20230324', - waitForNodeWithTitle = 'landmarks-to-json', - totalPolls = totalPolls, - pluginParameters = { - 'landmarks-to-json' : { - 'pftelDB' : self.options.pftelDB - } - } - ), - connectionNodeTitle = 'mergeJPGSwithInference', - distalNodeIDs = [('Leg Length Discrepency inference', 'heatmaps')], - topoJoinArgs = '\.jpg$,\.json$' - ), - workflowTitle = 'Leg Length Discrepency measurements on image v20230324', - waitForNodeWithTitle = 'measure-leg-segments', - totalPolls = 0, - pluginParameters = { - 'measure-leg-segments' : { - 'pftelDB' : self.options.pftelDB - } - } - ), - connectionNodeTitle = 'mergeMarkedJPGSwithDICOMS', - distalNodeIDs = [('Topological', 'mergeDICOMSwithInference')], - topoJoinArgs = '\.dcm$,\.*$' - ), - workflowTitle = 'PNG-to-DICOM and push to PACS v20230324', - waitForNodeWithTitle = 'pacs-push', - totalPolls = totalPolls, - pluginParameters = { - 'image-to-DICOM' : { - 'pftelDB' : self.options.pftelDB - }, - 'pacs-push' : { - 'pftelDB' : self.options.pftelDB, - 'orthancUrl' : self.env.orthanc('url'), - 'username' : self.env.orthanc('username'), - 'password' : self.env.orthanc('password'), - 'pushToRemote' : self.env.orthanc('remote') - } - } - ) - # pudb.set_trace() - return d_ret - - def __call__(self, filteredCopyInstanceID : int) -> dict: - """ Execute/manage the LLD compute flow - - - Args: - filteredCopyInstanceID (int): the plugin instance ID in the feed tree - from which to grow the compute flow - - Returns: - dict: the compute flow data structure - """ - self.newTreeID : str = int(filteredCopyInstanceID) - d_computeFlow : dict = self.computeFlow_build() - return d_computeFlow - diff --git a/build/lib/control/filter.py b/build/lib/control/filter.py deleted file mode 100644 index 5c13dbe..0000000 --- a/build/lib/control/filter.py +++ /dev/null @@ -1,80 +0,0 @@ -str_about = ''' - A simple class that provides rudimentary filtering on some input - directory for files conforming to some pattern. The primary purpose - of this class is to provide an alternative to the built-in chris_plugin - 'PathMapper' object. -''' - - -from argparse import ArgumentParser, Namespace -from pathlib import Path -import pfmisc -import glob -import os - -class PathFilter: - ''' - A simple filter class that operates on directories to catalog - some filtered subset of the input filesystem space. - ''' - - def __init__(self, inputdir, outputdir, *args, **kwargs): - """Main constructor - """ - - self.inputdir : Path = inputdir - self.outputdir : Path = outputdir - self.glob : str = '*' - self.l_files : list = [] - self.LOG : pfmisc.debug = None - self.b_filesOnly : bool = False - - for k,v in kwargs.items(): - if k == 'glob' : self.glob = v - if k == 'logger' : self.LOG = v - if k == 'only_files' : self.b_filesOnly = True - - self.inputdir_filter(self.inputdir) - - def __iter__(self): - return PathIterator(self) - - def log(self, message, **kwargs): - if self.LOG: self.LOG(message) - - def inputdir_filter(self, input: Path) -> list: - ''' - Filter the files in Path according to the passed options.pattern -- - mostly for debugging - ''' - - self.LOG("Parent directory contains at root level") - l_ls = [self.LOG(f) for f in os.listdir(str(input))] - self.LOG("Filtering files in %s containing '%s'" % (str(input), self.glob)) - str_glob : str = '%s/%s' % (str(self.inputdir), self.glob) - self.LOG("glob = %s" % str_glob) - - self.l_files = glob.glob(str_glob) - - l_glob = [self.LOG(f) for f in self.l_files] - return self.l_files - -class PathIterator: - ''' - An iterator over the PathFilter class - ''' - - def __init__(self, pathfilter): - - self._pathfilter = pathfilter - self._index = 0 - - def __next__(self): - ''' - Iterate over the PathFilter self.files list - ''' - if self._index < len(self._pathfilter.l_files): - result = (self._pathfilter.l_files[self._index]) - self._index += 1 - return (result, self._pathfilter.outputdir) - raise StopIteration diff --git a/build/lib/control/jobber.py b/build/lib/control/jobber.py deleted file mode 100644 index c064725..0000000 --- a/build/lib/control/jobber.py +++ /dev/null @@ -1,182 +0,0 @@ -str_about = ''' - This module provides the Jobber class -- an object designed to simplify/ - abstract running CLI. The actual command to run is specified as a string, - and the Jobber class executes the command, returning to the caller a - dictionary structure containing misc info such as , , and - . -''' - -import subprocess -import os -os.environ['XDG_CONFIG_HOME'] = '/tmp' -import pudb -import json - -class Jobber: - - def __init__(self, d_args : dict): - """Constructor for the jobber class. - - Args: - d_args (dict): a dictionary of "arguments" (parameters) for the - object. - """ - self.args = d_args.copy() - if not 'verbosity' in self.args.keys(): self.args['verbosity'] = 0 - if not 'noJobLogging' in self.args.keys(): self.args['noJobLogging'] = False - - def dict2JSONcli(self, d_dict : dict) -> str: - """Convert a dictionary into a CLI conformant JSON string. - - An input dictionary of - - { - 'key1': 'value1', - 'key2': 'value2' - } - - is converted to a string: - - "{\"key1\":\"value1\",\"key2\":\"value2\"}" - - Args: - d_dict (dict): a python dictionary to convert - - Returns: - str: CLI equivalent string. - """ - - str_JSON = json.dumps(d_dict) - str_JSON = str_JSON.replace('"', r'\"') - return str_JSON - - def dict2cli(self, d_dict : dict) -> str: - """Convert a dictionary into a CLI conformant JSON string. - - An input dictionary of - - { - 'key1': 'value1', - 'key2': 'value2', - 'key3': true, - 'key4': false - } - - is converted to a string: - - "--key1 value1 --key2 value2 --key3" - - Args: - d_dict (dict): a python dictionary to convert - - Returns: - str: CLI equivalent string. - """ - str_cli : str = "" - for k,v in d_dict.items(): - if type(v) == bool: - if v: - str_cli += '--%s ' % k - elif len(v): - str_cli += '--%s %s ' % (k, v) - return str_cli - - def job_run(self, str_cmd): - """ - Running some CLI process via python is cumbersome. The typical/easy - path of - - os.system(str_cmd) - - is deprecated and prone to hidden complexity. The preferred - method is via subprocess, which has a cumbersome processing - syntax. Still, this method runs the `str_cmd` and returns the - stderr and stdout strings as well as a returncode. - Providing readtime output of both stdout and stderr seems - problematic. The approach here is to provide realtime - output on stdout and only provide stderr on process completion. - """ - d_ret : dict = { - 'stdout': "", - 'stderr': "", - 'cmd': "", - 'cwd': "", - 'returncode': 0 - } - str_stdoutLine : str = "" - str_stdout : str = "" - - p = subprocess.Popen( - str_cmd.split(), - stdout = subprocess.PIPE, - stderr = subprocess.PIPE, - ) - - # Realtime output on stdout - while True: - stdout = p.stdout.readline() - if p.poll() is not None: - break - if stdout: - str_stdoutLine = stdout.decode() - if int(self.args['verbosity']): - print(str_stdoutLine, end = '') - str_stdout += str_stdoutLine - d_ret['cmd'] = str_cmd - d_ret['cwd'] = os.getcwd() - d_ret['stdout'] = str_stdout - d_ret['stderr'] = p.stderr.read().decode() - d_ret['returncode'] = p.returncode - with open('/tmp/job.json', 'w') as f: - json.dump(d_ret, f, indent=4) - if int(self.args['verbosity']) and len(d_ret['stderr']): - print('\nstderr: \n%s' % d_ret['stderr']) - return d_ret - - def job_runbg(self, str_cmd : str) -> dict: - """Run a job in the background - - Args: - str_cmd (str): CLI string to run - - Returns: - dict: a dictionary of exec state - """ - d_ret : dict = { - 'uid' : "", - 'cmd' : "", - 'cwd' : "" - } - # str_stdoutLine : str = "" - # str_stdout : str = "" - - p = subprocess.Popen( - str_cmd.split(), - stdout = subprocess.PIPE, - stderr = subprocess.PIPE, - ) - - d_ret['uid'] = str(os.getuid()) - d_ret['cmd'] = str_cmd - d_ret['cwd'] = os.getcwd() - # d_ret['stdout'] = str_stdout - # d_ret['stderr'] = p.stderr.read().decode() - # d_ret['returncode'] = p.returncode - # if int(self.args['verbosity']) and len(d_ret['stderr']): - # print('\nstderr: \n%s' % d_ret['stderr']) - return d_ret - - def job_stdwrite(self, d_job, str_outputDir, str_prefix = ""): - """ - Capture the d_job entries to respective files. - """ - if not self.args['noJobLogging']: - for key in d_job.keys(): - with open( - '%s/%s%s' % (str_outputDir, str_prefix, key), "w" - ) as f: - f.write(str(d_job[key])) - f.close() - return { - 'status': True - } diff --git a/build/lib/dylld.py b/build/lib/dylld.py deleted file mode 100644 index a24b1b5..0000000 --- a/build/lib/dylld.py +++ /dev/null @@ -1,380 +0,0 @@ -#!/usr/bin/env python -from collections.abc import Iterator -from pathlib import Path -from argparse import ArgumentParser, Namespace, ArgumentDefaultsHelpFormatter - -from chris_plugin import chris_plugin, PathMapper - -from pathlib import Path - -from io import TextIOWrapper -import os, sys - -os.environ["XDG_CONFIG_HOME"] = "/tmp" -import pudb -from pudb.remote import set_trace - -from loguru import logger -from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor -from threading import current_thread, get_native_id - -from typing import Callable, Any - -from datetime import datetime, timezone -import json - -from state import data -from logic import behavior -from control import action -from control.filter import PathFilter -from pftag import pftag -from pflog import pflog - -LOG = logger.debug - -logger_format = ( - "{time:YYYY-MM-DD HH:mm:ss} │ " - "{level: <5} │ " - "{name: >28}::" - "{function: <30} @" - "{line: <4} ║ " - "{message}" -) -logger.remove() -logger.add(sys.stderr, format=logger_format) - -pluginInputDir: Path -pluginOutputDir: Path -ld_forestResult: list = [] - -__version__ = "4.4.40" - -DISPLAY_TITLE = r""" - _ _ _ _ _ - | | | | | | | | | - _ __ | |______ __| |_ _| | | __| | -| '_ \| |______/ _` | | | | | |/ _` | -| |_) | | | (_| | |_| | | | (_| | -| .__/|_| \__,_|\__, |_|_|\__,_| -| | __/ | -|_| |___/ -""" - - -parser: ArgumentParser = ArgumentParser( - description=""" -A ChRIS plugin that dynamically builds a workflow to compute length -discrepencies from extremity X-Rays -""", - formatter_class=ArgumentDefaultsHelpFormatter, -) - - -parser.add_argument( - "-V", "--version", action="version", version=f"%(prog)s {__version__}" -) - -parser.add_argument( - "--pattern", - default="**/*dcm", - help=""" - pattern for file names to include (you should quote this!) - (this flag triggers the PathMapper on the inputdir).""", -) -parser.add_argument( - "--pluginInstanceID", - default="", - help="plugin instance ID from which to start analysis", -) -parser.add_argument( - "--CUBEurl", default="http://localhost:8000/api/v1/", help="CUBE URL" -) -parser.add_argument("--CUBEuser", default="chris", help="CUBE/ChRIS username") -parser.add_argument("--CUBEpassword", default="chris1234", help="CUBE/ChRIS password") -parser.add_argument( - "--orthancURL", - default="https://orthanc-chris-public.apps.ocp-prod.massopen.cloud/", - help="IP of the orthanc to receive analysis results", -) -parser.add_argument("--orthancuser", default="fnndsc", help="Orthanc username") -parser.add_argument( - "--orthancpassword", default="Lerkyacyids5", help="Orthanc password" -) -parser.add_argument("--orthancremote", default="", help="remote orthanc modality") -parser.add_argument("--verbosity", default="0", help="verbosity level of app") -parser.add_argument( - "--thread", - help="use threading to branch in parallel", - dest="thread", - action="store_true", - default=False, -) -parser.add_argument( - "--pftelDB", - help="an optional pftel telemetry logger, of form '/api/v1///'", - default="", -) -parser.add_argument( - "--inNode", - help="perform in-node implicit parallelization in conjunction with --thread", - dest="inNode", - action="store_true", - default=False, -) -parser.add_argument( - "--notimeout", - help="if specified, then controller never timesout while waiting on nodes to complete", - dest="notimeout", - action="store_true", - default=False, -) -parser.add_argument( - "--debug", - help="if true, toggle telnet pudb debugging", - dest="debug", - action="store_true", - default=False, -) -parser.add_argument( - "--debugTermSize", - help="the terminal 'cols,rows' size for debugging", - default="253,62", -) -parser.add_argument("--debugPort", help="the debugging telnet port", default="7900") -parser.add_argument("--debugHost", help="the debugging telnet host", default="0.0.0.0") - - -def Env_setup( - options: Namespace, inputdir: Path, outputdir: Path, debugPortOffset: int = 0 -) -> data.env: - """ - Setup the environment - - Args: - options (Namespace): options passed from the CLI caller - inputdir (Path): plugin global input directory - outputdir (Path): plugin global output directory - debugPortOffset (int, optional): offset added to debug port -- useful for multithreading. Defaults to 0. - - Returns: - data.env: an instantiated environment object. Note in multithreaded - runs, each thread gets its own object. - """ - Env: data.env = data.env() - Env.CUBE.set(inputdir=str(inputdir)) - Env.CUBE.set(outputdir=str(outputdir)) - Env.CUBE.set(url=str(options.CUBEurl)) - Env.CUBE.set(username=str(options.CUBEuser)) - Env.CUBE.set(password=str(options.CUBEpassword)) - Env.orthanc.set(url=str(options.orthancURL)) - Env.orthanc.set(username=str(options.orthancuser)) - Env.orthanc.set(password=str(options.orthancpassword)) - Env.orthanc.set(remote=str(options.orthancremote)) - Env.set(inputdir=inputdir) - Env.set(outputdir=outputdir) - Env.debug_setup( - debug=options.debug, - termsize=options.debugTermSize, - port=int(options.debugPort) + debugPortOffset, - host=options.debugHost, - ) - return Env - - -def preamble(options: Namespace) -> str: - """ - Just show some preamble "noise" in the output terminal and also process - the --pftelDB if provided. - - Args: - options (Namespace): CLI options namespace - - Returns: - str: the parsed string - """ - - print(DISPLAY_TITLE) - pftelDB: str = "" - - if options.pftelDB: - tagger: pftag.Pftag = pftag.Pftag({}) - pftelDB = tagger(options.pftelDB)["result"] - - LOG("plugin arguments...") - for k, v in options.__dict__.items(): - LOG("%25s: [%s]" % (k, v)) - LOG("") - - LOG("base environment...") - for k, v in os.environ.items(): - LOG("%25s: [%s]" % (k, v)) - LOG("") - - LOG("Starting growth cycle...") - return pftelDB - - -def ground_prep(options: Namespace, Env: data.env) -> action.PluginRun: - """ - Do some per-tree setup -- prepare the ground! - - Args: - options (Namespace): options namespace - Env (data.env): the environment for this tree - - Returns: - action.PluginRun: A filter specific to this tree that will - filter a study of interest in the parent - space -- analogously akin to choosing a - seed. - """ - - LOG("Prepping ground for tree in thread %s..." % get_native_id()) - LOG("Constructing object to filter parent field") - PLinputFilter: action.PluginRun = action.PluginRun(env=Env, options=options) - - if len(options.pluginInstanceID): - Env.CUBE.parentPluginInstanceID = options.pluginInstanceID - else: - Env.CUBE.parentPluginInstanceID = Env.CUBE.parentPluginInstanceID_discover()[ - "parentPluginInstanceID" - ] - return PLinputFilter - - -def replantSeed_catchError(PLseed: action.PluginRun, input: Path) -> dict: - """ - Re-run a failed filter (pl-shexec) with explicit error catching - - Args: - PLseed (action.Pluginrun): the plugin run object to re-execute - input (Path): the input on which the seed failed - - Returns: - dict: the detailed error log from the failed run - """ - global LOG - LOG("Some error was returned when planting the seed!") - LOG("Replanting seed with error catching on...") - d_seedreplant: dict = PLseed(str(input), append="--jsonReturn") - return d_seedreplant - - -def tree_grow(options: Namespace, input: Path, output: Path = None) -> dict: - """ - Based on some conditional applied to the file space, direct the - dynamic "growth" of this feed tree from the parent node of *this* plugin. - - Args: - options (Namespace): CLI options - input (Path): input path returned by mapper - output (Path, optional): ouptut path returned by mapper. Defaults to None. - - Returns: - dict: resulant object dictionary of this (threaded) growth - """ - global pluginInputDir, pluginOutputDir, LOG, ld_forestResult - - # set_trace(term_size=(253, 62), host = '0.0.0.0', port = 7900) - - Env: data.env = Env_setup(options, pluginInputDir, pluginOutputDir, get_native_id()) - Env.set_telnet_trace_if_specified() - - timenow: Callable[[], str] = ( - lambda: datetime.now(timezone.utc).astimezone().isoformat() - ) - conditional: behavior.Filter = behavior.Filter() - conditional.obj_pass = behavior.unconditionalPass - PLinputFilter: action.PluginRun = ground_prep(options, Env) - LLD: action.LLDcomputeflow = action.LLDcomputeflow(env=Env, options=options) - str_threadName: str = current_thread().getName() - d_seedGet: dict = {"status": False, "message": "unable to plant seed"} - d_treeGrow: dict = {"status": False, "message": "unable to grow tree"} - d_ret: dict = {"seed": {}, "tree": {}} - - Path("%s/start-%s.touch" % (Env.outputdir.touch(), str_threadName)) - LOG("Growing a new tree in thread %s..." % str_threadName) - str_heartbeat: str = str( - Env.outputdir.joinpath("heartbeat-%s.log" % str_threadName) - ) - fl: TextIOWrapper = open(str_heartbeat, "w") - fl.write("Start time: {}\n".format(timenow())) - if conditional.obj_pass(str(input)): - LOG("Planting seed off %s" % str(input)) - d_seedGet = PLinputFilter(str(input)) - if d_seedGet["status"]: - d_treeGrow = LLD(d_seedGet["branchInstanceID"]) - else: - d_seedGet["failed"] = replantSeed_catchError(PLinputFilter, input) - fl.write("End time: {}\n".format(timenow())) - fl.close() - d_ret["seed"] = d_seedGet - d_ret["tree"] = d_treeGrow - ld_forestResult.append(d_ret) - return d_ret - - -def treeGrowth_savelog(outputdir: Path) -> None: - """ - Write the global log file on the tree growth to the passed - - - Args: - outputdir (Path): the plugin base output directory - """ - global ld_forestResult - - with open(str(outputdir.joinpath("treeLog.json")), "w") as f: - f.write(json.dumps(ld_forestResult, indent=4)) - f.close() - - -# documentation: https://fnndsc.github.io/chris_plugin/chris_plugin.html#chris_plugin -@chris_plugin( - parser=parser, - title="Leg-Length Discrepency - Dynamic Compute Flow", - category="", # ref. https://chrisstore.co/plugins - min_memory_limit="100Mi", # supported units: Mi, Gi - min_cpu_limit="1000m", # millicores, e.g. "1000m" = 1 CPU core - min_gpu_limit=0, # set min_gpu_limit=1 to enable GPU -) -@pflog.tel_logTime( - event="dylld", log="Leg Length Discepency Dynamic Workflow controller" -) -def main(options: Namespace, inputdir: Path, outputdir: Path): - """ - :param options: non-positional arguments parsed by the parser given to @chris_plugin - :param inputdir: directory containing input files (read-only) - :param outputdir: directory where to write output files - """ - # set_trace(term_size=(253, 62), host = '0.0.0.0', port = 7900) - global pluginInputDir, pluginOutputDir - pluginInputDir = inputdir - pluginOutputDir = outputdir - - options.pftelDB = preamble(options) - - output: Path - if not options.inNode: - mapper: PathMapper = PathMapper.file_mapper( - inputdir, outputdir, glob=options.pattern - ) - else: - mapper: PathMapper = PathMapper.dir_mapper_deep(inputdir, outputdir) - if int(options.thread): - with ThreadPoolExecutor(max_workers=len(os.sched_getaffinity(0))) as pool: - results: Iterator = pool.map(lambda t: tree_grow(options, *t), mapper) - - # raise any Exceptions which happened in threads - for _ in results: - pass - else: - for input, output in mapper: - d_results: dict = tree_grow(options, input, output) - - LOG("Ending growth cycle...") - treeGrowth_savelog(outputdir) - - -if __name__ == "__main__": - main() diff --git a/build/lib/logic/behavior.py b/build/lib/logic/behavior.py deleted file mode 100644 index 8901d1a..0000000 --- a/build/lib/logic/behavior.py +++ /dev/null @@ -1,30 +0,0 @@ -str_about = ''' - The behavior module codifies the operational/dynamic behavior of this - plugin. - - Primary behaviors include some conditional logic over the space of - input objects, and triggering a resultant operation -- usually this means - copying the object into a child plugin, and then running a pipeline - on the result of that copy-to-child. - - For now, this class is mostly dummy filler. -''' - -def unconditionalPass(str_object: str) -> bool: - ''' - A dummy fall through function that always returns True. - ''' - return True - -class Filter: - ''' - An abstraction for evaluating a "condition" on some "object". - ''' - - def __init__(self, *args, **kwargs): - # point this to a function with signature - # ( : str) : bool - self.filterOp = None - - def obj_pass(self, str_object: str) -> bool: - return self.filterOp(str_object) \ No newline at end of file diff --git a/build/lib/state/data.py b/build/lib/state/data.py deleted file mode 100644 index 7008373..0000000 --- a/build/lib/state/data.py +++ /dev/null @@ -1,311 +0,0 @@ -str_about = ''' - This module is responsible for handling some state related information - which is mostly information about the ChRIS/CUBE instance. - - Core data includes information on the ChRIS/CUBE instances as well as - information relevant to the pipeline to be scheduled. -''' -import os -os.environ['XDG_CONFIG_HOME'] = '/tmp' -from pudb.remote import set_trace -from curses import meta -from pathlib import Path -import pudb -import json -from urllib.parse import urlparse -import logging -logging.basicConfig(level=logging.CRITICAL) - -class env: - ''' - A class that contains environmental data -- mostly information about CUBE - as well as data pertaining to the orthanc instance - ''' - - def __init__(self, *args, **kwargs): - ''' - Constructor - ''' - self.inputdir : Path = None - self.outputdir : Path = None - self.CUBE : CUBEinstance = CUBEinstance() - self.orthanc : Orthancinstance = Orthancinstance() - self.debug : dict = { - 'do' : False, - 'termsize' : (80,25), - 'port' : 7900, - 'host' : '0.0.0.0' - } - - def debug_setup(self, **kwargs) -> dict: - """ - Setup the debugging structure based on - - Returns: - dict: the debug structure - """ - str_termsize : str = "" - str_port : str = "" - str_host : str = "0.0.0.0" - b_debug : bool = False - for k,v in kwargs.items(): - if k == 'debug' : b_debug = v - if k == 'termsize' : str_termsize = v - if k == 'port' : str_port = v - if k == 'host' : str_host = v - - cols, rows = str_termsize.split(',') - self.debug['do'] = b_debug - self.debug['termsize'] = (int(cols), int(rows)) - self.debug['port'] = int(str_port) - self.debug['host'] = str_host - return self.debug - - def set_telnet_trace_if_specified(self): - """ - If specified in the env, pause for a telnet debug. - - If you are debugging, just "step" to return to the location - in your code where you specified to break! - """ - if self.debug['do']: - set_trace( - term_size = self.debug['termsize'], - host = self.debug['host'], - port = self.debug['port'] - ) - - def set_trace(self): - """ - Simple "override" for setting a trace. If the Env is configured - for debugging, then this set_trace will be called. Otherwise it - will be skipped. - - This is useful for leaving debugging set_traces in the code, and - being able to at runtime choose to debug or not. - - If you are debugging, just "step" to return to the location - in your code where you specified to break! - - Returns: - _type_: _description_ - """ - if self.debug['do']: - pudb.set_trace() - - def __call__(self, str_key) -> str | None: - ''' - get a value for a str_key - ''' - if str_key in ['inputdir', 'outputdir']: - return getattr(self, str_key) - else: - return None - - def set(self, **kwargs) -> bool: - """ - A custom setter -- the normal python getter/setter approach suffers - IMHO from implicitly adding members directly to the object. - - Returns: - bool: True or False on setting - """ - b_status:bool = False - for k,v in kwargs.items(): - if k == 'inputdir' : self.inputdir = v ; b_status = True - if k == 'outputdir' : self.outputdir = v ; b_status = True - return b_status - -class CUBEinstance: - ''' - A class that contains data pertinent to a specific CUBE instance - ''' - - def __init__(self, *args, **kwargs): - self.d_CUBE = { - 'username' : 'chris', - 'password' : 'chris1234', - 'address' : '192.168.1.200', - 'port' : '8000', - 'route' : '/api/v1/', - 'protocol' : 'http', - 'url' : '' - } - self.parentPluginInstanceID : str = '' - self.str_inputdir : str = None - self.str_outputdir : str = None - - def setCUBE(self, str_key, str_val): - ''' - set str_key to str_val - ''' - if str_key in self.d_CUBE.keys(): - self.d_CUBE[str_key] = str_val - - def __call__(self, str_key) -> str | None: - ''' - get a value for a str_key - ''' - if str_key in self.d_CUBE.keys(): - return self.d_CUBE[str_key] - else: - if str_key in ['inputdir', 'outputdir']: - return getattr(self, str_key) - else: - return None - - def set(self, **kwargs) -> bool: - """ - A custom setter -- the normal python getter/setter approach suffers - IMHO from implicitly adding members directly to the object. - - Returns: - bool: True or False on setting - """ - b_status:bool = False - for k,v in kwargs.items(): - if k == 'inputdir' : self.str_inputdir = v ; b_status = True - if k == 'outputdir' : self.str_outputdir = v ; b_status = True - if k == 'parentPluginInstanceID' : - self.parentPluginInstanceID = v ; b_status = True - if k == 'username' : self.setCUBE(k, v) ; b_status = True - if k == 'password' : self.setCUBE(k, v) ; b_status = True - if k == 'addreses' : self.setCUBE(k, v) ; b_status = True - if k == 'port' : self.setCUBE(k, v) ; b_status = True - if k == 'route' : self.setCUBE(k, v) ; b_status = True - if k == 'protocol' : self.setCUBE(k, v) ; b_status = True - if k == 'url' : - self.setCUBE(k, v) - self.url_decompose() - b_status = True - return b_status - - def onCUBE(self) -> dict: - ''' - Return a dictionary that is a subset of self.d_CUBE - suitable for using in calls to the CLI tool 'chrispl-run' - ''' - return { - 'protocol': self('protocol'), - 'port': self('port'), - 'address': self('address'), - 'user': self('username'), - 'password': self('password') - } - - def parentPluginInstanceID_discover(self) -> dict: - ''' - Determine the pluginInstanceID of the parent plugin. CUBE provides - several environment variables: - - CHRIS_JID - CHRIS_PLG_INST_ID - CHRIS_PREV_JID - CHRIS_PREV_PLG_INST_ID - - ''' - - self.parentPluginInstanceID = os.environ['CHRIS_PREV_PLG_INST_ID'] - - return { - 'parentPluginInstanceID': self.parentPluginInstanceID - } - - def url_decompose(self, *args): - ''' - Decompose the internal URL into constituent parts - ''' - - o = urlparse(self.d_CUBE['url']) - self.d_CUBE['protocol'] = o.scheme - self.d_CUBE['address'] = o.hostname - self.d_CUBE['port'] = o.port - if not self.d_CUBE['port']: - self.d_CUBE['port'] = '' - self.d_CUBE['route'] = o.path - return self.d_CUBE['url'] - -class Orthancinstance: - ''' - A class that contains data pertinent to a specific Orthanc instance - ''' - - def __init__(self, *args, **kwargs) -> None: - self.d_orthanc: dict[str, str] = { - 'username' : 'orthanc', - 'password' : 'orthanc', - 'IP' : '192.168.1.200', - 'port' : '4242', - 'remote' : '', - 'protocol' : 'http', - 'route' : '', - 'url' : '' - } - - def setOrthanc(self, str_key, str_val) -> bool: - ''' - set str_key to str_val - ''' - b_status:bool = False - if str_key in self.d_orthanc.keys(): - b_status = True - self.d_orthanc[str_key] = str_val - return b_status - - def __call__(self, str_key) -> str | None: - ''' - get a value for a str_key - ''' - if str_key in self.d_orthanc.keys(): - return self.d_orthanc[str_key] - else: - return None - - def set(self, **kwargs) -> bool: - """ - A custom setter -- the normal python getter/setter approach suffers - IMHO from implicitly adding members directly to the object. - - Returns: - bool: True or False on setting - """ - b_status:bool = False - for k,v in kwargs.items(): - if k == 'username' : self.setOrthanc(k, v) ; b_status = True - if k == 'password' : self.setOrthanc(k, v) ; b_status = True - if k == 'IP' : self.setOrthanc(k, v) ; b_status = True - if k == 'port' : self.setOrthanc(k, v) ; b_status = True - if k == 'remote' : self.setOrthanc(k, v) ; b_status = True - if k == 'protocol' : self.setOrthanc(k, v) ; b_status = True - if k == 'route' : self.setOrthanc(k, v) ; b_status = True - if k == 'url' : - self.setOrthanc(k, v) - self.url_decompose() - b_status = True - return b_status - - def url_decompose(self, *args): - ''' - Decompose the internal URL into constituent parts - ''' - o = urlparse(self.d_orthanc['url']) - - self.d_orthanc['protocol'] = o.scheme - self.d_orthanc['IP'] = o.hostname - self.d_orthanc['port'] = o.port - if not self.d_orthanc['port']: - self.d_orthanc['port'] = '' - self.d_orthanc['route'] = o.path - return self.d_orthanc['url'] - -class Pipeline: - ''' - Information pertinent to the pipline being scheduled. This is - encapsulated with a class object to allow for possible future - expansion. - ''' - - def __init__(self, *args, **kwargs): - - self.str_pipelineName = '' -