Source code for quapy.method._kdey

import numpy as np
from sklearn.base import BaseEstimator
from sklearn.neighbors import KernelDensity

import quapy as qp
from quapy.method.aggregative import AggregativeSoftQuantifier
import quapy.functional as F

from sklearn.metrics.pairwise import rbf_kernel


[docs] class KDEBase: """ Common ancestor for KDE-based methods. Implements some common routines. """ BANDWIDTH_METHOD = ['scott', 'silverman'] @classmethod def _check_bandwidth(cls, bandwidth): """ Checks that the bandwidth parameter is correct :param bandwidth: either a string (see BANDWIDTH_METHOD) or a float :return: the bandwidth if the check is passed, or raises an exception for invalid values """ assert bandwidth in KDEBase.BANDWIDTH_METHOD or isinstance(bandwidth, float), \ f'invalid bandwidth, valid ones are {KDEBase.BANDWIDTH_METHOD} or float values' if isinstance(bandwidth, float): assert 0 < bandwidth < 1, \ "the bandwith for KDEy should be in (0,1), since this method models the unit simplex" return bandwidth
[docs] def get_kde_function(self, X, bandwidth): """ Wraps the KDE function from scikit-learn. :param X: data for which the density function is to be estimated :param bandwidth: the bandwidth of the kernel :return: a scikit-learn's KernelDensity object """ return KernelDensity(bandwidth=bandwidth).fit(X)
[docs] def pdf(self, kde, X): """ Wraps the density evalution of scikit-learn's KDE. Scikit-learn returns log-scores (s), so this function returns :math:`e^{s}` :param kde: a previously fit KDE function :param X: the data for which the density is to be estimated :return: np.ndarray with the densities """ return np.exp(kde.score_samples(X))
[docs] def get_mixture_components(self, X, y, classes, bandwidth): """ Returns an array containing the mixture components, i.e., the KDE functions for each class. :param X: the data containing the covariates :param y: the class labels :param n_classes: integer, the number of classes :param bandwidth: float, the bandwidth of the kernel :return: a list of KernelDensity objects, each fitted with the corresponding class-specific covariates """ class_cond_X = [] for cat in classes: selX = X[y==cat] if selX.size==0: selX = [F.uniform_prevalence(len(classes))] class_cond_X.append(selX) return [self.get_kde_function(X_cond_yi, bandwidth) for X_cond_yi in class_cond_X]
[docs] class KDEyML(AggregativeSoftQuantifier, KDEBase): """ Kernel Density Estimation model for quantification (KDEy) relying on the Kullback-Leibler divergence (KLD) as the divergence measure to be minimized. This method was first proposed in the paper `Kernel Density Estimation for Multiclass Quantification <https://arxiv.org/abs/2401.00490>`_, in which the authors show that minimizing the distribution mathing criterion for KLD is akin to performing maximum likelihood (ML). The distribution matching optimization problem comes down to solving: :math:`\\hat{\\alpha} = \\arg\\min_{\\alpha\\in\\Delta^{n-1}} \\mathcal{D}(\\boldsymbol{p}_{\\alpha}||q_{\\widetilde{U}})` where :math:`p_{\\alpha}` is the mixture of class-specific KDEs with mixture parameter (hence class prevalence) :math:`\\alpha` defined by :math:`\\boldsymbol{p}_{\\alpha}(\\widetilde{x}) = \\sum_{i=1}^n \\alpha_i p_{\\widetilde{L}_i}(\\widetilde{x})` where :math:`p_X(\\boldsymbol{x}) = \\frac{1}{|X|} \\sum_{x_i\\in X} K\\left(\\frac{x-x_i}{h}\\right)` is the KDE function that uses the datapoints in X as the kernel centers. In KDEy-ML, the divergence is taken to be the Kullback-Leibler Divergence. This is equivalent to solving: :math:`\\hat{\\alpha} = \\arg\\min_{\\alpha\\in\\Delta^{n-1}} - \\mathbb{E}_{q_{\\widetilde{U}}} \\left[ \\log \\boldsymbol{p}_{\\alpha}(\\widetilde{x}) \\right]` which corresponds to the maximum likelihood estimate. :param classifier: a scikit-learn's BaseEstimator, or None, in which case the classifier is taken to be the one indicated in `qp.environ['DEFAULT_CLS']` :param fit_classifier: whether to train the learner (default is True). Set to False if the learner has been trained outside the quantifier. :param val_split: specifies the data used for generating classifier predictions. This specification can be made as float in (0, 1) indicating the proportion of stratified held-out validation set to be extracted from the training set; or as an integer (default 5), indicating that the predictions are to be generated in a `k`-fold cross-validation manner (with this integer indicating the value for `k`); or as a tuple (X,y) defining the specific set of data to use for validation. :param bandwidth: float, the bandwidth of the Kernel :param random_state: a seed to be set before fitting any base quantifier (default None) """ def __init__(self, classifier: BaseEstimator=None, fit_classifier=True, val_split=5, bandwidth=0.1, random_state=None): super().__init__(classifier, fit_classifier, val_split) self.bandwidth = KDEBase._check_bandwidth(bandwidth) self.random_state=random_state
[docs] def aggregation_fit(self, classif_predictions, labels): self.mix_densities = self.get_mixture_components(classif_predictions, labels, self.classes_, self.bandwidth) return self
[docs] def aggregate(self, posteriors: np.ndarray): """ Searches for the mixture model parameter (the sought prevalence values) that maximizes the likelihood of the data (i.e., that minimizes the negative log-likelihood) :param posteriors: instances in the sample converted into posterior probabilities :return: a vector of class prevalence estimates """ with qp.util.temp_seed(self.random_state): epsilon = 1e-10 n_classes = len(self.mix_densities) test_densities = [self.pdf(kde_i, posteriors) for kde_i in self.mix_densities] def neg_loglikelihood(prev): test_mixture_likelihood = sum(prev_i * dens_i for prev_i, dens_i in zip (prev, test_densities)) test_loglikelihood = np.log(test_mixture_likelihood + epsilon) return -np.sum(test_loglikelihood) return F.optim_minimize(neg_loglikelihood, n_classes)
[docs] class KDEyHD(AggregativeSoftQuantifier, KDEBase): """ Kernel Density Estimation model for quantification (KDEy) relying on the squared Hellinger Disntace (HD) as the divergence measure to be minimized. This method was first proposed in the paper `Kernel Density Estimation for Multiclass Quantification <https://arxiv.org/abs/2401.00490>`_, in which the authors proposed a Monte Carlo approach for minimizing the divergence. The distribution matching optimization problem comes down to solving: :math:`\\hat{\\alpha} = \\arg\\min_{\\alpha\\in\\Delta^{n-1}} \\mathcal{D}(\\boldsymbol{p}_{\\alpha}||q_{\\widetilde{U}})` where :math:`p_{\\alpha}` is the mixture of class-specific KDEs with mixture parameter (hence class prevalence) :math:`\\alpha` defined by :math:`\\boldsymbol{p}_{\\alpha}(\\widetilde{x}) = \\sum_{i=1}^n \\alpha_i p_{\\widetilde{L}_i}(\\widetilde{x})` where :math:`p_X(\\boldsymbol{x}) = \\frac{1}{|X|} \\sum_{x_i\\in X} K\\left(\\frac{x-x_i}{h}\\right)` is the KDE function that uses the datapoints in X as the kernel centers. In KDEy-HD, the divergence is taken to be the squared Hellinger Distance, an f-divergence with corresponding f-generator function given by: :math:`f(u)=(\\sqrt{u}-1)^2` The authors proposed a Monte Carlo solution that relies on importance sampling: :math:`\\hat{D}_f(p||q)= \\frac{1}{t} \\sum_{i=1}^t f\\left(\\frac{p(x_i)}{q(x_i)}\\right) \\frac{q(x_i)}{r(x_i)}` where the datapoints (trials) :math:`x_1,\\ldots,x_t\\sim_{\\mathrm{iid}} r` with :math:`r` the uniform distribution. :param classifier: a scikit-learn's BaseEstimator, or None, in which case the classifier is taken to be the one indicated in `qp.environ['DEFAULT_CLS']` :param fit_classifier: whether to train the learner (default is True). Set to False if the learner has been trained outside the quantifier. :param val_split: specifies the data used for generating classifier predictions. This specification can be made as float in (0, 1) indicating the proportion of stratified held-out validation set to be extracted from the training set; or as an integer (default 5), indicating that the predictions are to be generated in a `k`-fold cross-validation manner (with this integer indicating the value for `k`); or as a tuple (X,y) defining the specific set of data to use for validation. :param bandwidth: float, the bandwidth of the Kernel :param random_state: a seed to be set before fitting any base quantifier (default None) :param montecarlo_trials: number of Monte Carlo trials (default 10000) """ def __init__(self, classifier: BaseEstimator=None, fit_classifier=True, val_split=5, divergence: str='HD', bandwidth=0.1, random_state=None, montecarlo_trials=10000): super().__init__(classifier, fit_classifier, val_split) self.divergence = divergence self.bandwidth = KDEBase._check_bandwidth(bandwidth) self.random_state=random_state self.montecarlo_trials = montecarlo_trials
[docs] def aggregation_fit(self, classif_predictions, labels): self.mix_densities = self.get_mixture_components(classif_predictions, labels, self.classes_, self.bandwidth) N = self.montecarlo_trials rs = self.random_state n = len(self.classes_) self.reference_samples = np.vstack([kde_i.sample(N//n, random_state=rs) for kde_i in self.mix_densities]) self.reference_classwise_densities = np.asarray([self.pdf(kde_j, self.reference_samples) for kde_j in self.mix_densities]) self.reference_density = np.mean(self.reference_classwise_densities, axis=0) # equiv. to (uniform @ self.reference_classwise_densities) return self
[docs] def aggregate(self, posteriors: np.ndarray): # we retain all n*N examples (sampled from a mixture with uniform parameter), and then # apply importance sampling (IS). In this version we compute D(p_alpha||q) with IS n_classes = len(self.mix_densities) test_kde = self.get_kde_function(posteriors, self.bandwidth) test_densities = self.pdf(test_kde, self.reference_samples) def f_squared_hellinger(u): return (np.sqrt(u)-1)**2 # todo: this will fail when self.divergence is a callable, and is not the right place to do it anyway if self.divergence.lower() == 'hd': f = f_squared_hellinger else: raise ValueError('only squared HD is currently implemented') epsilon = 1e-10 qs = test_densities + epsilon rs = self.reference_density + epsilon iw = qs/rs #importance weights p_class = self.reference_classwise_densities + epsilon fracs = p_class/qs def divergence(prev): # ps / qs = (prev @ p_class) / qs = prev @ (p_class / qs) = prev @ fracs ps_div_qs = prev @ fracs return np.mean( f(ps_div_qs) * iw ) return F.optim_minimize(divergence, n_classes)
[docs] class KDEyCS(AggregativeSoftQuantifier): """ Kernel Density Estimation model for quantification (KDEy) relying on the Cauchy-Schwarz divergence (CS) as the divergence measure to be minimized. This method was first proposed in the paper `Kernel Density Estimation for Multiclass Quantification <https://arxiv.org/abs/2401.00490>`_, in which the authors proposed a Monte Carlo approach for minimizing the divergence. The distribution matching optimization problem comes down to solving: :math:`\\hat{\\alpha} = \\arg\\min_{\\alpha\\in\\Delta^{n-1}} \\mathcal{D}(\\boldsymbol{p}_{\\alpha}||q_{\\widetilde{U}})` where :math:`p_{\\alpha}` is the mixture of class-specific KDEs with mixture parameter (hence class prevalence) :math:`\\alpha` defined by :math:`\\boldsymbol{p}_{\\alpha}(\\widetilde{x}) = \\sum_{i=1}^n \\alpha_i p_{\\widetilde{L}_i}(\\widetilde{x})` where :math:`p_X(\\boldsymbol{x}) = \\frac{1}{|X|} \\sum_{x_i\\in X} K\\left(\\frac{x-x_i}{h}\\right)` is the KDE function that uses the datapoints in X as the kernel centers. In KDEy-CS, the divergence is taken to be the Cauchy-Schwarz divergence given by: :math:`\\mathcal{D}_{\\mathrm{CS}}(p||q)=-\\log\\left(\\frac{\\int p(x)q(x)dx}{\\sqrt{\\int p(x)^2dx \\int q(x)^2dx}}\\right)` The authors showed that this distribution matching admits a closed-form solution :param classifier: a scikit-learn's BaseEstimator, or None, in which case the classifier is taken to be the one indicated in `qp.environ['DEFAULT_CLS']` :param fit_classifier: whether to train the learner (default is True). Set to False if the learner has been trained outside the quantifier. :param val_split: specifies the data used for generating classifier predictions. This specification can be made as float in (0, 1) indicating the proportion of stratified held-out validation set to be extracted from the training set; or as an integer (default 5), indicating that the predictions are to be generated in a `k`-fold cross-validation manner (with this integer indicating the value for `k`); or as a tuple (X,y) defining the specific set of data to use for validation. :param bandwidth: float, the bandwidth of the Kernel """ def __init__(self, classifier: BaseEstimator=None, fit_classifier=True, val_split=5, bandwidth=0.1): super().__init__(classifier, fit_classifier, val_split) self.bandwidth = KDEBase._check_bandwidth(bandwidth)
[docs] def gram_matrix_mix_sum(self, X, Y=None): # this adapts the output of the rbf_kernel function (pairwise evaluations of Gaussian kernels k(x,y)) # to contain pairwise evaluations of N(x|mu,Sigma1+Sigma2) with mu=y and Sigma1 and Sigma2 are # two "scalar matrices" (h^2)*I each, so Sigma1+Sigma2 has scalar 2(h^2) (h is the bandwidth) h = self.bandwidth variance = 2 * (h**2) nD = X.shape[1] gamma = 1/(2*variance) norm_factor = 1/np.sqrt(((2*np.pi)**nD) * (variance**(nD))) gram = norm_factor * rbf_kernel(X, Y, gamma=gamma) return gram.sum()
[docs] def aggregation_fit(self, classif_predictions, labels): P, y = classif_predictions, labels n = len(self.classes_) assert all(sorted(np.unique(y)) == np.arange(n)), \ 'label name gaps not allowed in current implementation' # counts_inv keeps track of the relative weight of each datapoint within its class # (i.e., the weight in its KDE model) counts_inv = 1 / (F.counts_from_labels(y, classes=self.classes_)) # tr_tr_sums corresponds to symbol \overline{B} in the paper tr_tr_sums = np.zeros(shape=(n,n), dtype=float) for i in range(n): for j in range(n): if i > j: tr_tr_sums[i,j] = tr_tr_sums[j,i] else: block = self.gram_matrix_mix_sum(P[y == i], P[y == j] if i!=j else None) tr_tr_sums[i, j] = block # keep track of these data structures for the test phase self.Ptr = P self.ytr = y self.tr_tr_sums = tr_tr_sums self.counts_inv = counts_inv return self
[docs] def aggregate(self, posteriors: np.ndarray): Ptr = self.Ptr Pte = posteriors y = self.ytr tr_tr_sums = self.tr_tr_sums M, nD = Pte.shape Minv = (1/M) # t in the paper n = Ptr.shape[1] # becomes a constant that does not affect the optimization, no need to compute it # partC = 0.5*np.log(self.gram_matrix_mix_sum(Pte) * Kinv * Kinv) # tr_te_sums corresponds to \overline{a}*(1/Li)*(1/M) in the paper (note the constants # are already aggregated to tr_te_sums, so these multiplications are not carried out # at each iteration of the optimization phase) tr_te_sums = np.zeros(shape=n, dtype=float) for i in range(n): tr_te_sums[i] = self.gram_matrix_mix_sum(Ptr[y==i], Pte) def divergence(alpha): # called \overline{r} in the paper alpha_ratio = alpha * self.counts_inv # recall that tr_te_sums already accounts for the constant terms (1/Li)*(1/M) partA = -np.log((alpha_ratio @ tr_te_sums) * Minv) partB = 0.5 * np.log(alpha_ratio @ tr_tr_sums @ alpha_ratio) return partA + partB #+ partC return F.optim_minimize(divergence, n)