Source code for frds.algorithms._garch

import warnings
import itertools
from typing import List
from dataclasses import dataclass
import numpy as np
from scipy.optimize import minimize, OptimizeResult

USE_CPP_EXTENSION = True
try:
    import frds.algorithms.utils.utils_ext as ext
except ImportError:
    USE_CPP_EXTENSION = False


[docs] class GARCHModel: """:doc:`/algorithms/garch` model with constant mean and Normal noise It estimates the model parameters only. No standard errors calculated. This code is heavily based on the `arch <https://arch.readthedocs.io/>`_. Modifications are made for easier understaing of the code flow. """ @dataclass class Parameters: mu: float = np.nan omega: float = np.nan alpha: float = np.nan beta: float = np.nan loglikelihood: float = np.nan
[docs] def __init__(self, returns: np.ndarray, zero_mean=False) -> None: """__init__ Args: returns (np.ndarray): ``(T,)`` array of ``T`` returns zero_mean (bool): whether to use a zero mean returns model. Default to False. .. note:: ``returns`` is best to be percentage returns for optimization """ self.returns = np.asarray(returns, dtype=np.float64) self.estimation_success = False self.loglikelihood_final = np.nan self.parameters = type(self).Parameters() self.resids = np.empty_like(self.returns) self.sigma2 = np.empty_like(self.returns) self.backcast_value = np.nan self.var_bounds: np.ndarray = None self.zero_mean = zero_mean
[docs] def fit(self) -> Parameters: """Estimates the GARCH(1,1) parameters via MLE Returns: params: :class:`frds.algorithms.GARCHModel.Parameters` """ # No repeated estimation? if self.estimation_success: return starting_vals = self.preparation() # Set bounds for parameters bounds = [ (-np.inf, np.inf), # No bounds for mu (1e-6, np.inf), # Lower bound for omega (0.0, 1.0), # Bounds for alpha (0.0, 1.0), # Boudns for beta ] if self.zero_mean: bounds = bounds[1:] # Set constraint for stationarity def persistence_smaller_than_one(params: List[float]): alpha, beta = params[-2:] return 1.0 - (alpha + beta) # MLE via minimizing the negative log-likelihood # fmt: off with warnings.catch_warnings(): warnings.filterwarnings( "ignore", "Values in x were outside bounds during a minimize step", RuntimeWarning, ) opt: OptimizeResult = minimize( self.loglikelihood_model, starting_vals, args=(self.backcast_value, self.var_bounds, self.zero_mean), method="SLSQP", bounds=bounds, constraints={"type": "ineq", "fun": persistence_smaller_than_one}, ) if opt.success: self.estimation_success = True if self.zero_mean: self.parameters = type(self).Parameters(0.0, *list(opt.x), loglikelihood=-opt.fun) else: self.parameters = type(self).Parameters(*list(opt.x), loglikelihood=-opt.fun) self.resids = self.returns - self.parameters.mu return self.parameters
[docs] def preparation(self) -> List[float]: """Prepare starting values. Returns: List[float]: list of starting values """ # Compute a starting value for volatility process by backcasting if self.zero_mean: resids = self.returns else: resids = self.returns - np.mean(self.returns) self.backcast_value = self.backcast(resids) # Compute a loose bound for the volatility process # This is to avoid NaN in MLE by avoiding zero/negative variance, # as well as unreasonably large variance. self.var_bounds = self.variance_bounds(resids) # Compute the starting values for MLE # Starting values for the volatility process var_params = self.starting_values(resids) # Starting value for mu is the sample mean return initial_mu = self.returns.mean() # Starting values are [mu, omega, alpha, beta] starting_vals = [initial_mu, *var_params] return starting_vals if not self.zero_mean else var_params
[docs] def loglikelihood_model( self, params: np.ndarray, backcast: float, var_bounds: np.ndarray, zero_mean=False, ) -> float: """Calculates the negative log-likelihood based on the current ``params``. This function is used in optimization. Args: params (np.ndarray): [mu, omega, alpha, (gamma), beta] backcast (float): backcast value var_bounds (np.ndarray): variance bounds Returns: float: negative log-likelihood """ if zero_mean: resids = self.returns self.sigma2 = self.compute_variance(params, resids, backcast, var_bounds) else: resids = self.returns - params[0] # params[0] is mu self.sigma2 = self.compute_variance( params[1:], resids, backcast, var_bounds ) return -self.loglikelihood(resids, self.sigma2)
[docs] @staticmethod def loglikelihood(resids: np.ndarray, sigma2: np.ndarray) -> float: """Computes the log-likelihood assuming residuals are normally distributed conditional on the variance. Args: resids (np.ndarray): residuals to use in computing log-likelihood. sigma2 (np.ndarray): conditional variance of residuals. Returns: float: log-likelihood """ l = -0.5 * (np.log(2 * np.pi) + np.log(sigma2) + resids**2.0 / sigma2) return np.sum(l)
[docs] @staticmethod def compute_variance( params: List[float], resids: np.ndarray, backcast: float, var_bounds: np.ndarray, ) -> np.ndarray: """Computes the variances conditional on given parameters Args: params (List[float]): [omega, alpha, beta] resids (np.ndarray): residuals from mean equation backcast (float): backcast value var_bounds (np.ndarray): variance bounds Returns: np.ndarray: conditional variance """ if USE_CPP_EXTENSION: return ext.compute_garch_variance(params, resids, backcast, var_bounds) omega, alpha, beta = params sigma2 = np.zeros_like(resids) sigma2[0] = omega + (alpha + beta) * backcast for t in range(1, len(resids)): sigma2[t] = omega + alpha * (resids[t - 1] ** 2) + beta * sigma2[t - 1] sigma2[t] = GARCHModel.bounds_check(sigma2[t], var_bounds[t]) return sigma2
[docs] def starting_values(self, resids: np.ndarray) -> List[float]: """Finds the optimal initial values for the volatility model via a grid search. For varying target persistence and alpha values, return the combination of alpha and beta that gives the highest loglikelihood. Args: resids (np.ndarray): residuals from the mean model Returns: List[float]: [omega, alpha, beta] """ # Candidate target persistence - we don't have a prior knowledge here persistence_grid = [0.5, 0.7, 0.9, 0.98] # Candidate alpha values alpha_grid = [0.01, 0.05, 0.1, 0.2] # Sample variance var = self.returns.var() # Backcast initial value for the volaility process initial_params = [] max_likelihood = -np.inf for alpha, p in itertools.product(*[alpha_grid, persistence_grid]): # Note that (alpha+beta) is the "persistence" beta = p - alpha # Note that the unconditional variance is omega/(1-persistence). # As we are guessing the initial value, we use the sample variance # as the unconditional variance to guess the omega value. omega = var * (1 - p) params = [omega, alpha, beta] sigma2 = self.compute_variance( params, resids, self.backcast_value, self.var_bounds ) if -self.loglikelihood(resids, sigma2) > max_likelihood: initial_params = params return initial_params
[docs] @staticmethod def variance_bounds(resids: np.ndarray) -> np.ndarray: """Compute bounds for conditional variances using EWMA. This function calculates the lower and upper bounds for conditional variances based on the residuals provided. The bounds are computed to ensure numerical stability during the parameter estimation process of GARCH models. The function uses Exponentially Weighted Moving Average (EWMA) to estimate the initial variance and then adjusts these estimates to lie within global bounds. Args: resids (np.ndarray): residuals from the mean model. Returns: np.ndarray: an array where each row contains the lower and upper bounds for the conditional variance at each time point. """ T = len(resids) tau = min(75, T) # Compute initial variance using EWMA decay_factor = 0.94 weights = decay_factor ** np.arange(tau) weights /= weights.sum() initial_variance = np.dot(weights, resids[:tau] ** 2) # Compute var_bound using EWMA (assuming ewma_recursion is defined) var_bound = GARCHModel.ewma(resids, initial_variance) # Compute global bounds global_lower_bound = resids.var() / 1e8 global_upper_bound = 1e7 * (1 + (resids**2).max()) # Adjust var_bound to ensure it lies within global bounds var_bound = np.clip(var_bound, global_lower_bound, global_upper_bound) # Create bounds matrix var_bounds = np.vstack((var_bound / 1e6, var_bound * 1e6)).T return np.ascontiguousarray(var_bounds)
[docs] @staticmethod def bounds_check(sigma2: float, var_bounds: np.ndarray) -> float: """Adjust the conditional variance at time t based on its bounds Args: sigma2 (float): conditional variance var_bounds (np.ndarray): lower and upper bounds Returns: float: adjusted conditional variance """ if USE_CPP_EXTENSION: return ext.bounds_check(sigma2, var_bounds) lower, upper = var_bounds[0], var_bounds[1] sigma2 = max(lower, sigma2) if sigma2 > upper: if not np.isinf(sigma2): sigma2 = upper + np.log(sigma2 / upper) else: sigma2 = upper + 1000 return sigma2
[docs] @staticmethod def backcast(resids: np.ndarray) -> float: """Computes the starting value for estimating conditional variance. Args: resids (np.ndarray): residuals Returns: float: initial value from backcasting """ # Limit to first tau observations to reduce computation tau = min(75, resids.shape[0]) # Weights for Exponential Weighted Moving Average (EWMA) w = 0.94 ** np.arange(tau) w = w / sum(w) # Ensure weights add up to 1 # Let the initial value to be the EWMA of first tau observations return float(np.sum((resids[:tau] ** 2) * w))
[docs] @staticmethod def ewma(resids: np.ndarray, initial_value: float, lam=0.94) -> np.ndarray: """Compute the conditional variance estimates using Exponentially Weighted Moving Average (EWMA). Args: resids (np.ndarray): Residuals from the model. initial_value (float): Initial value for the conditional variance. lam (float): Decay factor for the EWMA. Returns: np.ndarray: Array containing the conditional variance estimates. """ if USE_CPP_EXTENSION: return ext.ewma(resids, initial_value, lam) T = len(resids) variance = np.empty(T) variance[0] = initial_value # Set the initial value # Compute the squared residuals squared_resids = resids**2 # Compute the EWMA using the decay factors and squared residuals for t in range(1, T): variance[t] = lam * variance[t - 1] + (1 - lam) * squared_resids[t - 1] return variance
[docs] class GJRGARCHModel(GARCHModel): """:doc:`/algorithms/gjr-garch` model with constant mean and Normal noise It estimates the model parameters only. No standard errors calculated. This code is heavily based on the `arch <https://arch.readthedocs.io/>`_. Modifications are made for easier understaing of the code flow. """ @dataclass class Parameters: mu: float = np.nan omega: float = np.nan alpha: float = np.nan gamma: float = np.nan beta: float = np.nan loglikelihood: float = np.nan
[docs] def fit(self) -> Parameters: """Estimates the GJR-GARCH(1,1) parameters via MLE Returns: List[float]: [mu, omega, alpha, gamma, beta, loglikelihood] """ starting_vals = self.preparation() bounds = [ (-np.inf, np.inf), # No bounds for mu (1e-6, np.inf), # Lower bound for omega (1e-6, 1.0), # Bounds for alpha (1e-6, 1.0), # Bounds for gamma (1e-6, 1.0), # Boudns for beta ] if self.zero_mean: bounds = bounds[1:] # Set constraint for stationarity def persistence_smaller_than_one(params: List[float]): alpha, gamma, beta = params[-3:] return 1.0 - (alpha + beta + gamma / 2) # MLE via minimizing the negative log-likelihood with warnings.catch_warnings(): warnings.filterwarnings( "ignore", "Values in x were outside bounds during a minimize step", RuntimeWarning, ) opt: OptimizeResult = minimize( self.loglikelihood_model, starting_vals, args=(self.backcast_value, self.var_bounds, self.zero_mean), method="SLSQP", bounds=bounds, constraints={"type": "ineq", "fun": persistence_smaller_than_one}, ) if opt.success: self.estimation_success = True if self.zero_mean: self.parameters = type(self).Parameters( 0.0, *list(opt.x), loglikelihood=-opt.fun ) else: self.parameters = type(self).Parameters( *list(opt.x), loglikelihood=-opt.fun ) self.resids = self.returns - self.parameters.mu return self.parameters
[docs] @staticmethod def compute_variance( params: List[float], resids: np.ndarray, backcast: float, var_bounds: np.ndarray, ) -> np.ndarray: """Computes the variances conditional on given parameters Args: params (List[float]): [omega, alpha, gamma, beta] resids (np.ndarray): residuals from mean equation backcast (float): backcast value var_bounds (np.ndarray): variance bounds Returns: np.ndarray: conditional variance """ if USE_CPP_EXTENSION: return ext.compute_gjrgarch_variance(params, resids, backcast, var_bounds) # fmt: off omega, alpha, gamma, beta = params sigma2 = np.zeros_like(resids) sigma2[0] = omega + (alpha + gamma/2 + beta) * backcast for t in range(1, len(resids)): sigma2[t] = omega + alpha * (resids[t - 1] ** 2) + beta * sigma2[t - 1] sigma2[t] += gamma * resids[t - 1] ** 2 if resids[t - 1] < 0 else 0 sigma2[t] = GJRGARCHModel.bounds_check(sigma2[t], var_bounds[t]) return sigma2
[docs] @staticmethod def forecast_variance( params: Parameters, resids: np.ndarray, initial_variance: float, ) -> np.ndarray: """Forecast the variances conditional on given parameters and residuals. Args: params (Parameters): :class:`frds.algorithms.GJRGARCHModel.Parameters` resids (np.ndarray): residuals to use initial_variance (float): starting value of variance forecasts Returns: np.ndarray: conditional variance """ # fmt: off omega, alpha, gamma, beta = params.omega, params.alpha, params.gamma, params.beta sigma2 = np.zeros_like(resids) sigma2[0] = initial_variance for t in range(1, len(resids)): sigma2[t] = omega + alpha * (resids[t - 1] ** 2) + beta * sigma2[t - 1] sigma2[t] += gamma * resids[t - 1] ** 2 if resids[t - 1] < 0 else 0 return sigma2[1:]
[docs] def starting_values(self, resids: np.ndarray) -> List[float]: """Finds the optimal initial values for the volatility model via a grid search. For varying target persistence and alpha values, return the combination of alpha and beta that gives the highest loglikelihood. Args: resids (np.ndarray): residuals from the mean model Returns: List[float]: [omega, alpha, gamma, beta] """ # Candidate target persistence - we don't have a prior knowledge here persistence_grid = [0.5, 0.7, 0.9, 0.98] # Candidate alpha values alpha_grid = [0.01, 0.05, 0.1, 0.2] gamma_grid = alpha_grid # Sample variance var = self.returns.var() # Backcast initial value for the volaility process var_bounds = self.variance_bounds(resids) backcast = self.backcast(resids) initial_params = [] max_likelihood = -np.inf # fmt: off for alpha, gamma, p in itertools.product(*[alpha_grid, gamma_grid, persistence_grid]): # Note that (alpha+beta) is the "persistence" beta = p - alpha - gamma/2 # Note that the unconditional variance is omega/(1-persistence). # As we are guessing the initial value, we use the sample variance # as the unconditional variance to guess the omega value. omega = var * (1 - p) params = [omega, alpha, gamma, beta] sigma2 = self.compute_variance(params, resids, backcast, var_bounds) if -self.loglikelihood(resids, sigma2) > max_likelihood: initial_params = params return initial_params
if __name__ == "__main__": import pandas as pd from pprint import pprint df = pd.read_stata( "https://www.stata-press.com/data/r18/stocks.dta", convert_dates=["date"] ) df.set_index("date", inplace=True) # Scale returns to percentage returns for better optimization results returns = df["nissan"].to_numpy() * 100 g = GARCHModel(returns) res = g.fit() pprint(res) gjr = GJRGARCHModel(returns) res = gjr.fit() pprint(res) from arch import arch_model m = arch_model(returns, p=1, o=1, q=1) print(m.fit(disp=False))