import numpy as np
from copy import deepcopy
from equadratures.parameter import Parameter
from equadratures import Weight
from equadratures.poly import Poly
from equadratures.basis import Basis
import equadratures.plot as plot
from urllib.parse import quote
[docs]class PolyTree(object):
""" Definition of a polynomial tree object.
Parameters
----------
splitting_criterion : str, optional
The type of splitting_criterion to use in the fit function. Options include ``model_aware`` which fits polynomials for each candidate split, ``model_agnostic`` which uses a standard deviation based model-agnostic split criterion [1], and ``loss_gradient`` which uses a gradient based splitting criterion similar to that in [2].
max_depth : int, optional
The maximum depth which the tree will grow to.
min_samples_leaf : int, optional
The minimum number of samples per leaf node.
order : int, optional
The order of the generated orthogonal polynomials.
basis : str, optional
The type of index set used for the basis. Options include: ``univariate``, ``total-order``, ``tensor-grid``, ``sparse-grid`` and ``hyperbolic-basis``.
search : str, optional
The method of search to be used. Options are ``grid`` or ``exhaustive``.
samples : int, optional
The interval between splits if ``grid`` search is chosen.
verbose : bool, optional
For debugging.
all_data : bool, optional
Store data at all nodes in :class:`~PolyTree` (instead of only leaf nodes).
split_dims : list, optional
List of dimensions along which to make splits.
k : float, optional
The smoothing parameter. Range from 0.0 to 1.0, with 0 giving no smoothing, and 1 giving maximum smoothing.
distribution : str, optional
The type of input parameter distributions. Either ``uniform`` or ``data``.
Example
-------
>>> tree = polytree.PolyTree()
>>> X = np.loadtxt('inputs.txt')
>>> Xtest = np.loadtxt('inputs_test.txt')
>>> y = np.loadtxt('outputs.txt')
>>> tree.fit(X,y)
>>> y_test = tree.predict(X_test)
References
----------
1. Wang, Y., Witten, I. H., (1997) Inducing Model Trees for Continuous Classes. In Proc. of the 9th European Conf. on Machine Learning Poster Papers. 128-137. `Paper <https://researchcommons.waikato.ac.nz/handle/10289/1183>`__
2. Broelemann, K., Kasneci, G., (2019) A Gradient-Based Split Criterion for Highly Accurate and Transparent Model Trees. In Int. Joint Conf. on Artificial Intelligence (IJCAI). 2030-2037. `Paper <https://www.ijcai.org/Proceedings/2019/0281.pdf>`__
3. Chan, T. F., Golub, G. H., LeVeque, R. J., (1983) Algorithms for computing the sample variance: Analysis and recommendations. The American Statistician. 37(3): 242–247. `Paper <https://www.tandfonline.com/doi/abs/10.1080/00031305.1983.10483115>`__
"""
def __init__(self, splitting_criterion='model_aware', max_depth=5, min_samples_leaf=None, order=1, basis='total-order', search='exhaustive', samples=50, verbose=False,
poly_method="least-squares", poly_solver_args={},all_data=False,split_dims=None,k=0.05,distribution='uniform'):
self.splitting_criterion = splitting_criterion
self.max_depth = max_depth
self.min_samples_leaf = min_samples_leaf
self.order = order
self.basis = basis
self.tree = None
self.search = search
self.samples = samples
self.verbose = verbose
self.cardinality = None
self.poly_method = poly_method
self.poly_solver_args = poly_solver_args
self.actual_max_depth = 0
self.all_data = all_data
self.k = k
self.distribution = distribution
if split_dims is not None:
split_dims = [split_dims] if not isinstance(split_dims, list) else split_dims
assert all(isinstance(dim, int) for dim in split_dims), "split_dims should be a list if ints"
self.split_dims = split_dims
assert max_depth >= 0, "max_depth must be >= 0"
assert order > 0, "order must be a postive integer"
assert samples > 0, "samples must be a postive integer"
assert k > 0, "k must be a positive number"
[docs] def get_splits(self):
""" Returns all of the data splits made.
Returns
-------
list
A list of splits made in the format of a nested list: [[split, dimension], ...]
"""
def _search_tree(node, splits):
if node["children"]["left"] != None:
if [node["threshold"], node["j_feature"]] not in splits:
splits.append([node["threshold"], node["j_feature"]])
splits = _search_tree(node["children"]["left"], splits)
if node["children"]["right"] != None:
if [node["threshold"], node["j_feature"]] not in splits:
splits.append([node["threshold"], node["j_feature"]])
splits = _search_tree(node["children"]["right"], splits)
return splits
return _search_tree(self.tree, [])
def _split_data(self, j_feature, threshold, X, y):
idx_left = np.where(X[:, j_feature] <= threshold)[0]
idx_right = np.delete(np.arange(0, len(X)), idx_left)
assert len(idx_left) + len(idx_right) == len(X)
return (X[idx_left], y[idx_left]), (X[idx_right], y[idx_right])
[docs] def get_polys(self):
"""
Returns all of the polynomials fitted at each node in the tree.
Returns
-------
list
A list of Poly objects.
"""
def _search_tree(node, polys):
if node["children"]["left"] == None and node["children"]["right"] == None:
polys.append(node["poly"])
if node["children"]["left"] != None:
polys = _search_tree(node["children"]["left"], polys)
if node["children"]["right"] != None:
polys = _search_tree(node["children"]["right"], polys)
return polys
return _search_tree(self.tree, [])
[docs] def fit(self, X, y):
"""
Fits the PolyTree to the provided data.
Parameters
----------
X : numpy.ndarray
Training input data
y : numpy.ndarray
Training output data
"""
def _build_tree():
global index_node_global
def _splitter(node):
# Extract data
X, y = node["data"]
depth = node["depth"]
N, d = X.shape
# Dimensions to split along
if self.split_dims is None:
self.split_dims = range(d)
# Find feature splits that might improve loss
did_split = False
if self.splitting_criterion == "model_aware":
loss_best = node["loss"]
elif self.splitting_criterion == "model_agnostic" or self.splitting_criterion=="loss_gradient":
loss_best = np.inf
else:
raise Exception("invalid splitting_criterion")
data_best = None
polys_best = None
j_feature_best = None
threshold_best = None
if self.verbose:
polys_fit = 0
# Perform threshold split search only if node has not hit max depth
if (depth >= 0) and (depth < self.max_depth):
if self.splitting_criterion != "loss_gradient":
for j_feature in range(d):
last_threshold = np.inf
if self.search == 'exhaustive':
threshold_search = X[:, j_feature]
elif self.search == 'grid':
if self.samples > N:
samples = N
else:
samples = self.samples
threshold_search = np.linspace(np.min(X[:,j_feature]), np.max(X[:,j_feature]), num=samples)
else:
raise Exception('Incorrect search type! Must be \'exhaustive\' or \'grid\'')
# Perform threshold split search on j_feature
for threshold in np.unique(np.sort(threshold_search)):
# Split data based on threshold
(X_left, y_left), (X_right, y_right) = self._split_data(j_feature, threshold, X, y)
#print(j_feature, threshold, X_left, X_right)
N_left, N_right = len(X_left), len(X_right)
# Do not attempt to split if split conditions not satisfied
if not (N_left >= self.min_samples_leaf and N_right >= self.min_samples_leaf):
continue
# Compute weight loss function
if self.splitting_criterion == "model_aware":
loss_left, poly_left = _fit_poly(X_left, y_left)
loss_right, poly_right = _fit_poly(X_right, y_right)
loss_split = (N_left*loss_left + N_right*loss_right) / N
if self.verbose: polys_fit += 2
elif self.splitting_criterion == "model_agnostic":
loss_split = np.std(y) - (N_left*np.std(y_left) + N_right*np.std(y_right)) / N
# Update best parameters if loss is lower
if loss_split < loss_best:
did_split = True
loss_best = loss_split
if self.splitting_criterion == "model_aware": polys_best = [poly_left, poly_right]
data_best = [(X_left, y_left), (X_right, y_right)]
j_feature_best = j_feature
threshold_best = threshold
# Gradient based splitting criterion from ref. [2]
else:
# Fit a single poly to parent node
loss, poly = _fit_poly(X, y)
# Now run the splitting algo using gradients from this poly
did_split, j_feature_best, threshold_best = self._find_split_from_grad(poly, X, y.reshape(-1,1))
# If model_agnostic or gradient based, fit poly's to children now we have split
if self.splitting_criterion != "model_aware" and did_split:
(X_left, y_left), (X_right, y_right) = self._split_data(j_feature_best, threshold_best, X, y)
loss_left, poly_left = _fit_poly(X_left, y_left)
loss_right, poly_right = _fit_poly(X_right, y_right)
N_left, N_right = len(X_left), len(X_right)
loss_best = (N_left*loss_left + N_right*loss_right) / N
polys_best = [poly_left, poly_right]
if self.splitting_criterion == "loss_gradient": data_best = [(X_left, y_left), (X_right, y_right)]
if self.verbose: polys_fit += 2
if self.verbose and did_split: print("Node (X.shape = {}) fitted with {} polynomials generated".format(X.shape, polys_fit))
elif self.verbose: print("Node (X.shape = {}) failed to fit after {} polynomials generated".format(X.shape, polys_fit))
if did_split and depth > self.actual_max_depth:
self.actual_max_depth = depth
# Return the best result
result = {"did_split": did_split,
"loss": loss_best,
"polys": polys_best,
"data": data_best,
"j_feature": j_feature_best,
"threshold": threshold_best,
"N": N}
return result
def _fit_poly(X, y):
# try:
N, d = X.shape
myParameters = []
for dimension in range(d):
values = X[:,dimension]
values_min = np.amin(values)
values_max = np.amax(values)
if (values_min - values_max) ** 2 < 0.01:
values_min -= 0.01
values_max += 0.01
myParameters.append(Parameter(distribution='Uniform', lower=values_min, upper=values_max, order=self.order))
else:
if self.distribution == 'uniform':
myParameters.append(Parameter(distribution='Uniform', lower=values_min, upper=values_max, order=self.order))
elif self.distribution == 'data':
input_dist = Weight(values, support=[values_min, values_max], pdf=False)
myParameters.append(Parameter(distribution='data',weight_function=input_dist,order=self.order))
if self.basis == "hyperbolic-basis":
myBasis = Basis(self.basis, orders=[self.order for _ in range(d)], q=0.5)
else:
myBasis = Basis(self.basis, orders=[self.order for _ in range(d)])
container["index_node_global"] += 1
poly = Poly(myParameters, myBasis, method=self.poly_method, sampling_args={'sample-points':X, 'sample-outputs':y}, solver_args=self.poly_solver_args)
poly.set_model()
mse = np.linalg.norm(y - poly.get_polyfit(X).reshape(-1)) ** 2 / N
# except Exception as e:
# print("Warning fitting of Poly failed:", e)
# print(d, values_min, values_max)
# mse, poly = np.inf, None
return mse, poly
def _create_node(X, y, depth, container):
poly_loss, poly = _fit_poly(X, y)
node = {"name": "node",
"index": container["index_node_global"],
"loss": poly_loss,
"poly": poly,
"data": (X, y),
"n_samples": len(X),
"j_feature": None,
"threshold": None,
"children": {"left": None, "right": None},
"depth": depth,
"flag": False}
container["index_node_global"] += 1
return node
def _split_traverse_node(node, container):
result = _splitter(node)
if not result["did_split"]:
return
node["j_feature"] = result["j_feature"]
node["threshold"] = result["threshold"]
if not self.all_data:
del node["data"]
(X_left, y_left), (X_right, y_right) = result["data"]
poly_left, poly_right = result["polys"]
node["children"]["left"] = _create_node(X_left, y_left, node["depth"]+1, container)
node["children"]["right"] = _create_node(X_right, y_right, node["depth"]+1, container)
node["children"]["left"]["poly"] = poly_left
node["children"]["right"]["poly"] = poly_right
# Split nodes
_split_traverse_node(node["children"]["left"], container)
_split_traverse_node(node["children"]["right"], container)
container = {"index_node_global": 0}
root = _create_node(X, y, 0, container)
_split_traverse_node(root, container)
return root
N, d = X.shape
if self.basis == "hyperbolic-basis":
self.cardinality = Basis(self.basis, orders=[self.order for _ in range(d)], q=0.5).get_cardinality()
else:
self.cardinality = Basis(self.basis, orders=[self.order for _ in range(d)]).get_cardinality()
if self.min_samples_leaf == None or self.min_samples_leaf == self.cardinality:
self.min_samples_leaf = int(np.ceil(self.cardinality * 1.25))
elif self.cardinality > self.min_samples_leaf:
print("WARNING: Basis cardinality ({}) greater than the minimum samples per leaf ({}). This may cause reduced performance.".format(self.cardinality, self.min_samples_leaf))
self.k *= self.min_samples_leaf
self.tree = _build_tree()
[docs] def prune(self, X, y, tol=0.0, percent=False):
""" Prunes the tree that you have fitted.
Parameters
----------
X : numpy.ndarray
Training input data
y : numpy.ndarray
Training output data
tol : float, optional
Pruning tolerance (%). Prune nodes if they only improve loss by less than this tolerance.
percent : bool, optional
If true, tol is taken as a percentage of the parent node's error. Otherwise, tol is taken to be an absolute value.
"""
if percent: tol /= 100.0
def pruner(node, X_subset, y_subset):
if X_subset.shape[0] < 1:
node["test_loss"] = 0
node["n_samples"] = 0
return node
node["test_loss"] = np.linalg.norm(y_subset - node["poly"].get_polyfit(X_subset).reshape(-1)) ** 2 / X_subset.shape[0]
is_left = node["children"]["left"] != None
is_right = node["children"]["right"] != None
if is_left and is_right:
(X_left, y_left), (X_right, y_right) = self._split_data(node["j_feature"], node["threshold"], X_subset, y_subset)
node["children"]["left"] = pruner(node["children"]["left"], X_left, y_left)
node["children"]["right"] = pruner(node["children"]["right"], X_right, y_right)
lower_loss = ( node["children"]["left"]["test_loss"] * node["children"]["left"]["n_samples"] + node["children"]["right"]["test_loss"] * node["children"]["right"]["n_samples"] ) / ( node["children"]["left"]["n_samples"] + node["children"]["right"]["n_samples"] )
node["lower_loss"] = lower_loss
if percent:
loss_eps = tol * node["test_loss"]
else:
loss_eps = tol
print(lower_loss + loss_eps, node["test_loss"])
if lower_loss + loss_eps > node["test_loss"]:
if self.verbose: print("prune",lower_loss, node["test_loss"], node["children"]["left"]["test_loss"], node["children"]["left"]["n_samples"], node["children"]["right"]["test_loss"], node["children"]["right"]["n_samples"])
node["children"]["left"] = None
node["children"]["right"] = None
return node
assert self.tree is not None, "Run fit() before prune()"
(X_left, y_left), (X_right, y_right) = self._split_data(self.tree["j_feature"], self.tree["threshold"], X, y)
self.tree["children"]["left"] = pruner(self.tree["children"]["left"], X_left, y_left)
self.tree["children"]["right"] = pruner(self.tree["children"]["right"], X_right, y_right)
[docs] def predict(self, X):
""" Evaluates the the polynomial tree approximation of the data.
Parameters
----------
X : numpy.ndarray
An ndarray with shape (number_of_observations, dimensions) at which the tree fit must be evaluated at.
Returns
-------
numpy.ndarray
Array with shape (1, number_of_observations) corresponding to the polynomial approximations of the tree.
"""
def _predict(node, indexes):
y_pred[indexes, node["depth"], 0] = node["poly"].get_polyfit(X[indexes]).reshape(-1)
y_pred[indexes, node["depth"], 1] = np.full(fill_value=node["n_samples"], shape=len(indexes))
no_children = node["children"]["left"] is None and \
node["children"]["right"] is None
if no_children: return
idx_left = np.where(X[indexes, node["j_feature"]] <= node["threshold"])[0]
idx_right = np.where(X[indexes, node["j_feature"]] > node["threshold"])[0]
_predict(node["children"]["left"], indexes[idx_left])
_predict(node["children"]["right"], indexes[idx_right])
assert self.tree is not None
y_pred = np.empty(shape=(X.shape[0], self.actual_max_depth + 2, 2)) * np.nan
_predict(self.tree, np.arange(0, X.shape[0]))
smoothed_y_pred = np.zeros(shape=(X.shape[0]))
for y in range(0,X.shape[0]):
i = self.actual_max_depth + 1
while np.isnan(y_pred[y][i][0]) and i > 0:
i-=1
smoothed_y = y_pred[y][i][0]
#print(y_pred[i])
while i > 0:
n_i = y_pred[y][i][1]
if n_i == 0: break
#print(smoothed_y)
smoothed_y = (smoothed_y * n_i + y_pred[y][i][0] * self.k) / (self.k + n_i)
i-=1
#print("\n")
smoothed_y_pred[y] = smoothed_y
return smoothed_y_pred
[docs] def apply(self,X):
""" Returns the leaf node index for each observation in the data.
Parameters
----------
X : numpy.ndarray
Array with shape (number_of_observations, dimensions) at which the tree fit must be evaluated at.
Returns
-------
numpy.ndarray
A numpy.ndarray of shape (number_of_observations,1) corresponding to the node indices for each observation in X.
"""
def _apply(node, indexes):
no_children = node["children"]["left"] is None and \
node["children"]["right"] is None
if no_children:
inode[indexes] = node["index"]
return
idx_left = np.where(X[indexes, node["j_feature"]] <= node["threshold"])[0]
idx_right = np.where(X[indexes, node["j_feature"]] > node["threshold"])[0]
_apply(node["children"]["left"], indexes[idx_left])
_apply(node["children"]["right"], indexes[idx_right])
if X.ndim == 1: X = X.reshape(1,-1)
inode = np.zeros(shape=X.shape[0],dtype=int)
_apply(self.tree, np.arange(0, X.shape[0]))
return inode
[docs] def get_leaves(self):
""" Returns the node indices for all leaf nodes.
Returns
-------
list
Contains the node indices of all leaf nodes.
"""
def _recurse(node,leaf_list):
no_children = node["children"]["left"] is None and \
node["children"]["right"] is None
if no_children:
leaf_list.append(node["index"])
return
_recurse(node["children"]["left"],leaf_list)
_recurse(node["children"]["right"],leaf_list)
leaf_list = []
_recurse(self.tree,leaf_list)
return leaf_list
[docs] def get_mean_and_variance(self):
""" Computes the mean and variance of the polynomial tree model.
Returns
-------
tuple
Tuple (mean,variance) containing two floats; the approximated mean and variance from the fitted PolyTree.
"""
# Get volume of polytree domain
root_poly = self.tree["poly"]
root_vol = self._calc_domain_vol(root_poly)
# Get leaf nodes
leaves = self.get_leaves()
# Summation over all leaf nodes in the tree
mean = 0.
var = 0.
for leaf in leaves:
leaf_poly = self.get_node(leaf)["poly"]
leaf_vol = self._calc_domain_vol(leaf_poly)
coeffs = leaf_poly.coefficients
# Compute mean
mean += (leaf_vol/root_vol) * float(coeffs[0])
# Compute variance
tmp = 0.
for i in range(0, len(coeffs)):
tmp += float(coeffs[i]**2)
var += (leaf_vol/root_vol) * tmp
var -= mean**2
return mean, var
[docs] def get_graphviz(self, X=None, feature_names=None, file_name=None):
""" Generates a graphviz visualisation of the PolyTree.
Parameters
----------
X : numpy.ndarray, optional
An ndarray with shape (dimensions) containing an input vector for a given sample, to highlight in the tree.
feature_names : list, optional
A list of the names of the features used in the training data.
filename : str, optional
Filename to write graphviz data to. If ``None`` (default) then rendered in-place, if ``'source'``, the raw graphviz string is returned.
"""
from graphviz import Digraph
g = Digraph('g', node_attr={'shape': 'record', 'height': '.1'})
if feature_names is None:
dim = self.tree["poly"].dimensions
feature_names = ['x_%d'%i for i in range(dim)]
def _build_graphviz_recurse(node, parent_node_index=0, parent_depth=0, edge_label="",labelangle=0):
# Empty node
if node is None:
return
# Create node
node_index = node["index"]
if node["children"]["left"] is None and node["children"]["right"] is None:
threshold_str = ""
leaf = True
else:
threshold_str = "{} <= {:.3f}\\n".format(feature_names[node['j_feature']], node["threshold"])
leaf = False
if "lower_loss" in node:
label_str = "node {} \\n {} n_samples = {}\\n loss = {:.6f}\\n lower_loss = {}".format(node_index,threshold_str, node["n_samples"], node["test_loss"], node["lower_loss"])
elif "test_loss" in node:
label_str = "node {} \\n {} n_samples = {}\\n loss = {:.6f}".format(node_index,threshold_str, node["n_samples"], node["test_loss"])
else:
label_str = "node {} \\n {} n_samples = {}\\n loss = {:.6f}".format(node_index,threshold_str, node["n_samples"], node["loss"])
# Create node
if leaf:
nodeshape = "rectangle"
style = ["rounded"]
fillcolor = "#E4fEE4"
else:
nodeshape = "rectangle"
style = ["filled"]
fillcolor = "#EBFAFF"
if node["flag"]:
style.append('bold')
bordercolor = "black"
fontcolor = "black"
g.attr('node', label=label_str, shape=nodeshape)
g.node('{}'.format(node_index),
color=bordercolor, style=', '.join(style),
fillcolor=fillcolor, fontcolor=fontcolor)
# Create edge
if parent_depth > 0:
if node["flag"]:
edgecolor = 'orange'
style = 'bold'
else:
edgecolor = 'black'
style = 'solid'
if parent_depth > 1: edge_label = '' # Only label True/False for root node
g.edge('{}'.format(parent_node_index),
'{}'.format(node_index), headlabel=edge_label, color=edgecolor,style=style,labeldistance="2.5",labelangle=labelangle)
# Traverse child or append leaf value
_build_graphviz_recurse(node["children"]["left"],
parent_node_index=node_index,
parent_depth=parent_depth + 1,
edge_label="True",labelangle="45")
_build_graphviz_recurse(node["children"]["right"],
parent_node_index=node_index,
parent_depth=parent_depth + 1,
edge_label="False",labelangle="-45")
def _flag_tree_walk(node,X):
node["flag"] = True
if node["children"]["left"] is None and \
node["children"]["right"] is None:
return
if X[node["j_feature"]] <= node["threshold"]:
return _flag_tree_walk(node["children"]["left"],X)
if X[node["j_feature"]] > node["threshold"]:
return _flag_tree_walk(node["children"]["right"],X)
# Flag the node path to highlight later
if X is not None:
_flag_tree_walk(self.tree,X)
# Build graph
_build_graphviz_recurse(self.tree,
parent_node_index=0,
parent_depth=0,
edge_label="")
if file_name == 'source':
return g.source
elif file_name is None:
try:
g.render(view=True)
except:
file_name = 'tree.dot'
print("GraphViz source file written to " + file_name + " and can be viewed using an online renderer. Alternatively you can install graphviz on your system to render locally")
if file_name is not None: # not elif here as file_name might be updated in try-except above
with open(file_name, "w") as file:
file.write(str(g.source))
[docs] def get_node(self, inode):
""" Returns the node corresponding to a given node number.
Parameters
----------
inode : int
The node number.
Returns
-------
dict
Dictionary containing the data for the requested node.
"""
# Find node with given index inode. Traverse all children until correct node found.
def _get_node_from_n(node):
if node is not None: # Need to check if node is None here as below _get_node_from_n() calls on children will result in None if leaf node
if node["index"] == inode:
return node
else:
result = _get_node_from_n(node["children"]["right"])
if result is None:
result = _get_node_from_n(node["children"]["left"])
return result
else:
return None
return _get_node_from_n(self.tree)
[docs] def get_paths(self,X=None):
""" Returns the tree paths for the leaf nodes in the tree.
Parameters
----------
X : numpy.ndarray, optional
Array with shape (number_of_observations, dimensions) to apply the tree to. If given, paths will only be returned for leaves which contain observations.
Returns
-------
dict
Dictionary containing a dict for each leaf node. Indexed by the node indices for the leaf nodes.
"""
def _find_path(node, path, i):
"""
Private recursive function to find path through a tree for a given leaf node.
"""
node_index = node["index"]
info = {'node':node_index,'j':node["j_feature"],'thresh':node["threshold"]}
path.append(info)
if node_index == i:
return True
left = False
right = False
if node["children"]["left"] is not None:
left = _find_path(node["children"]["left"], path, i)
if node["children"]["right"] is not None:
right = _find_path(node["children"]["right"], path, i)
if left or right :
return True
path.remove(info)
return False
# Get leaf node id's
if X is None:
leave_id = self.get_leaves()
else:
# Get leaf nodes
leave_id = self.apply(X)
# Loop through leaves and find path for each.
paths ={}
for leaf in np.unique(leave_id):
path_leaf = []
_find_path(self.tree, path_leaf, leaf)
# Set split info to None for leaf node
path_leaf[-1]["j"] = None
path_leaf[-1]["thresh"] = None
# Save in dict
paths[leaf] = path_leaf
return paths
[docs] def plot_decision_surface(self,ij,ax=None,X=None,y=None,max_depth=None,label=True,
color='data',colorbar=True,show=True,kwargs={}):
""" Plots the decision boundaries of the PolyTree over a 2D surface. See :meth:`~equadratures.plot.plot_decision_surface` for full description. """
return plot.plot_decision_surface(self,ij,ax,X,y,max_depth,label,color,colorbar,show,kwargs)
def _find_split_from_grad(self,model, X, y):
""" Private method to find the optimal split point for a tree node based on the training data in that node.
Parameters
----------
model : Poly
An instance of the Poly class, corresponding to the Poly belonging to the tree node.
X : numpy.ndarray
An ndarray with shape (number_of_observations, dimensions) containing the input data belonging to the tree node.
y : numpy.ndarray
An ndarray with shape (number_of_observations, 1) containing the response data belonging to the tree node.
Returns
-------
tuple
Tuple (did_split, split_dim, split_val), where:
did_split (bool): True if a split was found, otherwise False.
split_dim (int): The dimension in X within which the best split was found.
split_val (float): The location of the best split.
"""
renorm = True
N,D = np.shape(X)
# Gradient of loss wrt model coefficients
P = model.get_poly(X).T
r = y-model.get_polyfit(X)
g = r*P
# Sum of gradients
gsum = g.sum(axis=0)
# Loop through all dimensions in X
split_dim = None
split_val = None
gain_max = -np.inf
for d in self.split_dims:
# Sort along feature i
sort = np.argsort(X[:,d])
Xd = X[sort,d]
# Find unique values along one column. #TODO - grid search option
_,splits = np.unique(Xd,return_index=True)
splits = splits[1:]
# Number of samples on left and right split
N_l = splits
N_r = N - N_l
# Only take splits where both children have more than `min_samples_leaf` samples
idx = np.minimum(N_l, N_r) >= self.min_samples_leaf
splits = splits[idx]
N_l = N_l[idx].reshape(-1,1)
N_r = N_r[idx].reshape(-1,1)
# If we've run out of candidate spilts, skip
if len(splits) <= 1:
continue
# Sums of gradients for left and right
gsum_left = g[sort,:].cumsum(axis=0)
gsum_left = gsum_left[splits-1,:]
gsum_right = gsum - gsum_left
# Renorm. gradients to zero mean and unit std
if renorm:
mu_l, mu_r, sigma_l, sigma_r = self._get_mean_and_sigma(P[:,1:],splits,N_l,N_r,sort)
gsum_left = self._renormalise( gsum_left, 1/sigma_l, -mu_l/sigma_l)
gsum_right = self._renormalise(gsum_right, 1/sigma_r, -mu_r/sigma_r)
# Compute the Gain (see Eq. (6) in [1])
gain = (gsum_left**2).sum(axis=1)/N_l.reshape(-1) + (gsum_right**2).sum(axis=1)/N_r.reshape(-1)
# Find best gain and compare with previous best
best_idx = np.argmax(gain)
gain = gain[best_idx]
if gain > gain_max:
gain_max = gain
best_split_dim = d
best_split_val = 0.5*(Xd[splits[best_idx] - 1] + Xd[splits[best_idx]])
# If gain_max stilll == -np.inf, we must have passed through all features w/o finding a split
# so return False. Otherwise return True and the spilt details.
if gain_max == -np.inf:
return False, None, None
else:
return True, best_split_dim, best_split_val
@staticmethod
def _get_mean_and_sigma(X,splits,N_l,N_r,sort):
""" Computes mean and standard deviation of the data in array X, when it is
split in two by the threshold values in the splits array. The data is offset by
its mean to avoid catastrophic cancellation when computing the variance (see ref. [3]).
Parameters
----------
X : numpy.ndarray
Arrray with dimensions (N,ndim) containing the orthogonal polynomials P.
splits : numpy.ndarray
Array of split locations.
N_l : numpy.ndarray
Array containing info on number of samples to left of splits.
N_r : numpy.ndarray
Array containing info on number of samples to right of splits.
sort : numpy.ndarray
Index array to reorder X.
"""
# Min value of sigma (for stability later)
epsilon = 0.001
# Reorder, and shift X by mean
mu = np.reshape(np.mean(X, axis=0), (1, -1))
Xshift = X[sort] - mu
# Cumulative sums (and sums of squares) for left and right splits
Xsum_l = Xshift.cumsum(axis=0)
Xsum_r = Xsum_l[-1:,:] - Xsum_l
X2sum_l = (Xshift**2).cumsum(axis=0)
X2sum_r = X2sum_l[-1:,:] - X2sum_l
# Compute mean of left and right side for all splits
mu_l = Xsum_l[splits-1,:] / N_l
mu_r = Xsum_r[splits-1,:] / N_r
# Compute standard deviation of left and right side for all splits
sigma_l = np.sqrt(np.maximum(X2sum_l[splits-1,:]/(N_l-1)-mu_l**2, epsilon**2))
sigma_r = np.sqrt(np.maximum(X2sum_r[splits-1,:]/(N_r-1)-mu_r**2, epsilon**2))
# Correct for previous shift
mu_l = mu_l + mu
mu_r = mu_r + mu
return mu_l, mu_r, sigma_l, sigma_r
@staticmethod
def _renormalise(gradients, a, c):
"""
Renormalises gradients according to according to eq. (14) of [1].
Parameters
----------
gradients : numpy.ndarray
Array with shape (n_samples, n_params), containing the gradients.
a : numpy.ndarray
Array with shape (n_samples, n_params-1) containing the normalisation factors.
c: numpy.ndarray
Array with shape (n_samples, n_params-1) containing the normalisation offsets.
Returns
-------
gradients : numpy.ndarray
Array with shape (n_samples, n_params) containing the renormalised gradients.
"""
c = c*gradients[:,0].reshape(-1,1)
gradients[:,1:] = gradients[:,1:] * a + c
return gradients
@staticmethod
def _calc_domain_vol(Polynomial):
params = Polynomial.parameters
vol = 1.
for param in params:
vol *= param.upper - param.lower
return vol