Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(backend): randomizing output uri path to avoid overwriting. Fixes #10186 #11243

Merged

Conversation

b4sus
Copy link
Contributor

@b4sus b4sus commented Sep 24, 2024

In driver, random string is added when uri paths for output artifacts are generated. This should ensure that when component of certain name is executed in parallel (either with ParallelFor or just simply calling it multiple times in @pipeline), its outputs are always stored to different paths.

Signed-off-by: b4sus <jurob19@gmail.com>
Copy link

Hi @b4sus. Thanks for your PR.

I'm waiting for a kubeflow member to verify that this patch is reasonable to test. If it is, they should reply with /ok-to-test on its own line. Until that is done, I will not automatically test new commits in this PR, but the usual testing commands by org members will still work. Regular contributors should join the org to skip this step.

Once the patch is verified, the new status will be reflected by the ok-to-test label.

I understand the commands that are listed here.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

Copy link
Contributor

@hbelmiro hbelmiro left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/lgtm
/ok-to-test

@HumairAK
Copy link
Collaborator

Hey @b4sus , thanks for the contribution!

Can you provide a sample pipeline that illustrates the issue this pr is aiming to resolve?

At least in the case of a component being re-used, I believe the taskname will have a -# suffix, so should already be distinguished from repeated earlier calls. For parallelFor I'd be interested of its impact with #10798

cc @gmfrasca

@gmfrasca
Copy link
Member

@HumairAK - This appears to only impact output artifacts, and only changes the driver behavior when in CONTAINER driver mode, so I don't believe this should have any effect on #10798 in terms of sub-DAG naming schemes, etc.

With that said, I did see that ParallelFor outputs are storing artifacts in the same URI, which is a problem that this PR addresses by adding UUID salts.

Copy link
Member

@gmfrasca gmfrasca left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tested this out using a ParallelFor task and confirmed each iteration's output artifacts are given unique URIs which are referenced properly in KFP UI.

/lgtm

@b4sus
Copy link
Contributor Author

b4sus commented Sep 30, 2024

Hey @HumairAK ,

We noticed the problem when, from one pipeline, we started other pipelines (pipeline as component) using ParallelFor. This is roughly the code:

@dsl.pipeline
def inner_pipeline(date_to_process: str):
    comp1_task = component1(date_to_process = date_to_process)
    comp2_task = component2(comp1_task.outputs["output_df"])

@dsl.pipeline
def main_pipeline(from_date: str, to_date: str):
    prepare_dates_task = prepare_dates_component(from_date = from_date, to_date = to_date)

    with dsl.ParallelFor(items = prepare_dates_task.output, parallelism=4) as date_to_process:
        inner_ppln_task = inner_pipeline(date_to_process = date_to_process)

In this case, many inner pipelines were started (more then 4 as parallelism is not yet supported) and problem was that output of component1 was/is written to the same minio location, so overwriting each other. And subsequently couple of component2 tasks get the same input, regardless of the argument (date_to_process), producing the same final output (not visible here in code as it is store directly in component).

@HumairAK
Copy link
Collaborator

HumairAK commented Oct 3, 2024

Perfect, thanks guys

tested and works as well with the following pipeline:

pipeline.py
from typing import List

from kfp import dsl, compiler
from kfp.dsl import Dataset
from kfp.dsl import Output, InputPath

@dsl.component(base_image="quay.io/opendatahub/ds-pipelines-ci-executor-image:v1.0")
def component1(date_to_process: str, output_df: Output[Dataset]):
    with open(output_df.path, 'w') as f:
        f.write(date_to_process)

@dsl.component(base_image="quay.io/opendatahub/ds-pipelines-ci-executor-image:v1.0")
def component2(dataset_in: InputPath('Dataset')):
    with open(dataset_in, 'r') as input_file:
        dataset_one_contents = input_file.read()
    print(dataset_one_contents)

@dsl.component(base_image="quay.io/opendatahub/ds-pipelines-ci-executor-image:v1.0")
def prepare_dates_component() -> List[str]:
    return ["1", "2", "3", "4", "5", "6"]

@dsl.pipeline
def inner_pipeline(date_to_process: str):
    comp1_task = component1(date_to_process = date_to_process).set_caching_options(enable_caching=False)
    comp2_task = component2(dataset_in = comp1_task.outputs["output_df"]).set_caching_options(enable_caching=False)

@dsl.pipeline
def main_pipeline():
    prepare_dates_task = prepare_dates_component().set_caching_options(enable_caching=False)

    with dsl.ParallelFor(items = prepare_dates_task.output, parallelism=4) as date_to_process:
        inner_ppln_task = inner_pipeline(date_to_process = date_to_process)


if __name__ == '__main__':
    compiler.Compiler().compile(main_pipeline, __file__ + '.yaml')

before:
Pasted image 20241003162455

after:
image

/lgtm
/approve

Copy link

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: HumairAK

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@google-oss-prow google-oss-prow bot merged commit 219725d into kubeflow:master Oct 3, 2024
13 checks passed
@b4sus b4sus deleted the fix_backend_overwriting_artifacts branch October 4, 2024 07:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants