title | tag |
---|---|
AlgFlow |
pipeline, algorithm |
Please report any vulnerability in this software to ISOSO [https://opensource.roche.com/contact] if you are part of Roche. Or send an email to the contributor or file a github issue. We will try to address it as soon as possible.
Most of the pipeline framework are task
, process
or container
oriented
and therefore fail to model the way a data scientist works. This framework
proposes a data science centric pipeline model which is based on Algorithms.
Data scientists implement an algorithm with named inputs and outputs and
publishes the algorithm. The pipelines are stitched automatically based on
output.
The algorithms are described in terms of tensors, vectors and other commonly available numpy or python data types isolating algorithm implementation totally independent of the data source. The input/output handlers provide the actual interfacing with input file / db / cloud source to get the data, prepare and adverise it as tensors. This makes algorithms completely independent of data format and very little overhead of getting started.
The algorithm model is central construct of AlgFlow
. All the application logic is
defined in terms of algorithms. The basic anatomy of an algorithm is Input
and Output
.
We have augmented the anatomy with other constructs to help the algorithm easily integrate
in the workflow. All these constructs needed to specify AlgFlow
algorithms are described below.
import numpy as np
from algflow import Algorithm
from algflow.asset import Asset
from algflow.data import Float, Array
class BWAAlignment(Algorithm):
'''
Description of algorithm
Feature 1
Feature 2
What it does?
Performance etc.
'''
group = 'Alignment'
description = 'XYZ'
class Params:
k1_rate = Float(3.5)
k1_exp_rate = Float(2.3, private=True)
qscore_model = Asset(id='qscore-models/1', project='RSS-Data-Science')
class Input:
seq = Array(shape=(2,3))
template = Array(dtype=np.string, shape=(2,3))
class Output:
aligned_seq = np.array(shape=(2,2,3))
bwa_stats_report = Asset(path='bwa/bwa_stats.html', template='bwa_stats.html')
def run(inputs, outputs):
seq = inputs.seq
template = inputs.template
...
...
outputs.aligned_seq = ...
ouputs.bwa_stats_report.render(data={...})
def verify(self, outputs):
raise NotImplementedError
All the inputs are listed under the class Input:
option.
The inputs are strongly typed and follow a typing system is very
similar to traits
library, with some additional metadata
.
Following types are supported:
- Array
- Str
- Float
- Bool
- List
- Either
- Enum
- List
- Set
- Dict
- Tuple
- Instance
These types support following common metadata:
- required - This value must be provided. The default is
True
- null - This tells if the value can be set to
None
. The default isFalse
- label - Used in GUI to collect the parameters. The default is
None
- regex - Applicable to String and ensures that input string satisfies a pattern.
- default_value - The default value of the variable.
This construct requires all the inputs required by the algorithm to be specified in advance.
The explicit listing of the inputs tells AlgFlow
to also select the algorithms needed
to run to generate that input. Another added side benefits is that Algorithm
doesn't
need to specify the next step of the pipeline, keeping the algorithms pipeline agnostic.
This construct requires all the outputs that will be generated by the algorithm. The explicit listing of the output enables
seq = String(required=True, default_value='', null=True)
This construct allows us to achieve the following goals.
- A series of algorithm can be grouped under one category by using
ComputationGroup
. - All the algorithms in the same computation group can be skipped based on certain criteria
- A common set of parameters can be abstracted out of the Algorithms and defined at group level and be shared among the group.
- Helps visualization by making the computation graph hierarchical.
from algflow import ComputationGroup
class AlginmentGroup(ComputationGroup):
'''
Description of computation group
Feature 1
Feature 2
What it does?
Performance etc.
'''
name = 'Alignment'
class Params:
k1_rate = float(value=3.5)
k1_exp_rate = float(value=2.3, private=True)
class Input:
seq = np.array(shape=(2,3))
template = np.arrayOf(np.string, shape=(2,3))
def should_skip(self, inputs, params):
...
return True/False
In 'algflow', a pipeline is automatically constructed based on output expected from the pipeline. Hence, the pipelines are very similar to algorithm except that there is no implicit logic to produce the output. An example pipeline is given below:
class RnnPipeline(Pipeline):
input:
runId = String(required=True)
model_name = String(default='AOC_B0')
params:
p1 = Float(algo='Alg1', default_value=0.37)
p2 = String(algo='Alg2', required=True)
output:
include = [
'alignment',
'base_calling',
'normalization',
]
exclude = [
'annotation1',
'annotation2',
]
A pipeline can be invoked by a job which is specified below:
job:
tag: 'parameter-tuning'
image: acap-latest
pipeline: RnnPipeline
input:
runId: 'R12323'
model_name: 'AOC_B0_small'
params:
p2: 'norm-2'
A pipeline can also have a run method, which is useful for postprocessing the output and compose the pipeline from other pipelines. An example of composable pipeline is given below:
class CompareRuns(Pipeline):
input:
runId1 = String(required=True)
runId2 = String(required=True)
model_name = String(default='AOC_B0')
params:
p1 = Float(algo='Alg1', default_value=0.37)
p2 = String(algo='Alg2', required=True)
output:
**
def run(inputs, outputs):
runId1 = inputs.runId1
runId2 = inputs.runId2
job1 = Job(name='gold', inputs={'runId': runId1}, pipeline = 'HmmPipeline')
job2 = Job(name='current', inputs={'runId': runId2}, pipeline = 'RnnPipeline')