Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dataflow runner does not respect ParDo's lifecycle on case of exceptions #18592

Open
kennknowles opened this issue Jun 3, 2022 · 2 comments
Open

Comments

@kennknowles
Copy link
Member

The lifecycle of the DoFn is not respected in case of exception in any of the lifecycle methods after setup.

Imported from Jira BEAM-3245. Original Jira may contain additional context.
Reported by: iemejia.

@Abacn
Copy link
Contributor

Abacn commented Jul 23, 2024

Found that tearDown is never called in Dataflow runner v1 or v2, even pipeline finished normally

Just use a simple DoFn:

    pipeline
        .apply(GenerateSequence.from(0).to(1000))
        .apply(ParDo.of(new SomeDoFn()));

where SomeDoFn is

static class SomeDoFn extends DoFn<Long, Long> {

    protected transient Integer id;

    @Setup
    public void setup() {
      Integer idOld = id;
      id = ThreadLocalRandom.current().nextInt(0, 1000);
      LOG.info("@Setup {}, {} at {}", idOld, id, Integer.toHexString(System.identityHashCode(this)));
    }

    @StartBundle
    public void startBundle() {
      LOG.info("@StartBundle {} at {}", id, Integer.toHexString(System.identityHashCode(this)));
    }

    @ProcessElement
    public void process(@Element Long input, OutputReceiver<Long> receiver) {
      // LOG.info("@ProcessElement for {} at bundle {}", input, id);
      receiver.output(input);
    }

    @FinishBundle
    public void finishBundle() throws Exception {
      LOG.info("@FinishBundle {} at {}", id, Integer.toHexString(System.identityHashCode(this)));
    }

    @Teardown
    public void teardown() {
      LOG.info("@Teardown {} at {}", id, Integer.toHexString(System.identityHashCode(this)));
    }

    @Override
    protected void finalize() throws Throwable {
      LOG.info("finalize");
    }
  }

seen log in setup, start/finishBundle, but not teardown.

@Abacn
Copy link
Contributor

Abacn commented Sep 20, 2024

Checked that for Dataflow legacy runner, teardown is called after a DoFn throw error in processElement or finishBundle, but not Dataflow runner v2.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants