A collection of awesome examples and tips regarding KubeFlow Pipeline.
You can specify additional packages as package_to_install
. This enables to use additional packages without building new docker image.
slack_post_op = func_to_container_op(slack_post, packages_to_install=["slack_sdk"])
You can use dsl.Condition
like an if statement in a pipeline function.
with dsl.Condition(flip_task.output == "heads"):
# Will be executed in the case flip_task.output is 'heads'.
print_op("You got heads!!")
ExitHandler will be executed whether the pipeline succeeds or fails.
exit_op = ContainerOp(...)
with ExitHandler(exit_op):
op1 = ContainerOp(...)
op2 = ContainerOp(...)
Note: ExitHandler requires ContainerOp as a argument, not ResourceOp or something else. If you want to create/delete/apply k8s resources with ExitHandler, please see this example.
ParallelFor represents a parallel for loop over a static set of items.
Simple literal example.
In this case op1 would be executed twice, once with case args=['echo 1']
and once with case args=['echo 2']
with dsl.ParallelFor([{'apple': 2, 'banana': 4}, {'apple': 3, 'banana': 20}]) as item:
op1 = ContainerOp(..., args=['echo apple:{}'.format(item.apple)])
op2 = ContainerOp(..., args=['echo banana:{}'.format(item.banana])
In this example, the previous task's output will be used for loop arguments.
list_task = list_generator_op(parallelism)
parallel_tasks = dsl.ParallelFor(list_task.output)
with parallel_tasks as msg:
print_op(msg)
You can set the number of prallel tasks inside of the pipeline function.
def pipeline(parallelism: int):
# set the number of parallel
default_conf = kfp.dsl.get_pipeline_conf()
default_conf.set_parallelism(2)
Please refer official document or our example.
We recommend to read KubeFlow official documents about caching.
For disabling:
# Make sure mutatingwebhookconfiguration exists in your cluster
export NAMESPACE=<Namespace where KFP is installed>
kubectl get mutatingwebhookconfiguration cache-webhook-${NAMESPACE}
kubectl patch mutatingwebhookconfiguration cache-webhook-${NAMESPACE} --type='json' -p='[{"op":"replace", "path": "/webhooks/0/rules/0/operations/0", "value": "DELETE"}]'
For enabling:
kubectl patch mutatingwebhookconfiguration cache-webhook-${NAMESPACE} --type='json' -p='[{"op":"replace", "path": "/webhooks/0/rules/0/operations/0", "value": "CREATE"}]'
To control the maximum staleness of the reused cached data, you can set the step’s max_cache_staleness
parameter which is in RFC3339 Duration format (so 10 days = "P30D").
For disabling cache of ContainerOP:
task = dsl.ContainerOp(...)
task.execution_options.caching_strategy.max_cache_staleness = "P30D"
Because ResourceOp doesn't have execution_options
parameter, you can useadd_pod_annotation
instead:
rop = dsl.ResourceOp(...)
rop.add_pod_annotation("pipelines.kubeflow.org/max_cache_staleness", "P0D")
If you're using KubeFlow Pipeline SDK v2, you can use enable_caching
for both of ContainerOp and ResourceOp:
task = dsl.ContainerOp(...)
task.set_caching_options(False)
rop = dsl.ResourceOp(...)
rop.set_caching_options(False)
See our example for more information.
Interested in contributing? Awesome😎
Fork and run make init
to setup.
Then create PR and let us review!
See the Google's best practices which we follow in this repository.