Skip to content

Commit

Permalink
[query] implement write_table_per_column
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel King committed Aug 18, 2022
1 parent 9d489e1 commit a449195
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 32 deletions.
120 changes: 119 additions & 1 deletion hail/python/hail/matrixtable.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
get_key_by_exprs, \
get_select_exprs, check_annotate_exprs, process_joins


class GroupedMatrixTable(ExprContainer):
"""Matrix table grouped by row or column that can be aggregated into a new matrix table."""

Expand Down Expand Up @@ -4164,6 +4163,125 @@ def fmt(f, col_key):

return t

@typecheck_method(output=str,
overwrite=bool,
stage_locally=bool,
_codec_spec=nullable(str),
fanout=int)
def write_table_per_column(self,
output,
*,
overwrite: bool = False,
stage_locally: bool = False,
_codec_spec: Optional[str] = None,
fanout: int = 1024):
"""Write one Hail Table for each column of this Matrix Table.
The column key fields are each converted to a string and concatenated, delimited by "_", and
suffixed by ".ht". For example, suppose the column key for the first column is
``hl.struct(famid=1, sampleid="NA0005")``. This column will be written to `output/1_NA0005.ht`.
Example
-------
Split a dataset into one table per genome:
>>> mt = hl.balding_nichols_model(3, 10, 10)
>>> mt.write_table_per_column("sequences")
>>> hl.read_table("sequences/0.ht").show()
+---------+---------------+------------+
| 0.ht.GT | locus | alleles |
+---------+---------------+------------+
| call | locus<GRCh37> | array<str> |
+---------+---------------+------------+
| 0/0 | 1:1 | ["A","C"] |
| 1/1 | 1:2 | ["A","C"] |
| 1/1 | 1:3 | ["A","C"] |
| 1/1 | 1:4 | ["A","C"] |
| 1/1 | 1:5 | ["A","C"] |
| 1/1 | 1:6 | ["A","C"] |
| 0/0 | 1:7 | ["A","C"] |
| 1/1 | 1:8 | ["A","C"] |
| 1/1 | 1:9 | ["A","C"] |
| 0/1 | 1:10 | ["A","C"] |
+---------+---------------+------------+
Parameters
----------
output : str
Directory into which to write the individual tables.
overwrite : bool
For every output table, replace it if it already exists.
stage_locally : bool
If ``True``, major output will be written to temporary local storage
before being copied to ``output``.
fanout : int
If there are more than `fanout` columns, then a tree export is performed. This method is
called recursively, each time producing `fanout` tables containing roughly even numbers
of columns until each column is present in its own table. The intermediate tables are
automatically deleted when this method finishes.
"""
def fanout_table(n_cols, col_keys, fanout, t, dirname, entries_field, layer, first_index):
assert n_cols > 0
assert fanout > 0
base_group_size = n_cols // fanout
excess_cols = n_cols % fanout
s = 0
individuals = []
slices = []
sizes = []
col_keyses = []
for i in range(fanout):
size = base_group_size + (i < excess_cols)
if size == 0:
pass
elif size == 1:
individuals.append(s)
else:
sizes.append(size)
slices.append(slice(s, (s+size)))
col_keyses.append(col_keys[s:(s+size)])
s += size

individual_tables = {
'_'.join(str(x) for x in col_keys[i].values()) + '.ht': t[entries_field][i]
for i in individuals
}
group_tables = {
f'tmp/{layer:02}_{first_index+idx:08}': t[entries_field][sl]
for idx, sl in enumerate(slices)
}
t.select(
**individual_tables,
**group_tables
).write_many(dirname, fields=list(individual_tables.keys()) + list(group_tables.keys()))
return (
list(individual_tables.keys()),
zip(col_keyses,
group_tables.keys(),
sizes)
)

n_cols = self.count_cols()
col_keys = self.col_key.collect()
t = self.localize_entries('entries', 'cols')

try:
layer = 0
finished, work = fanout_table(n_cols, col_keys, fanout, t, output, 'entries', layer, 0)
while work:
layer += 1
oldwork = work
work = []
for col_keys, path, n_cols in oldwork:
f, w = fanout_table(n_cols, col_keys, fanout, hl.read_table(output + "/" + path), output, path, layer, len(work))
finished.extend(f)
work.extend(w)
return finished
finally:
hl.current_backend().fs.rmtree(output + "/tmp")

@typecheck_method(rows=bool, cols=bool, entries=bool, handler=nullable(anyfunc))
def summarize(self, *, rows=True, cols=True, entries=True, handler=None):
"""Compute and print summary information about the fields in the matrix table.
Expand Down
64 changes: 33 additions & 31 deletions hail/python/hail/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1348,9 +1348,14 @@ def write_many(self,
_codec_spec: Optional[str] = None):
"""Write fields to distinct tables.
Each field is written to a Hail Table inside the `output` directory. The field name is also
used as the filename. Each table is keyed and partitioned the same way this table is.
Examples
--------
Split a table into three tables:
>>> t = hl.utils.range_table(10)
>>> t = t.annotate(a = t.idx, b = t.idx * t.idx, c = hl.str(t.idx))
>>> t.write_many('output', fields=('a', 'b', 'c'))
Expand Down Expand Up @@ -1382,34 +1387,6 @@ def write_many(self,
| 8 | 8 |
| 9 | 9 |
+-------+-------+
>>> hl.read_table('output/b').describe()
----------------------------------------
Global fields:
None
----------------------------------------
Row fields:
'b': int32
'idx': int32
----------------------------------------
Key: ['idx']
----------------------------------------
>>> hl.read_table('output/b').show()
+-------+-------+
| b | idx |
+-------+-------+
| int32 | int32 |
+-------+-------+
| 0 | 0 |
| 1 | 1 |
| 4 | 2 |
| 9 | 3 |
| 16 | 4 |
| 25 | 5 |
| 36 | 6 |
| 49 | 7 |
| 64 | 8 |
| 81 | 9 |
+-------+-------+
>>> hl.read_table('output/c').describe()
----------------------------------------
Global fields:
Expand Down Expand Up @@ -1439,6 +1416,30 @@ def write_many(self,
| "9" | 9 |
+-----+-------+
Split a table after renaming the fields to include the usual ".ht" extension:
>>> t = hl.utils.range_table(10)
>>> t = t.annotate(a = t.idx, b = t.idx * t.idx, c = hl.str(t.idx))
>>> t = t.rename({'a': 'a.ht', 'b': 'b.ht', 'c': 'c.ht'})
>>> t.write_many('output2', fields=('a.ht', 'b.ht', 'c.ht'))
>>> hl.read_table('output2/a.ht').show()
+-------+-------+
| a | idx |
+-------+-------+
| int32 | int32 |
+-------+-------+
| 0 | 0 |
| 1 | 1 |
| 2 | 2 |
| 3 | 3 |
| 4 | 4 |
| 5 | 5 |
| 6 | 6 |
| 7 | 7 |
| 8 | 8 |
| 9 | 9 |
+-------+-------+
.. include:: _templates/write_warning.rst
See Also
Expand All @@ -1448,14 +1449,15 @@ def write_many(self,
Parameters
----------
output : str
Path at which to write.
Directory into which to write the individual tables.
fields : list of str
The fields to write.
overwrite : bool
For every output table, replace it if it already exists.
stage_locally: bool
If ``True``, major output will be written to temporary local storage
before being copied to ``output``.
overwrite : bool
If ``True``, overwrite an existing file at the destination.
"""

Env.backend().execute(
Expand Down

0 comments on commit a449195

Please sign in to comment.