Skip to content

Commit

Permalink
Add step decorator usage (#1)
Browse files Browse the repository at this point in the history
* Add .python-version to .gitignore

* Make step function usable as a decorator with warning

* Update pipeline step error message

* Update tests

* Update README
  • Loading branch information
freddiev4 authored Mar 16, 2023
1 parent afdc77b commit f54ca09
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 31 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ __pycache__/
*.DS_Store
*.egg-info

*.env
*.env

.python-version
21 changes: 5 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ $ pip install tinypipeline

`tinypipeline` exposes two main objects:
- `pipeline`: a decorator for defining your pipeline. Returns a `Pipeline` instance.
- `step`: a function that is used to define individual pipeline steps. Returns a `Step` instance.
- `step`: a decorator that is used to define individual pipeline steps. Returns a `Step` instance.

Each object requires you provide a `name`, `version`, and `description` to explicitly define what pipeline you're creating.

Expand All @@ -27,11 +27,12 @@ If you'd like to use this package, you can follow the `example.py` below:
```python
from tinypipeline import pipeline, step


def step_fn_one():
@step(name='step_one', version='0.0.1', description='first step')
def step_one():
print("Step function one")

def step_fn_two():
@step(name='step_two', version='0.0.1', description='second step')
def step_two():
print("Step function two")

@pipeline(
Expand All @@ -40,18 +41,6 @@ def step_fn_two():
description='a test tinypipeline',
)
def pipe():
step_one = step(
callable=step_fn_one,
name='step_one',
version='0.0.1',
description='first step',
)
step_two = step(
name='step_two',
version='0.0.1',
description='second step',
callable=step_fn_two,
)
return [step_one, step_two]

pipe = pipe()
Expand Down
58 changes: 56 additions & 2 deletions tests/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,58 @@ def test_pipeline():
mock_step_2.assert_called_once()
mock_step_3.assert_called_once()

def test_pipeline_completion_using_step_decorator():
"""
Test that the pipeline runs all the steps in the correct order,
using the step decorator instead of the step function.
"""
mock_step_1 = Mock()
mock_step_2 = Mock()
mock_step_3 = Mock()

mock_step_1.return_value = "step 1"
mock_step_2.return_value = "step 2"
mock_step_3.return_value = "step 3"

@step(
name="step_one",
description="Step one",
version="1.0.0",
)
def step_one():
mock_step_1()

@step(
name="step_two",
description="Step two",
version="1.0.0",
)
def step_two():
mock_step_2()

@step(
name="step_three",
description="Step three",
version="1.0.0",
)
def step_three():
mock_step_3()

@pipeline(
name="test_pipeline",
description="Test pipeline",
version="1.0.0",
)
def test_pipeline():
return [step_one, step_two, step_three]

pipe = test_pipeline()
pipe.run()

assert len(pipe.steps) == 3
mock_step_1.assert_called_once()
mock_step_2.assert_called_once()
mock_step_3.assert_called_once()

def test_pipeline_failure_no_function_passed():
"""
Expand Down Expand Up @@ -97,7 +149,8 @@ def test_pipeline():
pipe.run()

assert (
"Not a valid step. Consider using the step() method to create steps for your pipeline."
"Not a valid step. Consider using the step decorator to "
"create steps for your pipeline."
== str(context.value)
)

Expand Down Expand Up @@ -145,7 +198,8 @@ def test_pipeline():

def test_pipeline_failure_exception_in_step():
"""
Test that the pipeline fails with an Exception if there is an exception in one of the steps.
Test that the pipeline fails with an Exception if there is an exception
in one of the steps.
Also check that the error message is correct.
"""
Expand Down
8 changes: 5 additions & 3 deletions tinypipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ def _get_steps(self):

if not all(isinstance(s, Step) for s in _steps):
raise TypeError(
"Not a valid step. Consider using the step() method to create steps for your pipeline."
"Not a valid step. Consider using the step decorator "
"to create steps for your pipeline."
)

return _steps
Expand Down Expand Up @@ -86,7 +87,7 @@ def run(self) -> None:

completion_time = (end - start).total_seconds()
print(f"Step [{step.name}] completed in {completion_time} seconds\n")
except Exception as e:
except Exception:
print(f"Pipeline failed due to an exception in step [{step.name}]")
raise
return None
Expand Down Expand Up @@ -119,7 +120,8 @@ def wrapper():
"""
if not isinstance(func, Callable):
raise TypeError(
f"The pipeline decorator only accepts functions. Passed {type(func)}"
"The pipeline decorator only accepts functions. "
f"Passed {type(func)}"
)

_pipeline = Pipeline(
Expand Down
35 changes: 26 additions & 9 deletions tinypipeline/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ def run(self):


def step(
callable: Callable,
name: str,
version: str,
description: str,
callable: Callable = None,
):
"""
Create a step for a pipeline.
Create a step for a pipeline. Can be used as a decorator or a function.
Params
------
Expand All @@ -56,10 +56,27 @@ def step(
description: str
A description of the step.
"""
_step = Step(
callable=callable,
name=name,
version=version,
description=description,
)
return _step
if callable is not None:
print(
f"WARNING: step() is being used as a function for {name}. "
"This is deprecated and will be removed in a future version. "
"Please use step() as a decorator instead."
)

_step = Step(
callable=callable,
name=name,
version=version,
description=description,
)
return _step

def decorator(callable):
_step = Step(
callable=callable,
name=name,
version=version,
description=description,
)
return _step
return decorator

0 comments on commit f54ca09

Please sign in to comment.