"""
Helper functions required for HOSA.
"""
from itertools import product
import numpy as np
from numpy.lib.stride_tricks import as_strided
from sklearn.metrics import accuracy_score, multilabel_confusion_matrix, roc_auc_score, \
balanced_accuracy_score
[docs]def sliding_window(x, window_size):
"""
Creates a sliding window view of `x` according to the window size specified.
.. note::
This function is based on the NumPy's function `sliding_window_view`. See
`numpy.lib.stride_tricks.sliding_window_view
<https://numpy.org/devdocs/reference/generated/numpy.lib.stride_tricks
.sliding_window_view.html>`_.
Args:
x (numpy.ndarray): Input data.
window_size (int): Size of the sliding window.
Returns:
(numpy.ndarray): Returns a sliding window view of the array.
"""
window_size = (window_size,)
x = np.array(x, copy=False, subok=False)
axis = tuple(range(x.ndim))
out_strides = x.strides + tuple(x.strides[ax] for ax in axis)
x_shape_trimmed = list(x.shape)
for ax, dim in zip(axis, window_size):
x_shape_trimmed[ax] -= dim - 1
out_shape = tuple(x_shape_trimmed) + window_size
return as_strided(x, strides=out_strides, shape=out_shape, subok=False, writeable=False)
[docs]def create_overlapping(x, y, model, n_overlapping_epochs=0, overlapping_type=None, n_stride=1,
n_timesteps=None):
"""
Depending on the model chosen, prepare the data with segmented windows according to the
number of epochs and overlapping type.
Args:
x (numpy.ndarray): Input data.
y (numpy.ndarray or None): Target values (class labels in classification, real numbers in
regression). If `None`, the parameter will be ingored.
model (object): Class of the object to be optimized. Available options are:
:class:`.RNNClassification`, :class:`.RNNRegression`, :class:`.CNNClassification` and
:class:`.CNNRegression`.
n_overlapping_epochs (int): Number of epochs to be overlapped (in other words,
the overlap duration).
overlapping_type (str or None): Type of overlapping to perform on the data. Available
options are
`central`, where the target value corresponds to the central epoch of the overlapping
window; `left`, where the target value corresponds to the rightmost epoch of the
overlapping window and `right`, where the target value corresponds to the leftmost epoch
of the overlapping window. When `n_overlapping_epochs=0`, this parameter is ignored.
n_stride (int): Number of strides to apply to the data.
n_timesteps (int): Number of timesteps to apply to the data for recurrent models,
in other words, the number of lagged observations to be used in the model. **Only used
when `model=RNNClassification` or `model=RNNRegression`.**
Returns:
tuple: Returns a tuple with the input data (`x`) and target values (`y`)—or `None` if
`y=None`—, both in segmented window view.
"""
def cnn(x, y, n_overlapping_epochs, overlapping_type, n_stride):
if n_overlapping_epochs < 0:
raise ValueError(
'The number of overlapping epochs should be zero or a positive number.')
n_points = x.shape[1]
if n_overlapping_epochs == 0:
window_size = n_points
y_windowed = y[::n_stride] if y is not None else None
elif overlapping_type == 'central':
window_size = n_points * (2 * n_overlapping_epochs + 1)
y_windowed = y[
n_overlapping_epochs:-n_overlapping_epochs:n_stride] if y is not None \
else None
elif overlapping_type == 'left':
window_size = n_points * (n_overlapping_epochs + 1)
y_windowed = y[n_overlapping_epochs::n_stride] if y is not None else None
elif overlapping_type == 'right':
window_size = n_points * (n_overlapping_epochs + 1)
y_windowed = y[:-n_overlapping_epochs:n_stride] if y is not None else None
else:
raise ValueError(
f'`{overlapping_type}` is not a valid type. The available types are: '
f'`central`, `left` and `right`.')
x_flatten = x.flatten()
if window_size > len(x_flatten):
raise ValueError('Not enough data to create the overlapping window.')
idx = np.arange(len(x_flatten))
idx_win = sliding_window(idx, window_size)[::n_points * n_stride]
x_windowed = x_flatten[idx_win]
return x_windowed, y_windowed
def rnn(x, y, n_timesteps, n_overlapping_epochs, overlapping_type, n_stride):
if n_timesteps is None:
raise ValueError('`timesteps` must be defined.')
x_windowed, y_windowed = cnn(x, y, n_overlapping_epochs, overlapping_type, n_stride)
y_windowed = y_windowed[n_timesteps - 1:] if y_windowed is not None else None
idx = np.arange(len(x_windowed))
idx_win = sliding_window(idx, n_timesteps)
x_windowed = x_windowed[idx_win]
return x_windowed, y_windowed
# According to the model, initialize the overlapping function
if 'cnn' in str(model):
return cnn(x, y, n_overlapping_epochs, overlapping_type, n_stride)
elif 'rnn' in str(model):
return rnn(x, y, n_timesteps, n_overlapping_epochs, overlapping_type, n_stride)
else:
raise ValueError('The type of the model is invalid.')
[docs]def metrics_multiclass(y_true, y_probs, n_classes, imbalance_correction=False):
"""Computes the performance metrics for classification problems.
This function supports multiclass classification, being, in this case, the metrics given in
terms of the average value, or weighed average if `imbalance_correction=True`.
Args:
y_true (numpy.ndarray): Ground truth (correct) labels.
y_probs (numpy.ndarray): Probability estimates.
n_classes (int): Number of classes (or labels) of the classification problem.
imbalance_correction (bool): `True` if correction for imbalance should be applied to the
metrics; `False` otherwise.
Returns:
(tuple): Returns a tuple with the metrics for AUC, accuracy, sensitivity, and specificity.
"""
y_pred = np.argmax(y_probs, axis=1)
mcm = multilabel_confusion_matrix(y_true, y_pred)
tn, tp, fn, fp = mcm[:, 0, 0], mcm[:, 1, 1], mcm[:, 1, 0], mcm[:, 0, 1]
if imbalance_correction:
classes_weight = np.sum(mcm[:, 1, :], axis=1) / np.sum(mcm[:, 1, :])
sensitivity = np.average(tp / (tp + fn), weights=classes_weight)
specificity = np.average(tn / (fp + tn), weights=classes_weight)
accuracy = balanced_accuracy_score(y_true, y_pred)
else:
sensitivity = np.mean(tp / (tp + fn))
specificity = np.mean(tn / (fp + tn))
accuracy = accuracy_score(y_true, y_pred)
if n_classes > 2:
if imbalance_correction:
auc_value = roc_auc_score(y_true, y_probs, average='weighted', multi_class='ovr')
else:
auc_value = roc_auc_score(y_true, y_probs, average='macro', multi_class='ovr')
else:
auc_value = roc_auc_score(y_true, y_probs[:, 1])
return auc_value, accuracy, sensitivity, specificity
[docs]def create_parameter_grid(param_grid):
"""This function generates an iterator that can be traversed through all the parameter value
combinations.
The order of the generated parameter combinations is deterministic, being done according to
the total number of values to try in each parameter in descending order.
Args:
param_grid (dict): Dictionary with parameters names (`str`) as keys and lists of
parameter settings to try as values.
"""
for p in param_grid:
# Always sort the keys of a dictionary, for reproducibility
keys_sorted = sorted(p, key=lambda key: len(p[key]), reverse=True)
items = [(key, p[key]) for key in keys_sorted]
if not items:
yield {}
else:
keys, values = zip(*items)
for v in product(*values):
params = dict(zip(keys, v))
yield params
[docs]def prepare_param_overlapping(specification):
"""
Prepares, considering the given specification, the parameters for creating the input and
output overlapping.
Args:
specification (dict): Parameter names mapped to their values.
Returns:
tuple: Returns a tuple containing the overlapping type, number of overlapping epochs,
strides, and timesteps.
"""
if 'overlapping_epochs' in specification:
overlapping_epochs = specification['overlapping_epochs']
else:
overlapping_epochs = 0
if overlapping_epochs > 0 and 'overlapping_type' in specification:
overlapping_type = specification['overlapping_type']
else:
overlapping_type = None
if 'stride' in specification:
stride = specification['stride']
else:
stride = 1
if 'timesteps' in specification:
timesteps = specification['timesteps']
else:
timesteps = None
return overlapping_type, overlapping_epochs, stride, timesteps