"""The polynomial parent class; one of the main building blocks in Effective Quadratures."""
from equadratures.stats import Statistics
from equadratures.parameter import Parameter
from equadratures.basis import Basis
from equadratures.solver import Solver
from equadratures.subsampling import Subsampling
from equadratures.quadrature import Quadrature
from equadratures.datasets import score
import equadratures.plot as plot
import scipy.stats as st
import numpy as np
from copy import deepcopy
MAXIMUM_ORDER_FOR_STATS = 8
CARD_LIMIT_SOFT = int(50e3)
[docs]class Poly(object):
""" Definition of a polynomial object.
Parameters
----------
parameters : list
A list of parameters, where each element of the list is an instance of the Parameter class.
basis : Basis
An instance of the Basis class corresponding to the multi-index set used.
method : str, optional
The method used for computing the coefficients. Should be one of: ``compressed-sensing``, ``least-squares``, ``minimum-norm``, ``numerical-integration``, ``least-squares-with-gradients``, ``least-absolute-residual``, ``huber``, ``elastic-net``, ``relevance-vector-machine`` or ``custom-solver``. See :class:`~equadratures.solver.Solver` for further details.
sampling_args : dict, optional
A dict containing optional arguments centered around the specific sampling strategy:
- **mesh** (str): Avaliable options are: ``monte-carlo``, ``sparse-grid``, ``tensor-grid``, ``induced``, or ``user-defined``. Note that when the ``sparse-grid`` option is invoked, the sparse pseudospectral approximation method [1] is the adopted. One can think of this as being the correct way to use sparse grids in the context of polynomial chaos [2] techniques.
- **subsampling-algorithm** (str): The ``subsampling-algorithm`` input refers to the optimisation technique for subsampling. In the aforementioned four sampling strategies, we generate a logarithm factor of samples above the required amount and prune down the samples using an optimisation technique (see [1]). Existing optimisation strategies include: ``qr``, ``lu``, ``svd``, ``newton``. These refer to QR with column pivoting [2], LU with row pivoting [3], singular value decomposition with subset selection [2] and a convex relaxation via Newton's method for determinant maximization [4]. Note that if the ``tensor-grid`` option is selected, then subsampling will depend on whether the Basis argument is a total order index set, hyperbolic basis or a tensor order index set.
- **sampling-ratio** (float): Denotes the extent of undersampling or oversampling required. For values equal to unity (default), the number of rows and columns of the associated Vandermonde-type matrix are equal.
- **sample-points** (numpy.ndarray): A numpy ndarray with shape (number_of_observations, dimensions) that corresponds to a set of sample points over the parameter space.
- **sample-outputs** (numpy.ndarray): A numpy ndarray with shape (number_of_observations, 1) that corresponds to model evaluations at the sample points. Note that if ``sample-points`` is provided as an input, then the code expects ``sample-outputs`` too.
- **sample-gradients** (numpy.ndarray): A numpy ndarray with shape (number_of_observations, dimensions) that corresponds to a set of sample gradient values over the parameter space.
solver_args : dict, optional
Optional arguments centered around the specific solver used for computing the coefficients. See :class:`Solver<equadratures.solver.Solver>`.
override_cardinality: bool, optional
Option to override the soft cardinality limit of 50E3. By default the limit is enforced in order to avoid excessively long run times when the polynomial coefficients are computed.
Examples
--------
>>> # Subsampling from a tensor grid
>>> param = eq.Parameter(distribution='uniform', lower=-1., upper=1., order=3)
>>> basis = eq.Basis('total order')
>>> poly = eq.Poly(parameters=[param, param], basis=basis, method='least-squares' , sampling_args={'mesh':'tensor-grid', 'subsampling-algorithm':'svd', 'sampling-ratio':1.0})
>>> # User-defined data with compressive sensing
>>> X = np.loadtxt('inputs.txt')
>>> y = np.loadtxt('outputs.txt')
>>> param = eq.Parameter(distribution='uniform', lower=-1., upper=1., order=3)
>>> basis = eq.Basis('total order')
>>> poly = eq.Poly([param, param], basis, method='compressive-sensing', sampling_args={'sample-points':X_red,'sample-outputs':Y_red})
>>> # Using a sparse grid
>>> param = eq.Parameter(distribution='uniform', lower=-1., upper=1., order=3)
>>> basis = eq.Basis('sparse-grid', level=7, growth_rule='exponential')
>>> poly = eq.Poly(parameters=[param, param], basis=basis, method='numerical-integration')
References
----------
1. Constantine, P. G., Eldred, M. S., Phipps, E. T., (2012) Sparse Pseudospectral Approximation Method. Computer Methods in Applied Mechanics and Engineering. 1-12. `Paper <https://www.sciencedirect.com/science/article/pii/S0045782512000953>`__
2. Xiu, D., Karniadakis, G. E., (2002) The Wiener-Askey Polynomial Chaos for Stochastic Differential Equations. SIAM Journal on Scientific Computing, 24(2), `Paper <https://epubs.siam.org/doi/abs/10.1137/S1064827501387826?journalCode=sjoce3>`__
3. Seshadri, P., Iaccarino, G., Ghisu, T., (2018) Quadrature Strategies for Constructing Polynomial Approximations. Uncertainty Modeling for Engineering Applications. Springer, Cham, 2019. 1-25. `Preprint <https://arxiv.org/pdf/1805.07296.pdf>`__
4. Seshadri, P., Narayan, A., Sankaran M., (2017) Effectively Subsampled Quadratures for Least Squares Polynomial Approximations. SIAM/ASA Journal on Uncertainty Quantification, 5(1). `Paper <https://epubs.siam.org/doi/abs/10.1137/16M1057668>`__
5. Rogers, S., Girolami, M., (2016) Variability in predictions. In: A First Course in Machine Learning, Second Edition (2nd. ed.). Chapman & Hall/CRC. `Book <https://github.com/wwkenwong/book/blob/master/Simon%20Rogers%2C%20Mark%20Girolami%20A%20First%20Course%20in%20Machine%20Learning.pdf>`__
"""
def __init__(self, parameters, basis, method=None, sampling_args=None, solver_args={}, variable=None, override_cardinality=False):
try:
len(parameters)
except TypeError:
parameters = [parameters]
self.parameters = parameters
self.basis = deepcopy(basis)
self.method = method
self.sampling_args = sampling_args
self.solver_args = solver_args
self.override = override_cardinality
self.dimensions = len(parameters)
self.orders = []
self.gradient_flag = False
self.variable = variable
for i in range(0, self.dimensions):
self.orders.append(self.parameters[i].order)
if not self.basis.orders :
self.basis.set_orders(self.orders)
# Initialize some default values!
self.inputs = None
self.outputs = None
self.output_variances = None
self.subsampling_algorithm_name = None
self.sampling_ratio = 1.0
self.statistics_object = None
self.parameters_order = [ parameter.order for parameter in self.parameters]
self.highest_order = np.max(self.parameters_order)
if self.method is not None:
if self.method == 'numerical-integration' or self.method == 'integration':
self.mesh = self.basis.basis_type
if self.basis.basis_type != 'tensor-grid' and self.basis.basis_type != 'sparse-grid' and self.basis.basis_type != 'univariate':
raise ValueError('tensor-grid or sparse-grid basis must be used with the numerical-integration Poly method')
elif self.method == 'least-squares' or self.method == 'least-absolute-residual' or self.method=='huber' or self.method=='elastic-net':
self.mesh = 'tensor-grid'
elif self.method == 'least-squares-with-gradients':
self.gradient_flag = True
self.mesh = 'tensor-grid'
elif self.method == 'compressed-sensing' or self.method == 'compressive-sensing':
self.mesh = 'monte-carlo'
elif self.method == 'minimum-norm':
self.mesh = 'monte-carlo'
elif self.method == 'relevance-vector-machine':
self.mesh = 'monte-carlo'
# Now depending on user inputs, override these default values!
sampling_args_flag = 0
if self.sampling_args is not None:
if 'mesh' in sampling_args:
self.mesh = sampling_args.get('mesh')
sampling_args_flag = 1
if 'sampling-ratio' in sampling_args:
self.sampling_ratio = float(sampling_args.get('sampling-ratio'))
sampling_args_flag = 1
if 'subsampling-algorithm' in sampling_args:
self.subsampling_algorithm_name = sampling_args.get('subsampling-algorithm')
sampling_args_flag = 1
if 'sample-points' in sampling_args:
self.inputs = sampling_args.get('sample-points')
sampling_args_flag = 1
self.mesh = 'user-defined'
if 'sample-outputs' in sampling_args:
self.outputs = sampling_args.get('sample-outputs')
sampling_args_flag = 1
if 'sample-gradients' in sampling_args:
self.gradients = sampling_args.get('sample-gradients')
sampling_args_flag = 1
if 'gram-schmidt-correction' in sampling_args:
R_Psi = sampling_args.get('gram-schmidt-correction')
self.inv_R_Psi = np.linalg.inv(R_Psi)
sampling_args_flag = 1
if 'correlations' in sampling_args:
self.corr = sampling_args.get('correlations')
sampling_args_flag = 1
if sampling_args_flag == 0:
raise ValueError( 'An input value that you have specified is likely incorrect. Sampling arguments include: mesh, sampling-ratio, subsampling-algorithm, sample-points, sample-outputs and sample-gradients.')
# Additional optional sampling_args
if 'sample-outputs' in sampling_args:
self.output_variances = sampling_args.get('sample-output-variances')
# Check cardinality before setting points and weights etc (unless override_cardinality=True)
if not self.override:
cardinality = self.basis.get_cardinality()
if cardinality >= CARD_LIMIT_SOFT:
raise Exception('Cardinality %.1e >= soft cardinality limit %.1e. Computing polynomial coefficients may take a long time. To override this, set override_cardinality=True.'
%(cardinality,CARD_LIMIT_SOFT))
# Set solver, points and weight etc
self._set_solver()
self._set_subsampling_algorithm()
self._set_points_and_weights()
else:
print('WARNING: Method not declared.')
[docs] def plot_polyfit_1D(self, ax=None, uncertainty=True, output_variances=None, number_of_points=200, show=True):
""" Plots a 1D only polynomial fit to the data. See :meth:`~equadratures.plot.plot_polyfit_1D` for full description. """
return plot.plot_polyfit_1D(self,ax,uncertainty,output_variances,number_of_points,show)
[docs] def plot_model_vs_data(self, ax=None, sample_data=None, metric='adjusted_r2', show=True):
""" Plots the polynomial approximation against the true data. See :meth:`~equadratures.plot.plot_model_vs_data` for full description. """
return plot.plot_model_vs_data(self,ax,sample_data,metric,show)
[docs] def plot_sobol(self, ax=None, order=1, show=True, labels=None, kwargs={}):
""" Plots a polynomial's Sobol' indices of a given order. See :meth:`~equadratures.plot.plot_sobol` for full description. """
return plot.plot_sobol(self,ax,order,show,labels,kwargs)
[docs] def plot_parameters(self, ax=None, cols=2, show=True):
""" Plots the probability density functions for all Parameters within a Polynomial. See :meth:`~equadratures.plot.plot_parameters` for full description. """
return plot.plot_parameters(self, ax, cols, show)
[docs] def plot_total_sobol(self, ax=None, show=True, labels=None, kwargs={}):
""" Plots a polynomial's total-order Sobol' indices. See :meth:`~equadratures.plot.plot_total_sobol` for full description. """
return plot.plot_total_sobol(self,ax,show,labels,kwargs)
[docs] def plot_sobol_heatmap(self,parameters=None,show=True,ax=None):
""" Generates a heatmap showing the first and second order Sobol indices. See :meth:`~equadratures.plot.plot_sobol_heatmap` for full description. """
return plot.plot_sobol_heatmap(self,parameters,show,ax)
def _set_parameters(self, parameters):
""" Private function that sets the parameters. Required by the Correlated class. """
self.parameters = parameters
self._set_points_and_weights()
[docs] def get_parameters(self):
""" Returns the list of parameters.
Returns
-------
list
Contains the :class:`~equadratures.parameter.Parameter` objects belonging to the polynomial.
"""
return self.parameters
[docs] def get_summary(self, filename=None, tosay=False):
""" A simple utility that writes a file summarising what the polynomial approximation has determined.
Parameters
----------
filename : str, optional
The filename to write to. If ``None``, the summary is written to *effective-quadratures-output.txt*.
tosay : bool, optional
True will replace "-" signs with "minus" when writing to file for compatibility with os.say().
"""
prec = '{:.3g}'
if self.dimensions == 1:
parameter_string = str('parameter.')
else:
parameter_string = str('parameters.')
introduction = str('Your problem has been defined by '+str(self.dimensions)+' '+parameter_string)
added = str('Their distributions are given as follows:')
for i in range(0, self.dimensions):
added_new = ('\nParameter '+str(i+1)+' '+str(self.parameters[i].get_description()))
if i == 0:
if self.variable is not None:
title = str('This polynomial concerns the output variable '+str(self.variable) + '.\n')
added = title + introduction + added_new
else:
added = introduction + added_new
else:
added = added + added_new
if self.statistics_object is not None:
mean_value, var_value = self.get_mean_and_variance()
X = self.get_points()
y_eval = self.get_polyfit(X)
y_valid = self._model_evaluations
a,b,r,_,_ = st.linregress(y_eval.flatten(),y_valid.flatten())
r2 = r**2
statistics = '\n \nA summary of computed output statistics is given below:\nThe mean is estimated to be '+ prec.format(mean_value) +\
' while the variance is ' + prec.format(var_value) +'.\nFor the data avaliable, the polynomial approximation had a r square value of '+prec.format(r2)+'.'
if self.dimensions > 1:
sobol_indices_array = np.argsort(self.get_total_sobol_indices())
final_value = sobol_indices_array[-1] + 1
statistics_extra = str('\nAdditionally, the most important parameter--based on the total Sobol indices--was found to be parameter '+str(final_value)+'.')
statistics = statistics + statistics_extra
added = added + statistics
if(tosay is True):
added = added.replace('e-','e minus')
added = added.replace('minus0','minus')
if filename is None:
filename = 'effective-quadratures-output.txt'
output_file = open(filename, 'w')
output_file.write(added)
output_file.close()
def _set_subsampling_algorithm(self):
""" Private function that sets the subsampling algorithm based on the user-defined method. """
polysubsampling = Subsampling(self.subsampling_algorithm_name)
self.subsampling_algorithm_function = polysubsampling.get_subsampling_method()
def _set_solver(self):
""" Private function that sets the solver depending on the user-defined method. """
self.solver = Solver.select_solver(self.method, self.solver_args)
if self.method.lower()=="elastic-net":
self.solver.elements=self.basis.elements
def _set_points_and_weights(self):
""" Private function that sets the quadrature points. """
if hasattr(self, 'corr'):
corr = self.corr
else:
corr = None
self.quadrature = Quadrature(parameters=self.parameters, basis=self.basis, \
points=self.inputs, mesh=self.mesh, corr=corr)
quadrature_points, quadrature_weights = self.quadrature.get_points_and_weights()
if self.subsampling_algorithm_name is not None:
P = self.get_poly(quadrature_points)
W = np.mat( np.diag(np.sqrt(quadrature_weights)))
A = W * P.T
self.A = A
self.P = P
mm, nn = A.shape
m_refined = int(np.round(self.sampling_ratio * nn))
z = self.subsampling_algorithm_function(A, m_refined)
self._quadrature_points = quadrature_points[z,:]
self._quadrature_weights = quadrature_weights[z] / np.sum(quadrature_weights[z])
else:
self._quadrature_points = quadrature_points
self._quadrature_weights = quadrature_weights
P = self.get_poly(quadrature_points)
W = np.mat( np.diag(np.sqrt(quadrature_weights)))
A = W * P.T
self.A = A
self.P = P
[docs] def get_model_evaluations(self):
""" Returns the output model evaluations used to fit the polynomial.
Returns
-------
numpy.ndarray
Array containing the output model evaluations.
"""
return self._model_evaluations
[docs] def get_mean_and_variance(self):
""" Computes the mean and variance of the model.
Returns
-------
tuple
Tuple containing two floats; the approximated mean and variance from the polynomial fit.
"""
self._set_statistics()
return self.statistics_object.get_mean(), self.statistics_object.get_variance()
[docs] def get_skewness_and_kurtosis(self):
""" Computes the skewness and kurtosis of the model.
Returns
-------
tuple
Tuple containing two floats; the approximated skewness and kurtosis from the polynomial fit.
"""
self._set_statistics()
return self.statistics_object.get_skewness(), self.statistics_object.get_kurtosis()
def _set_statistics(self):
""" Private method that is used within the statistics routines. """
if self.statistics_object is None:
if hasattr(self, 'inv_R_Psi'):
# quad_pts, quad_wts = self.quadrature.get_points_and_weights()
N_quad = 20000
quad_pts = self.corr.get_correlated_samples(N=N_quad)
quad_wts = 1.0 / N_quad * np.ones(N_quad)
poly_vandermonde_matrix = self.get_poly(quad_pts)
elif self.method != 'numerical-integration' and self.dimensions <= 6 and self.highest_order <= MAXIMUM_ORDER_FOR_STATS:
quad = Quadrature(parameters=self.parameters, basis=Basis('tensor-grid', orders= np.array(self.parameters_order) + 1), \
mesh='tensor-grid', points=None)
quad_pts, quad_wts = quad.get_points_and_weights()
poly_vandermonde_matrix = self.get_poly(quad_pts)
elif self.mesh == 'monte-carlo':
quad = Quadrature(parameters=self.parameters,
basis=self.basis, points=None, mesh=self.mesh, oversampling=10.0)
quad_pts, quad_wts = quad.get_points_and_weights()
N_quad = len(quad_wts)
quad_wts = 1.0 / N_quad * np.ones(N_quad)
poly_vandermonde_matrix = self.get_poly(quad_pts)
else:
poly_vandermonde_matrix = self.get_poly(self._quadrature_points)
quad_pts, quad_wts = self.get_points_and_weights()
if self.highest_order <= MAXIMUM_ORDER_FOR_STATS and (self.basis.basis_type.lower() == 'total-order'
or self.basis.basis_type.lower() == 'hyperbolic-basis'):
self.statistics_object = Statistics(self.parameters, self.basis, self.coefficients, quad_pts, \
quad_wts, poly_vandermonde_matrix, max_sobol_order=self.highest_order)
else:
self.statistics_object = Statistics(self.parameters, self.basis, self.coefficients, quad_pts, \
quad_wts, poly_vandermonde_matrix, max_sobol_order=MAXIMUM_ORDER_FOR_STATS)
[docs] def get_sobol_indices(self, order):
""" Computes the Sobol' indices.
Parameters
----------
order : int
The order of the Sobol' indices required.
Returns
-------
dict
A dict comprising of Sobol' indices and constitutent mixed orders of the parameters.
"""
self._set_statistics()
return self.statistics_object.get_sobol(order)
[docs] def get_total_sobol_indices(self):
""" Computes the total Sobol' indices.
Returns
-------
numpy.ndarray
Array of length d (number of input parameters) of total Sobol' indices.
"""
self._set_statistics()
return self.statistics_object.get_sobol_total()
[docs] def get_conditional_skewness_indices(self, order):
""" Computes the skewness indices.
Parameters
----------
order : int
The highest order of the skewness indices required.
Returns
-------
dict
A dict comprising of skewness indices and constitutent mixed orders of the parameters.
"""
self._set_statistics()
return self.statistics_object.get_conditional_skewness(order)
[docs] def get_conditional_kurtosis_indices(self, order):
""" Computes the kurtosis indices.
Parameters
----------
order : int
The highest order of the kurtosis indices required.
Returns
-------
dict
A dict comprising of kurtosis indices and constitutent mixed orders of the parameters.
"""
self._set_statistics()
return self.statistics_object.get_conditional_kurtosis(order)
[docs] def set_model(self, model=None, model_grads=None):
""" Computes the coefficients of the polynomial via the method selected.
If model evaluations and/or gradients have not yet been provided to Poly via ``sample-outputs`` and/or ``sample-gradients``, they can be given here via the ``model`` and ``model_grads`` arguments.
Parameters
----------
model : ~collections.abc.Callable,numpy.ndarray
The function that needs to be approximated. In the absence of a callable function, the input can be a ndarray model evaluations at the quadrature points.
model_grads : ~collections.abc.Callable,numpy.ndarray
The gradient of the function that needs to be approximated. In the absence of a callable gradient function, the input can be a ndarray of gradient evaluations at the quadrature points.
Examples
--------
Defining a simple univariate example
>>> x = np.linspace(-2,2,20).reshape(-1,1)
>>> f = lambda x: x**2 - 1
>>> param = eq.Parameter(lower=-2,upper=2,order=2)
>>> basis = eq.Basis('univariate')
Least-squares regression of existing data
>>> y = f(x)
>>> poly = eq.Poly(param,basis,method='least-squares',sampling_args={'mesh': 'user-defined','sample-points':x,'sample-outputs':y})
>>> poly.set_model()
>>> eq.datasets.score(f(xtest),poly.get_polyfit(xtest),metric='rmse')
5.438959822042073e-16
Numerical integration with model provided as a callable function
>>> poly = eq.Poly(param,basis,method='numerical-integration')
>>> poly.set_model(f)
>>> eq.datasets.score(f(xtest),poly.get_polyfit(xtest),metric='rmse')
5.363652779335998e-15
Numerical integration, manually providing model evaluations at quadrature points
>>> poly = eq.Poly(param,basis,method='numerical-integration')
>>> x = poly.get_points()
>>> y = f(x)
>>> poly.set_model(y)
>>> eq.datasets.score(f(xtest),poly.get_polyfit(xtest),metric='rmse')
5.363652779335998e-15
Raises
------
ValueError
model values should be a column vector.
"""
if (model is None) and (self.outputs is not None):
self._model_evaluations = self.outputs
else:
if callable(model):
y = evaluate_model(self._quadrature_points, model)
else:
y = model
# TODO: This error gives messages that are usually not clear
assert(y.shape[0] == self._quadrature_points.shape[0])
if y.shape[1] != 1:
raise ValueError( 'model values should be a column vector.')
self._model_evaluations = y
if self.gradient_flag:
if (model_grads is None) and (self.gradients is not None):
grad_values = self.gradients
else:
if callable(model_grads):
grad_values = evaluate_model_gradients(self._quadrature_points, model_grads, 'matrix')
else:
grad_values = model_grads
p, q = grad_values.shape
self._gradient_evaluations = np.zeros((p*q,1))
W = np.diag(np.sqrt(self._quadrature_weights))
counter = 0
for j in range(0,q):
for i in range(0,p):
self._gradient_evaluations[counter] = W[i,i] * grad_values[i,j]
counter = counter + 1
del grad_values
self.statistics_object = None
self._set_coefficients()
def _set_coefficients(self, user_defined_coefficients=None):
""" Computes the polynomial approximation coefficients.
Parameters
----------
user_defined_coefficients : numpy.ndarray, optional
A numpy.ndarray of shape (N, 1) where N corresponds to the N coefficients provided by the user
"""
# Check to ensure that if there any NaNs, a different basis must be used and solver must be changed
# to least squares!
if user_defined_coefficients is not None:
self.coefficients = user_defined_coefficients
return
indices_with_nans = np.argwhere(np.isnan(self._model_evaluations))[:,0]
if len(indices_with_nans) != 0:
print('WARNING: One or more of your model evaluations have resulted in an NaN. We found '+str(len(indices_with_nans))+' NaNs out of '+str(len(self._model_evaluations))+'.')
print('The code will now use a least-squares technique that will ignore input-output pairs of your model that have NaNs. This will likely compromise computed statistics.')
self.inputs = np.delete(self._quadrature_points, indices_with_nans, axis=0)
self.outputs = np.delete(self._model_evaluations, indices_with_nans, axis=0)
self.subsampling_algorithm_name = None
number_of_basis_to_prune_down = self.basis.cardinality - len(self.outputs)
if number_of_basis_to_prune_down > 0:
self.basis.prune(number_of_basis_to_prune_down + self.dimensions) # To make it an over-determined system!
self.method = 'least-squares'
self.mesh = 'user-defined'
self._set_solver()
self._set_points_and_weights()
self.set_model()
if self.mesh == 'sparse-grid':
counter = 0
multi_index = []
coefficients = np.empty([1])
multindices = np.empty([1, self.dimensions])
for tensor in self.quadrature.list:
P = self.get_poly(tensor.points, tensor.basis.elements)
W = np.diag(np.sqrt(tensor.weights))
A = np.dot(W , P.T)
_, _ , counts = np.unique( np.vstack( [tensor.points, self._quadrature_points]), axis=0, return_index=True, return_counts=True)
indices = [i for i in range(0, len(counts)) if counts[i] == 2]
b = np.dot(W , self._model_evaluations[indices])
del counts, indices
coefficients_i = self.solver.get_coefficients(A, b) * self.quadrature.sparse_weights[counter]
multindices_i = tensor.basis.elements
coefficients = np.vstack([coefficients_i, coefficients])
multindices = np.vstack([multindices_i, multindices])
counter = counter + 1
multindices = np.delete(multindices, multindices.shape[0]-1, 0)
coefficients = np.delete(coefficients, coefficients.shape[0]-1)
unique_indices, indices , counts = np.unique(multindices, axis=0, return_index=True, return_counts=True)
coefficients_final = np.zeros((unique_indices.shape[0], 1))
for i in range(0, unique_indices.shape[0]):
for j in range(0, multindices.shape[0]):
if np.array_equiv( unique_indices[i,:] , multindices[j,:]):
coefficients_final[i] = coefficients_final[i] + coefficients[j]
self.coefficients = coefficients_final
self.basis.elements = unique_indices
else:
P = self.get_poly(self._quadrature_points)
W = np.diag(np.sqrt(self._quadrature_weights))
A = np.dot(W , P.T)
b = np.dot(W , self._model_evaluations)
if self.gradient_flag:
# Now, we can reduce the number of rows!
dP = self.get_poly_grad(self._quadrature_points)
C = cell2matrix(dP, W)
G = np.vstack([A, C])
r = np.linalg.matrix_rank(G)
m, n = A. shape
print('Gradient computation: The rank of the stacked matrix is '+str(r)+'.')
print('The number of unknown basis terms is '+str(n))
if n > r:
print('WARNING: Please increase the number of samples; one way to do this would be to increase the sampling-ratio.')
self.coefficients = self.solver.get_coefficients(A, b, C, self._gradient_evaluations)
else:
self.coefficients = self.solver.get_coefficients(A, b)
[docs] def get_multi_index(self):
""" Returns the multi-index set of the basis.
Returns
-------
numpy.ndarray
Array of the coefficients with size (cardinality_of_basis, dimensions).
"""
return self.basis.elements
[docs] def get_coefficients(self):
""" Returns the coefficients of the polynomial approximation.
Returns
-------
numpy.ndarray
The coefficients with size (number_of_coefficients, 1).
"""
return self.coefficients
[docs] def get_points(self):
""" Returns the samples based on the sampling strategy.
Returns
-------
numpy.ndarray
The sampled quadrature points with shape (number_of_samples, dimension).
"""
return self._quadrature_points
[docs] def get_weights(self):
""" Computes quadrature weights.
Returns
-------
numpy.ndarray
Array of the corresponding quadrature weights with shape (number_of_samples, 1).
"""
return self._quadrature_weights
[docs] def get_points_and_weights(self):
""" Returns the samples and weights based on the sampling strategy.
Returns
-------
tuple
Tuple containing two ndarrays; the sampled quadrature points with shape (number_of_samples, dimension), and the corresponding quadrature weights with shape (number_of_samples, 1).
"""
return self._quadrature_points, self._quadrature_weights
[docs] def get_polyfit(self, stack_of_points, uq=False):
""" Evaluates the /polynomial approximation of a function (or model data) at prescribed points.
Parameters
----------
stack_of_points : numpy.ndarray
An ndarray with shape (number_of_observations, dimensions) at which the polynomial fit must be evaluated at.
uq : bool, optional
If ``True``, the estimated uncertainty (standard deviation) of the polynomial approximation is also returned (see [5]).
Returns
-------
numpy.ndarray
Array with shape (1, number_of_observations) corresponding to the polynomial approximation of the model.
"""
N = len(self.coefficients)
if uq:
return np.dot(self.get_poly(stack_of_points).T , self.coefficients.reshape(N, 1)), self._get_polystd(stack_of_points)
else:
return np.dot(self.get_poly(stack_of_points).T , self.coefficients.reshape(N, 1))
[docs] def get_polyfit_grad(self, stack_of_points, dim_index = None):
""" Evaluates the gradient of the polynomial approximation of a function (or model data) at prescribed points.
Parameters
----------
stack_of_points : numpy.ndarray
An ndarray with shape (number_of_observations, dimensions) at which the polynomial fit approximation's gradient must be evaluated at.
dim_index : int, optional
Index of dimension to evaluate gradient for.
Returns
-------
numpy.ndarray
Array of shape (dimensions, number_of_observations) corresponding to the polynomial gradient approximation of the model.
"""
N = len(self.coefficients)
if stack_of_points.ndim == 1:
no_of_points = 1
else:
no_of_points, _ = stack_of_points.shape
H = self.get_poly_grad(stack_of_points, dim_index=dim_index)
grads = np.zeros((self.dimensions, no_of_points ) )
if self.dimensions == 1:
return np.dot(self.coefficients.reshape(N,), H)
for i in range(0, self.dimensions):
grads[i,:] = np.dot(self.coefficients.reshape(N,) , H[i] )
return grads
[docs] def get_polyfit_hess(self, stack_of_points):
""" Evaluates the hessian of the polynomial approximation of a function (or model data) at prescribed points.
Parameters
--------_-
stack_of_points : numpy.ndarray
An ndarray with shape (number_of_observations, dimensions) at which the polynomial fit approximation's Hessian must be evaluated at.
Returns
-------
numpy.ndarray
Array of shape (dimensions, dimensions, number_of_observations) corresponding to the polynomial Hessian approximation of the model.
"""
if stack_of_points.ndim == 1:
no_of_points = 1
else:
no_of_points, _ = stack_of_points.shape
H = self.get_poly_hess(stack_of_points)
if self.dimensions == 1:
return np.dot(self.coefficients.T , H)
hess = np.zeros((self.dimensions, self.dimensions, no_of_points))
for i in range(0, self.dimensions):
for j in range(0, self.dimensions):
hess[i, j, :] = np.dot(self.coefficients.T , H[i * self.dimensions + j])
return hess
[docs] def get_polyfit_function(self):
""" Returns a callable polynomial approximation of a function (or model data).
Returns
-------
~collections.abc.Callable
A callable function.
"""
N = len(self.coefficients)
return lambda x: np.dot( self.get_poly(x).T , self.coefficients.reshape(N, 1) )
[docs] def get_polyfit_grad_function(self):
""" Returns a callable for the gradients of the polynomial approximation of a function (or model data).
Returns
-------
~collections.abc.Callable
A callable function.
"""
return lambda x : self.get_polyfit_grad(x)
[docs] def get_polyfit_hess_function(self):
""" Returns a callable for the hessian of the polynomial approximation of a function (or model data).
Returns
-------
~collections.abc.Callable
A callable function.
"""
return lambda x : self.get_polyfit_hess(x)
[docs] def get_poly(self, stack_of_points, custom_multi_index=None):
""" Evaluates the value of each polynomial basis function at a set of points.
Parameters
----------
stack_of_points : numpy.ndarray
An ndarray with shape (number of observations, dimensions) at which the polynomial must be evaluated.
custom_multi_index : numpy.ndarray, optional
Array containing a custom multi-index set, in the format given by :meth:`~equadratures.basis.Basis.get_elements`.
Returns
-------
numpy.ndarray
Array of shape (cardinality, number_of_observations) corresponding to the polynomial basis function evaluations at the stack_of_points.
"""
if custom_multi_index is None:
basis = self.basis.elements
else:
basis = custom_multi_index
basis_entries, dimensions = basis.shape
if stack_of_points.ndim == 1:
no_of_points = 1
else:
no_of_points, _ = stack_of_points.shape
p = {}
# Save time by returning if univariate!
if dimensions == 1:
poly , _ , _ = self.parameters[0]._get_orthogonal_polynomial(stack_of_points, int(np.max(basis)))
return poly
else:
for i in range(0, dimensions):
if len(stack_of_points.shape) == 1:
stack_of_points = np.array([stack_of_points])
p[i] , _ , _ = self.parameters[i]._get_orthogonal_polynomial(stack_of_points[:,i], int(np.max(basis[:,i])) )
# One loop for polynomials
polynomial = np.ones((basis_entries, no_of_points))
for k in range(dimensions):
basis_entries_this_dim = basis[:, k].astype(int)
polynomial *= p[k][basis_entries_this_dim]
if hasattr(self, 'inv_R_Psi'):
polynomial = self.inv_R_Psi.T @ polynomial
return polynomial
[docs] def get_poly_grad(self, stack_of_points, dim_index = None):
""" Evaluates the gradient for each of the polynomial basis functions at a set of points, with respect to each input variable.
Parameters
----------
stack_of_points : numpy.ndarray
An ndarray with shape (number_of_observations, dimensions) at which the gradient must be evaluated.
dim_index : int, optional
Index of the dimension to evaluate the gradient for.
Returns
-------
list
A list with d elements, where d corresponds to the dimension of the problem. Each element is a numpy.ndarray of shape (cardinality, number_of_observations) corresponding to the gradient polynomial evaluations at the stack_of_points.
"""
# "Unpack" parameters from "self"
basis = self.basis.elements
basis_entries, dimensions = basis.shape
if len(stack_of_points.shape) == 1:
if dimensions == 1:
# a 1d array of inputs, and each input is 1d
stack_of_points = np.reshape(stack_of_points, (len(stack_of_points),1))
else:
# a 1d array representing 1 point, in multiple dimensions!
stack_of_points = np.array([stack_of_points])
no_of_points, _ = stack_of_points.shape
p = {}
dp = {}
# Save time by returning if univariate!
if dimensions == 1:
_ , dpoly, _ = self.parameters[0]._get_orthogonal_polynomial(stack_of_points, int(np.max(basis) ) )
return dpoly
else:
for i in range(0, dimensions):
if len(stack_of_points.shape) == 1:
stack_of_points = np.array([stack_of_points])
p[i] , dp[i], _ = self.parameters[i]._get_orthogonal_polynomial(stack_of_points[:,i], int(np.max(basis[:,i])) )
# One loop for polynomials
R = []
if dim_index is None:
dim_index = range(dimensions)
for v in range(dimensions):
if not(v in dim_index):
R.append(np.zeros((basis_entries, no_of_points)))
else:
polynomialgradient = np.ones((basis_entries, no_of_points))
for k in range(dimensions):
basis_entries_this_dim = basis[:,k].astype(int)
if k==v:
polynomialgradient *= dp[k][basis_entries_this_dim]
else:
polynomialgradient *= p[k][basis_entries_this_dim]
if hasattr(self, 'inv_R_Psi'):
polynomialgradient = self.inv_R_Psi.T @ polynomialgradient
R.append(polynomialgradient)
return R
[docs] def get_poly_hess(self, stack_of_points):
""" Evaluates the Hessian for each of the polynomial basis functions at a set of points, with respect to each input variable.
Parameters
----------
stack_of_points : numpy.ndarray
An ndarray with shape (number_of_observations, dimensions) at which the Hessian must be evaluated.
Returns
-------
list
A list with d^2 elements, where d corresponds to the dimension of the model. Each element is a numpy.ndarray of shape (cardinality, number_of_observations) corresponding to the hessian polynomial evaluations at the stack_of_points.
"""
# "Unpack" parameters from "self"
basis = self.basis.elements
basis_entries, dimensions = basis.shape
if stack_of_points.ndim == 1:
no_of_points = 1
else:
no_of_points, _ = stack_of_points.shape
p = {}
dp = {}
d2p = {}
# Save time by returning if univariate!
if dimensions == 1:
_, _, d2poly = self.parameters[0]._get_orthogonal_polynomial(stack_of_points, int(np.max(basis)))
return d2poly
else:
for i in range(0, dimensions):
if len(stack_of_points.shape) == 1:
stack_of_points = np.array([stack_of_points])
p[i], dp[i], d2p[i] = self.parameters[i]._get_orthogonal_polynomial(stack_of_points[:, i],
int(np.max(basis[:, i]) + 1))
H = []
for w in range(0, dimensions):
gradDirection1 = w
for v in range(0, dimensions):
gradDirection2 = v
polynomialhessian = np.zeros((basis_entries, no_of_points))
for i in range(0, basis_entries):
temp = np.ones((1, no_of_points))
for k in range(0, dimensions):
if k == gradDirection1 == gradDirection2:
polynomialhessian[i, :] = d2p[k][int(basis[i, k])] * temp
elif k == gradDirection1:
polynomialhessian[i, :] = dp[k][int(basis[i, k])] * temp
elif k == gradDirection2:
polynomialhessian[i, :] = dp[k][int(basis[i, k])] * temp
else:
polynomialhessian[i, :] = p[k][int(basis[i, k])] * temp
temp = polynomialhessian[i, :]
if hasattr(self, 'inv_R_Psi'):
polynomialhessian = self.inv_R_Psi.T @ polynomialhessian
H.append(polynomialhessian)
return H
[docs] def get_polyscore(self,X_test=None,y_test=None,metric='adjusted_r2'):
""" Evaluates the accuracy of the polynomial approximation using the selected accuracy metric.
Training accuracy is evaluated on the data used for fitting the polynomial. Testing accuracy is evaluated on new data if it is provided by the ``X_test`` and ``y_test`` arguments (both must be provided together).
Parameters
----------
X_test : numpy.ndarray, optional
An ndarray with shape (number_of_observations, dimensions), containing new data ``X_test`` data (optional).
y_test : numpy.ndarray, optional
An ndarray with shape (number_of_observations, 1) containing new ``y_test`` data (optional).
metric : str, optional
An optional string containing the scoring metric to use. Avaliable options are: ``adjusted_r2``, ``r2``, ``mae``, ``rmse``, or ``normalised_mae`` (default: ``adjusted_r2``).
Returns
-------
float
If ``X_test`` and ``y_test`` are ``None``. The training score of the model, output as a float.
tuple
If ``X_test`` and ``y_test`` are given. Tuple containing the train and test scores of the model.
"""
X = self.get_points()
y_pred = self.get_polyfit(X)
train_score = score(self._model_evaluations, y_pred,metric, X=X)
if X_test is not None and y_test is not None:
y_pred_test = self.get_polyfit(X_test)
test_score = score(y_test,y_pred_test,metric,X=X_test)
return train_score, test_score
else:
return train_score
def _get_polystd(self, stack_of_points):
""" Private function to evaluate the uncertainty of the polynomial approximation at prescribed points, following the approach from [5].
Parameters
----------
stack_of_points : numpy.ndarray
An ndarray with shape (number_of_observations, dimensions) at which the polynomial variance must be evaluated at.
Returns
-------
numpy.ndarray
Array of shape (number_of_observations,1) corresponding to the uncertainty (one standard deviation) of the polynomial approximation at each point.
"""
# Training data
X_train = self._quadrature_points
y_train = self._model_evaluations
# Define covariance matrix - TODO: allow non-diagonal matrix?
# Empirical variance
if self.output_variances is None:
mse = ((y_train - self.get_polyfit(X_train))**2).mean()
data_variance = np.full(X_train.shape[0],mse)
# User defined variance (scalar)
elif np.isscalar(self.output_variances):
data_variance = np.full(X_train.shape[0],self.output_variances)
# User defined variance (array)
else:
data_variance = self.output_variances
Sigma = np.diag(data_variance)
# Construct Q, the pseudoinverse of the weighted orthogonal polynomial matrix P
P = self.get_poly(self._quadrature_points)
W = np.diag(np.sqrt(self._quadrature_weights))
A = np.dot(W, P.T)
Q = np.dot( _inv( np.dot(A.T, A) ), A.T)
# Construct A matrix for test points, but omit weights
X_test = stack_of_points
Po = self.get_poly(X_test)
Ao = Po.T
# Propagate the uncertainties
Sigma_X = np.dot( np.dot(Q, Sigma), Q.T)
Sigma_F = np.dot( np.dot(Ao, Sigma_X), Ao.T)
std_F = np.sqrt( np.diag(Sigma_F) )
return std_F.reshape(-1,1)
[docs]class Graphpolys(object):
"""
Constructor for generating polynomials over graphs.
Parameters
----------
Graph : networkx.Graph
A networkx Graph object.
data_train : dict
A dictionary with training data.
poly : Poly
A Poly object that defines a polynomial over the subspace.
edge_weight : float, optional
Edge weight for Graph edges, default=15.
"""
def __init__(self, Graph, data_train, poly, edge_weight=15):
self.Graph = Graph
self.data_train = data_train
self.poly = poly
self.edge_weight = edge_weight
for _, _, e in Graph.edges(data=True):
e["weight"] = edge_weight
def _stratified_model_admm(self, shape, Lap, loss_proximal_func, regulariser_proximal_func, graph_data=dict(), \
relative_tolerance=1e-5, absolute_tolerance=1e-5, num_jobs=4, \
max_cg_iters=10, max_iters=1000, rho=1, tau_decrement=2, tau_increment=2, mu=10, \
rho_min=0.1, rho_max=1.0):
"""
Fits a Laplacian regularised stratified model using ADMM.
This code is based on the implementation in cvxgrp/strat_models.
@article{strat_models,
author = {Jonathan Tuck and Shane Barratt and Stephen Boyd},
title = {A Distributed Method for Fitting {L}aplacian Regularized Stratified Models},
journal = {Journal of Machine Learning Research},
year = {2021},
note = {To appear}
}
"""
import multiprocessing as mp
import scipy as sc
optimal_solution = False
n = np.prod(shape)
m = Lap.shape[0]
# Retrieve data from ``graph_data``
# alpha_init
if 'alpha_init' in graph_data:
alpha = graph_data['alpha_init'].copy()
else:
alpha = np.zeros((m,) + shape)
primal_residual = np.zeros(alpha.shape)
primal_residual_tilde = np.zeros(alpha.shape)
dual_residual = np.zeros(alpha.shape)
dual_residual_tilde = np.zeros(alpha.shape)
# alpha_tilde
if 'alpha_tilde' in graph_data:
alpha = graph_data['alpha_tilde'].copy()
else:
alpha_tilde = alpha.copy()
# alpha_hat
if 'alpha_hat' in graph_data:
alpha_hat = graph_data['alpha_hat'].copy()
else:
alpha_hat = alpha.copy()
# u
if 'u' in graph_data:
u = graph_data['u'].copy()
else:
u = np.zeros(alpha.shape)
# u_tilde
if 'u_tilde' in graph_data:
u_tilde = graph_data['u_tilde'].copy()
else:
u_tilde = np.zeros(alpha.shape)
# Multiprocessing
if m <= num_jobs:
num_jobs = m
proximal_pool = mp.Pool(num_jobs)
for iter_j in range(1, max_iters):
# Update alpha
alpha = loss_proximal_func(t=1./rho, nu=alpha_hat-u, warm_start=alpha, pool=proximal_pool)
# Update alpha_tilde
alpha_tilde = regulariser_proximal_func(t=1./rho, nu=alpha_hat-u_tilde, warm_start=alpha_tilde, \
pool=proximal_pool)
# Update alpha_hat
S = Lap + 2.0 * rho * sc.sparse.eye(m)
M = sc.sparse.diags(1./S.diagonal() )
indices = np.ndindex(shape)
equ_rhs = rho * (alpha.T + alpha_tilde.T + u.T + u_tilde.T)
for j, index in enumerate(indices):
index_value = index[::-1]
solution = sc.sparse.linalg.cg(S, equ_rhs[index_value], \
M=M, x0=alpha_hat.T[index_value], \
maxiter=max_cg_iters)
solution = solution[0]
dual_residual.T[index_value] = -rho * (solution - alpha_hat.T[index_value])
dual_residual_tilde.T[index_value] = dual_residual.T[index_value]
alpha_hat.T[index_value] = solution
# Updates
primal_residual = alpha - alpha_hat
primal_residual_tilde = alpha_tilde - alpha_hat
u += alpha - alpha_hat
u_tilde += alpha_tilde - alpha_hat
# Calculation of residual norms and epsilon values
primal_residual_norm = np.linalg.norm(np.append(primal_residual, primal_residual_tilde), 2)
dual_residual_norm = np.linalg.norm(np.append(dual_residual, dual_residual_tilde), 2)
primal_eps = np.sqrt(2. * m * n) * absolute_tolerance + relative_tolerance * \
np.max([primal_residual_norm, dual_residual_norm])
dual_eps = np.sqrt(2. * m * n) * absolute_tolerance + relative_tolerance * \
np.linalg.norm(rho * np.append(u, u_tilde))
# Breaking condition!
if primal_residual_norm <= primal_eps and \
dual_residual_norm <= dual_eps:
optimal_solution = True
break
rho_update = rho
if primal_residual_norm > mu * dual_residual_norm:
rho_update = tau_increment * rho
elif dual_residual_norm > mu * primal_residual_norm:
rho_update = rho / tau_decrement
rho_update = np.clip(rho_update, rho_min, rho_max)
u *= rho / rho_update
u_tilde *= rho / rho_update
rho = rho_update
proximal_pool.close()
proximal_pool.join()
output = {'alpha': alpha, \
'alpha_tilde': alpha_tilde, \
'alpha_hat': alpha_hat, \
'u': u, \
'u_tilde': u_tilde}
# Complete later!
result = {'iterations': iter_j, \
'optimal' :optimal_solution}
return output, result
[docs] def predict(self, data, score=True):
"""
Predict.
Parameters
----------
data : dict
A dictionary with test data.
score : bool, optional
returns mean error along with prediction if true.
Returns
-------
Y_pred : numpy.nparray
Predicted data.
mean_error: numpy.ndarray
Mean error in prediction.
"""
import torch
X = torch.from_numpy(data['X'])
X = torch.tensor(self.poly.get_poly(X)).double().t()
alpha = torch.tensor(([self.Graph._node[z]['alpha_tilde'] for z in data["Z"] ]))
Y_pred = (X.unsqueeze(-1) * alpha).sum(1).numpy()
if score:
residuals = (data['Y'] - Y_pred)**2
mean_error = np.sqrt(np.mean(residuals))
return Y_pred, mean_error
else:
return Y_pred
def _graph_to_data(self, shape):
"""
Vectorises the variables in G --> returning a dict.
"""
alpha_init = np.zeros(shape)
alpha_tilde_init = np.zeros(shape)
alpha_hat_init = np.zeros(shape)
u_init = np.zeros(shape)
u_tilde_init = np.zeros(shape)
for i, node in enumerate(self.Graph.nodes()):
vertex = self.Graph._node[node]
if 'alpha' in vertex:
alpha_init[i] = vertex['alpha']
if 'alpha_tilde' in vertex:
alpha_tilde_init[i] = vertex['alpha_tilde']
if 'alpha_hat' in vertex:
alpha_hat_init[i] = vertex['alpha_hat']
if 'u' in vertex:
u_init[i] = vertex['u']
if 'u_tilde' in vertex:
u_tilde_init[i] = vertex['u_tilde']
data = {
'alpha_init': alpha_init,
'alpha_tilde_init': alpha_tilde_init,
'u_init': u_init,
'u_tilde_init': u_tilde_init
}
return data
[docs] def fit(self):
"""
Fit polynomial to data.
"""
import networkx as nx
import torch
# Step 1. Calculate the Laplacian matrix
L = nx.laplacian_matrix(self.Graph)
nodelist = self.Graph.nodes()
K = L.shape[0]
# Step 2. Get the data in the right format
cache = self.loss_function(self.data_train)
# Step 3. Compute the proximal loss
def proximal_loss(t, nu, warm_start, pool, cache=cache):
XtX = cache['XtX']
XtY = cache['XtY']
n = cache['n']
# LU = X'X + 0.5 * t * I
Alu = torch.lu(XtX + 1./(2 * t) * torch.eye(n).unsqueeze(0).double())
b = XtY + 1./(2 * t) * torch.from_numpy(nu)
x = torch.lu_solve(b, *Alu).numpy()
return x
def proximal_residual(t, nu, warm_start, pool, lambda_val=1e-4):
return nu / (1. + t * lambda_val)
G_to_data = self._graph_to_data(cache['alpha_shape'])
result, info = self._stratified_model_admm(shape=cache['shape'], \
Lap=L, \
loss_proximal_func=proximal_loss, \
regulariser_proximal_func=proximal_residual, \
graph_data=G_to_data)
print(info)
return self._output_to_graph(result)
def loss_function(self, data):
import torch
X = data['X']
Y = data['Y']
Z = data['Z']
if X.ndim == 1:
X = X.reshape(-1, 1)
N, _ = X.shape
_, m = Y.shape
n = self.poly.basis.cardinality # verify!
K = len(self.Graph.nodes())
shape = (n, m)
alpha_shape = (K, ) + shape
for x, y, z in zip(X, Y, Z):
vertex = self.Graph._node[z]
if 'X' in vertex:
vertex['X'] += [x]
vertex['Y'] += [y]
else:
vertex['X'] = [x]
vertex['Y'] = [y]
XtX = torch.zeros(K, n, n).double()
XtY = torch.zeros(K, n, m).double()
# Filter out empty nodes
nodes = [[i, node] for i, node in enumerate(self.Graph.nodes()) if self.Graph._node[node] != {}]
for i, node in nodes:
vertex = self.Graph._node[node]
X = torch.tensor(vertex['X']).double()
Y = torch.tensor(vertex['Y']).double()
X = torch.tensor(self.poly.get_poly(X)).double()
XtX[i] = X @ X.t()
XtY[i] = X @ Y
del vertex
cache = {'XtX': XtX, \
'XtY': XtY,
'n': n, \
'alpha_shape': alpha_shape, \
'shape': shape}
return cache
def _output_to_graph(self, output):
alpha = output['alpha']
alpha_tilde = output['alpha_tilde']
alpha_hat = output['alpha_hat']
u = output['u']
u_tilde = output['u_tilde']
for k, node in enumerate(self.Graph.nodes()):
vertex = self.Graph._node[node]
vertex['alpha'] = alpha[k]
vertex['alpha_tilde'] = alpha_tilde[k]
vertex['alpha_hat'] = alpha_hat[k]
vertex['u'] = u[k]
vertex['u_tilde'] = u_tilde[k]
return self.Graph
def _inv(M):
"""
Private function to compute inverse of matrix M, where M is a numpy.ndarray.
"""
ll, mm = M.shape
M2 = M + 1e-10 * np.eye(ll)
L = np.linalg.cholesky(M2)
inv_L = np.linalg.inv(L)
inv_M = inv_L.T @ inv_L
return inv_M
[docs]def evaluate_model_gradients(points, fungrad, format='matrix'):
""" Evaluates the model gradient at given values.
Parameters
----------
points : numpy.ndarray
An ndarray with shape (number_of_observations, dimensions) at which the gradient must be evaluated.
fungrad : ~collections.abc.Callable
A callable argument for the function's gradients.
format : str, optional
The format in which the output is to be provided: ``matrix`` will output a numpy.ndarray of shape (number_of_observations, dimensions) with gradient values, while ``vector`` will stack all the vectors in this matrix to yield a numpy.ndarray with shape (number_of_observations x dimensions, 1).
Returns
-------
numpy.ndarray
Array of gradient evaluations.
"""
dimensions = len(points[0,:])
if format == 'matrix':
grad_values = np.zeros((len(points), dimensions))
# For loop through all the points
for i in range(0, len(points)):
output_from_gradient_call = fungrad(points[i,:])
for j in range(0, dimensions):
grad_values[i,j] = output_from_gradient_call[j]
return grad_values
elif format == 'vector':
grad_values = np.zeros((len(points) * dimensions, 1))
# For loop through all the points
counter = 0
for i in range(0, len(points)):
output_from_gradient_call = fungrad(points[i,:])
for j in range(0, dimensions):
grad_values[counter, 0] = output_from_gradient_call[j]
counter = counter + 1
return np.mat(grad_values)
else:
raise ValueError( 'evalgradients(): Format must be either matrix or vector!')
[docs]def evaluate_model(points, function):
"""
Evaluates the model function at given values.
Parameters
----------
points : numpy.ndarray
An ndarray with shape (number_of_observations, dimensions) at which the gradient must be evaluated.
function : ~collections.abc.Callable
A callable argument for the function.
Returns
-------
numpy.ndarray
Array of function evaluations.
"""
function_values = np.zeros((len(points), 1))
for i in range(0, len(points)):
function_values[i,0] = function(points[i,:])
return function_values
[docs]def vector_to_2D_grid(coefficients, index_set):
""" Handy function that converts a vector of coefficients into a matrix based on index set values.
Parameters
----------
coefficients : numpy.ndarray
An ndarray with shape (N, 1) where N corresponds to the number of coefficient values.
index_set : numpy.ndarray
The multi-index set of the basis.
Returns
-------
tuple
Tuple (x,y,z,max_order), containing the x,y values of the meshgrid, the coefficient values, and the highest order.
"""
max_order = int(np.max(index_set)) + 1
x, y = np.mgrid[0:max_order, 0:max_order]
z = np.full(x.shape, float('NaN'))
indices = index_set.astype(int)
l = len(coefficients)
coefficients = np.reshape(coefficients, (1, l))
z[indices[:,0], indices[:,1]] = coefficients
return x, y, z, max_order
def cell2matrix(G, W):
dimensions = len(G)
G0 = G[0] # Which by default has to exist!
C0 = G0.T
rows, cols = C0.shape
BigC = np.zeros((dimensions*rows, cols))
counter = 0
for i in range(0, dimensions):
K = np.dot(W, G[i].T)
for j in range(0, rows):
for k in range(0,cols):
BigC[counter,k] = K[j,k]
counter = counter + 1
BigC = np.mat(BigC)
return BigC