import warnings
import itertools
from typing import List
from dataclasses import dataclass
import numpy as np
from scipy.optimize import minimize, OptimizeResult
USE_CPP_EXTENSION = True
try:
import frds.algorithms.utils.utils_ext as ext
except ImportError:
USE_CPP_EXTENSION = False
[docs]
class GARCHModel:
""":doc:`/algorithms/garch` model with constant mean and Normal noise
It estimates the model parameters only. No standard errors calculated.
This code is heavily based on the `arch <https://arch.readthedocs.io/>`_.
Modifications are made for easier understaing of the code flow.
"""
@dataclass
class Parameters:
mu: float = np.nan
omega: float = np.nan
alpha: float = np.nan
beta: float = np.nan
loglikelihood: float = np.nan
[docs]
def __init__(self, returns: np.ndarray, zero_mean=False) -> None:
"""__init__
Args:
returns (np.ndarray): ``(T,)`` array of ``T`` returns
zero_mean (bool): whether to use a zero mean returns model. Default to False.
.. note:: ``returns`` is best to be percentage returns for optimization
"""
self.returns = np.asarray(returns, dtype=np.float64)
self.estimation_success = False
self.loglikelihood_final = np.nan
self.parameters = type(self).Parameters()
self.resids = np.empty_like(self.returns)
self.sigma2 = np.empty_like(self.returns)
self.backcast_value = np.nan
self.var_bounds: np.ndarray = None
self.zero_mean = zero_mean
[docs]
def fit(self) -> Parameters:
"""Estimates the GARCH(1,1) parameters via MLE
Returns:
params: :class:`frds.algorithms.GARCHModel.Parameters`
"""
# No repeated estimation?
if self.estimation_success:
return
starting_vals = self.preparation()
# Set bounds for parameters
bounds = [
(-np.inf, np.inf), # No bounds for mu
(1e-6, np.inf), # Lower bound for omega
(0.0, 1.0), # Bounds for alpha
(0.0, 1.0), # Boudns for beta
]
if self.zero_mean:
bounds = bounds[1:]
# Set constraint for stationarity
def persistence_smaller_than_one(params: List[float]):
alpha, beta = params[-2:]
return 1.0 - (alpha + beta)
# MLE via minimizing the negative log-likelihood
# fmt: off
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore",
"Values in x were outside bounds during a minimize step",
RuntimeWarning,
)
opt: OptimizeResult = minimize(
self.loglikelihood_model,
starting_vals,
args=(self.backcast_value, self.var_bounds, self.zero_mean),
method="SLSQP",
bounds=bounds,
constraints={"type": "ineq", "fun": persistence_smaller_than_one},
)
if opt.success:
self.estimation_success = True
if self.zero_mean:
self.parameters = type(self).Parameters(0.0, *list(opt.x), loglikelihood=-opt.fun)
else:
self.parameters = type(self).Parameters(*list(opt.x), loglikelihood=-opt.fun)
self.resids = self.returns - self.parameters.mu
return self.parameters
[docs]
def preparation(self) -> List[float]:
"""Prepare starting values.
Returns:
List[float]: list of starting values
"""
# Compute a starting value for volatility process by backcasting
if self.zero_mean:
resids = self.returns
else:
resids = self.returns - np.mean(self.returns)
self.backcast_value = self.backcast(resids)
# Compute a loose bound for the volatility process
# This is to avoid NaN in MLE by avoiding zero/negative variance,
# as well as unreasonably large variance.
self.var_bounds = self.variance_bounds(resids)
# Compute the starting values for MLE
# Starting values for the volatility process
var_params = self.starting_values(resids)
# Starting value for mu is the sample mean return
initial_mu = self.returns.mean()
# Starting values are [mu, omega, alpha, beta]
starting_vals = [initial_mu, *var_params]
return starting_vals if not self.zero_mean else var_params
[docs]
def loglikelihood_model(
self,
params: np.ndarray,
backcast: float,
var_bounds: np.ndarray,
zero_mean=False,
) -> float:
"""Calculates the negative log-likelihood based on the current ``params``.
This function is used in optimization.
Args:
params (np.ndarray): [mu, omega, alpha, (gamma), beta]
backcast (float): backcast value
var_bounds (np.ndarray): variance bounds
Returns:
float: negative log-likelihood
"""
if zero_mean:
resids = self.returns
self.sigma2 = self.compute_variance(params, resids, backcast, var_bounds)
else:
resids = self.returns - params[0] # params[0] is mu
self.sigma2 = self.compute_variance(
params[1:], resids, backcast, var_bounds
)
return -self.loglikelihood(resids, self.sigma2)
[docs]
@staticmethod
def loglikelihood(resids: np.ndarray, sigma2: np.ndarray) -> float:
"""Computes the log-likelihood assuming residuals are
normally distributed conditional on the variance.
Args:
resids (np.ndarray): residuals to use in computing log-likelihood.
sigma2 (np.ndarray): conditional variance of residuals.
Returns:
float: log-likelihood
"""
l = -0.5 * (np.log(2 * np.pi) + np.log(sigma2) + resids**2.0 / sigma2)
return np.sum(l)
[docs]
@staticmethod
def compute_variance(
params: List[float],
resids: np.ndarray,
backcast: float,
var_bounds: np.ndarray,
) -> np.ndarray:
"""Computes the variances conditional on given parameters
Args:
params (List[float]): [omega, alpha, beta]
resids (np.ndarray): residuals from mean equation
backcast (float): backcast value
var_bounds (np.ndarray): variance bounds
Returns:
np.ndarray: conditional variance
"""
if USE_CPP_EXTENSION:
return ext.compute_garch_variance(params, resids, backcast, var_bounds)
omega, alpha, beta = params
sigma2 = np.zeros_like(resids)
sigma2[0] = omega + (alpha + beta) * backcast
for t in range(1, len(resids)):
sigma2[t] = omega + alpha * (resids[t - 1] ** 2) + beta * sigma2[t - 1]
sigma2[t] = GARCHModel.bounds_check(sigma2[t], var_bounds[t])
return sigma2
[docs]
def starting_values(self, resids: np.ndarray) -> List[float]:
"""Finds the optimal initial values for the volatility model via a grid
search. For varying target persistence and alpha values, return the
combination of alpha and beta that gives the highest loglikelihood.
Args:
resids (np.ndarray): residuals from the mean model
Returns:
List[float]: [omega, alpha, beta]
"""
# Candidate target persistence - we don't have a prior knowledge here
persistence_grid = [0.5, 0.7, 0.9, 0.98]
# Candidate alpha values
alpha_grid = [0.01, 0.05, 0.1, 0.2]
# Sample variance
var = self.returns.var()
# Backcast initial value for the volaility process
initial_params = []
max_likelihood = -np.inf
for alpha, p in itertools.product(*[alpha_grid, persistence_grid]):
# Note that (alpha+beta) is the "persistence"
beta = p - alpha
# Note that the unconditional variance is omega/(1-persistence).
# As we are guessing the initial value, we use the sample variance
# as the unconditional variance to guess the omega value.
omega = var * (1 - p)
params = [omega, alpha, beta]
sigma2 = self.compute_variance(
params, resids, self.backcast_value, self.var_bounds
)
if -self.loglikelihood(resids, sigma2) > max_likelihood:
initial_params = params
return initial_params
[docs]
@staticmethod
def variance_bounds(resids: np.ndarray) -> np.ndarray:
"""Compute bounds for conditional variances using EWMA.
This function calculates the lower and upper bounds for conditional variances
based on the residuals provided. The bounds are computed to ensure numerical
stability during the parameter estimation process of GARCH models. The function
uses Exponentially Weighted Moving Average (EWMA) to estimate the initial variance
and then adjusts these estimates to lie within global bounds.
Args:
resids (np.ndarray): residuals from the mean model.
Returns:
np.ndarray: an array where each row contains the lower and upper bounds for the conditional variance at each time point.
"""
T = len(resids)
tau = min(75, T)
# Compute initial variance using EWMA
decay_factor = 0.94
weights = decay_factor ** np.arange(tau)
weights /= weights.sum()
initial_variance = np.dot(weights, resids[:tau] ** 2)
# Compute var_bound using EWMA (assuming ewma_recursion is defined)
var_bound = GARCHModel.ewma(resids, initial_variance)
# Compute global bounds
global_lower_bound = resids.var() / 1e8
global_upper_bound = 1e7 * (1 + (resids**2).max())
# Adjust var_bound to ensure it lies within global bounds
var_bound = np.clip(var_bound, global_lower_bound, global_upper_bound)
# Create bounds matrix
var_bounds = np.vstack((var_bound / 1e6, var_bound * 1e6)).T
return np.ascontiguousarray(var_bounds)
[docs]
@staticmethod
def bounds_check(sigma2: float, var_bounds: np.ndarray) -> float:
"""Adjust the conditional variance at time t based on its bounds
Args:
sigma2 (float): conditional variance
var_bounds (np.ndarray): lower and upper bounds
Returns:
float: adjusted conditional variance
"""
if USE_CPP_EXTENSION:
return ext.bounds_check(sigma2, var_bounds)
lower, upper = var_bounds[0], var_bounds[1]
sigma2 = max(lower, sigma2)
if sigma2 > upper:
if not np.isinf(sigma2):
sigma2 = upper + np.log(sigma2 / upper)
else:
sigma2 = upper + 1000
return sigma2
[docs]
@staticmethod
def backcast(resids: np.ndarray) -> float:
"""Computes the starting value for estimating conditional variance.
Args:
resids (np.ndarray): residuals
Returns:
float: initial value from backcasting
"""
# Limit to first tau observations to reduce computation
tau = min(75, resids.shape[0])
# Weights for Exponential Weighted Moving Average (EWMA)
w = 0.94 ** np.arange(tau)
w = w / sum(w) # Ensure weights add up to 1
# Let the initial value to be the EWMA of first tau observations
return float(np.sum((resids[:tau] ** 2) * w))
[docs]
@staticmethod
def ewma(resids: np.ndarray, initial_value: float, lam=0.94) -> np.ndarray:
"""Compute the conditional variance estimates using
Exponentially Weighted Moving Average (EWMA).
Args:
resids (np.ndarray): Residuals from the model.
initial_value (float): Initial value for the conditional variance.
lam (float): Decay factor for the EWMA.
Returns:
np.ndarray: Array containing the conditional variance estimates.
"""
if USE_CPP_EXTENSION:
return ext.ewma(resids, initial_value, lam)
T = len(resids)
variance = np.empty(T)
variance[0] = initial_value # Set the initial value
# Compute the squared residuals
squared_resids = resids**2
# Compute the EWMA using the decay factors and squared residuals
for t in range(1, T):
variance[t] = lam * variance[t - 1] + (1 - lam) * squared_resids[t - 1]
return variance
[docs]
class GJRGARCHModel(GARCHModel):
""":doc:`/algorithms/gjr-garch` model with constant mean and Normal noise
It estimates the model parameters only. No standard errors calculated.
This code is heavily based on the `arch <https://arch.readthedocs.io/>`_.
Modifications are made for easier understaing of the code flow.
"""
@dataclass
class Parameters:
mu: float = np.nan
omega: float = np.nan
alpha: float = np.nan
gamma: float = np.nan
beta: float = np.nan
loglikelihood: float = np.nan
[docs]
def fit(self) -> Parameters:
"""Estimates the GJR-GARCH(1,1) parameters via MLE
Returns:
List[float]: [mu, omega, alpha, gamma, beta, loglikelihood]
"""
starting_vals = self.preparation()
bounds = [
(-np.inf, np.inf), # No bounds for mu
(1e-6, np.inf), # Lower bound for omega
(1e-6, 1.0), # Bounds for alpha
(1e-6, 1.0), # Bounds for gamma
(1e-6, 1.0), # Boudns for beta
]
if self.zero_mean:
bounds = bounds[1:]
# Set constraint for stationarity
def persistence_smaller_than_one(params: List[float]):
alpha, gamma, beta = params[-3:]
return 1.0 - (alpha + beta + gamma / 2)
# MLE via minimizing the negative log-likelihood
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore",
"Values in x were outside bounds during a minimize step",
RuntimeWarning,
)
opt: OptimizeResult = minimize(
self.loglikelihood_model,
starting_vals,
args=(self.backcast_value, self.var_bounds, self.zero_mean),
method="SLSQP",
bounds=bounds,
constraints={"type": "ineq", "fun": persistence_smaller_than_one},
)
if opt.success:
self.estimation_success = True
if self.zero_mean:
self.parameters = type(self).Parameters(
0.0, *list(opt.x), loglikelihood=-opt.fun
)
else:
self.parameters = type(self).Parameters(
*list(opt.x), loglikelihood=-opt.fun
)
self.resids = self.returns - self.parameters.mu
return self.parameters
[docs]
@staticmethod
def compute_variance(
params: List[float],
resids: np.ndarray,
backcast: float,
var_bounds: np.ndarray,
) -> np.ndarray:
"""Computes the variances conditional on given parameters
Args:
params (List[float]): [omega, alpha, gamma, beta]
resids (np.ndarray): residuals from mean equation
backcast (float): backcast value
var_bounds (np.ndarray): variance bounds
Returns:
np.ndarray: conditional variance
"""
if USE_CPP_EXTENSION:
return ext.compute_gjrgarch_variance(params, resids, backcast, var_bounds)
# fmt: off
omega, alpha, gamma, beta = params
sigma2 = np.zeros_like(resids)
sigma2[0] = omega + (alpha + gamma/2 + beta) * backcast
for t in range(1, len(resids)):
sigma2[t] = omega + alpha * (resids[t - 1] ** 2) + beta * sigma2[t - 1]
sigma2[t] += gamma * resids[t - 1] ** 2 if resids[t - 1] < 0 else 0
sigma2[t] = GJRGARCHModel.bounds_check(sigma2[t], var_bounds[t])
return sigma2
[docs]
@staticmethod
def forecast_variance(
params: Parameters,
resids: np.ndarray,
initial_variance: float,
) -> np.ndarray:
"""Forecast the variances conditional on given parameters and residuals.
Args:
params (Parameters): :class:`frds.algorithms.GJRGARCHModel.Parameters`
resids (np.ndarray): residuals to use
initial_variance (float): starting value of variance forecasts
Returns:
np.ndarray: conditional variance
"""
# fmt: off
omega, alpha, gamma, beta = params.omega, params.alpha, params.gamma, params.beta
sigma2 = np.zeros_like(resids)
sigma2[0] = initial_variance
for t in range(1, len(resids)):
sigma2[t] = omega + alpha * (resids[t - 1] ** 2) + beta * sigma2[t - 1]
sigma2[t] += gamma * resids[t - 1] ** 2 if resids[t - 1] < 0 else 0
return sigma2[1:]
[docs]
def starting_values(self, resids: np.ndarray) -> List[float]:
"""Finds the optimal initial values for the volatility model via a grid
search. For varying target persistence and alpha values, return the
combination of alpha and beta that gives the highest loglikelihood.
Args:
resids (np.ndarray): residuals from the mean model
Returns:
List[float]: [omega, alpha, gamma, beta]
"""
# Candidate target persistence - we don't have a prior knowledge here
persistence_grid = [0.5, 0.7, 0.9, 0.98]
# Candidate alpha values
alpha_grid = [0.01, 0.05, 0.1, 0.2]
gamma_grid = alpha_grid
# Sample variance
var = self.returns.var()
# Backcast initial value for the volaility process
var_bounds = self.variance_bounds(resids)
backcast = self.backcast(resids)
initial_params = []
max_likelihood = -np.inf
# fmt: off
for alpha, gamma, p in itertools.product(*[alpha_grid, gamma_grid, persistence_grid]):
# Note that (alpha+beta) is the "persistence"
beta = p - alpha - gamma/2
# Note that the unconditional variance is omega/(1-persistence).
# As we are guessing the initial value, we use the sample variance
# as the unconditional variance to guess the omega value.
omega = var * (1 - p)
params = [omega, alpha, gamma, beta]
sigma2 = self.compute_variance(params, resids, backcast, var_bounds)
if -self.loglikelihood(resids, sigma2) > max_likelihood:
initial_params = params
return initial_params
if __name__ == "__main__":
import pandas as pd
from pprint import pprint
df = pd.read_stata(
"https://www.stata-press.com/data/r18/stocks.dta", convert_dates=["date"]
)
df.set_index("date", inplace=True)
# Scale returns to percentage returns for better optimization results
returns = df["nissan"].to_numpy() * 100
g = GARCHModel(returns)
res = g.fit()
pprint(res)
gjr = GJRGARCHModel(returns)
res = gjr.fit()
pprint(res)
from arch import arch_model
m = arch_model(returns, p=1, o=1, q=1)
print(m.fit(disp=False))