Source code for frds.algorithms._mgarch

import warnings
from typing import List, Tuple, Union
import itertools
from dataclasses import dataclass, asdict
import numpy as np
from scipy.optimize import minimize, OptimizeResult

from frds.algorithms import GARCHModel, GJRGARCHModel

USE_CPP_EXTENSION = True

try:
    import frds.algorithms.utils.utils_ext as ext
except ImportError:
    USE_CPP_EXTENSION = False


[docs] class GARCHModel_CCC: """:doc:`/algorithms/garch-ccc` model with the following specification: - Bivariate - Constant mean - Normal noise It estimates the model parameters only. No standard errors calculated. """ @dataclass class Parameters: mu1: float = np.nan omega1: float = np.nan alpha1: float = np.nan beta1: float = np.nan mu2: float = np.nan omega2: float = np.nan alpha2: float = np.nan beta2: float = np.nan rho: float = np.nan loglikelihood: float = np.nan
[docs] def __init__(self, returns1: np.ndarray, returns2: np.ndarray) -> None: """__init__ Args: returns1 (np.ndarray): ``(T,)`` array of ``T`` returns of first asset returns2 (np.ndarray): ``(T,)`` array of ``T`` returns of second asset .. note:: ``returns`` is best to be percentage returns for optimization """ self.returns1 = np.asarray(returns1, dtype=np.float64) self.returns2 = np.asarray(returns2, dtype=np.float64) self.model1 = GARCHModel(self.returns1) self.model2 = GARCHModel(self.returns2) self.estimation_success = False self.parameters = type(self).Parameters()
[docs] def fit(self) -> Parameters: """Estimates the Multivariate GARCH(1,1)-CCC parameters via MLE Returns: Parameters: :class:`frds.algorithms.GARCHModel_CCC.Parameters` """ if self.estimation_success: return self.parameters m1, m2 = self.model1, self.model2 m1.fit() m2.fit() z1 = m1.resids / np.sqrt(m1.sigma2) z2 = m2.resids / np.sqrt(m2.sigma2) rho = np.corrcoef(z1, z2)[1, 0] m1_params = list(asdict(m1.parameters).values())[:-1] m2_params = list(asdict(m2.parameters).values())[:-1] starting_vals = [*m1_params, *m2_params, rho] # Step 4. Set bounds for parameters bounds = [ # For first returns (None, None), # No bounds for mu (1e-6, None), # Lower bound for omega (0.0, 1.0), # Bounds for alpha (0.0, 1.0), # Boudns for beta # For second returns (None, None), # No bounds for mu (1e-6, None), # Lower bound for omega (0.0, 1.0), # Bounds for alpha (0.0, 1.0), # Boudns for beta # Constant correlation (-0.99, 0.99), # Bounds for rho ] # Step 5. Set constraint for stationarity def persistence_smaller_than_one_1(params: List[float]): mu1, omega1, alpha1, beta1, mu2, omega2, alpha2, beta2, rho = params return 1.0 - (alpha1 + beta1) def persistence_smaller_than_one_2(params: List[float]): mu1, omega1, alpha1, beta1, mu2, omega2, alpha2, beta2, rho = params return 1.0 - (alpha2 + beta2) # fmt: off with warnings.catch_warnings(): warnings.filterwarnings( "ignore", "Values in x were outside bounds during a minimize step", RuntimeWarning, ) opt: OptimizeResult = minimize( self.loglikelihood_model, starting_vals, args=(m1.backcast_value, m2.backcast_value, m1.var_bounds, m2.var_bounds), method="SLSQP", bounds=bounds, constraints=[ {"type": "ineq", "fun": persistence_smaller_than_one_1}, {"type": "ineq", "fun": persistence_smaller_than_one_2}, ], ) if opt.success: self.estimation_success = True self.parameters = type(self).Parameters(*list(opt.x, ), loglikelihood=-opt.fun) return self.parameters
[docs] def loglikelihood_model( self, params: np.ndarray, backcast1: float, backcast2: float, var_bounds1: np.ndarray, var_bounds2: np.ndarray, ) -> float: """Calculates the negative log-likelihood based on the current ``params``. Args: params (np.ndarray): [mu1, omega1, alpha1, beta1, mu2, omega2, alpha2, beta2, rho] backcast1 (float): Backcast value for initializing the first return series variance. backcast2 (float): Backcast value for initializing the second return series variance. var_bounds1 (np.ndarray): Array of variance bounds for the first return series. var_bounds2 (np.ndarray): Array of variance bounds for the second return series. Returns: float: negative log-likelihood """ # fmt: off mu1, omega1, alpha1, beta1, mu2, omega2, alpha2, beta2, rho = params resids1 = self.returns1 - mu1 resids2 = self.returns2 - mu2 var_params1 = [omega1, alpha1, beta1] var_params2 = [omega2, alpha2, beta2] backcast1 = GARCHModel.backcast(resids1) backcast2 = GARCHModel.backcast(resids2) var_bounds1 = GARCHModel.variance_bounds(resids1) var_bounds2 = GARCHModel.variance_bounds(resids2) sigma2_1 = GARCHModel.compute_variance(var_params1, resids1, backcast1, var_bounds1) sigma2_2 = GARCHModel.compute_variance(var_params2, resids2, backcast2, var_bounds2) negative_loglikelihood = -self.loglikelihood(resids1, sigma2_1, resids2, sigma2_2, rho) return negative_loglikelihood
[docs] def loglikelihood( self, resids1: np.ndarray, sigma2_1: np.ndarray, resids2: np.ndarray, sigma2_2: np.ndarray, rho: float, ) -> float: """ Computes the log-likelihood for a bivariate GARCH(1,1) model with constant correlation. Args: resids1 (np.ndarray): Residuals for the first return series. sigma2_1 (np.ndarray): Array of conditional variances for the first return series. resids2 (np.ndarray): Residuals for the second return series. sigma2_2 (np.ndarray): Array of conditional variances for the second return series. rho (float): Constant correlation. Returns: float: The log-likelihood value for the bivariate model. """ # z1 and z2 are standardized residuals z1 = resids1 / np.sqrt(sigma2_1) z2 = resids2 / np.sqrt(sigma2_2) # fmt: off log_likelihood_terms = -0.5 * ( 2 * np.log(2 * np.pi) + np.log(sigma2_1 * sigma2_2 * (1 - rho ** 2)) + (z1 ** 2 + z2 ** 2 - 2 * rho * z1 * z2) / (1 - rho ** 2) ) log_likelihood = np.sum(log_likelihood_terms) return log_likelihood
[docs] class GARCHModel_DCC(GARCHModel_CCC): """:doc:`/algorithms/garch-dcc` model with the following specification: - Bivariate - Constant mean - Normal noise It estimates the model parameters only. No standard errors calculated. """ @dataclass class Parameters: mu1: float = np.nan omega1: float = np.nan alpha1: float = np.nan beta1: float = np.nan mu2: float = np.nan omega2: float = np.nan alpha2: float = np.nan beta2: float = np.nan a: float = np.nan b: float = np.nan loglikelihood: float = np.nan
[docs] def __init__( self, returns1: Union[np.ndarray, GARCHModel], returns2: Union[np.ndarray, GARCHModel], ) -> None: """__init__ Args: returns1 (Union[np.ndarray, GARCHModel]): ``(T,)`` array of ``T`` returns of first asset. Can also be an :class:`frds.algorithms.GARCHModel`. returns2 (Union[np.ndarray, GARCHModel]): ``(T,)`` array of ``T`` returns of second asset. Can also be an :class:`frds.algorithms.GARCHModel`. .. note:: If ``returns`` is an array, it is best to be percentage returns for optimization. Estimated :class:`frds.algorithms.GARCHModel` can be used to save computation time. """ if isinstance(returns1, np.ndarray): self.returns1 = np.asarray(returns1, dtype=np.float64) self.model1 = GARCHModel(self.returns1) if isinstance(returns2, np.ndarray): self.returns2 = np.asarray(returns2, dtype=np.float64) self.model2 = GARCHModel(self.returns2) if isinstance(returns1, GARCHModel): self.model1 = returns1 self.returns1 = self.model1.returns if isinstance(returns2, GARCHModel): self.model2 = returns2 self.returns2 = self.model2.returns self.estimation_success = False self.parameters = type(self).Parameters()
[docs] def fit(self) -> Parameters: """Estimates the Multivariate GARCH(1,1)-DCC parameters via twp-step QML Returns: Parameters: :class:`frds.algorithms.GARCHModel_DCC.Parameters` """ if self.estimation_success: return self.parameters m1, m2 = self.model1, self.model2 m1.fit() m2.fit() a, b = self.starting_values() m1_params = list(asdict(m1.parameters).values())[:-1] m2_params = list(asdict(m2.parameters).values())[:-1] self.parameters = type(self).Parameters(*m1_params, *m2_params) starting_vals = [a, b] bounds = [ # DCC parameters (0.0, 1.0), # Bounds for a (0.0, 1.0), # Bounds for b ] def a_plus_b_smaller_than_one(params: List[float]): a, b = params return 1.0 - (a + b) # fmt: off with warnings.catch_warnings(): warnings.filterwarnings( "ignore", "Values in x were outside bounds during a minimize step", RuntimeWarning, ) opt: OptimizeResult = minimize( self.loglikelihood_model, starting_vals, args=(), method="SLSQP", bounds=bounds, constraints=[ {"type": "ineq", "fun": a_plus_b_smaller_than_one}, ], ) if opt.success: self.estimation_success = True a, b = list(opt.x) self.parameters.a = a self.parameters.b = b self.parameters.loglikelihood=-opt.fun return self.parameters
[docs] def starting_values(self) -> Tuple[float, float]: """Use a grid search to find the starting values for a and b Returns: Tuple[float, float]: [a, b] """ a_grid = np.linspace(0.01, 0.5, 10) b_grid = np.linspace(0.6, 0.95, 10) max_ll = -np.inf initial_values = [0.01, 0.8] for a, b in itertools.product(a_grid, b_grid): if a + b >= 1: continue ll = -self.loglikelihood_model([a, b]) if ll > max_ll: initial_values = [a, b] return initial_values
[docs] def loglikelihood_model(self, params: np.ndarray) -> float: """Calculates the negative log-likelihood based on the current ``params``. Args: params (np.ndarray): [a, b] Returns: float: negative log-likelihood """ # fmt: off a, b = params resids1 = self.model1.resids resids2 = self.model2.resids sigma2_1 = self.model1.sigma2 sigma2_2 = self.model2.sigma2 # z1 and z2 are standardized residuals z1 = resids1 / np.sqrt(sigma2_1) z2 = resids2 / np.sqrt(sigma2_2) # The loglikelihood of the variance component (Step 1) l1 = self.model1.parameters.loglikelihood + self.model2.parameters.loglikelihood # The loglikelihood of the correlation component (Step 2) rho = self.conditional_correlations(a, b) # TODO: rare case rho is out of bounds rho = np.clip(rho, -0.99, 0.99) log_likelihood_terms = -0.5 * ( - (z1**2 + z2**2) + np.log(1 - rho ** 2) + (z1 ** 2 + z2 ** 2 - 2 * rho * z1 * z2) / (1 - rho ** 2) ) l2 = np.sum(log_likelihood_terms) negative_loglikelihood = - (l1 + l2) return negative_loglikelihood
[docs] def conditional_correlations(self, a: float, b: float) -> np.ndarray: """Computes the conditional correlations based on given a and b. Other parameters are Args: a (float): DCC parameter b (float): DCC parameter Returns: np.ndarray: array of conditional correlations """ self.model1.fit() # in case it was not estimated, no performance loss self.model2.fit() # in case it was not estimated if USE_CPP_EXTENSION: return ext.dcc_conditional_correlation( a, b, self.model1.resids, self.model2.resids, self.model1.sigma2, self.model2.sigma2, ) resids1 = self.model1.resids resids2 = self.model2.resids sigma2_1 = self.model1.sigma2 sigma2_2 = self.model2.sigma2 # z1 and z2 are standardized residuals z1 = resids1 / np.sqrt(sigma2_1) z2 = resids2 / np.sqrt(sigma2_2) Q_bar = np.corrcoef(z1, z2) q_11_bar, q_12_bar, q_22_bar = Q_bar[0, 0], Q_bar[0, 1], Q_bar[1, 1] T = len(z1) q11 = np.empty_like(z1) q12 = np.empty_like(z1) q22 = np.empty_like(z1) rho = np.zeros_like(z1) q11[0] = q_11_bar q22[0] = q_22_bar q12[0] = q_12_bar rho[0] = q12[0] / np.sqrt(q11[0] * q22[0]) for t in range(1, T): q11[t] = (1 - a - b) * q_11_bar + a * z1[t - 1] ** 2 + b * q11[t - 1] q22[t] = (1 - a - b) * q_22_bar + a * z2[t - 1] ** 2 + b * q22[t - 1] q12[t] = (1 - a - b) * q_12_bar + a * z1[t - 1] * z2[t - 1] + b * q12[t - 1] rho[t] = q12[t] / np.sqrt(q11[t] * q22[t]) return rho
[docs] class GJRGARCHModel_DCC(GARCHModel_DCC): """:doc:`/algorithms/gjr-garch-dcc` model with the following specification: - Bivariate - Constant mean - Normal noise It estimates the model parameters only. No standard errors calculated. """ @dataclass class Parameters: mu1: float = np.nan omega1: float = np.nan alpha1: float = np.nan gamma1: float = np.nan beta1: float = np.nan mu2: float = np.nan omega2: float = np.nan alpha2: float = np.nan gamma2: float = np.nan beta2: float = np.nan a: float = np.nan b: float = np.nan loglikelihood: float = np.nan
[docs] def __init__( self, returns1: Union[np.ndarray, GJRGARCHModel], returns2: Union[np.ndarray, GJRGARCHModel], ) -> None: """__init__ Args: returns1 (Union[np.ndarray, GJRGARCHModel]): ``(T,)`` array of ``T`` returns of first asset. Can also be an :class:`frds.algorithms.GJRGARCHModel`. returns2 (Union[np.ndarray, GJRGARCHModel]): ``(T,)`` array of ``T`` returns of second asset. Can also be an :class:`frds.algorithms.GJRGARCHModel`. .. note:: If ``returns`` is an array, it is best to be percentage returns for optimization. Estimated :class:`frds.algorithms.GJRGARCHModel` can be used to save computation time. """ if isinstance(returns1, np.ndarray): self.returns1 = np.asarray(returns1, dtype=np.float64) self.model1 = GJRGARCHModel(self.returns1) if isinstance(returns2, np.ndarray): self.returns2 = np.asarray(returns2, dtype=np.float64) self.model2 = GJRGARCHModel(self.returns2) if isinstance(returns1, GJRGARCHModel): self.model1 = returns1 self.returns1 = self.model1.returns if isinstance(returns2, GJRGARCHModel): self.model2 = returns2 self.returns2 = self.model2.returns self.estimation_success = False self.parameters = type(self).Parameters()
[docs] def fit(self) -> Parameters: """Estimates the Multivariate GJR-GARCH(1,1)-DCC parameters via twp-step QML Returns: Parameters: :class:`frds.algorithms.GJRGARCHModel_DCC.Parameters` """ return super().fit()
if __name__ == "__main__": import pandas as pd from pprint import pprint # df = pd.read_stata( # "https://www.stata-press.com/data/r18/stocks.dta", convert_dates=["date"] # ) df = pd.read_stata("~/Downloads/stocks.dta", convert_dates=["date"]) df.set_index("date", inplace=True) # Scale returns to percentage returns for better optimization results toyota = df["toyota"].to_numpy() * 100 nissan = df["nissan"].to_numpy() * 100 honda = df["honda"].to_numpy() * 100 model = GARCHModel_CCC(toyota, nissan) res = model.fit() pprint(res) dcc = GARCHModel_DCC(toyota, nissan) res = dcc.fit() pprint(res) toyota_garch = GARCHModel(toyota) nissan_garch = GARCHModel(nissan) dcc = GARCHModel_DCC(toyota_garch, nissan) res = dcc.fit() pprint(res) dcc = GARCHModel_DCC(toyota_garch, nissan_garch) res = dcc.fit() pprint(res) dcc = GJRGARCHModel_DCC(GJRGARCHModel(toyota), GJRGARCHModel(nissan)) res = dcc.fit() pprint(res)