Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Support indexed table reads in lowered execution #9522

Merged
merged 3 commits into from
Oct 6, 2020

Conversation

tpoterba
Copy link
Contributor

No description provided.

@tpoterba tpoterba added the WIP label Sep 29, 2020
@tpoterba
Copy link
Contributor Author

a large diff, but not that complex. The changes to Emit/EmitStream are rote -- adding ctx to the EmitStreamContext and entry points.

The design here (serializing the unstaged closures to generate decoders and such) is pretty gross, and I intend to fix this when we have infrastructure to mix interpreted TableIRs and lowered TableStages, and we can remove the interpreted indexed reader.

Copy link
Collaborator

@patrick-schultz patrick-schultz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay. Just a couple nits.

val ctxMemo = ctxStruct.asBaseStruct.memoize(cb, "pnri_ctx_struct")
cb.assign(idxr, getIndexReader(ctxMemo
.loadField(cb, "indexPath")
.handle(cb, cb._fatal(""))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use IEmitCode.get when you don't have a specific error message.


spec.rowsSpec.readTableStage(ctx, spec.rowsComponent.absolutePath(params.path), requestedType.rowType, partitioner, filterIntervals).apply(globals)
//
// val rowsPath = spec.rowsComponent.absolutePath(params.path)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete commented out old code?

@tpoterba
Copy link
Contributor Author

tpoterba commented Oct 6, 2020

addressed, thanks for the comments.

@danking danking merged commit 2b0aded into hail-is:main Oct 6, 2020
@danking danking mentioned this pull request Nov 30, 2023
danking added a commit that referenced this pull request Nov 30, 2023
CHANGELOG: Fix #13998 which appeared in 0.2.58 and prevented reading
from a networked filesystem mounted within the filesystem of the worker
node for certain pipelines (those that did not trigger "lowering").

We use the IndexReader in `PartitionNativeIntervalReader`,
`PartitionNativeReaderIndexed`, and
`PartitionZippedIndexedNativeReader`.

1. `PartitionNativeIntervalReader` is only used by `query_table`.

2. `PartitionNativeReaderIndexed` is only used by
`IndexedRVDSpec2.readTableStage` which is used by `TableNativeReader`
when there is a new partitioner.

3. `PartitionZippedIndexedNativeReader` is only sued by
`AbstractRVDSpec.readZippedLowered` when there is a new partitioner.

Two is for tables, three is for matrix tables. In `readZippedLowered` we
explicitly [drop the file
protocol](https://github.com/hail-is/hail/blob/1dedf3c63f9aabf1b6ce538165360056f82f76e4/hail/src/main/scala/is/hail/rvd/AbstractRVDSpec.scala#L154-L155):

```
        val absPathLeft = removeFileProtocol(pathLeft)
        val absPathRight = removeFileProtocol(pathRight)
```

We have done this, by various names, since this lowered code path was
added. I added `removeFileProtocol` because stripping the protocol in
Query-on-Batch prevented the reading and writing of gs:// URIs, the only
URIs I could read in QoB. `uriPath` (the function whose use I replaced
with `removeFileProtocol`) was added by Cotton [a very long time
ago](92a9936).
It seems he added it so that he could use HDFS to generate a temporary
file path on the local filesystem but pass the file path to binary tools
that know nothing of HDFS and file:// URIs.

#9522 added the lowered code path and thus introduced this bug. It
attempted to mirror the extant code in
[`readIndexedPartitions`](https://github.com/hail-is/hail/blob/2b0aded9206849252b453dd80710cea8d2156793/hail/src/main/scala/is/hail/HailContext.scala#L421-L440)
which *does not* strip any protocols from the path.

This has gone undetected because we never try to read data through the
OS's filesystem. We always use gs://, Azure, or s3:// because we do not
test in environments that have a networked file system mounted in the
OS's filesystem. To replicate this bug (and add a test for it), we would
need a cluster with a lustre file system (or another networked
filesystem). This would be a fairly large lift. The fix is trivial: just
never intentionally strip the protocol!
hail-ci-robot pushed a commit that referenced this pull request Mar 28, 2024
CHANGELOG: Fixes a serious, but likely rare, bug in the
Table/MatrixTable reader, which has been present since Sep 2020. It
manifests as many (around half or more) of the rows being dropped. This
could only happen when 1) reading a (matrix)table whose partitioning
metadata allows rows with the same key to be split across neighboring
partitions, and 2) reading it with a different partitioning than it was
written. 1) would likely only happen by reading data keyed by locus and
alleles, and rekeying it to only locus before writing. 2) would likely
only happen by using the `_intervals` or `_n_partitions` arguments to
`read_(matrix)_table`, or possibly `repartition`. Please reach out to us
if you're concerned you may have been affected by this.

This fixes a serious and longstanding bug in `IndexedRVDSpec2`, which
appears to have been around since this code was first added in #9522
almost four years ago. It was reported in this [zulip
thread](https://hail.zulipchat.com/#narrow/stream/123010-Hail-Query-0.2E2-support/topic/Number.20of.20rows.20changing.20with.20partitioning).
I want to do further work to better characterize exactly what it takes
to be affected by this bug, but I think you must have a table or
matrixtable on disk which has duplicate keys, and moreover keys which
span neighboring partitions, and then you must read the data with a
different partitioner.

The root of the issue is an invalid assumption made in the code. To read
data written with partitioner `p1` using new partitioner `p2`, it first
computes the "intersection", or common refinement, of the two. It then
assumes that each partition in the refinement overlaps exactly one
partition of `p1`. But this is only true if the partitions of `p1` are
themselves mutually disjoint, which is usually but not necessarily true.

For example, suppose `p1 = [ [1, 5], [5, 8] ]` is the old partitioner,
and `p2 = [ [1, 4), [4, 8] ]` is the new. Note that the two input
partitions are not disjoint, as the key `5` is allowed in both. The
common refinement would then be `[ [1, 4), [4, 5], [5, 8] ]`. For each
partition in the refinement, we want to read in the corresponding range
from the appropriate input partition, then we want to group the
partitions in the refinement to match the new partitioner. The code
finds "the appropriate input partition" by taking the first input
partition which overlaps the refinement partition, using
`lowerBoundInterval`. That works if there is only one overlapping input
partition, but here fails, since the refinement partition `[5, 8]`
overlaps both input partitions. So the code mistakenly reads from the
input partition `[1, 5]` to produce the refinement partition `[5, 8]`,
and so completely drops all rows in the input `[5, 8]`.

In practice, I think the most likely way to run into this (and the way
it was found by a user) is to have a dataset keyed by `["locus",
"alleles"]`, which has split multi-allelics, so there are multiple rows
with the same locus. Then shorten the key to `["locus"]`, write the
dataset to disk, and read it back with a different partitioning, e.g. by
passing a `_n_partitions` argument to `read_table` or
`read_matrix_table`. For instance, if the partitioning was originally `[
[{1:1, ["A"]}, {1:500, ["G"]}), [{1:500, ["G"]}, {1:1000, ["C"]}] ]`,
then after shortening the key it would be `[ [1:1, 1:500], [1:500,
1:1000] ]`. Notice that even though the original partitioning had no
overlap, it does after shortening the key, because rows with locus
`1:500` with alleles less than `["G"]` are allowed in the first
partition, so we have to make the right endpoint inclusive after
shortening. You would then need to write this rekeyed dataset to disk
and read it back with different partitioning (note that `ds.repartition`
is enough to do this in the batch backend).

I still need to think through what holes in our testing allowed this to
remain undetected for so long, and attempt to plug them. We should also
plan for what to tell a user who is concerned they may have been
affected by this in the past.
chrisvittal pushed a commit to chrisvittal/hail that referenced this pull request Jul 10, 2024
CHANGELOG: Fixes a serious, but likely rare, bug in the
Table/MatrixTable reader, which has been present since Sep 2020. It
manifests as many (around half or more) of the rows being dropped. This
could only happen when 1) reading a (matrix)table whose partitioning
metadata allows rows with the same key to be split across neighboring
partitions, and 2) reading it with a different partitioning than it was
written. 1) would likely only happen by reading data keyed by locus and
alleles, and rekeying it to only locus before writing. 2) would likely
only happen by using the `_intervals` or `_n_partitions` arguments to
`read_(matrix)_table`, or possibly `repartition`. Please reach out to us
if you're concerned you may have been affected by this.

This fixes a serious and longstanding bug in `IndexedRVDSpec2`, which
appears to have been around since this code was first added in hail-is#9522
almost four years ago. It was reported in this [zulip
thread](https://hail.zulipchat.com/#narrow/stream/123010-Hail-Query-0.2E2-support/topic/Number.20of.20rows.20changing.20with.20partitioning).
I want to do further work to better characterize exactly what it takes
to be affected by this bug, but I think you must have a table or
matrixtable on disk which has duplicate keys, and moreover keys which
span neighboring partitions, and then you must read the data with a
different partitioner.

The root of the issue is an invalid assumption made in the code. To read
data written with partitioner `p1` using new partitioner `p2`, it first
computes the "intersection", or common refinement, of the two. It then
assumes that each partition in the refinement overlaps exactly one
partition of `p1`. But this is only true if the partitions of `p1` are
themselves mutually disjoint, which is usually but not necessarily true.

For example, suppose `p1 = [ [1, 5], [5, 8] ]` is the old partitioner,
and `p2 = [ [1, 4), [4, 8] ]` is the new. Note that the two input
partitions are not disjoint, as the key `5` is allowed in both. The
common refinement would then be `[ [1, 4), [4, 5], [5, 8] ]`. For each
partition in the refinement, we want to read in the corresponding range
from the appropriate input partition, then we want to group the
partitions in the refinement to match the new partitioner. The code
finds "the appropriate input partition" by taking the first input
partition which overlaps the refinement partition, using
`lowerBoundInterval`. That works if there is only one overlapping input
partition, but here fails, since the refinement partition `[5, 8]`
overlaps both input partitions. So the code mistakenly reads from the
input partition `[1, 5]` to produce the refinement partition `[5, 8]`,
and so completely drops all rows in the input `[5, 8]`.

In practice, I think the most likely way to run into this (and the way
it was found by a user) is to have a dataset keyed by `["locus",
"alleles"]`, which has split multi-allelics, so there are multiple rows
with the same locus. Then shorten the key to `["locus"]`, write the
dataset to disk, and read it back with a different partitioning, e.g. by
passing a `_n_partitions` argument to `read_table` or
`read_matrix_table`. For instance, if the partitioning was originally `[
[{1:1, ["A"]}, {1:500, ["G"]}), [{1:500, ["G"]}, {1:1000, ["C"]}] ]`,
then after shortening the key it would be `[ [1:1, 1:500], [1:500,
1:1000] ]`. Notice that even though the original partitioning had no
overlap, it does after shortening the key, because rows with locus
`1:500` with alleles less than `["G"]` are allowed in the first
partition, so we have to make the right endpoint inclusive after
shortening. You would then need to write this rekeyed dataset to disk
and read it back with different partitioning (note that `ds.repartition`
is enough to do this in the batch backend).

I still need to think through what holes in our testing allowed this to
remain undetected for so long, and attempt to plug them. We should also
plan for what to tell a user who is concerned they may have been
affected by this in the past.
chrisvittal pushed a commit that referenced this pull request Jul 11, 2024
CHANGELOG: Fixes a serious, but likely rare, bug in the
Table/MatrixTable reader, which has been present since Sep 2020. It
manifests as many (around half or more) of the rows being dropped. This
could only happen when 1) reading a (matrix)table whose partitioning
metadata allows rows with the same key to be split across neighboring
partitions, and 2) reading it with a different partitioning than it was
written. 1) would likely only happen by reading data keyed by locus and
alleles, and rekeying it to only locus before writing. 2) would likely
only happen by using the `_intervals` or `_n_partitions` arguments to
`read_(matrix)_table`, or possibly `repartition`. Please reach out to us
if you're concerned you may have been affected by this.

This fixes a serious and longstanding bug in `IndexedRVDSpec2`, which
appears to have been around since this code was first added in #9522
almost four years ago. It was reported in this [zulip
thread](https://hail.zulipchat.com/#narrow/stream/123010-Hail-Query-0.2E2-support/topic/Number.20of.20rows.20changing.20with.20partitioning).
I want to do further work to better characterize exactly what it takes
to be affected by this bug, but I think you must have a table or
matrixtable on disk which has duplicate keys, and moreover keys which
span neighboring partitions, and then you must read the data with a
different partitioner.

The root of the issue is an invalid assumption made in the code. To read
data written with partitioner `p1` using new partitioner `p2`, it first
computes the "intersection", or common refinement, of the two. It then
assumes that each partition in the refinement overlaps exactly one
partition of `p1`. But this is only true if the partitions of `p1` are
themselves mutually disjoint, which is usually but not necessarily true.

For example, suppose `p1 = [ [1, 5], [5, 8] ]` is the old partitioner,
and `p2 = [ [1, 4), [4, 8] ]` is the new. Note that the two input
partitions are not disjoint, as the key `5` is allowed in both. The
common refinement would then be `[ [1, 4), [4, 5], [5, 8] ]`. For each
partition in the refinement, we want to read in the corresponding range
from the appropriate input partition, then we want to group the
partitions in the refinement to match the new partitioner. The code
finds "the appropriate input partition" by taking the first input
partition which overlaps the refinement partition, using
`lowerBoundInterval`. That works if there is only one overlapping input
partition, but here fails, since the refinement partition `[5, 8]`
overlaps both input partitions. So the code mistakenly reads from the
input partition `[1, 5]` to produce the refinement partition `[5, 8]`,
and so completely drops all rows in the input `[5, 8]`.

In practice, I think the most likely way to run into this (and the way
it was found by a user) is to have a dataset keyed by `["locus",
"alleles"]`, which has split multi-allelics, so there are multiple rows
with the same locus. Then shorten the key to `["locus"]`, write the
dataset to disk, and read it back with a different partitioning, e.g. by
passing a `_n_partitions` argument to `read_table` or
`read_matrix_table`. For instance, if the partitioning was originally `[
[{1:1, ["A"]}, {1:500, ["G"]}), [{1:500, ["G"]}, {1:1000, ["C"]}] ]`,
then after shortening the key it would be `[ [1:1, 1:500], [1:500,
1:1000] ]`. Notice that even though the original partitioning had no
overlap, it does after shortening the key, because rows with locus
`1:500` with alleles less than `["G"]` are allowed in the first
partition, so we have to make the right endpoint inclusive after
shortening. You would then need to write this rekeyed dataset to disk
and read it back with different partitioning (note that `ds.repartition`
is enough to do this in the batch backend).

I still need to think through what holes in our testing allowed this to
remain undetected for so long, and attempt to plug them. We should also
plan for what to tell a user who is concerned they may have been
affected by this in the past.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants