Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Non-deterministic map operator option in data loading #980

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

MartinGleize
Copy link
Contributor

What does this PR do? Please describe:

Add the "deterministic" boolean parameter to the map operator in data_pipeline. It follows similar semantics as the tensorflow equivalent: https://www.tensorflow.org/api_docs/python/tf/data/Dataset#map , i.e. it trades the matching of output and input order with some execution speed.

Does your PR introduce any breaking changes? If yes, please list them:
No

Check list:

  • Was the content of this PR discussed and approved via a GitHub issue? (no need for typos or documentation improvements)
  • Did you read the contributor guideline?
  • Did you make sure that your PR does only one thing instead of bundling different changes together?
  • Did you make sure to update the documentation with your changes? (if necessary)
  • Did you write any new necessary tests?
  • Did you verify new and existing tests pass locally with your changes?
  • Did you update the CHANGELOG? (no need for typos, documentation, or minor internal changes)

@facebook-github-bot facebook-github-bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label Jan 20, 2025
@MartinGleize
Copy link
Contributor Author

Tested on toy example:

num_parallel_calls = 10

def fn(d: int) -> int:
    time.sleep(d)
    return d

pipeline = (
    read_sequence(list(range(num_parallel_calls, 0, -1)))
        .map(fn, num_parallel_calls=num_parallel_calls, deterministic=False)
        .and_return()
)

"=True" returns 10, 9, 8, ... in a quick burst after a 10s wait, while "=False" returns 1, 2, 3, ..., one every second.

}

task = std::move(tasks_.front());
tasks_.pop();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we track somehow the submission time here ? (to make sure that among finished tasks we return tasks according to their submission order).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not, that's a good point (they will come out sorted by completion order). Is the option of preserving submission order as much as feasible still desirable in the deterministic=False setting?

}

bool
map_data_source::fill_buffer_async()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm right to say that the max buffer size will be num_parallel_calls in any case ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This was intended, I wondered about providing the option to have it higher, would that be interesting? Maybe for slightly better throughput.

Comment on lines +352 to +353
def fn(d: int) -> int:
return d
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be important to add a test where different tasks have different sleep(time) and to check the return order

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran the sleep test locally, but it would be nice to have some stronger CI test you're right. This feels non-trivial to mock, and actually thinking about it we probably shouldn't assume order even with long processing times, right?

Like:

def fn(d: int) -> int:
time.sleep(d)
return d

In practice in a local test the d's are indeed produced in order, but in theory there should be no such guarantee.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants