Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add Dask Implementation of PCA Functions #259

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions allel/stats/decomposition.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, division


import numpy as np

import dask.array as da

from allel.stats.preprocessing import get_scaler

Expand Down Expand Up @@ -80,8 +79,6 @@ def fit_transform(self, gn):
return u

def _fit(self, gn):
import scipy.linalg

# apply scaling
gn = self.scaler_.fit(gn).transform(gn)

Expand All @@ -91,7 +88,12 @@ def _fit(self, gn):
n_samples, n_features = x.shape

# singular value decomposition
u, s, v = scipy.linalg.svd(x, full_matrices=False)
if type(x) is da.Array:
from dask.array.linalg import svd as dask_svd
u, s, v = dask_svd(x)
else:
import scipy.linalg
u, s, v = scipy.linalg.svd(x, full_matrices=False)

# calculate explained variance
explained_variance_ = (s ** 2) / n_samples
Expand Down Expand Up @@ -209,7 +211,6 @@ def fit_transform(self, gn):

def _fit(self, gn):
from sklearn.utils.validation import check_random_state
from sklearn.utils.extmath import randomized_svd

# apply scaling
gn = self.scaler_.fit(gn).transform(gn)
Expand All @@ -224,9 +225,15 @@ def _fit(self, gn):
n_samples, n_features = x.shape

# singular value decomposition
u, s, v = randomized_svd(x, n_components,
n_iter=self.iterated_power,
random_state=random_state)
if type(x) is da.Array:
from dask.array.linalg import svd_compressed
u, s, v = svd_compressed(x, n_components,
n_power_iter=self.iterated_power)
else:
from sklearn.utils.extmath import randomized_svd
u, s, v = randomized_svd(x, n_components,
n_iter=self.iterated_power,
random_state=random_state)

# calculate explained variance
self.explained_variance_ = exp_var = (s ** 2) / n_samples
Expand Down
30 changes: 21 additions & 9 deletions allel/stats/preprocessing.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, division


import numpy as np
import dask.array as da



from allel.compat import text_type
Expand Down Expand Up @@ -37,11 +38,13 @@ def fit(self, gn):
# check input
gn = asarray_ndim(gn, 2)

# find mean
self.mean_ = np.mean(gn, axis=1, keepdims=True)

# find scaling factor
self.std_ = np.std(gn, axis=1, keepdims=True)
# find mean and scaling factor
if type(gn) is da.Array:
self.mean_ = da.mean(gn, axis=1, keepdims=True)
self.std_ = da.std(gn, axis=1, keepdims=True)
else:
self.mean_ = np.mean(gn, axis=1, keepdims=True)
self.std_ = np.std(gn, axis=1, keepdims=True)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would just use gn.mean(...) which should work in both cases.


return self

Expand Down Expand Up @@ -79,7 +82,10 @@ def fit(self, gn):
gn = asarray_ndim(gn, 2)

# find mean
self.mean_ = np.mean(gn, axis=1, keepdims=True)
if type(gn) is da.Array:
self.mean_ = da.mean(gn, axis=1, keepdims=True)
else:
self.mean_ = np.mean(gn, axis=1, keepdims=True)

return self

Expand Down Expand Up @@ -115,11 +121,17 @@ def fit(self, gn):
gn = asarray_ndim(gn, 2)

# find mean
self.mean_ = np.mean(gn, axis=1, keepdims=True)
if type(gn) is da.Array:
self.mean_ = da.mean(gn, axis=1, keepdims=True)
else:
self.mean_ = np.mean(gn, axis=1, keepdims=True)

# find scaling factor
p = self.mean_ / self.ploidy
self.std_ = np.sqrt(p * (1 - p))
if type(gn) is da.Array:
self.std_ = da.sqrt(p * (1 - p))
else:
self.std_ = np.sqrt(p * (1 - p))

return self

Expand Down
8 changes: 7 additions & 1 deletion allel/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import numpy as np

import dask.array as da

from allel.compat import string_types

Expand Down Expand Up @@ -49,7 +50,12 @@ def asarray_ndim(a, *ndims, **kwargs):
kwargs.setdefault('copy', False)
if a is None and allow_none:
return None
a = np.array(a, **kwargs)
if type(a) is da.Array:
# Remove copy kwarg if it exists (Dask does not support this parameter)
kwargs.pop('copy', False)
a = da.array(a, **kwargs)
else:
a = np.array(a, **kwargs)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would raise an issue with Dask asking them to support this keyword

if a.ndim not in ndims:
if len(ndims) > 1:
expect_str = 'one of %s' % str(ndims)
Expand Down