Source code for frds.measures._long_run_mes

from typing import Tuple
import numpy as np
from frds.algorithms import GJRGARCHModel, GJRGARCHModel_DCC

USE_CPP_EXTENSION = True
try:
    from frds.measures import measures_ext as ext
except ImportError:
    USE_CPP_EXTENSION = False


[docs] class LongRunMarginalExpectedShortfall: """:doc:`/measures/long_run_mes`"""
[docs] def __init__(self, firm_returns: np.ndarray, market_returns: np.ndarray) -> None: """__init__ Args: firm_returns (np.ndarray): ``(n_days,)`` array of the firm raw returns. market_returns (np.ndarray): ``(n_days,)`` array of the market raw returns. .. note:: Raw returns should be used! They are automatically converted to log returns. Do NOT use percentage returns. """ # Convert raw returns to log returns self.firm_returns = np.log(1 + np.asarray(firm_returns, dtype=np.float64)) self.market_returns = np.log(1 + np.asarray(market_returns, dtype=np.float64)) assert self.firm_returns.shape == self.market_returns.shape # Scale to percentage (log) returns # This is for better (GJR)GARCH estimation self.firm_returns *= 100 self.market_returns *= 100 self.firm_model = GJRGARCHModel(self.firm_returns) self.market_model = GJRGARCHModel(self.market_returns) self.dcc_model = GJRGARCHModel_DCC(self.firm_model, self.market_model)
[docs] def estimate(self, h=22, S=10_000, C=-0.1, random_seed=42) -> float: """h-step-ahead LRMES forecasts conditional on a systemic event of market decline C Args: h (int, optional): h-period-ahead prediction horizon. Defaults to 22. S (int, optional): sample size used in simulation to generate LRMES forecasts. Defaults to 10000. C (float, optional): market decline used to define systemic event. Defaults to -0.1, i.e. -10%. random_seed (int, optional): random seed. Defaults to 42. Returns: float: the firm's LRMES forecast """ rng = np.random.default_rng(random_seed) # Estimate the (GJR)GARCH-DCC model self.dcc_model.fit() if not self.dcc_model.estimation_success: raise RuntimeError("(GJR)GARCH-DCC model estimation failed!") # Construct GJR-GARCH-DCC standardized innovations for the sample # See the step 1 of Computing LRMES section firm_variances = self.firm_model.sigma2 firm_resids = self.firm_model.resids market_variances = self.market_model.sigma2 market_resids = self.market_model.resids # Conditional correlations a, b = self.dcc_model.parameters.a, self.dcc_model.parameters.b rho = self.dcc_model.conditional_correlations(a, b) # Standarized residuals z_m = market_resids / np.sqrt(market_variances) z_i = firm_resids / np.sqrt(firm_variances) # Firm shock orthogonal to market xi_i = (z_i - rho * z_m) / np.sqrt(1 - rho**2) sample = np.array([xi_i, z_m]) # Sample with replacement S*h innovations # sample = sample.T[rng.choice(sample.shape[1], (S, h), replace=True)] sample = np.zeros((S, h, 2)) # Sample with replacement for s in range(S): idx = rng.choice(len(xi_i), size=h, replace=True) sample[s, :, 0] = xi_i[idx] sample[s, :, 1] = z_m[idx] assert sample.shape == (S, h, 2) Q_bar = np.corrcoef(z_i, z_m) firm_avg_return = 0.0 n_systemic_event = 0 for s in range(S): # Each simulation inv = sample[s, :, :] # shape=(h,2) assert inv.shape == (h, 2) if USE_CPP_EXTENSION: firm_return, systemic_event = ext.simulation( inv, C, self.firm_model.sigma2[-1], self.market_model.sigma2[-1], self.firm_model.resids[-1], self.market_model.resids[-1], self.dcc_model.parameters.a, self.dcc_model.parameters.b, rho[-1], Q_bar, self.firm_model.parameters.mu, self.firm_model.parameters.omega, self.firm_model.parameters.alpha, self.firm_model.parameters.gamma, self.firm_model.parameters.beta, self.market_model.parameters.mu, self.market_model.parameters.omega, self.market_model.parameters.alpha, self.market_model.parameters.gamma, self.market_model.parameters.beta, ) else: firm_return, systemic_event = self.simulation( innovation=inv, C=C, firm_var=self.firm_model.sigma2[-1], mkt_var=self.market_model.sigma2[-1], firm_resid=self.firm_model.resids[-1], mkt_resid=self.market_model.resids[-1], a=self.dcc_model.parameters.a, b=self.dcc_model.parameters.b, rho=rho[-1], Q_bar=Q_bar, ) firm_avg_return += firm_return n_systemic_event += systemic_event if n_systemic_event == 0: return 0.0 return -firm_avg_return / n_systemic_event
[docs] def simulation( self, innovation: np.ndarray, C: float, firm_var: float, mkt_var: float, firm_resid: float, mkt_resid: float, a: float, b: float, rho: float, Q_bar: np.ndarray, ) -> Tuple[float, bool]: """A simulation to compute the firm's return given the parameters. This method should be used internally. Args: innovation (np.ndarray): ``(h,2)`` array of market and firm innovations C (float): market decline used to define systemic event. Defaults to -0.1, i.e. -10%. firm_var (float): the firm conditional variance at time :math:`T`, used as starting value in forecast mkt_var (float): the market conditional variance at time :math:`T`, used as starting value in forecast firm_resid (float): the firm residual at time :math:`T`, used as starting value in forecast mkt_resid (float): the market residual at time :math:`T`, used as starting value in forecast a (float): DCC parameter b (float): DCC parameter rho (float): the last conditional correlation at time :math:`T`, used as starting value in forecast Q_bar (np.ndarray): ``(2,2)`` array of sample correlation matrix of standarized residuals Returns: Tuple[float, bool]: tuple of the firm return and whether a systemic event occurs """ q_i_bar = Q_bar[0, 0] q_m_bar = Q_bar[1, 1] q_im_bar = Q_bar[1, 0] q_i, q_m, q_im = 1.0, 1.0, rho pi = self.firm_model.parameters mu_i = pi.mu omega_i, alpha_i, gamma_i, beta_i = pi.omega, pi.alpha, pi.gamma, pi.beta pm = self.market_model.parameters mu_m = pm.mu omega_m, alpha_m, gamma_m, beta_m = pm.omega, pm.alpha, pm.gamma, pm.beta firm_return = np.empty(shape=len(innovation)) mkt_return = np.empty(shape=len(innovation)) # lagged residuals resid_i_tm1 = firm_resid resid_m_tm1 = mkt_resid # lagged conditonal variance firm_var_tm1 = firm_var mkt_var_tm1 = mkt_var # lagged standarized residuals firm_innov_tm1 = resid_i_tm1 / np.sqrt(firm_var_tm1) mkt_innov_tm1 = resid_m_tm1 / np.sqrt(mkt_var_tm1) for h in range(len(innovation)): # fmt: off # Each iteration is a one-step-ahead forecast # conditional variance this time firm_var_t = omega_i + alpha_i * (resid_i_tm1**2) + beta_i * firm_var_tm1 firm_var_t += gamma_i * (resid_i_tm1**2) if resid_i_tm1 < 0 else 0 mkt_var_t = omega_m + alpha_m * (resid_m_tm1**2) + beta_m * mkt_var_tm1 mkt_var_t += gamma_m * (resid_m_tm1**2) if resid_m_tm1 < 0 else 0 # conditional correlation this time q_i = (1 - a - b) * q_i_bar + a * firm_innov_tm1**2 + b * q_i q_m = (1 - a - b) * q_m_bar + a * mkt_innov_tm1**2 + b * q_m q_im = (1 - a - b) * q_im_bar + a * firm_innov_tm1*mkt_innov_tm1 + b * q_im rho_h = q_im / np.sqrt(q_i * q_m) # innovations this time firm_innov = innovation[h, 0] mkt_innov = innovation[h, 1] # market excess return # or, residual this time, conditional volatility * innovation (z_m) epsilon_m = np.sqrt(mkt_var_t) * mkt_innov # Beta of the firm = cov/var(mkt) # Beta = rho_h * np.sqrt(firm_var_t) / np.sqrt(mkt_var_t) # epsilon_i = Beta * epsilon_m epsilon_i = np.sqrt(firm_var_t) * ( rho_h * mkt_innov + np.sqrt(1 - rho_h**2) * firm_innov ) mkt_return[h] = mu_m + epsilon_m firm_return[h] = mu_i + epsilon_i firm_var_tm1 = firm_var_t mkt_var_tm1 = mkt_var_t resid_i_tm1 = epsilon_i resid_m_tm1 = epsilon_m firm_innov_tm1 = resid_i_tm1 / np.sqrt(firm_var_tm1) mkt_innov_tm1 = mkt_innov # Convert back to original scale mkt_return /= 100 firm_return /= 100 # systemic event if over the prediction horizon, # the market falls by more than C systemic_event = np.exp(np.sum(mkt_return)) - 1 < C # no need to simulate firm returns if there is no systemic event if not systemic_event: return 0.0, False # firm return over the horizon firmret = np.exp(np.sum(firm_return)) - 1 return firmret, True