Source code for Orange.classification.outlier_detection

# pylint: disable=unused-argument
from typing import Callable

import numpy as np

from sklearn.covariance import EllipticEnvelope
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.svm import OneClassSVM

from Orange.base import SklLearner, SklModel
from Orange.data import Table, Domain, DiscreteVariable, ContinuousVariable
from Orange.data.util import get_unique_names, SharedComputeValue
from Orange.preprocess import AdaptiveNormalize
from Orange.util import dummy_callback

__all__ = ["LocalOutlierFactorLearner", "IsolationForestLearner",
           "EllipticEnvelopeLearner", "OneClassSVMLearner"]


class _CachedTransform:
    # to be used with SharedComputeValue
    def __init__(self, model):
        self.model = model

    def __call__(self, data):
        return self.model.data_to_model_domain(data)


class _OutlierModel(SklModel):
    def __init__(self, skl_model):
        super().__init__(skl_model)
        self.outlier_var = None
        self.cached_transform = _CachedTransform(self)

    def predict(self, X: np.ndarray) -> np.ndarray:
        pred = self.skl_model.predict(X)
        pred[pred == -1] = 0
        return pred[:, None]

    def new_domain(self, data: Table) -> Domain:
        assert self.outlier_var is not None
        return Domain(data.domain.attributes, data.domain.class_vars,
               data.domain.metas + (self.outlier_var,))

    def __call__(self, data: Table, progress_callback: Callable = None) \
            -> Table:
        assert isinstance(data, Table)

        domain = self.new_domain(data)
        if progress_callback is None:
            progress_callback = dummy_callback
        progress_callback(0, "Predicting...")
        new_table = data.transform(domain)
        progress_callback(1)
        return new_table


class _OutlierLearner(SklLearner):
    __returns__ = _OutlierModel
    supports_multiclass = True

    def _fit_model(self, data: Table) -> _OutlierModel:
        domain = data.domain
        model = super()._fit_model(data.transform(Domain(domain.attributes)))

        transformer = _Transformer(model)
        names = [v.name for v in domain.variables + domain.metas]
        variable = DiscreteVariable(
            get_unique_names(names, "Outlier"),
            values=("Yes", "No"),
            compute_value=transformer
        )

        model.outlier_var = variable
        return model


class _Transformer(SharedComputeValue):
    def __init__(self, model: _OutlierModel):
        super().__init__(model.cached_transform)
        self._model = model

    def compute(self, data: Table, shared_data: Table) -> np.ndarray:
        return self._model.predict(shared_data.X)[:, 0]


[docs] class OneClassSVMLearner(_OutlierLearner): name = "One class SVM" __wraps__ = OneClassSVM preprocessors = SklLearner.preprocessors + [AdaptiveNormalize()] supports_weights = True def __init__(self, kernel='rbf', degree=3, gamma="auto", coef0=0.0, tol=0.001, nu=0.5, shrinking=True, cache_size=200, max_iter=-1, preprocessors=None): super().__init__(preprocessors=preprocessors) self.params = vars()
[docs] class LocalOutlierFactorLearner(_OutlierLearner): __wraps__ = LocalOutlierFactor name = "Local Outlier Factor" supports_weights = False def __init__(self, n_neighbors=20, algorithm="auto", leaf_size=30, metric="minkowski", p=2, metric_params=None, contamination="auto", novelty=True, n_jobs=None, preprocessors=None): super().__init__(preprocessors=preprocessors) self.params = vars()
[docs] class IsolationForestLearner(_OutlierLearner): __wraps__ = IsolationForest name = "Isolation Forest" supports_weights = True def __init__(self, n_estimators=100, max_samples='auto', contamination='auto', max_features=1.0, bootstrap=False, n_jobs=None, behaviour='deprecated', random_state=None, verbose=0, warm_start=False, preprocessors=None): super().__init__(preprocessors=preprocessors) self.params = vars()
class EllipticEnvelopeClassifier(_OutlierModel): def __init__(self, skl_model): super().__init__(skl_model) self.mahal_var = None def mahalanobis(self, observations: np.ndarray) -> np.ndarray: """Computes squared Mahalanobis distances of given observations. Parameters ---------- observations : ndarray (n_samples, n_features) Returns ------- distances : ndarray (n_samples, 1) Squared Mahalanobis distances given observations. """ return self.skl_model.mahalanobis(observations)[:, None] def new_domain(self, data: Table) -> Domain: assert self.mahal_var is not None domain = super().new_domain(data) return Domain(domain.attributes, domain.class_vars, domain.metas + (self.mahal_var,)) class _TransformerMahalanobis(_Transformer): def compute(self, data: Table, shared_data: Table) -> np.ndarray: return self._model.mahalanobis(shared_data.X)[:, 0]
[docs] class EllipticEnvelopeLearner(_OutlierLearner): __wraps__ = EllipticEnvelope __returns__ = EllipticEnvelopeClassifier name = "Covariance Estimator" supports_weights = False def __init__(self, store_precision=True, assume_centered=False, support_fraction=None, contamination=0.1, random_state=None, preprocessors=None): super().__init__(preprocessors=preprocessors) self.params = vars() def _fit_model(self, data: Table) -> EllipticEnvelopeClassifier: domain = data.domain model = super()._fit_model(data.transform(Domain(domain.attributes))) transformer = _TransformerMahalanobis(model) names = [v.name for v in domain.variables + domain.metas] variable = ContinuousVariable( get_unique_names(names, "Mahalanobis"), compute_value=transformer ) model.mahal_var = variable return model