Source code for best.model

"""Bayesian estimation for two groups

This module implements Bayesian estimation for two groups, providing
complete distributions for effect size, group means and their
difference, standard deviations and their difference, and the
normality of the data.

Based on:

Kruschke, J. (2012) Bayesian estimation supersedes the t
    test. Journal of Experimental Psychology: General.

"""

from abc import ABC, abstractmethod
import sys

import arviz
import numpy as np
import pymc3 as pm
from pymc3.backends.base import MultiTrace
import scipy.stats as st


[docs]class BestModel(ABC): """Base class for BEST models""" @property @abstractmethod def version(self): """Version of the model specification For details, see the page :ref:`ch-model-history`. """ pass @property @abstractmethod def model(self) -> pm.Model: """The underlying PyMC3 Model object (This property is accessible primarily for internal purposes.) """ pass
[docs] @abstractmethod def observed_data(self, group_id: int): """Return the observed data as a NumPy array (This method is accessible primarily for internal purposes.) """ pass
@abstractmethod def __str__(self): pass
[docs] def sample(self, n_samples: int, **kwargs) -> MultiTrace: """Draw posterior samples from the model (This method is accessible primarily for internal purposes.) """ kwargs['tune'] = kwargs.get('tune', 1000) pm_major, pm_minor, *_ = pm.__version__.split('.') if (int(pm_major), int(pm_minor)) < (3, 7): kwargs.setdefault('nuts_kwargs', {'target_accept': 0.90}) else: kwargs.setdefault('target_accept', 0.9) max_rounds = 2 for r in range(max_rounds): with self.model: trace = pm.sample(n_samples, **kwargs) if trace.report.ok: break else: if r == 0: kwargs['tune'] = 2000 print('\nDue to potentially incorrect estimates, rerunning sampling ' 'with {} tuning samples.\n'.format(kwargs['tune']), file=sys.stderr) else: print('\nThe samples maybe are still not totally okay. ' 'Try rerunning the analysis.') return trace
[docs]class BestModelOne(BestModel): """Model for a single-group analysis; subclass of :class:`BestModel`""" def __init__(self, y, ref_val): self.y = y = np.array(y) self.ref_val = ref_val assert y.ndim == 1 self.mu_loc = mu_loc = np.mean(y) self.mu_scale = mu_scale = np.std(y) * 1000 self.sigma_low = sigma_low = np.std(y) / 1000 self.sigma_high = sigma_high = np.std(y) * 1000 self.nu_min = nu_min = 2.5 self.nu_mean = nu_mean = 30 self._nu_param = nu_mean - nu_min with pm.Model() as self._model: mean = pm.Normal('Mean', mu=mu_loc, sd=mu_scale) logsigma = pm.Uniform('Log sigma', lower=np.log(sigma_low), upper=np.log(sigma_high)) sigma = pm.Deterministic('Sigma', np.exp(logsigma)) prec = sigma ** (-2) nu = pm.Exponential('nu - %g' % nu_min, 1 / (nu_mean - nu_min)) + nu_min _ = pm.Deterministic('Normality', nu) _ = pm.StudentT('Data', observed=y, nu=nu, mu=mean, lam=prec) stddev = pm.Deterministic('SD', sigma * (nu / (nu - 2)) ** 0.5) _ = pm.Deterministic('Effect size', (mean - ref_val) / stddev) @property def version(self): return 'v2' @property def model(self): return self._model def observed_data(self, group_id: int): if group_id == 1: return self.y else: raise ValueError('Group ID for a single-group analysis must be 1') def __repr__(self): return 'BestModelOne(y=%r, ref_val=%r, version=%r)' \ % (self.y, self.ref_val, self.version) def __str__(self): return ('μ ~ Normal({0.mu_loc:.2e}, {0.mu_scale:.2e})\n' 'log(σ) ~ Uniform(log({0.sigma_low:g}), log({0.sigma_high:g}))\n' 'ν ~ Exponential(1/{0._nu_param:g}) + {0.nu_min:g}\n' 'y ~ t(ν, μ, σ)\n'.format(self))
[docs]class BestModelTwo(BestModel): """Model for a two-group analysis; subclass of :class:`BestModel`""" def __init__(self, y1, y2): self.y1 = y1 = np.array(y1) self.y2 = y2 = np.array(y2) assert y1.ndim == 1 assert y2.ndim == 1 y_all = np.concatenate((y1, y2)) self.mu_loc = mu_loc = np.mean(y_all) self.mu_scale = mu_scale = np.std(y_all) * 1000 self.sigma_low = sigma_low = np.std(y_all) / 1000 self.sigma_high = sigma_high = np.std(y_all) * 1000 self.nu_min = nu_min = 2.5 self.nu_mean = nu_mean = 30 self._nu_param = nu_mean - nu_min with pm.Model() as self._model: # Note: the IDE might give a warning for these because it thinks # distributions like pm.Normal() don't have a string "name" argument, # but this is false – pm.Distribution redefined __new__, so the # first argument indeed is the name (a string). group1_mean = pm.Normal('Group 1 mean', mu=mu_loc, sd=mu_scale) group2_mean = pm.Normal('Group 2 mean', mu=mu_loc, sd=mu_scale) nu = pm.Exponential('nu - %g' % nu_min, 1 / (nu_mean - nu_min)) + nu_min _ = pm.Deterministic('Normality', nu) group1_logsigma = pm.Uniform( 'Group 1 log sigma', lower=np.log(sigma_low), upper=np.log(sigma_high) ) group2_logsigma = pm.Uniform( 'Group 2 log sigma', lower=np.log(sigma_low), upper=np.log(sigma_high) ) group1_sigma = pm.Deterministic('Group 1 sigma', np.exp(group1_logsigma)) group2_sigma = pm.Deterministic('Group 2 sigma', np.exp(group2_logsigma)) lambda1 = group1_sigma ** (-2) lambda2 = group2_sigma ** (-2) group1_sd = pm.Deterministic('Group 1 SD', group1_sigma * (nu / (nu - 2)) ** 0.5) group2_sd = pm.Deterministic('Group 2 SD', group2_sigma * (nu / (nu - 2)) ** 0.5) _ = pm.StudentT('Group 1 data', observed=y1, nu=nu, mu=group1_mean, lam=lambda1) _ = pm.StudentT('Group 2 data', observed=y2, nu=nu, mu=group2_mean, lam=lambda2) diff_of_means = pm.Deterministic('Difference of means', group1_mean - group2_mean) _ = pm.Deterministic('Difference of SDs', group1_sd - group2_sd) _ = pm.Deterministic( 'Effect size', diff_of_means / np.sqrt((group1_sd ** 2 + group2_sd ** 2) / 2) ) @property def version(self): return 'v2' @property def model(self): return self._model def observed_data(self, group_id: int): if group_id == 1: return self.y1 elif group_id == 2: return self.y2 else: raise ValueError('Group ID for a two-group analysis must be 1 or 2') def __repr__(self): return 'BestModelTwo(y1=%r, y2=%r, version=%r)' \ % (self.y1, self.y2, self.version) def __str__(self): return ('μ1 ~ Normal({0.mu_loc:g}, {0.mu_scale:g})\n' 'μ2 ~ Normal({0.mu_loc:g}, {0.mu_scale:g})\n' 'log(σ1) ~ Uniform(log({0.sigma_low:g}), log({0.sigma_high:g}))\n' 'log(σ2) ~ Uniform(log({0.sigma_low:g}), log({0.sigma_high:g}))\n' 'ν ~ Exponential(1/{0._nu_param:g}) + {0.nu_min:g}\n' 'y1 ~ t(ν, μ1, σ1)\n' 'y2 ~ t(ν, μ2, σ2)\n'.format(self))
[docs]class BestResults(ABC): """Results of an analysis""" def __init__(self, model: BestModel, trace: MultiTrace): self._model = model self._trace = trace @property def model(self): return self._model @property def trace(self) -> MultiTrace: """The collection of posterior samples See the relevant `PyMC3 documentation <https://docs.pymc.io/api/inference.html#multitrace>`_ for details. """ return self._trace
[docs] def observed_data(self, group_id: int): """Return the observed data as a NumPy array (This method is accessible primarily for internal purposes.) """ return self.model.observed_data(group_id)
[docs] def summary(self, credible_mass: float = 0.95): """Return summary statistics of the results Parameters ---------- credible_mass : float The highest posterior density intervals in the summary will cover credible_mass * 100% of the probability mass. For example, credible_mass=0.95 results in 95% credible intervals. Default: 0.95. """ return arviz.summary(self.trace, hdi_prob=credible_mass)
[docs] def hdi(self, var_name: str, credible_mass: float = 0.95): """Calculate the highest posterior density interval (HDI) This function calculates a *credible interval* which contains the ``credible_mass`` most likely values of the parameter, given the data. Also known as an HPD interval. Parameters ---------- var_name : str Name of variable. credible_mass : float The HDI will cover credible_mass * 100% of the probability mass. Default: 0.95, i.e. a 95% HDI. Returns ------- (float, float) The endpoints of the HPD """ az_major, az_minor, *_ = arviz.__version__.split('.') if (int(az_major), int(az_minor)) >= (0, 8): return tuple(arviz.hdi(self.trace[var_name], hdi_prob=credible_mass)) else: return tuple(arviz.hpd(self.trace[var_name], credible_interval=credible_mass))
[docs] def posterior_prob(self, var_name: str, low: float = -np.inf, high: float = np.inf): r"""Calculate the posterior probability that a variable is in a given interval The return value approximates the following probability: .. math:: \text{Pr}(\textit{low} < \theta_{\textit{var_name}} < \textit{high} | y_1, y_2) One-sided intervals can be specified by using only the ``low`` or ``high`` argument, for example, to calculate the probability that the the mean of the first group is larger than that of the second one:: best_result.posterior_prob('Difference of means', low=0) Parameters ---------- var_name : str Name of variable. low : float, optional Lower limit of the interval. Default: :math:`-\infty` (no lower limit) high : float, optional Upper limit of the interval. Default: :math:`\infty` (no upper limit) Returns ------- float Posterior probability that the variable is in the given interval. Notes ----- If *p* is the result and *S* is the total number of samples, then the standard deviation of the result is :math:`\sqrt{p(1-p)/S}` (see BDA3, p. 267). For example, with 2000 samples, the errors for some returned probabilities are - 0.01 ± 0.002, - 0.1 ± 0.007, - 0.2 ± 0.009, - 0.5 ± 0.011, meaning the answer is accurate for most practical purposes. """ samples = self.trace[var_name] n_match = len(samples[(low < samples) * (samples < high)]) n_all = len(samples) return n_match / n_all
[docs] def posterior_mode(self, var_name: str): """Calculate the posterior mode of a variable Parameters ---------- var_name : string The name of the variable whose posterior mode is to be calculated. Returns ------- float The posterior mode. """ samples = self.trace[var_name] # calculate mode using kernel density estimate kernel = st.gaussian_kde(samples) bw = kernel.covariance_factor() cut = 3 * bw x_low = np.min(samples) - cut * bw x_high = np.max(samples) + cut * bw n = 512 x = np.linspace(x_low, x_high, n) vals = kernel.evaluate(x) max_idx = np.argmax(vals) mode_val = x[max_idx] return mode_val
[docs]class BestResultsOne(BestResults): """Results of a two-group analysis; subclass of :class:`BestResults`""" def __init__(self, model: BestModelOne, trace: MultiTrace): super().__init__(model, trace)
[docs]class BestResultsTwo(BestResults): """Results of a two-group analysis; subclass of :class:`BestResults`""" def __init__(self, model: BestModelTwo, trace: MultiTrace): super().__init__(model, trace)
[docs] def observed_data(self, group_id): return self.model.observed_data(group_id)
[docs]def analyze_two(group1_data, group2_data, n_samples: int = 2000, **kwargs) -> BestResultsTwo: """Analyze the difference between two groups This analysis takes about a minute, depending on the amount of data. (See the Notes section below.) This function creates a model with the given parameters, and updates the distributions of the parameters as dictated by the model and the data. Parameters ---------- group1_data : list of numbers, NumPy array, or Pandas Series. Data of the first group analyzed and to be plotted. group2_data : list of numbers, NumPy array, or Pandas Series. Data of the second group analyzed and to be plotted. n_samples : int Number of samples *per chain* to be drawn for the analysis. (The number of chains depends on the number of CPU cores, but is at least 2.) Default: 2000. **kwargs Keyword arguments are passed to :meth:`pymc3.sample`. For example, number of tuning samples can be increased to 2000 (from the default 1000) by:: best.analyze_two(group1_data, group2_data, tune=2000) Returns ------- BestResultsTwo An object that contains all the posterior samples from the model. Notes ----- The first call of this function takes about 2 minutes extra, in order to compile the model and speed up later calls. Afterwards, performing a two-group analysis takes: - 20 seconds with 45 data points per group, or - 90 seconds with 10,000 data points per group. Don’t be taken aback by the time estimates in the beginning – the sampling process speeds up after the initial few hundred iterations. (These times were measured on a 2015 MacBook.) """ model = BestModelTwo(group1_data, group2_data) trace = model.sample(n_samples, **kwargs) return BestResultsTwo(model, trace)
[docs]def analyze_one(group_data, ref_val: float = 0, n_samples: int = 2000, **kwargs) -> BestResultsOne: """Analyze the distribution of a single group This method is typically used to compare some observations against a reference value, such as zero. It can be used to analyze paired data, or data from an experiment without a control group. This analysis takes around a minute, depending on the amount of data. This function creates a model with the given parameters, and updates the distributions of the parameters as dictated by the model and the data. Parameters ---------- group_data : list of numbers, NumPy array, or Pandas Series. Data of the group to be analyzed. ref_val : float The reference value to be compared against. This affects the plots and the effect size calculations. Default: 0. n_samples : int Number of samples *per chain* to be drawn for the analysis. (The number of chains depends on the number of CPU cores, but is at least 2.) Default: 2000. **kwargs Keyword arguments are passed to ``pymc3.sample``. For example, number of tuning samples can be increased to 2000 (from the default 1000) by:: best.analyze_one(group_data, tune=2000) Returns ------- BestResultsOne An object that contains all the posterior samples from the model. Notes ----- The first call of this function takes about 2 minutes extra, in order to compile the model and speed up later calls. Afterwards, performing a two-group analysis takes about 20 seconds on a 2015 MacBook, both with 20 and 1000 data points. """ model = BestModelOne(group_data, ref_val) trace = model.sample(n_samples, **kwargs) return BestResultsOne(model, trace)