Source code for qunfold.transformers

import numpy as np
import warnings
from abc import ABC, abstractmethod
from functools import partial
from scipy.sparse import csr_matrix
from scipy.spatial.distance import cdist

def class_prevalences(y, n_classes=None):
  """Determine the prevalence of each class.

  Args:
      y: An array of labels, shape (n_samples,).
      n_classes (optional): The number of classes. Defaults to `None`, which corresponds to `np.max(y)+1`.

  Returns:
      An array of class prevalences that sums to one, shape (n_classes,).
  """
  if n_classes is None:
    n_classes = np.max(y)+1
  n_samples_per_class = np.zeros(n_classes, dtype=int)
  i, n = np.unique(y, return_counts=True)
  n_samples_per_class[i] = n # non-existing classes maintain a zero entry
  return n_samples_per_class / n_samples_per_class.sum() # normalize to prevalences

# helper function for ensuring sane labels
def _check_y(y, n_classes=None):
  if n_classes is not None:
    if n_classes != np.max(y)+1:
      warnings.warn(f"Classes are missing: n_classes != np.max(y)+1 = {np.max(y)+1}")

class AbstractTransformer(ABC):
  """Abstract base class for transformers."""
  @abstractmethod
  def fit_transform(self, X, y, average=True, n_classes=None):
    """This abstract method has to fit the transformer and to return the transformation of the input data.

    Note:
        Implementations of this abstract method should check the sanity of labels by calling `_check_y(y, n_classes)` and they must set the property `self.p_trn = class_prevalences(y, n_classes)`.

    Args:
        X: The feature matrix to which this transformer will be fitted.
        y: The labels to which this transformer will be fitted.
        average (optional): Whether to return a transfer matrix `M` or a transformation `(f(X), y)`. Defaults to `True`.
        n_classes (optional): The number of expected classes. Defaults to `None`.

    Returns:
        A transfer matrix `M` if `average==True` or a transformation `(f(X), y)` if `average==False`.
    """
    pass
  @abstractmethod
  def transform(self, X, average=True):
    """This abstract method has to transform the data `X`.

    Args:
        X: The feature matrix that will be transformed.
        average (optional): Whether to return a vector `q` or a transformation `f(X)`. Defaults to `True`.

    Returns:
        A vector `q = f(X).mean(axis=0)` if `average==True` or a transformation `f(X)` if `average==False`.
    """
    pass

# helper function for ClassTransformer(..., is_probabilistic=False)
def _onehot_encoding(y, n_classes):
  return np.eye(n_classes)[y] # https://stackoverflow.com/a/42874726/20580159

[docs] class ClassTransformer(AbstractTransformer): """A classification-based feature transformation. This transformation can either be probabilistic (using the posterior predictions of a classifier) or crisp (using the class predictions of a classifier). It is used in ACC, PACC, CC, PCC, and SLD. Args: classifier: A classifier that implements the API of scikit-learn. is_probabilistic (optional): Whether probabilistic or crisp predictions of the `classifier` are used to transform the data. Defaults to `False`. fit_classifier (optional): Whether to fit the `classifier` when this quantifier is fitted. Defaults to `True`. """ def __init__(self, classifier, is_probabilistic=False, fit_classifier=True): self.classifier = classifier self.is_probabilistic = is_probabilistic self.fit_classifier = fit_classifier
[docs] def fit_transform(self, X, y, average=True, n_classes=None): if not hasattr(self.classifier, "oob_score") or not self.classifier.oob_score: raise ValueError( "The ClassTransformer either requires a bagging classifier with oob_score=True", "or an instance of qunfold.sklearn.CVClassifier" ) _check_y(y, n_classes) self.p_trn = class_prevalences(y, n_classes) n_classes = len(self.p_trn) # not None anymore if self.fit_classifier: self.classifier.fit(X, y) fX = np.zeros((X.shape[0], n_classes)) fX[:, self.classifier.classes_] = self.classifier.oob_decision_function_ is_finite = np.all(np.isfinite(fX), axis=1) fX = fX[is_finite,:] # drop instances that never became OOB y = y[is_finite] if not self.is_probabilistic: fX = _onehot_encoding(np.argmax(fX, axis=1), n_classes) if average: M = np.zeros((n_classes, n_classes)) for c in range(n_classes): if np.sum(y==c) > 0: M[:,c] = fX[y==c].mean(axis=0) return M return fX, y
[docs] def transform(self, X, average=True): n_classes = len(self.p_trn) fX = np.zeros((X.shape[0], n_classes)) fX[:, self.classifier.classes_] = self.classifier.predict_proba(X) if not self.is_probabilistic: fX = _onehot_encoding(np.argmax(fX, axis=1), n_classes) if average: return fX.mean(axis=0) # = q return fX
[docs] class DistanceTransformer(AbstractTransformer): """A distance-based feature transformation, as it is used in `EDx` and `EDy`. Args: metric (optional): The metric with which the distance between data items is measured. Can take any value that is accepted by `scipy.spatial.distance.cdist`. Defaults to `"euclidean"`. preprocessor (optional): Another `AbstractTransformer` that is called before this transformer. Defaults to `None`. """ def __init__(self, metric="euclidean", preprocessor=None): self.metric = metric self.preprocessor = preprocessor
[docs] def fit_transform(self, X, y, average=True, n_classes=None): if self.preprocessor is not None: X, y = self.preprocessor.fit_transform(X, y, average=False, n_classes=n_classes) self.p_trn = self.preprocessor.p_trn # copy from preprocessor else: _check_y(y, n_classes) self.p_trn = class_prevalences(y, n_classes) n_classes = len(self.p_trn) # not None anymore self.X_trn = X self.y_trn = y if average: M = np.zeros((n_classes, n_classes)) for c in range(n_classes): if np.sum(y==c) > 0: M[:,c] = self._transform_after_preprocessor(X[y==c]) return M else: return self._transform_after_preprocessor(X, average=False), y
[docs] def transform(self, X, average=True): if self.preprocessor is not None: X = self.preprocessor.transform(X, average=False) return self._transform_after_preprocessor(X, average=average)
def _transform_after_preprocessor(self, X, average=True): n_classes = len(self.p_trn) fX = np.zeros((X.shape[0], n_classes)) for c in range(n_classes): if np.sum(self.y_trn==c) > 0: fX[:,c] = cdist(X, self.X_trn[self.y_trn==c], metric=self.metric).mean(axis=1) if average: return fX.mean(axis=0) # = q return fX
[docs] class HistogramTransformer(AbstractTransformer): """A histogram-based feature transformation, as it is used in `HDx` and `HDy`. Args: n_bins: The number of bins in each feature. preprocessor (optional): Another `AbstractTransformer` that is called before this transformer. Defaults to `None`. unit_scale (optional): Whether or not to scale each output to a sum of one. A value of `False` indicates that the sum of each output is the number of features. Defaults to `True`. """ def __init__(self, n_bins, preprocessor=None, unit_scale=True): self.n_bins = n_bins self.preprocessor = preprocessor self.unit_scale = unit_scale
[docs] def fit_transform(self, X, y, average=True, n_classes=None): if self.preprocessor is not None: X, y = self.preprocessor.fit_transform(X, y, average=False, n_classes=n_classes) self.p_trn = self.preprocessor.p_trn # copy from preprocessor else: _check_y(y, n_classes) self.p_trn = class_prevalences(y, n_classes) n_classes = len(self.p_trn) # not None anymore self.edges = [] for x in X.T: # iterate over columns = features e = np.histogram_bin_edges(x, bins=self.n_bins) self.edges.append(e) if average: M = np.zeros((X.shape[1] * self.n_bins, n_classes)) for c in range(n_classes): if np.sum(y==c) > 0: M[:,c] = self._transform_after_preprocessor(X[y==c]) return M return self._transform_after_preprocessor(X, average=average), y
[docs] def transform(self, X, average=True): if self.preprocessor is not None: X = self.preprocessor.transform(X, average=False) return self._transform_after_preprocessor(X, average=average)
def _transform_after_preprocessor(self, X, average=True): if not average: fX = [] for j in range(X.shape[1]): # feature index e = self.edges[j][1:] i_row = np.arange(X.shape[0]) i_col = np.clip(np.ceil((X[:,j] - e[0]) / (e[1]-e[0])).astype(int), 0, self.n_bins-1) fX_j = csr_matrix( (np.ones(X.shape[0], dtype=int), (i_row, i_col)), shape = (X.shape[0], self.n_bins), ) fX.append(fX_j.toarray()) fX = np.stack(fX).swapaxes(0, 1).reshape((X.shape[0], -1)) if self.unit_scale: fX = fX / fX.sum(axis=1, keepdims=True) return fX else: # a concatenation of numpy histograms is faster to compute histograms = [] for j in range(X.shape[1]): # feature index e = np.copy(self.edges[j]) e[0] = -np.inf # always use exactly self.n_bins and never omit any items e[-1] = np.inf hist, _ = np.histogram(X[:, j], bins=e) if self.unit_scale: hist = hist / X.shape[1] histograms.append(hist / X.shape[0]) return np.concatenate(histograms) # = q
[docs] class EnergyKernelTransformer(AbstractTransformer): """A kernel-based feature transformation, as it is used in `KMM`, that uses the `energy` kernel: k(x_1, x_2) = ||x_1|| + ||x_2|| - ||x_1 - x_2|| Note: The methods of this transformer do not support setting `average=False`. Args: preprocessor (optional): Another `AbstractTransformer` that is called before this transformer. Defaults to `None`. """ def __init__(self, preprocessor=None): self.preprocessor = preprocessor
[docs] def fit_transform(self, X, y, average=True, n_classes=None): if not average: raise ValueError("EnergyKernelTransformer does not support average=False") if self.preprocessor is not None: X, y = self.preprocessor.fit_transform(X, y, average=False, n_classes=n_classes) self.p_trn = self.preprocessor.p_trn # copy from preprocessor else: _check_y(y, n_classes) self.p_trn = class_prevalences(y, n_classes) n_classes = len(self.p_trn) # not None anymore self.X_trn = X self.y_trn = y self.norms = np.zeros(n_classes) # = ||x|| for c in range(len(self.norms)): if np.sum(y==c) > 0: self.norms[c] = np.linalg.norm(X[y==c], axis=1).mean() M = np.zeros((n_classes, n_classes)) for c in range(n_classes): if np.sum(y==c) > 0: M[:,c] = self._transform_after_preprocessor(X[y==c], self.norms[c]) return M
[docs] def transform(self, X, average=True): if not average: raise ValueError("EnergyKernelTransformer does not support average=False") if self.preprocessor is not None: X = self.preprocessor.transform(X, average=False) norm = np.linalg.norm(X, axis=1).mean() return self._transform_after_preprocessor(X, norm)
def _transform_after_preprocessor(self, X, norm): n_classes = len(self.p_trn) dists = np.zeros(n_classes) # = ||x_1 - x_2|| for all x_2 = X_trn[y_trn == i] for c in range(n_classes): if np.sum(self.y_trn==c) > 0: dists[c] = cdist(X, self.X_trn[self.y_trn==c], metric="euclidean").mean() return norm + self.norms - dists # = ||x_1|| + ||x_2|| - ||x_1 - x_2|| for all x_2
[docs] class GaussianKernelTransformer(AbstractTransformer): """A kernel-based feature transformation, as it is used in `KMM`, that uses the `gaussian` kernel: k(x, y) = exp(-||x - y||^2 / (2σ^2)) Args: sigma (optional): A smoothing parameter of the kernel function. Defaults to `1`. preprocessor (optional): Another `AbstractTransformer` that is called before this transformer. Defaults to `None`. """ def __init__(self, sigma=1, preprocessor=None): self.sigma = sigma self.preprocessor = preprocessor
[docs] def fit_transform(self, X, y, average=True, n_classes=None): if not average: raise ValueError("GaussianKernelTransformer does not support average=False") if self.preprocessor is not None: X, y = self.preprocessor.fit_transform(X, y, average=False, n_classes=n_classes) self.p_trn = self.preprocessor.p_trn # copy from preprocessor else: _check_y(y, n_classes) self.p_trn = class_prevalences(y, n_classes) n_classes = len(self.p_trn) # not None anymore self.X_trn = X self.y_trn = y M = np.zeros((n_classes, n_classes)) for c in range(n_classes): M[:,c] = self._transform_after_preprocessor(X[y==c]) return M
[docs] def transform(self, X, average=True): if not average: raise ValueError("GaussianKernelTransformer does not support average=False") if self.preprocessor is not None: X = self.preprocessor.transform(X, average=False) return self._transform_after_preprocessor(X)
def _transform_after_preprocessor(self, X): n_classes = len(self.p_trn) res = np.zeros(n_classes) for i in range(n_classes): norm_fac = X.shape[0] * self.X_trn[self.y_trn==i].shape[0] sq_dists = cdist(X, self.X_trn[self.y_trn == i], metric="euclidean")**2 res[i] = np.exp(-sq_dists / 2*self.sigma**2).sum() / norm_fac return res
[docs] class KernelTransformer(AbstractTransformer): """A general kernel-based feature transformation, as it is used in `KMM`. If you intend to use a Gaussian kernel or energy kernel, prefer their dedicated and more efficient implementations over this class. Note: The methods of this transformer do not support setting `average=False`. Args: kernel: A callable that will be used as the kernel. Must follow the signature `(X[y==i], X[y==j]) -> scalar`. """ def __init__(self, kernel): self.kernel = kernel
[docs] def fit_transform(self, X, y, average=True, n_classes=None): if not average: raise ValueError("KernelTransformer does not support average=False") _check_y(y, n_classes) self.p_trn = class_prevalences(y, n_classes) n_classes = len(self.p_trn) # not None anymore self.X_trn = X self.y_trn = y M = np.zeros((n_classes, n_classes)) for i in range(n_classes): for j in range(i, n_classes): if np.sum(y==i) > 0 and np.sum(y==j): M[i, j] = self.kernel(X[y==i], X[y==j]) if i != j: M[j, i] = M[i, j] return M
[docs] def transform(self, X, average=True): if not average: raise ValueError("KernelTransformer does not support average=False") n_classes = len(self.p_trn) q = np.zeros(n_classes) for c in range(n_classes): if np.sum(self.y_trn==c) > 0: q[c] = self.kernel(self.X_trn[self.y_trn==c], X) return q
# kernel function for the LaplacianKernelTransformer def _laplacianKernel(X, Y, sigma): nx = X.shape[0] ny = Y.shape[0] X = X.reshape((X.shape[0], 1, X.shape[1])) Y = Y.reshape((1, Y.shape[0], Y.shape[1])) D_lk = np.abs(((X - Y))).sum(-1) K_ij = np.exp((-sigma * D_lk)).sum(0).sum(0) / (nx * ny) return K_ij
[docs] class LaplacianKernelTransformer(KernelTransformer): """A kernel-based feature transformation, as it is used in `KMM`, that uses the `laplacian` kernel. Args: sigma (optional): A smoothing parameter of the kernel function. Defaults to `1`. """ def __init__(self, sigma=1): self.sigma = sigma @property # implement self.kernel as a property to allow for hyper-parameter tuning of sigma def kernel(self): return partial(_laplacianKernel, sigma=self.sigma)
[docs] class GaussianRFFKernelTransformer(AbstractTransformer): """An efficient approximation of the `GaussianKernelTransformer`, as it is used in `KMM`, using random Fourier features. Args: sigma (optional): A smoothing parameter of the kernel function. Defaults to `1`. n_rff (optional): The number of random Fourier features. Defaults to `1000`. preprocessor (optional): Another `AbstractTransformer` that is called before this transformer. Defaults to `None`. seed (optional): Controls the randomness of the random Fourier features. Defaults to `None`. """ def __init__(self, sigma=1, n_rff=1000, preprocessor=None, seed=None): self.sigma = sigma self.n_rff = n_rff self.preprocessor = preprocessor self.seed = seed
[docs] def fit_transform(self, X, y, average=True, n_classes=None): if not average: raise ValueError("GaussianRFFKernelTransformer does not support average=False") if self.preprocessor is not None: X, y = self.preprocessor.fit_transform(X, y, average=False, n_classes=n_classes) self.p_trn = self.preprocessor.p_trn # copy from preprocessor else: _check_y(y, n_classes) self.p_trn = class_prevalences(y, n_classes) n_classes = len(self.p_trn) # not None anymore self.X_trn = X self.y_trn = y self.w = np.random.default_rng(self.seed).normal( loc = 0, scale = (1. / self.sigma), size = (self.n_rff // 2, X.shape[1]), ).astype(np.float32) self.mu = np.stack( [ self._transform_after_preprocessor(X[y==c]) for c in range(n_classes) ], axis = 1 ) self.M = self.mu.T @ self.mu return self.M
[docs] def transform(self, X, average=True): if not average: raise ValueError("GaussianRFFKernelTransformer does not support average=False") if self.preprocessor is not None: X = self.preprocessor.transform(X, average=False) return self._transform_after_preprocessor(X) @ self.mu
def _transform_after_preprocessor(self, X): Xw = X @ self.w.T C = np.concatenate((np.cos(Xw), np.sin(Xw)), axis=1) return np.sqrt(2 / self.n_rff) * np.mean(C, axis=0)