import numbers
import six
import numpy as np
import scipy.sparse as sp
from scipy.linalg import lu, qr, svd
from sklearn import decomposition as skl_decomposition
from sklearn.utils import check_array, check_random_state
from sklearn.utils.extmath import svd_flip, safe_sparse_dot
from sklearn.utils.validation import check_is_fitted
import Orange.data
from Orange.statistics import util as ut
from Orange.data import Variable
from Orange.data.util import get_unique_names
from Orange.misc.wrapper_meta import WrapperMeta
from Orange.preprocess.score import LearnerScorer
from Orange.projection import SklProjector, DomainProjection
__all__ = ["PCA", "SparsePCA", "IncrementalPCA", "TruncatedSVD"]
def randomized_pca(A, n_components, n_oversamples=10, n_iter="auto",
flip_sign=True, random_state=0):
"""Compute the randomized PCA decomposition of a given matrix.
This method differs from the scikit-learn implementation in that it supports
and handles sparse matrices well.
"""
if n_iter == "auto":
# Checks if the number of iterations is explicitly specified
# Adjust n_iter. 7 was found a good compromise for PCA. See sklearn #5299
n_iter = 7 if n_components < .1 * min(A.shape) else 4
n_samples, n_features = A.shape
c = np.atleast_2d(ut.nanmean(A, axis=0))
if n_samples >= n_features:
Q = random_state.normal(size=(n_features, n_components + n_oversamples))
if A.dtype.kind == "f":
Q = Q.astype(A.dtype, copy=False)
Q = safe_sparse_dot(A, Q) - safe_sparse_dot(c, Q)
# Normalized power iterations
for _ in range(n_iter):
Q = safe_sparse_dot(A.T, Q) - safe_sparse_dot(c.T, Q.sum(axis=0)[None, :])
Q, _ = lu(Q, permute_l=True)
Q = safe_sparse_dot(A, Q) - safe_sparse_dot(c, Q)
Q, _ = lu(Q, permute_l=True)
Q, _ = qr(Q, mode="economic")
QA = safe_sparse_dot(A.T, Q) - safe_sparse_dot(c.T, Q.sum(axis=0)[None, :])
R, s, V = svd(QA.T, full_matrices=False)
U = Q.dot(R)
else: # n_features > n_samples
Q = random_state.normal(size=(n_samples, n_components + n_oversamples))
if A.dtype.kind == "f":
Q = Q.astype(A.dtype, copy=False)
Q = safe_sparse_dot(A.T, Q) - safe_sparse_dot(c.T, Q.sum(axis=0)[None, :])
# Normalized power iterations
for _ in range(n_iter):
Q = safe_sparse_dot(A, Q) - safe_sparse_dot(c, Q)
Q, _ = lu(Q, permute_l=True)
Q = safe_sparse_dot(A.T, Q) - safe_sparse_dot(c.T, Q.sum(axis=0)[None, :])
Q, _ = lu(Q, permute_l=True)
Q, _ = qr(Q, mode="economic")
QA = safe_sparse_dot(A, Q) - safe_sparse_dot(c, Q)
U, s, R = svd(QA, full_matrices=False)
V = R.dot(Q.T)
if flip_sign:
U, V = svd_flip(U, V)
return U[:, :n_components], s[:n_components], V[:n_components, :]
class ImprovedPCA(skl_decomposition.PCA):
"""Patch sklearn PCA learner to include randomized PCA for sparse matrices.
Scikit-learn does not currently support sparse matrices at all, even though
efficient methods exist for PCA. This class patches the default scikit-learn
implementation to properly handle sparse matrices.
Notes
-----
- This should be removed once scikit-learn releases a version which
implements this functionality.
"""
# pylint: disable=too-many-branches
def _fit(self, X):
"""Dispatch to the right submethod depending on the chosen solver."""
X = self._validate_data(
X,
dtype=[np.float64, np.float32],
reset=False,
accept_sparse=["csr", "csc"],
copy=self.copy
)
# Handle n_components==None
if self.n_components is None:
if self.svd_solver != "arpack":
n_components = min(X.shape)
else:
n_components = min(X.shape) - 1
else:
n_components = self.n_components
# Handle svd_solver
self._fit_svd_solver = self.svd_solver
if self._fit_svd_solver == "auto":
# Sparse data can only be handled with the randomized solver
if sp.issparse(X):
self._fit_svd_solver = "randomized"
# Small problem or n_components == 'mle', just call full PCA
elif max(X.shape) <= 500 or n_components == "mle":
self._fit_svd_solver = "full"
elif 1 <= n_components < .8 * min(X.shape):
self._fit_svd_solver = "randomized"
# This is also the case of n_components in (0,1)
else:
self._fit_svd_solver = "full"
# Ensure we don't try call arpack or full on a sparse matrix
if sp.issparse(X) and self._fit_svd_solver != "randomized":
raise ValueError("only the randomized solver supports sparse matrices")
# Call different fits for either full or truncated SVD
if self._fit_svd_solver == "full":
return self._fit_full(X, n_components)
elif self._fit_svd_solver in ["arpack", "randomized"]:
return self._fit_truncated(X, n_components, self._fit_svd_solver)
else:
raise ValueError(
"Unrecognized svd_solver='{0}'".format(self._fit_svd_solver)
)
def _fit_truncated(self, X, n_components, svd_solver):
"""Fit the model by computing truncated SVD (by ARPACK or randomized) on X"""
n_samples, n_features = X.shape
if isinstance(n_components, six.string_types):
raise ValueError(
"n_components=%r cannot be a string with svd_solver='%s'" %
(n_components, svd_solver)
)
if not 1 <= n_components <= min(n_samples, n_features):
raise ValueError(
"n_components=%r must be between 1 and min(n_samples, "
"n_features)=%r with svd_solver='%s'" % (
n_components, min(n_samples, n_features), svd_solver
)
)
if not isinstance(n_components, (numbers.Integral, np.integer)):
raise ValueError(
"n_components=%r must be of type int when greater than or "
"equal to 1, was of type=%r" % (n_components, type(n_components))
)
if svd_solver == "arpack" and n_components == min(n_samples, n_features):
raise ValueError(
"n_components=%r must be strictly less than min(n_samples, "
"n_features)=%r with svd_solver='%s'" % (
n_components, min(n_samples, n_features), svd_solver
)
)
random_state = check_random_state(self.random_state)
self.mean_ = X.mean(axis=0)
total_var = ut.var(X, axis=0, ddof=1)
if svd_solver == "arpack":
# Center data
X -= self.mean_
# random init solution, as ARPACK does it internally
v0 = random_state.uniform(-1, 1, size=min(X.shape))
U, S, V = sp.linalg.svds(X, k=n_components, tol=self.tol, v0=v0)
# svds doesn't abide by scipy.linalg.svd/randomized_svd
# conventions, so reverse its outputs.
S = S[::-1]
# flip eigenvectors' sign to enforce deterministic output
U, V = svd_flip(U[:, ::-1], V[::-1])
elif svd_solver == "randomized":
# sign flipping is done inside
U, S, V = randomized_pca(
X,
n_components=n_components,
n_iter=self.iterated_power,
flip_sign=True,
random_state=random_state,
)
self.n_samples_ = n_samples
self.components_ = V
self.n_components_ = n_components
# Get variance explained by singular values
self.explained_variance_ = (S ** 2) / (n_samples - 1)
self.explained_variance_ratio_ = self.explained_variance_ / total_var.sum()
self.singular_values_ = S.copy() # Store the singular values.
if self.n_components_ < min(n_features, n_samples):
self.noise_variance_ = (total_var.sum() - self.explained_variance_.sum())
self.noise_variance_ /= min(n_features, n_samples) - n_components
else:
self.noise_variance_ = 0
return U, S, V
def transform(self, X):
check_is_fitted(self, ["mean_", "components_"], all_or_any=all)
X = self._validate_data(
X,
accept_sparse=["csr", "csc"],
dtype=[np.float64, np.float32],
reset=False,
copy=self.copy
)
if self.mean_ is not None:
X = X - self.mean_
X_transformed = np.dot(X, self.components_.T)
if self.whiten:
X_transformed /= np.sqrt(self.explained_variance_)
return X_transformed
class _FeatureScorerMixin(LearnerScorer):
feature_type = Variable
component = 0
def score(self, data):
model = self(data)
return (
np.abs(model.components_[:self.component]) if self.component
else np.abs(model.components_),
model.orig_domain.attributes)
[docs]
class PCA(SklProjector, _FeatureScorerMixin):
__wraps__ = ImprovedPCA
name = 'PCA'
supports_sparse = True
def __init__(self, n_components=None, copy=True, whiten=False,
svd_solver='auto', tol=0.0, iterated_power='auto',
random_state=None, preprocessors=None):
super().__init__(preprocessors=preprocessors)
self.params = vars()
def fit(self, X, Y=None):
params = self.params.copy()
if params["n_components"] is not None:
params["n_components"] = min(min(X.shape), params["n_components"])
proj = self.__wraps__(**params)
proj = proj.fit(X, Y)
return PCAModel(proj, self.domain, len(proj.components_))
[docs]
class SparsePCA(SklProjector):
__wraps__ = skl_decomposition.SparsePCA
name = 'Sparse PCA'
supports_sparse = False
def __init__(self, n_components=None, alpha=1, ridge_alpha=0.01,
max_iter=1000, tol=1e-8, method='lars', n_jobs=1, U_init=None,
V_init=None, verbose=False, random_state=None, preprocessors=None):
super().__init__(preprocessors=preprocessors)
self.params = vars()
def fit(self, X, Y=None):
proj = self.__wraps__(**self.params)
proj = proj.fit(X, Y)
return PCAModel(proj, self.domain, len(proj.components_))
class PCAModel(DomainProjection, metaclass=WrapperMeta):
var_prefix = "PC"
def _get_var_names(self, n):
names = [f"{self.var_prefix}{postfix}" for postfix in range(1, n + 1)]
return get_unique_names(self.orig_domain, names)
[docs]
class IncrementalPCA(SklProjector):
__wraps__ = skl_decomposition.IncrementalPCA
name = 'Incremental PCA'
supports_sparse = False
def __init__(self, n_components=None, whiten=False, copy=True,
batch_size=None, preprocessors=None):
super().__init__(preprocessors=preprocessors)
self.params = vars()
def fit(self, X, Y=None):
proj = self.__wraps__(**self.params)
proj = proj.fit(X, Y)
return IncrementalPCAModel(proj, self.domain, len(proj.components_))
def partial_fit(self, data):
return self(data)
class IncrementalPCAModel(PCAModel):
def partial_fit(self, data):
if isinstance(data, Orange.data.Storage):
if data.domain != self.pre_domain:
data = data.from_table(self.pre_domain, data)
self.proj.partial_fit(data.X)
else:
self.proj.partial_fit(data)
self.__dict__.update(self.proj.__dict__)
return self
class TruncatedSVD(SklProjector, _FeatureScorerMixin):
__wraps__ = skl_decomposition.TruncatedSVD
name = 'Truncated SVD'
supports_sparse = True
def __init__(self, n_components=2, algorithm='randomized', n_iter=5,
random_state=None, tol=0.0, preprocessors=None):
super().__init__(preprocessors=preprocessors)
self.params = vars()
def fit(self, X, Y=None):
params = self.params.copy()
# strict requirement in scikit fit_transform:
# n_components must be < n_features
params["n_components"] = min(min(X.shape)-1, params["n_components"])
proj = self.__wraps__(**params)
proj = proj.fit(X, Y)
return PCAModel(proj, self.domain, len(proj.components_))