Source code for picos.expressions.samples

# ------------------------------------------------------------------------------
# Copyright (C) 2020 Maximilian Stahlberg
#
# This file is part of PICOS.
#
# PICOS is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# PICOS is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# this program.  If not, see <http://www.gnu.org/licenses/>.
# ------------------------------------------------------------------------------

"""Implements :class:`Samples`."""

import random

import cvxopt

from .. import glyphs
from ..apidoc import api_end, api_start
from ..caching import cached_property
from .data import cvxopt_hcat, load_data, load_shape
from .exp_affine import ComplexAffineExpression, Constant
from .expression import Expression

_API_START = api_start(globals())
# -------------------------------


[docs]class Samples(): """A collection of data points. :Example: >>> from picos.expressions import Samples >>> # Load the column-major vectorization of six matrices. >>> data = [[[1*i, 3*i], ... [2*i, 4*i]] for i in range(1, 7)] >>> S = Samples(data) >>> S <Samples: (6 4-dimensional samples)> >>> [S.num, S.dim, S.original_shape] # Metadata. [6, 4, (2, 2)] >>> S.matrix # All samples as the columns of one matrix. <4×6 Real Constant: [4×6]> >>> print(S.matrix) [ 1.00e+00 2.00e+00 3.00e+00 4.00e+00 5.00e+00 6.00e+00] [ 2.00e+00 4.00e+00 6.00e+00 8.00e+00 1.00e+01 1.20e+01] [ 3.00e+00 6.00e+00 9.00e+00 1.20e+01 1.50e+01 1.80e+01] [ 4.00e+00 8.00e+00 1.20e+01 1.60e+01 2.00e+01 2.40e+01] >>> print(S[0].T) # The first sample (transposed for brevity). [ 1.00e+00 2.00e+00 3.00e+00 4.00e+00] >>> print(S.mean.T) # The sample mean (transposed for brevity). [ 3.50e+00 7.00e+00 1.05e+01 1.40e+01] >>> print(S.covariance) # The sample covariance matrix. [ 3.50e+00 7.00e+00 1.05e+01 1.40e+01] [ 7.00e+00 1.40e+01 2.10e+01 2.80e+01] [ 1.05e+01 2.10e+01 3.15e+01 4.20e+01] [ 1.40e+01 2.80e+01 4.20e+01 5.60e+01] >>> print(S.original[0]) # The first sample in its original shape. [ 1.00e+00 3.00e+00] [ 2.00e+00 4.00e+00] >>> U = S.select([0, 2, 4]) # Select a subset of samples by indices. >>> print(U.matrix) [ 1.00e+00 3.00e+00 5.00e+00] [ 2.00e+00 6.00e+00 1.00e+01] [ 3.00e+00 9.00e+00 1.50e+01] [ 4.00e+00 1.20e+01 2.00e+01] >>> T, V = S.partition() # Split into training and validation samples. >>> print(T.matrix) [ 1.00e+00 2.00e+00 3.00e+00] [ 2.00e+00 4.00e+00 6.00e+00] [ 3.00e+00 6.00e+00 9.00e+00] [ 4.00e+00 8.00e+00 1.20e+01] >>> print(V.matrix) [ 4.00e+00 5.00e+00 6.00e+00] [ 8.00e+00 1.00e+01 1.20e+01] [ 1.20e+01 1.50e+01 1.80e+01] [ 1.60e+01 2.00e+01 2.40e+01] """
[docs] def __new__(cls, samples=None, forced_original_shape=None, **kwargs): """Prepare a :class:`Samples` instance.""" if isinstance(samples, cls): if forced_original_shape is not None: forced_shape = load_shape(forced_original_shape) if forced_shape[0]*forced_shape[1] != samples.dim: raise ValueError("Incompatible forced original shape.") if forced_shape == samples.original_shape: # Shapes are consistent, return the existing instance. return samples else: # Make a shallow copy and change only the original shape. self = object.__new__(cls) self._cached_cvx_mat = samples._cached_cvx_mat self._cached_cvx_vec = samples._cached_cvx_vec self._cached_pic_mat = samples._cached_pic_mat self._cached_pic_vec = samples._cached_pic_vec self._original_shape = forced_shape return self else: # Return the existing instance. return samples else: # Return a new instance. self = object.__new__(cls) self._cached_cvx_mat = None self._cached_cvx_vec = None self._cached_pic_mat = None self._cached_pic_vec = None self._original_shape = None return self
[docs] def __init__(self, samples, forced_original_shape=None, always_copy=True): """Load a number of data points (samples). :param samples: Any of the following: - A tuple or list of constants, each of which denotes a sample vector. Matrices are vectorized but their :attr:`original_shape` is stored and may be used by PICOS internally. - A constant row or column vector whose entries denote scalar samples. - A constant matrix whose columns denote the samples. - Another :class:`Samples` instance. If possible, it is returned as is (:class:`Samples` instances are immutable), otherwise a shallow copy with the necessary modifications is returned instead. In any case, constants may be given as constant numeric data values (anything recognized by :func:`~.data.load_data`) or as constant PICOS expressions. :param forced_original_shape: Overwrites :attr:`original_shape` with the given shape. :param bool always_copy: If this is :obj:`False`, then data that is provided in the form of CVXOPT types is not copied but referenced if possible. This can speed up instance creation but will introduce inconsistencies if the original data is modified. Note that this argument has no impact if the ``samples`` argument already is a :class:`Samples` instance; in this case data is never copied. """ if isinstance(samples, Samples): # Handled in __new__. return elif isinstance(samples, (tuple, list)): if not samples: raise ValueError("Need at least one sample.") if all(isinstance(s, (int, float, complex)) for s in samples): # Efficiently handle a list of scalars. self._cached_cvx_mat = load_data(samples)[0].T elif all(isinstance(s, ComplexAffineExpression) and s.constant for s in samples): if len(set(s.shape for s in samples)) != 1: raise ValueError("Cannot load samples of differing shapes.") self._original_shape = samples[0].shape self._cached_pic_vec = tuple(s.vec for s in samples) else: samples = tuple( load_data(s, alwaysCopy=always_copy)[0] for s in samples) if len(set(s.size for s in samples)) != 1: raise ValueError("Cannot load samples of differing shapes.") self._original_shape = samples[0].size self._cached_cvx_vec = tuple( s if s.size[1] == 1 else s[:] for s in samples) elif isinstance(samples, Expression): samples = samples.refined if not isinstance(samples, ComplexAffineExpression): raise TypeError("Can only extract samples from a (constant) " "affine expression, not from an instance of {}." .format(type(samples).__name__)) if not samples.constant: raise TypeError("Can only extract samples from a constant " "expression, {} is not constant.".format(samples.string)) self._cached_pic_mat = samples # Treat any vector as a number of scalar samples. if self._cached_pic_mat.shape[1] == 1: self._cached_pic_mat = self._cached_pic_mat.T else: self._cached_cvx_mat = load_data(samples, alwaysCopy=always_copy)[0] # Treat any vector as a number of scalar samples. if self._cached_cvx_mat.size[1] == 1: self._cached_cvx_mat = self._cached_cvx_mat.T assert any(samples is not None for samples in ( self._cached_cvx_vec, self._cached_pic_vec, self._cached_cvx_mat, self._cached_pic_mat)) if forced_original_shape is not None: forced_shape = load_shape(forced_original_shape) if forced_shape[0]*forced_shape[1] != self.dim: raise ValueError("Incompatible forced original shape.") self._original_shape = forced_shape
def __len__(self): """Number of samples.""" return self.num def __str__(self): return glyphs.parenth( "{} {}-dimensional samples".format(self.num, self.dim)) def __repr__(self): return glyphs.repr2("Samples", self.__str__()) def __getitem__(self, i): return self.vectors[i] def __iter__(self): for vector in self.vectors: yield vector @property def _cvxopt_matrix(self): """A CVXOPT dense or sparse matrix whose columns are the samples. This cached property is used by PICOS internally as accessing the CVXOPT value of a constant PICOS expression would create a copy of the data. .. warning:: :class:`Sample` instances are supposed to be immutable, so you are expected not to modify the returned CVXOPT objects. """ if self._cached_cvx_mat is not None: pass elif self._cached_pic_mat is not None: self._cached_cvx_mat = self._cached_pic_mat.value_as_matrix elif self._cached_cvx_vec is not None: self._cached_cvx_mat = cvxopt_hcat(self._cached_cvx_vec) else: self._cached_cvx_mat = cvxopt_hcat( [s.value_as_matrix for s in self._cached_pic_vec]) return self._cached_cvx_mat @property def _cvxopt_vectors(self): """A :class:`tuple` containing the samples as CVXOPT column vectors. This cached property is used by PICOS internally as accessing the CVXOPT value of a constant PICOS expression would create a copy of the data. .. warning:: :class:`Sample` instances are supposed to be immutable, so you are expected not to modify the returned CVXOPT objects. """ if self._cached_cvx_vec is not None: pass elif self._cached_cvx_mat is not None: self._cached_cvx_vec = tuple(self._cached_cvx_mat[:, i] for i in range(self._cached_cvx_mat.size[1])) elif self._cached_pic_vec is not None: self._cached_cvx_vec = tuple( s.value_as_matrix for s in self._cached_pic_vec) else: # We need to convert from a PICOS to a CVXOPT matrix, do so in a way # that caches the result. _ = self._cvxopt_matrix assert self._cached_cvx_mat is not None self._cached_cvx_vec = tuple(self._cached_cvx_mat[:, i] for i in range(self._cached_cvx_mat.size[1])) return self._cached_cvx_vec @property def matrix(self): """A matrix whose columns are the samples.""" if self._cached_pic_mat is not None: pass else: self._cached_pic_mat = Constant(self._cvxopt_matrix) return self._cached_pic_mat @property def vectors(self): """A :class:`tuple` containing the samples as column vectors.""" if self._cached_pic_vec is not None: pass else: self._cached_pic_vec = tuple( Constant(s) for s in self._cvxopt_vectors) return self._cached_pic_vec
[docs] @cached_property def original(self): """A :class:`tuple` containing the samples in their original shape.""" shape = self.original_shape if shape[1] == 1: return self.vectors else: return tuple(sample.reshaped(shape) for sample in self)
@property def dim(self): """Sample dimension.""" if self._cached_cvx_mat is not None: return self._cached_cvx_mat.size[0] elif self._cached_pic_mat is not None: return self._cached_pic_mat.shape[0] elif self._cached_cvx_vec is not None: # NOTE: len() counts nonzero entries for sparse matrices. return self._cached_cvx_vec[0].size[0] else: return len(self._cached_pic_vec[0]) @property def num(self): """Number of samples.""" if self._cached_cvx_mat is not None: return self._cached_cvx_mat.size[1] elif self._cached_pic_mat is not None: return self._cached_pic_mat.shape[1] elif self._cached_cvx_vec is not None: return len(self._cached_cvx_vec) else: return len(self._cached_pic_vec) @property def original_shape(self): """Original shape of the samples before vectorization.""" if self._original_shape is None: self._original_shape = (self.dim, 1) return self._original_shape
[docs] @cached_property def mean(self): """The sample mean as a column vector.""" return Constant(sum(self._cvxopt_vectors) / self.num)
[docs] @cached_property def covariance(self): """The sample covariance matrix.""" if self.num == 1: return cvxopt.spmatrix([], [], [], (1, 1)) mu = self.mean.value_as_matrix X = self._cvxopt_matrix Y = mu*cvxopt.matrix(1, (1, self.num)) Z = X - Y return Constant(Z * Z.T / (self.num - 1))
[docs] def shuffled(self, rng=None): """Return a randomly shuffled instance of the samples. :param rng: A function that generates a random :class:`float` in :math:`[0, 1)`. Defaults to whatever :func:`random.shuffle` defaults to. :Example: >>> from picos.expressions import Samples >>> S = Samples(range(6)) >>> print(S.matrix) [ 0.00e+00 1.00e+00 2.00e+00 3.00e+00 4.00e+00 5.00e+00] >>> rng = lambda: 0.5 # Fake RNG for reproducibility. >>> print(S.shuffled(rng).matrix) [ 0.00e+00 5.00e+00 1.00e+00 4.00e+00 2.00e+00 3.00e+00] """ order = list(range(self.num)) random.shuffle(order, rng) S = self.__class__.__new__(self.__class__) if self._cached_cvx_mat is not None: S._cached_cvx_mat = self._cached_cvx_mat[:, order] if self._cached_cvx_vec is not None: S._cached_cvx_vec = tuple(self._cached_cvx_vec[i] for i in order) if self._cached_pic_mat is not None: # NOTE: Rename to a default string for consistency. S._cached_pic_mat = self._cached_pic_mat[:, order].renamed( glyphs.matrix(glyphs.shape(self._cached_pic_mat.shape))) if self._cached_pic_vec is not None: S._cached_pic_vec = tuple(self._cached_pic_vec[i] for i in order) return S
[docs] def partition(self, after_or_fraction=0.5): """Split the samples into two parts. :param after_or_fraction: Either a fraction strictly between zero and one that denotes the relative size of the first partition or an integer that denotes the number of samples to put in the first partition. :type after_or_fraction: int or float """ if isinstance(after_or_fraction, float): if after_or_fraction <= 0 or after_or_fraction >= 1: raise ValueError( "A partitioning fraction must be strictly between 0 and 1.") n = round(self.num * after_or_fraction) n = min(n, self.num - 1) n = max(1, n) else: n = int(after_or_fraction) if n < 1 or n >= self.num: raise ValueError("Partitioning would leave one partition empty.") A = self.__class__.__new__(self.__class__) B = self.__class__.__new__(self.__class__) if self._cached_cvx_mat is not None: A._cached_cvx_mat = self._cached_cvx_mat[:, :n] B._cached_cvx_mat = self._cached_cvx_mat[:, n:] if self._cached_cvx_vec is not None: A._cached_cvx_vec = self._cached_cvx_vec[:n] B._cached_cvx_vec = self._cached_cvx_vec[n:] if self._cached_pic_mat is not None: A._cached_pic_mat = self._cached_pic_mat[:, :n] B._cached_pic_mat = self._cached_pic_mat[:, n:] if self._cached_pic_vec is not None: A._cached_pic_vec = self._cached_pic_vec[:n] B._cached_pic_vec = self._cached_pic_vec[n:] A._original_shape = self._original_shape B._original_shape = self._original_shape return A, B
[docs] def kfold(self, k): r"""Perform :math:`k`-fold cross-validation (without shuffling). If random shuffling is desired, write ``S.shuffled().kfold(k)`` where ``S`` is your :class:`Samples` instance. To make the shuffling reproducible, see :meth:`shuffled`. :returns list(tuple): A list of :math:`k` training set and validation set pairs. .. warning:: If the number of samples :math:`n` is not a multiple of :math:`k`, then the last :math:`n \bmod k` samples will appear in every training but in no validation set. :Example: >>> from picos.expressions import Samples >>> n, k = 7, 3 >>> S = Samples(range(n)) >>> for i, (T, V) in enumerate(S.kfold(k)): ... print("Partition {}:\nT = {}V = {}" ... .format(i + 1, T.matrix, V.matrix)) Partition 1: T = [ 2.00e+00 3.00e+00 4.00e+00 5.00e+00 6.00e+00] V = [ 0.00e+00 1.00e+00] <BLANKLINE> Partition 2: T = [ 0.00e+00 1.00e+00 4.00e+00 5.00e+00 6.00e+00] V = [ 2.00e+00 3.00e+00] <BLANKLINE> Partition 3: T = [ 0.00e+00 1.00e+00 2.00e+00 3.00e+00 6.00e+00] V = [ 4.00e+00 5.00e+00] <BLANKLINE> """ if not isinstance(k, int): raise TypeError("k must be an integer.") if k < 2: raise ValueError("k must be at least two.") if k > self.num: raise ValueError("k must not exceed the number of samples.") n = self.num // k assert n >= 1 and n < self.num fold = [] indices = list(range(self.num)) for i in range(k): t = indices[:i*n] + indices[(i+1)*n:] v = indices[i*n:(i+1)*n] T = self.__class__.__new__(self.__class__) V = self.__class__.__new__(self.__class__) if self._cached_cvx_mat is not None: T._cached_cvx_mat = self._cached_cvx_mat[:, t] V._cached_cvx_mat = self._cached_cvx_mat[:, v] if self._cached_cvx_vec is not None: T._cached_cvx_vec = tuple(self._cached_cvx_vec[i] for i in t) V._cached_cvx_vec = tuple(self._cached_cvx_vec[i] for i in v) if self._cached_pic_mat is not None: T._cached_pic_mat = self._cached_pic_mat[:, t] V._cached_pic_mat = self._cached_pic_mat[:, v] if self._cached_pic_vec is not None: T._cached_pic_vec = tuple(self._cached_pic_vec[i] for i in t) V._cached_pic_vec = tuple(self._cached_pic_vec[i] for i in v) fold.append((T, V)) return fold
[docs] def select(self, indices): """Return a new :class:`Samples` instance with only selected samples. :param indices: The indices of the samples to select. """ indices = list(indices) S = self.__class__.__new__(self.__class__) if self._cached_cvx_mat is not None: S._cached_cvx_mat = self._cached_cvx_mat[:, indices] if self._cached_cvx_vec is not None: S._cached_cvx_vec = tuple(self._cached_cvx_vec[i] for i in indices) if self._cached_pic_mat is not None: S._cached_pic_mat = self._cached_pic_mat[:, indices] if self._cached_pic_vec is not None: S._cached_pic_vec = tuple(self._cached_pic_vec[i] for i in indices) if len(S) == 0: raise ValueError("Empty susbet of samples selected.") S._original_shape = self._original_shape return S
# -------------------------------------- __all__ = api_end(_API_START, globals())