"""
Functions are used to forecaset House Price Index (HPI) using ARX model.
It includes two forecast performance evaluation test:
- Testing the equality of prediction mean squared errors: David I. Harvey, Stephen J. Leybourne, Paul Newbold (1997)
- Tests for Forecast Encompassing: David I. Harvey, Stephen J. Leybourne, Paul Newbold (1998)
"""
#%%
# Imports
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import ast
from IPython.display import display
from itertools import combinations
import statsmodels.api as sm
from sklearn.linear_model import LinearRegression # ElasticNetCV,
from sklearn.decomposition import PCA
from sklearn.model_selection import cross_validate, KFold # cross_val_score,
import scipy.stats as ss
#%%
[docs]
def lags_list_function(lags, max_lag):
"""
Generate a list of lag combinations based on the specified `lags` parameter and maximum lag value.
Parameters
----------
lags : {'Auto', 'glob', list of int}
Determines the type of lag combinations to generate:
- 'Auto': Automatically generates sequential lags up to `max_lag`.
- 'glob': Generates all possible combinations of lags up to `max_lag`.
- list of int: Uses the specified list of lags directly.
max_lag : int
The maximum lag value to consider when generating lag combinations.
Returns
-------
lags_list : list of list of int
A list where each element is a list of integers representing a combination
of lag values. The specific combinations depend on the input `lags` parameter.
Examples
--------
>>> lags_list_function('Auto', 3)
[[], [1], [1, 2], [1, 2, 3]]
>>> lags_list_function('glob', 2)
[[], [1], [2], [1, 2]]
>>> lags_list_function([1, 2], 3)
[[1, 2]]
"""
full_lags = list(range(1, max_lag+1))
all_combo = []
for r in range(0, max_lag+1):
for combo in combinations(full_lags, r):
all_combo.append(list(combo))
if lags == 'Auto':
lags_list = [full_lags[:i] for i in range(0, max_lag+1)]
elif lags == 'glob':
lags_list = all_combo
else:
lags_list = [lags]
return lags_list
#%%
[docs]
def result_table(index, header, h_list, index_name, first_index='None'):
"""
Create a multi-indexed DataFrame with specified index and column headers.
Parameters
----------
index : list of str
A list of labels for the DataFrame's index.
header : str
The header label for the DataFrame's columns.
h_list : list of int
A list of integers that will be appended to the header label to create the column names.
index_name : str
The name to assign to the DataFrame's index.
first_index : str, optional
The label for the first index position. Defaults to 'None'.
Returns
-------
df : pandas.DataFrame
A DataFrame with a MultiIndex for the columns, where the first level is the `header` and
the second level corresponds to 'h=' followed by each element in `h_list`. The DataFrame's
index is set to the provided `index` list, with an optional `first_index` prepended.
Examples
--------
>>> result_table(['A', 'B', 'C'], 'Metric', [1, 2, 3], 'Category')
Metric
h=1 h=2 h=3
None NaN NaN NaN
A NaN NaN NaN
B NaN NaN NaN
C NaN NaN NaN
>>> result_table(['X', 'Y'], 'Value', [1, 2], 'Type', 'Start')
Value
h=1 h=2
Start NaN NaN
X NaN NaN
Y NaN NaN
"""
df_index = pd.Index([first_index] + index, name=index_name)
df_columns = pd.MultiIndex.from_tuples([(header, 'h='+str(i)) for i in h_list])
df = pd.DataFrame(index=df_index, columns=df_columns)
return df
#%%
[docs]
def compute_ic_cv(y, X, metric, cv_criteria='MSFE', fit_intercept=True, cv=5, shuffle=False, n_iter=20, seed=None):
"""
Compute the information criterion (IC) or cross-validation (CV) score based on the specified metric and criteria.
Parameters
----------
y : array-like or pandas.Series
The dependent variable vector (target).
X : array-like or pandas.DataFrame
The independent variable matrix (features).
metric : {'CV', 'IC'}
The type of metric to compute:
- 'CV': Cross-validation metric based on the `cv_criteria`.
- 'IC': Information criterion (e.g., BIC).
cv_criteria : {'MSFE', 'MAFE'}, optional
The criterion used to compute the CV score when `metric` is 'CV':
- 'MSFE': Mean Squared Forecast Error (negative mean squared error).
- 'MAFE': Mean Absolute Forecast Error (negative mean absolute error).
Default is 'MSFE'.
fit_intercept : bool, optional
Whether to calculate the intercept for the linear model. If set to False, no intercept will be used in calculations. Default is True.
cv : int, optional
The number of folds in cross-validation. Default is 5.
shuffle : bool, optional
Whether to shuffle the data before splitting into batches in cross-validation. Default is False.
n_iter : int, optional
The number of iterations for cross-validation when `shuffle` is True. Default is 20.
seed : int, optional
The random seed for reproducibility when shuffling data in cross-validation. Default is None.
Returns
-------
ic : float
The computed information criterion (IC) or cross-validation (CV) score.
Examples
--------
>>> compute_ic_cv(y, X, metric='CV', cv_criteria='MSFE', cv=5)
0.045
>>> compute_ic_cv(y, X, metric='IC')
210.34
"""
n_iter = n_iter if shuffle else 1
if metric == 'CV':
ics = []
for i in range(n_iter):
splitter = KFold(n_splits=cv, shuffle=shuffle, random_state=seed)
scores = cross_validate(LinearRegression(fit_intercept=fit_intercept), X, y, cv=splitter, scoring=['neg_mean_absolute_error','neg_mean_squared_error'])
# scores = cross_val_score(LinearRegression(fit_intercept=fit_intercept), X, y, cv=cv, scoring='neg_mean_squared_error')
if cv_criteria == 'MAFE':
ics.append(-scores['test_neg_mean_absolute_error'].mean())
elif cv_criteria == 'MSFE':
ics.append(-scores['test_neg_mean_squared_error'].mean())
# ics.append(-scores.mean())
ic = np.mean(ics)
elif metric == 'IC':
ic = sm.OLS(y, X).fit().info_criteria('bic') # .astype(float) should be added to X_train in CoCalc
return ic
#%%
[docs]
def lag_selector(df, lag_select, seed=None, cv_criteria='MSFE', cv=5, shuffle=False, n_iter=20, h=1, max_lag=13, exog=None, var_order='cross', y_lags='Auto', exog_lags='Auto', seasonal=False, verbose=0):
"""
Select optimal lags for the dependent and exogenous variables using information criteria (IC) or cross-validation (CV) metrics.
Parameters
----------
df : pandas.DataFrame
The input DataFrame containing the time series data. The first column is assumed to be the dependent variable ('y').
lag_select : {'IC', 'CV'}
The metric used to select the optimal lags:
- 'IC': Information criterion (e.g., BIC).
- 'CV': Cross-validation score based on `cv_criteria`.
seed : int, optional
Random seed for reproducibility in cross-validation. Default is None.
cv_criteria : {'MSFE', 'MAFE'}, optional
The criterion used for cross-validation when `lag_select` is 'CV':
- 'MSFE': Mean Squared Forecast Error (default).
- 'MAFE': Mean Absolute Forecast Error.
cv : int, optional
Number of folds for cross-validation. Default is 5.
shuffle : bool, optional
Whether to shuffle the data before splitting into batches for cross-validation. Default is False.
n_iter : int, optional
Number of iterations for cross-validation when `shuffle` is True. Default is 20.
h : int, optional
Forecast horizon, indicating how many steps ahead the model is predicting. Default is 1.
max_lag : int, optional
The maximum lag order to consider for selection. Default is 13.
exog : str, optional
Name of the exogenous variable in the DataFrame. Default is None.
var_order : {'cross', 'nested'}, optional
The order in which lag selection is performed:
- 'cross': Cross all combinations of lags for 'y' and 'exog'.
- 'nested': Select the best lags for 'y' first, then choose the best lags for 'exog' based on the selected 'y' lags. Default is 'cross'.
y_lags : {'Auto', 'glob', list of int}, optional
The lags to consider for the dependent variable:
- 'Auto': Automatically generate lag sequences up to `max_lag`.
- 'glob': Generates all possible combinations of lags up to `max_lag`.
- list of int: Use the specified lags directly. Default is 'Auto'.
exog_lags : {'Auto', 'glob', list of int}, optional
The lags to consider for the exogenous variable, if applicable:
- 'Auto': Automatically generate lag sequences up to `max_lag`.
- 'glob': Generates all possible combinations of lags up to `max_lag`.
- list of int: Use the specified lags directly. Default is 'Auto'.
seasonal : bool, optional
Whether to include seasonal dummies (monthly) in the model. Default is False.
verbose : int, optional
If greater than 1, prints detailed information about the lag selection process. Default is 0.
Returns
-------
y_lags : list of int
The optimal lags for the dependent variable ('y') based on the specified metric.
exog_lags : list of int
The optimal lags for the exogenous variable based on the specified metric.
IC : float
The minimum information criterion (IC) or cross-validation score obtained.
ics : dict
A dictionary where keys are the IC/CV scores and values are the corresponding lags for 'y' and 'exog'.
Examples
--------
>>> lag_selector_IC_CV(df, lag_select='CV', cv_criteria='MSFE', h=1)
([1, 2, 3], [1], 0.034, {...})
>>> lag_selector_IC_CV(df, lag_select='IC', exog='ExogVar', h=2, max_lag=5)
([1, 3], [1], 210.45, {...})
"""
df = df.copy()
y_lags_list = lags_list_function(y_lags, max_lag)
exog_lags_list = lags_list_function(exog_lags, max_lag) if exog else [[]]
df['const'] = 1
base_cols = ['const']
for i in range(h, max_lag + h):
df['y.L' + str(i)] = df.iloc[:,0].shift(i)
if exog:
for i in range(h, max_lag + h):
df[exog + '.L' + str(i)] = df.loc[:,exog].shift(i)
df = df.dropna() # To get the same results with ar_select_order(), we should drop NaNs before starting to get ICs.
# That means we wont have, for example, the first 13 rows even when try to get IC of lag=[1,2] when max_lag=13 and h=1
if seasonal:
dummies = pd.get_dummies(df.index.month, dtype='float').iloc[:, 1:]
dummies.columns = [f's({i},12)' for i in dummies.columns]
base_cols += [f's({i},12)' for i in dummies.columns]
dummies.index = df.index
df = df.reset_index().merge(dummies, left_on='Date', right_index=True).set_index('Date')
ics = {}
if var_order == 'cross':
for y_lags in y_lags_list:
y_lags_for_model = [i+h-1 for i in y_lags]
for exog_lags in exog_lags_list:
exog_lags_for_model = [i+h-1 for i in exog_lags]# if exog else []
cols_label = base_cols + ['y.L' + str(i) for i in y_lags_for_model] + [exog + '.L' + str(i) for i in exog_lags_for_model]
y = df['HPI']
X = df[cols_label]
ic = compute_ic_cv(y, X, metric=lag_select, cv_criteria=cv_criteria, fit_intercept=False, cv=cv, shuffle=shuffle, n_iter=n_iter, seed=seed)
ics[ic] = [y_lags, exog_lags]
ics = {k: ics[k] for k in sorted(ics)}
IC = min(ics.keys())
y_lags = ics[IC][0]
exog_lags = ics[IC][1]
else:
exog_lags = exog_lags if ((exog_lags != 'Auto') & (exog != None)) else []
exog_lags_for_model = [l+h-1 for l in exog_lags]
for y_lags in y_lags_list:
y_lags_for_model = [i+h-1 for i in y_lags]
cols_label = base_cols + ['y.L' + str(i) for i in y_lags_for_model] + [exog + '.L' + str(i) for i in exog_lags_for_model]
y = df['HPI']
X = df[cols_label]
ic = compute_ic_cv(y, X, metric=lag_select, cv_criteria=cv_criteria, fit_intercept=False, cv=cv, shuffle=shuffle, n_iter=n_iter, seed=seed)
ics[ic] = [y_lags, exog_lags]
ics = {k: ics[k] for k in sorted(ics)}
IC = min(ics.keys())
y_lags = ics[IC][0]
y_lags_for_model = [i+h-1 for i in y_lags]
exog_lags_list.remove(exog_lags)
for exog_lags in exog_lags_list:
exog_lags_for_model = [i+h-1 for i in exog_lags]
cols_label = base_cols + ['y.L' + str(i) for i in y_lags_for_model] + [exog + '.L' + str(i) for i in exog_lags_for_model]
y = df['HPI']
X = df[cols_label]
ic = compute_ic_cv(y, X, metric=lag_select, cv_criteria=cv_criteria, fit_intercept=False, cv=cv, shuffle=shuffle, n_iter=n_iter, seed=seed)
ics[ic] = [y_lags, exog_lags]
ics = {k: ics[k] for k in sorted(ics)}
IC = min(ics.keys())
exog_lags = ics[IC][1]
if verbose > 1:
print(f'h={h}, exog: {exog}, ics: {ics}')
return y_lags, exog_lags, IC, ics
#%%
[docs]
def model(df, h=1, max_lag=3, exog=None, seasonal=False, lag_select='IC', seed=None, cv=5, shuffle=False, n_iter=20, y_lags='Auto', exog_lags='Auto', var_order='cross', train_cut=0.8, verbose=0, log=False, plot=False, original_hpi=None, original_scale=True):
"""
Fit an autoregressive exogenous (ARX) model with selected lags using information criteria (IC) or cross-validation (CV) for dependent and independent variables, and evaluate its performance.
Parameters
----------
df : pandas.DataFrame
The input DataFrame containing the time series data. The first column is assumed to be the dependent variable ('y').
h : int, optional
Forecast horizon, indicating how many steps ahead the model is predicting. Default is 1.
max_lag : int, optional
The maximum lag order to consider for selection. Default is 3.
exog : str, optional
Name of the exogenous variable in the DataFrame. Default is None.
seasonal : bool, optional
Whether to include seasonal dummies (monthly) in the model. Default is False.
lag_select : {'IC', 'CV'}, optional
The metric used to select the optimal lags:
- 'IC': Information criterion (e.g., BIC).
- 'CV': Cross-validation score based on `cv_criteria`. Default is 'IC'.
seed : int, optional
Random seed for reproducibility in cross-validation. Default is None.
cv : int, optional
Number of folds for cross-validation. Default is 5.
shuffle : bool, optional
Whether to shuffle the data before splitting into batches for cross-validation. Default is False.
n_iter : int, optional
Number of iterations for cross-validation when `shuffle` is True. Default is 20.
y_lags : {'Auto', 'glob', list of int}, optional
The lags to consider for the dependent variable:
- 'Auto': Automatically generate lag sequences up to `max_lag`.
- 'glob': Generates all possible combinations of lags up to `max_lag`.
- list of int: Use the specified lags directly. Default is 'Auto'.
exog_lags : {'Auto', 'glob', list of int}, optional
The lags to consider for the exogenous variable, if applicable:
- 'Auto': Automatically generate lag sequences up to `max_lag`.
- 'glob': Generates all possible combinations of lags up to `max_lag`.
- list of int: Use the specified lags directly. Default is 'Auto'.
var_order : {'cross', 'nested'}, optional
The order in which lag selection is performed:
- 'cross': Cross all combinations of lags for 'y' and 'exog'.
- 'nested': Select the best lags for 'y' first, then choose the best lags for 'exog' based on the selected 'y' lags. Default is 'cross'.
train_cut : float or str or datetime-like, optional
The cutoff point for splitting the data into training and validation sets. Can be a float between 0 and 1 representing the proportion of the data to use for training, or a specific index value. Default is 0.8.
verbose : int, optional
Level of verbosity for debugging or detailed output. Default is 0.
log : bool, optional
If True, the data is assumed to be log-transformed and predictions will be back-transformed. Default is False.
plot : bool, optional
If True, plots the actual vs. predicted values with a vertical line indicating the training/validation split. Default is False.
original_hpi : pandas.Series, optional
Original HPI (Housing Price Index) values for back-transforming predictions. Only used if `original_scale` is True. Default is None.
original_scale : bool, optional
Whether to return the predictions on the original scale (before any transformations). Default is True.
Returns
-------
MAFE_val : float
Mean Absolute Forecast Error on the validation set.
MSFE_val : float
Mean Squared Forecast Error on the validation set.
MAFE_train : float
Mean Absolute Forecast Error on the training set.
MSFE_train : float
Mean Squared Forecast Error on the training set.
lags_set : dict
Dictionary containing the selected lags for 'y' and 'exog'.
IC : float
The minimum information criterion (IC) or cross-validation score obtained during lag selection.
res_full : pandas.Series
The full set of predictions for the dependent variable, including both training and validation periods.
Examples
--------
>>> MAFE_val, MSFE_val, MAFE_train, MSFE_train, lags_set, IC, res_full = model_IC_CV(df, h=1, max_lag=3, exog='ExogVar')
>>> print(MAFE_val, MSFE_val, lags_set)
0.034 0.002 {'y lags': [1, 2], 'exog lags': [1]}
"""
df = df.copy()
res_full = pd.Series(index=df.index)
if isinstance(train_cut, (int, float)):
train_cut = df.index[round(len(df) * train_cut)-1]
y_lags, exog_lags, IC, ics = lag_selector(df.loc[:train_cut], h=h, max_lag=max_lag, exog=exog, seasonal=seasonal, lag_select=lag_select, seed=seed, cv=cv, shuffle=shuffle, n_iter=n_iter, y_lags=y_lags, exog_lags=exog_lags, var_order=var_order, verbose=verbose)
y_lags_for_model = [i+h-1 for i in y_lags]
exog_lags_for_model = [i+h-1 for i in exog_lags] if exog else []
df['const'] = 1
base_cols = ['const']
for i in range(h, max_lag + h):
df['y.L' + str(i)] = df.iloc[:,0].shift(i)
if exog:
for i in range(h, max_lag + h):
df[exog + '.L' + str(i)] = df.loc[:,exog].shift(i)
df = df.dropna() # To get the same results with ar_select_order(), we should drop NaNs before starting to get ICs.
# That means we wont have, for example, the first 13 rows even when try to get IC of lag=[1,2] when max_lag=13 and h=1
if seasonal:
dummies = pd.get_dummies(df.index.month, dtype='float').iloc[:, 1:]
dummies.columns = [f's({i},12)' for i in dummies.columns]
base_cols += [f's({i},12)' for i in dummies.columns]
dummies.index = df.index
df = df.reset_index().merge(dummies, left_on='Date', right_index=True).set_index('Date')
cols_label = base_cols + [f'y.L{lag}' for lag in y_lags_for_model] + [f'{exog}.L{lag}' for lag in exog_lags_for_model]
Y = df['HPI']
Y_train = Y.loc[:train_cut]
X = df[cols_label]
X_train = X.loc[:train_cut]
model = sm.OLS(Y_train, X_train).fit() # .astype(float) should be added to X_train in CoCalc
predict = model.predict(X)
res_full.loc[:] = predict # res_full.update(predict) # res_full.combine_first(predict) # predict.reindex(res_full.index)
if original_scale:
if original_hpi is None:
if log:
res_full = np.exp(res_full)
Y_full = np.exp(Y)
else:
Y_full = Y
else:
Y_full = original_hpi
if log:
res_full = np.exp(res_full + np.log(original_hpi) - Y)
else:
res_full = res_full + original_hpi - Y
else:
Y_full = Y
# val_start_pos = df.index.searchsorted(train_cut, side='right')
val_start_idx = df.index[df.index.get_loc(train_cut) + 1]
e = Y_full - res_full
e_train = e.loc[:train_cut]
e_val = e.loc[val_start_idx:]
MAFE_train = e_train.abs().mean()
MAFE_val = e_val.abs().mean()
MSFE_train = e_val.abs().mean()
MSFE_val = (e_val ** 2).mean()
lags_set = {'y lags': y_lags, 'exog lags': exog_lags}
if plot:
plt.plot(res_full)
plt.axvline(x = pd.Timestamp(train_cut), color = 'r', ls='--', label = 'train cut')
plt.plot(Y_full)
return MAFE_val, MSFE_val, MAFE_train, MSFE_train, lags_set, IC, res_full #, e_train, params
#%%
[docs]
def compare_exog(df, train_cut, h_list=[1,3,6,12], seasonal=False, max_lag=3, lag_select='IC', seed=None, lag_fit_intercept=True, lag_cv=5, lag_shuffle=False, lag_iter=20, hsi_CV_select=False, hsi_fit_intercept=True, hsi_cv=4, hsi_iter=40, y_lags='Auto', exog_lags='Auto', var_order='cross', verbose=False, log=False, original_hpi=None, original_scale=True, sort_df='criteria', sort_col=-1):
"""
Compare the impact of different exogenous variables on forecasting accuracy using various evaluation metrics.
Parameters
----------
df : pandas.DataFrame
The input DataFrame containing the time series data. The first column is assumed to be the dependent variable ('HPI'), and the subsequent columns are potential exogenous variables.
train_cut : float or str or datetime-like
The cutoff point for splitting the data into training and validation sets. Can be a float between 0 and 1 representing the proportion of the data to use for training, or a specific index value.
h_list : list of int, optional
A list of forecast horizons (e.g., [1, 3, 6, 12]) to evaluate. Default is [1, 3, 6, 12].
seasonal : bool, optional
Whether to include seasonal dummies (monthly) in the model. Default is False.
max_lag : int, optional
The maximum lag order to consider for selection. Default is 3.
lag_select : {'IC', 'CV'}, optional
The metric used to select the optimal lags:
- 'IC': Information criterion (e.g., BIC).
- 'CV': Cross-validation score. Default is 'IC'.
seed : int, optional
Random seed for reproducibility in cross-validation. Default is None.
lag_fit_intercept : bool, optional
Whether to include an intercept in the lag selection linear regression model. Default is True.
lag_cv : int, optional
Number of folds for cross-validation during lag selection. Default is 5.
lag_shuffle : bool, optional
Whether to shuffle the data before splitting into batches for cross-validation during lag selection. Default is False.
lag_iter : int, optional
Number of iterations for cross-validation during lag selection when `lag_shuffle` is True. Default is 20.
hsi_CV_select : bool, optional
Whether to perform cross-validation for the selection of the UHSI (Unified Housing Sentiment Index) based on the selected lags. Default is False.
hsi_fit_intercept : bool, optional
Whether to include an intercept in the UHSI selection linear regression model. Default is True.
hsi_cv : int, optional
Number of folds for cross-validation during UHSI selection. Default is 4.
hsi_iter : int, optional
Number of iterations for cross-validation during UHSI selection when `hsi_CV_select` is True. Default is 40.
y_lags : {'Auto', 'glob', list of int}, optional
The lags to consider for the dependent variable ('HPI'):
- 'Auto': Automatically generate lag sequences up to `max_lag`.
- 'glob': Generates all possible combinations of lags up to `max_lag`.
- list of int: Use the specified lags directly. Default is 'Auto'.
exog_lags : {'Auto', 'glob', list of int}, optional
The lags to consider for the exogenous variables, if applicable:
- 'Auto': Automatically generate lag sequences up to `max_lag`.
- 'glob': Generates all possible combinations of lags up to `max_lag`.
- list of int: Use the specified lags directly. Default is 'Auto'.
var_order : {'cross', 'nested'}, optional
The order in which lag selection is performed:
- 'cross': Cross all combinations of lags for 'y' and 'exog'.
- 'nested': Select the best lags for 'y' first, then choose the best lags for 'exog' based on the selected 'y' lags. Default is 'cross'.
verbose : bool, optional
If True, print detailed output during the function execution. Default is False.
log : bool, optional
If True, the data is assumed to be log-transformed and predictions will be back-transformed. Default is False.
original_hpi : pandas.Series, optional
Original HPI (Housing Price Index) values for back-transforming predictions. Only used if `original_scale` is True. Default is None.
original_scale : bool, optional
Whether to return the predictions on the original scale (before any transformations). Default is True.
sort_df : {'criteria', 'MAFE', 'MSFE'}, optional
Criteria to sort the results by:
- 'criteria': Sort by the lag selection criteria (e.g., IC or CV).
- 'MAFE': Sort by Mean Absolute Forecast Error.
- 'MSFE': Sort by Mean Squared Forecast Error. Default is 'criteria'.
sort_col : int, optional
The column index in `sort_df` to use for sorting. Default is -1 (last column).
Returns
-------
MAFE_val_df : pandas.DataFrame
DataFrame containing the Mean Absolute Forecast Error (MAFE) on the validation set for each exogenous variable and forecast horizon.
MAFE_val_df_improve : pandas.DataFrame
DataFrame containing the percentage improvement in MAFE on the validation set for each exogenous variable compared to the baseline (no exogenous variables).
MSFE_val_df : pandas.DataFrame
DataFrame containing the Mean Squared Forecast Error (MSFE) on the validation set for each exogenous variable and forecast horizon.
MSFE_val_df_improve : pandas.DataFrame
DataFrame containing the percentage improvement in MSFE on the validation set for each exogenous variable compared to the baseline.
criteria_df : pandas.DataFrame
DataFrame containing the lag selection criteria (e.g., IC or CV) values for each exogenous variable and forecast horizon.
lags_df : pandas.DataFrame
DataFrame containing the selected lags for the dependent variable ('HPI') and each exogenous variable.
MAFE_train_df : pandas.DataFrame
DataFrame containing the Mean Absolute Forecast Error (MAFE) on the training set for each exogenous variable and forecast horizon.
MSFE_train_df : pandas.DataFrame
DataFrame containing the Mean Squared Forecast Error (MSFE) on the training set for each exogenous variable and forecast horizon.
forecast_dict : dict
A dictionary where each key is a forecast horizon (from `h_list`) and the corresponding value is a DataFrame containing the actual vs. predicted values for each exogenous variable.
df : pandas.DataFrame
The modified input DataFrame with additional columns for principal components (UHSI) if `hsi_CV_select` is True.
Examples
--------
>>> MAFE_val_df, MAFE_val_df_improve, MSFE_val_df, MSFE_val_df_improve, criteria_df, lags_df, MAFE_train_df, MSFE_train_df, forecast_dict, df = compare_exog(df, '2023-01-01', h_list=[1, 6, 12], lag_select='IC', seasonal=True)
>>> print(MAFE_val_df)
Horizon 1 Horizon 6 Horizon 12
exog_var1 0.03 0.04 0.05
exog_var2 0.02 0.03 0.04
"""
df = df.copy()
n_hsi = df.shape[1] - 1
index = list(df.columns[1:]) + ['UHSI_'+str(i) for i in h_list]
if (original_hpi is None) | (original_scale == False):
first_col = df.loc[:,'HPI']
else:
first_col = original_hpi
forecast_df = pd.DataFrame(index=first_col.index, columns=['HPI', 'None']+index)
# forecast_df.columns = forecast_df.columns[0] + ['Y_' + i for i in forecast_df.columns[1:]]
forecast_df.iloc[:,0] = first_col
forecast_dict = {h:forecast_df.copy() for h in h_list}
MAFE_val_df = result_table(index, 'MAFE_val', h_list, 'exog')
MAFE_val_df_improve = result_table(index, 'MAFE_val improve (in percent)', h_list, 'exog')
MSFE_val_df = result_table(index, 'MSFE_val', h_list, 'exog')
MSFE_val_df_improve = result_table(index, 'MSFE_val improve (in percent)', h_list, 'exog')
MAFE_train_df = result_table(index, 'MAFE_train', h_list, 'exog')
MSFE_train_df = result_table(index, 'MSFE_train', h_list, 'exog')
lags_df = result_table(index, 'lags', h_list, 'exog')
criteria_df = result_table(index, 'lag selection criteria: ' + lag_select, h_list, 'exog')
if hsi_CV_select & (lag_select in ['IC', 'CV']):
criteria_df = result_table(index, 'hsi selection criteria: CV', h_list, 'exog')
for i, exog in enumerate([None]+index):
for j, h in enumerate(h_list):
if exog == f'UHSI_{h_list[0]}':
top_hsis = criteria_df.iloc[1:n_hsi+1].sort_values(criteria_df.columns[j]).index[:10]
if verbose:
print(f'10 Selected queries for UHSI_{h}:\n{np.array(top_hsis)}')
hsi_selected = df.loc[:, top_hsis]
hsi_selected_train = hsi_selected.loc[:train_cut]
pca = PCA(n_components=1)
principal_component = pca.fit(hsi_selected_train).transform(hsi_selected)
df['UHSI_'+str(h)] = principal_component
MAFE_val, MSFE_val, MAFE_train, MSFE_train, lag_set, criteria, Y_pred = model(df, h=h, exog=exog, seasonal=seasonal, max_lag=max_lag, lag_select=lag_select, seed=seed, fit_intercept=lag_fit_intercept, cv=lag_cv, shuffle=lag_shuffle, n_iter=lag_iter, y_lags=y_lags, exog_lags=exog_lags, var_order=var_order, train_cut=train_cut, verbose=verbose, log=log, plot=False, original_hpi=original_hpi, original_scale=original_scale)
if hsi_CV_select:
MAFE_val, MSFE_val, MAFE_train, MSFE_train, lag_set, criteria, Y_pred = model(df, h=h, exog=exog, seasonal=seasonal, max_lag=max_lag, lag_select='CV', seed=seed, fit_intercept=hsi_fit_intercept, cv=hsi_cv, shuffle=True, n_iter=hsi_iter, y_lags=lag_set['y lags'], exog_lags=lag_set['exog lags'], var_order=var_order, train_cut=train_cut, verbose=verbose, log=log, plot=False, original_hpi=original_hpi, original_scale=original_scale)
criteria_df.iloc[i, j] = criteria
MAFE_val_df.iloc[i, j] = MAFE_val
MAFE_val_df_improve.iloc[i, j] = 100 * (MAFE_val_df.iloc[0, j] - MAFE_val) / MAFE_val_df.iloc[0, j]
MSFE_val_df.iloc[i, j] = MSFE_val
MSFE_val_df_improve.iloc[i, j] = 100 * (MSFE_val_df.iloc[0, j] - MSFE_val) / MSFE_val_df.iloc[0, j]
MAFE_train_df.iloc[i, j] = MAFE_train
MSFE_train_df.iloc[i, j] = MSFE_train
lags_df.iloc[i, j] = str(lag_set)
forecast_dict[h].iloc[-len(Y_pred):, i+1] = Y_pred
MAFE_val_df_improve.loc['selected'] = [MAFE_val_df_improve.iloc[criteria_df.iloc[:,i].argmin(), i] for i in range(criteria_df.shape[1])]
MSFE_val_df_improve.loc['selected'] = [MSFE_val_df_improve.iloc[criteria_df.iloc[:,i].argmin(), i] for i in range(criteria_df.shape[1])]
# Sorting dfs
if sort_df == 'MAFE':
MAFE_val_df = MAFE_val_df.loc[['None']+list(MAFE_val_df.iloc[1:,:].sort_values(MAFE_val_df.columns[sort_col]).index)]
elif sort_df == 'MSFE':
MSFE_val_df = MSFE_val_df.loc[['None']+list(MSFE_val_df.iloc[1:,:].sort_values(MSFE_val_df.columns[sort_col]).index)]
MAFE_val_df = MAFE_val_df.loc[MSFE_val_df.index]
elif sort_df == 'criteria':
criteria_df = criteria_df.loc[['None']+list(criteria_df.iloc[1:,:].sort_values(criteria_df.columns[sort_col]).index)]
MAFE_val_df = MAFE_val_df.loc[criteria_df.index]
MAFE_val_df_improve = MAFE_val_df_improve.loc[['selected'] + list(MAFE_val_df.index[1:])]
MSFE_val_df = MSFE_val_df.loc[MAFE_val_df.index]
MSFE_val_df_improve = MSFE_val_df_improve.loc[['selected'] + list(MAFE_val_df.index[1:])]
criteria_df = criteria_df.loc[MAFE_val_df.index]
MAFE_train_df = MAFE_train_df.loc[MAFE_val_df.index]
MSFE_train_df = MSFE_train_df.loc[MAFE_val_df.index]
lags_df = lags_df.loc[MAFE_val_df.index]
return MAFE_val_df, MAFE_val_df_improve, MSFE_val_df, MSFE_val_df_improve, criteria_df, lags_df, MAFE_train_df, MSFE_train_df, forecast_dict, df
#%%
[docs]
def HLN_MDM(d, h):
"""
Compute the Harvey, Leybourne, and Newbold (1997) Modified Diebold-Mariano (MDM) statistic and its p-value.
Parameters
----------
d : numpy.ndarray or pandas.Series
The array or series of forecast error differentials. This represents the difference between the forecast errors of two competing models.
h : int
The forecast horizon (number of steps ahead).
Returns
-------
MDM : float
The Modified Diebold-Mariano statistic.
pval : float
The p-value associated with the MDM statistic, under the null hypothesis that the forecast accuracy of the two models is the same.
Notes
-----
- The Harvey, Leybourne, and Newbold (1997) test modifies the Diebold-Mariano test to account for small-sample bias, especially when the forecast horizon is greater than one.
- This test is particularly useful in comparing predictive accuracy when forecasts are based on overlapping data.
References
----------
Harvey, D. I., Leybourne, S. J., & Newbold, P. (1997).
Testing the equality of prediction mean squared errors.
International Journal of Forecasting, 13(2), 281-291.
Examples
--------
>>> d = np.array([0.1, 0.2, -0.1, 0.05, 0.3])
>>> h = 1
>>> MDM, pval = HLN_MDM(d, h)
>>> print(f"MDM Statistic: {MDM}, p-value: {pval}")
MDM Statistic: 1.414213562373095, p-value: 0.1826592511440174
"""
tau = d.shape[0]
MDMfix = np.sqrt((tau +1 -2*h + h*(h-1)/tau) / tau )
d_bar = np.mean(d)
sigma_d = np.sum(d*d)
if h > 1:
for j in range(h-1):
sigma_d += 2 * np.sum(d[1+j:]*d[:tau-j-1])
sigma_d = np.sqrt(sigma_d) / tau
MDM = MDMfix * d_bar/sigma_d
pval = 2*ss.t.cdf(-abs(MDM),tau-1)
return MDM, pval
#%%
[docs]
def forecast_table(self):
"""
Generate a table comparing forecast performance across different predictors and horizons.
This table includes the Mean Absolute Forecast Error (MAFE), Mean Squared Forecast Error (MSFE),
and p-values for hypothesis tests related to equal predictive accuracy and forecast encompassing.
Hypotheses:
-----------
- H_{0,1}: Equal MAFE between the predictor model and the base model.
- H_{0,2}: Equal MSFE between the predictor model and the base model.
- H_{0,3}: The predictor model forecast encompasses the base model.
Returns
-------
result : pandas.DataFrame
A DataFrame with a MultiIndex of forecast horizons (`h`) and predictors, and columns including:
- 'HPI lags': The lags of the dependent variable (HPI).
- 'Exog lags': The lags of the exogenous variable.
- 'MAFE': The Mean Absolute Forecast Error for the predictor.
- 'MSFE': The Mean Squared Forecast Error for the predictor.
- 'MAFE improvement': The percentage improvement in MAFE relative to the base model.
- 'MSFE improvement': The percentage improvement in MSFE relative to the base model.
- 'H_{0,1}': The p-value for the hypothesis test of equal MAFE (using the Modified Diebold-Mariano test).
- 'H_{0,2}': The p-value for the hypothesis test of equal MSFE (using the Modified Diebold-Mariano test).
- 'H_{0,3}': The p-value for the hypothesis test of forecast encompassing (using the Modified Diebold-Mariano test).
Notes
-----
- The base model is represented by the 'None' predictor.
- Forecast encompassing tests whether the predictor model contains all the information in the base model.
- The method uses the Harvey, Leybourne, and Newbold (1997) Modified Diebold-Mariano test to compute p-values.
Examples
--------
>>> table = model.forecast_table()
>>> print(table)
HPI lags Exog lags MAFE MSFE MAFE improvement MSFE improvement H_{0,1} H_{0,2} H_{0,3}
h Predictors
1 None ... ... 0.0251 0.00123 0.000 0.000 0.0321 0.0287 0.1025
UHSI_1 ... ... 0.0223 0.00110 0.111 0.105 0.2103 0.1325 0.0923
UHSI_3 ... ... 0.0210 0.00105 0.162 0.147 0.1809 0.1014 0.0534
...
"""
Predictors = ['None', 'UHSI_1', 'UHSI_3', 'UHSI_6', 'UHSI_12']
result = pd.DataFrame(index=pd.MultiIndex.from_product([self.h_list, Predictors], names=['h', 'Predictors']),
columns=['HPI lags', 'Exog lags', 'MAFE', 'MSFE', 'MAFE improvement', 'MSFE improvement', 'H_{0,1}', 'H_{0,2}', 'H_{0,3}'])
for h in self.h_list:
df = self.forecast_dict[h]
e_base = (df['HPI'] - df['None'])[self.train_cut:].iloc[1:]
for exog in Predictors:
lags_set = ast.literal_eval(self.lags_df.loc[exog, ('lags', f'h={h}')])
result.loc[(h, exog), 'HPI lags'] = str(lags_set['y lags'])
result.loc[(h, exog), 'Exog lags'] = str(lags_set['exog lags'])
result.loc[(h, exog), 'MAFE'] = self.MAFE_val_df.loc[exog, ('MAFE_val', f'h={h}')]
result.loc[(h, exog), 'MSFE'] = self.MSFE_val_df.loc[exog, ('MSFE_val', f'h={h}')]
result.loc[(h, exog), 'MAFE improvement'] = (1 - result.loc[(h, exog), 'MAFE']/result.loc[(h, 'None'), 'MAFE'])
result.loc[(h, exog), 'MSFE improvement'] = (1 - result.loc[(h, exog), 'MSFE']/result.loc[(h, 'None'), 'MSFE'])
e_exog = (df['HPI'] - df[exog])[self.train_cut:].iloc[1:]
d = e_exog.abs() - e_base.abs()
MDM, pval = HLN_MDM(d,h)
result.loc[(h, exog), 'H_{0,1}'] = pval
d = e_exog**2 - e_base**2
MDM, pval = HLN_MDM(d,h)
result.loc[(h, exog), 'H_{0,2}'] = pval
d = (e_exog - e_base) * e_exog
MDM, pval = HLN_MDM(d,h)
result.loc[(h, exog), 'H_{0,3}'] = pval
return result
#%%
[docs]
def results(self, results_table=False, MAFE_val_df=False, MAFE_val_df_improve=False, MSFE_val_df=False, MSFE_val_df_improve=False, forecast_criteria_df=False, lags_df=False, MAFE_train_df=False, MSFE_train_df=False, head=10):
"""
Display selected result tables generated during the forecasting process.
Parameters
----------
results_table : bool, optional
If True, display the full results table from the forecast comparison (default is False).
MAFE_val_df : bool, optional
If True, display the Mean Absolute Forecast Error (MAFE) validation DataFrame (default is False).
MAFE_val_df_improve : bool, optional
If True, display the percentage improvement in MAFE for each predictor (default is False).
MSFE_val_df : bool, optional
If True, display the Mean Squared Forecast Error (MSFE) validation DataFrame (default is False).
MSFE_val_df_improve : bool, optional
If True, display the percentage improvement in MSFE for each predictor (default is False).
forecast_criteria_df : bool, optional
If True, display the forecast criteria DataFrame, showing the selected model criteria (default is False).
lags_df : bool, optional
If True, display the DataFrame containing the selected lags for each model (default is False).
MAFE_train_df : bool, optional
If True, display the Mean Absolute Forecast Error (MAFE) training DataFrame (default is False).
MSFE_train_df : bool, optional
If True, display the Mean Squared Forecast Error (MSFE) training DataFrame (default is False).
head : int or bool, optional
Number of rows to display from each DataFrame (default is 10).
If True, display all rows of each DataFrame.
Returns
-------
None
Displays the selected DataFrames in the Jupyter Notebook environment.
Notes
-----
- The method allows selective display of any of the key result tables generated during the model comparison process.
- It uses the `display` function from IPython to show the DataFrames.
Examples
--------
>>> model.results(results_table=True, MAFE_val_df=True, head=5)
Displays the results table and the MAFE validation DataFrame with the top 5 rows.
"""
head = len(self.MAFE_val_df) if (head == True) else head
dfs_list = [(results_table, self.results_table), (MAFE_val_df, self.MAFE_val_df), (MAFE_val_df_improve, self.MAFE_val_df_improve), (MSFE_val_df, self.MSFE_val_df), (MSFE_val_df_improve, self.MSFE_val_df_improve), (forecast_criteria_df, self.forecast_criteria_df), (lags_df, self.lags_df), (MAFE_train_df, self.MAFE_train_df), (MSFE_train_df, self.MSFE_train_df)]
for dis, df in dfs_list:
if dis:
display(df.head(head))