Source code for pmdarima.arima.seasonality

# -*- coding: utf-8 -*-
#
# Author: Taylor Smith <taylor.smith@alkaline-ml.com>
#
# Tests for seasonal differencing terms, and seasonal decomposition

from collections import namedtuple

import math
from sklearn.linear_model import LinearRegression
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler

from scipy.linalg import svd
from statsmodels import api as sm
from statsmodels.tools import add_constant

from abc import ABCMeta, abstractmethod
from numpy.linalg import solve
import numpy as np

from .arima import _aicc
from ..compat.numpy import DTYPE
from .stationarity import _BaseStationarityTest
from ..utils.array import c, diff, check_endog

from ._arima import C_canova_hansen_sd_test

__all__ = [
    'CHTest',
    'decompose',
    'OCSBTest'
]


[docs]def decompose(x, type_, m, filter_=None): """ Decompose the time series into trend, seasonal, and random components. Parameters ---------- x : np.array, shape=(n_samples,) The time series of which the trend, seasonal, and noise/random components will be extracted. type_: str The type of decomposition that will be performed - 'multiplicative' or 'additive'. We would use 'multiplicative' generally when we see an increasing trend. We use 'additive' when the trend is relatively stable over time. m: int The frequency in terms of number of observations. This behaves similarly to R's frequency for a time series (ts). filter_: np.array, optional (default=None) A filter by which the convolution will be performed. Returns ------- decomposed_tuple : namedtuple A named tuple with ``x``, ``trend``, ``seasonal``, and ``random`` components where ``x`` is the input signal, ``trend`` is the overall trend, ``seasonal`` is the seasonal component, and `random` is the noisy component. The input signal ``x`` can be mostly reconstructed by the other three components with a number of points missing equal to ``m``. Notes ----- This function is generally used in conjunction with :func:`pmdarima.utils.visualization.decomposed_plot`, which plots the decomposed components. Also there is an example script in the ``examples`` folder of the repo and the ``Examples`` section of the docs as well. References ---------- .. [1] Example of decompose using both multiplicative and additive types: https://anomaly.io/seasonal-trend-decomposition-in-r/index.html .. [2] R documentation for decompose: https://www.rdocumentation.org/packages/stats/versions/3.6.1/topics/decompose """ # noqa: E501 multiplicative = "multiplicative" additive = "additive" is_m_odd = (m % 2 == 1) # Helper function to stay consistent and concise based on 'type_' def _decomposer_helper(a, b): if type_ == multiplicative: return a / b else: return a - b # Since R's ts class has a frequency as input I think this it acceptable # to ask the user for the frequency. try: assert isinstance(m, int) and m > 1 except (ValueError, AssertionError): raise ValueError("'f' should be a positive integer") if filter_ is None: filter_ = np.ones((m,)) / m # We only accept the values in multiplicative or additive if type_ not in (multiplicative, additive): err_msg = "'type_' can only take values '{}' or '{}'" raise ValueError(err_msg.format(multiplicative, additive)) # There needs to be at least 2 periods. This is due to the behavior of # convolutions and how they behave with respect to losing endpoints if (x.shape[0] / m) < 2: raise ValueError("time series has no or less than 2 periods") # Take half of m for the convolution / sma process. half_m = m // 2 trend = np.convolve(x, filter_, mode='valid') if not is_m_odd: trend = trend[:-1] # we remove the final index if m is even. # Remove the effect of the trend on the original signal and pad for reshape sma_xs = range(half_m, len(trend) + half_m) detrend = _decomposer_helper(x[sma_xs], trend) num_seasons = math.ceil((1.0 * trend.shape[0]) / m) pad_length = (num_seasons * m) - trend.shape[0] if pad_length > 0: buffer = pad_length * [np.nan] detrend = np.array(detrend.tolist() + buffer) # Determine the seasonal effect of the signal m_arr = np.reshape(detrend, (num_seasons, m)) seasonal = np.nanmean(m_arr, axis=0).tolist() seasonal = np.array(seasonal[half_m:] + seasonal[:half_m]) temp = seasonal for i in range(m_arr.shape[0]): seasonal = np.concatenate((seasonal, temp)) if pad_length > 0: seasonal = seasonal[:-pad_length] if is_m_odd: seasonal = seasonal[:-1] # We buffer the trend and seasonal components so that they are the same # length as the other outputs. This counters the effects of losing data # by the convolution/sma buffer = [np.nan] * half_m trend = list(buffer + trend.tolist() + buffer) # Remove the trend and seasonal effects from the original signal to get # the random/noisy effects within the original signal. random = _decomposer_helper(_decomposer_helper(x, trend), seasonal) # Create a namedtuple so the output mirrors the output of the R function. decomposed = namedtuple('decomposed', 'x trend seasonal random') decomposed_tuple = decomposed(x, trend, seasonal, random) return decomposed_tuple
class _SeasonalStationarityTest(_BaseStationarityTest, metaclass=ABCMeta): """Provides the base class for seasonal differencing tests such as the Canova-Hansen test and the Osborn-Chui-Smith-Birchenhall tests. These tests are used to determine the seasonal differencing term for a time-series. """ def __init__(self, m): self.m = m if m < 2: raise ValueError('m must be > 1') @abstractmethod def estimate_seasonal_differencing_term(self, x): """Estimate the seasonal differencing term. Parameters ---------- x : array-like, shape=(n_samples,) The time series vector. """
[docs]class CHTest(_SeasonalStationarityTest): """Conduct a CH test for seasonality. The Canova-Hansen test for seasonal differences. Canova and Hansen (1995) proposed a test statistic for the null hypothesis that the seasonal pattern is stable. The test statistic can be formulated in terms of seasonal dummies or seasonal cycles. The former allows us to identify seasons (e.g. months or quarters) that are not stable, while the latter tests the stability of seasonal cycles (e.g. cycles of period 2 and 4 quarters in quarterly data). [1] Parameters ---------- m : int The seasonal differencing term. For monthly data, e.g., this would be 12. For quarterly, 4, etc. For the Canova-Hansen test to work, ``m`` must exceed 1. Notes ----- This test is generally not used directly, but in conjunction with :func:`pmdarima.arima.nsdiffs`, which directly estimates the number of seasonal differences. References ---------- .. [1] Testing for seasonal stability using the Canova and Hansen test statistic: http://bit.ly/2wKkrZo .. [2] R source code for CH test: https://github.com/robjhyndman/forecast/blob/master/R/arima.R#L148 """ crit_vals = c(0.4617146, 0.7479655, 1.0007818, 1.2375350, 1.4625240, 1.6920200, 1.9043096, 2.1169602, 2.3268562, 2.5406922, 2.7391007)
[docs] def __init__(self, m): super(CHTest, self).__init__(m=m)
@staticmethod def _sd_test(wts, s): # assume no NaN values since called internally # also assume s > 1 since called internally n = wts.shape[0] # no use checking, because this is an internal method # if n <= s: raise ValueError('too few samples (%i<=%i)' % (n, s)) frec = np.ones(int((s + 1) / 2), dtype=int) ltrunc = int(np.round(s * ((n / 100.0) ** 0.25))) R1 = CHTest._seas_dummy(wts, s) # fit model, get residuals lmch = make_pipeline( StandardScaler(with_mean=False), LinearRegression() ).fit(R1, wts) # lmch = sm.OLS(wts, R1).fit(method='qr') residuals = wts - lmch.predict(R1) # translated R code: # multiply the residuals by the column vectors # Fhataux = Fhat.copy() # for i in range(Fhat.shape[1]): # for (i in 1:(s-1)) # Fhataux[:, i] = R1[:, i] * residuals # more efficient numpy: Fhataux = (R1.T * residuals).T.astype(np.float64) # translated R code # matrix row cumsums # Fhat = np.ones((n, s - 1)) * np.nan # for i in range(n): # for n in range(Fhataux.shape[1]): # Fhat[i, n] = Fhataux[:i, n].sum() # more efficient numpy: Ne = Fhataux.shape[0] # As of v0.9.1, use the C_canova_hansen_sd_test function to compute # Omnw, Omfhat, A, frecob. This avoids the overhead of multiple calls # to C functions A, AtOmfhatA = C_canova_hansen_sd_test(ltrunc, Ne, Fhataux, frec, s) # UPDATE 01/04/2018 - we can get away without computing u, v # (this is also MUCH MUCH faster!!!) sv = svd(AtOmfhatA, compute_uv=False) # type: np.ndarray # From R: # double.eps: the smallest positive floating-point number ‘x’ such that # ‘1 + x != 1’. It equals ‘double.base ^ ulp.digits’ if either # ‘double.base’ is 2 or ‘double.rounding’ is 0; otherwise, it # is ‘(double.base ^ double.ulp.digits) / 2’. Normally # ‘2.220446e-16’. # Numpy's float64 has an eps of 2.2204460492503131e-16 if sv.min() < np.finfo(sv.dtype).eps: # machine min eps return 0 # solve against the identity matrix, then produce # a nasty mess of dot products... this is the (horrendous) R code: # (1/N^2) * sum(diag(solve(tmp) %*% t(A) %*% t(Fhat) %*% Fhat %*% A)) # https://github.com/robjhyndman/forecast/blob/master/R/arima.R#L321 Fhat = Fhataux.cumsum(axis=0) solved = solve(AtOmfhatA, np.identity(AtOmfhatA.shape[0])) return (1.0 / n ** 2) * solved.dot(A.T).dot( Fhat.T).dot(Fhat).dot(A).diagonal().sum() @staticmethod def _seas_dummy(x, m): # Here is the R code: # (https://github.com/robjhyndman/forecast/blob/master/R/arima.R#L132) # # SeasDummy <- function(x) { # n <- length(x) # m <- frequency(x) # if (m == 1) { # stop("Non-seasonal data") # } # tt <- 1:n # fmat <- matrix(NA, nrow = n, ncol = 2 * m) # for (i in 1:m) { # fmat[, 2 * i] <- sin(2 * pi * i * tt / m) # fmat[, 2 * (i - 1) + 1] <- cos(2 * pi * i * tt / m) # } # return(fmat[, 1:(m - 1)]) # } # set up seasonal dummies using fourier series n = x.shape[0] # assume m > 1 since this function called internally... assert m > 1, 'This function is called internally and ' \ 'should not encounter this issue' tt = np.arange(n) + 1 fmat = np.ones((n, 2 * m)) * np.nan pi = np.pi for i in range(1, m + 1): # for(i in 1:m) # subtract one, unlike the R code. in the R code, this sets # columns 2, 4, 6, etc... here it sets 1, 3, 5 # fmat[,2*i] <- sin(2*pi*i*tt/m) fmat[:, (2 * i) - 1] = np.sin(2 * pi * i * tt / m) # in the R code, this sets columns 1, 3, 5, etc. here it # sets 0, 2, 4, etc. # fmat[,2*(i-1)+1] <- cos(2*pi*i*tt/m) fmat[:, 2 * (i - 1)] = np.cos(2 * pi * i * tt / m) return fmat[:, :m - 1]
[docs] def estimate_seasonal_differencing_term(self, x): """Estimate the seasonal differencing term. Parameters ---------- x : array-like, shape=(n_samples,) The time series vector. Returns ------- D : int The seasonal differencing term. The CH test defines a set of critical values:: (0.4617146, 0.7479655, 1.0007818, 1.2375350, 1.4625240, 1.6920200, 1.9043096, 2.1169602, 2.3268562, 2.5406922, 2.7391007) For different values of ``m``, the CH statistic is compared to a different critical value, and returns 1 if the computed statistic is greater than the critical value, or 0 if not. """ if not self._base_case(x): return 0 # ensure vector x = check_endog(x, dtype=DTYPE, preserve_series=False) n = x.shape[0] m = int(self.m) if n < 2 * m + 5: return 0 chstat = self._sd_test(x, m) if m <= 12: return int(chstat > self.crit_vals[m - 2]) # R does m - 1... if m == 24: return int(chstat > 5.098624) if m == 52: return int(chstat > 10.341416) if m == 365: return int(chstat > 65.44445) return int(chstat > 0.269 * (m ** 0.928))
[docs]class OCSBTest(_SeasonalStationarityTest): """Perform an OCSB test of seasonality. Compute the Osborn, Chui, Smith, and Birchenhall (OCSB) test for an input time series to determine whether it needs seasonal differencing. The regression equation may include lags of the dependent variable. When ``lag_method`` = "fixed", the lag order is fixed to ``max_lag``; otherwise, ``max_lag`` is the maximum number of lags considered in a lag selection procedure that minimizes the ``lag_method`` criterion, which can be "aic", "bic" or corrected AIC, "aicc". Critical values for the test are based on simulations, which have been smoothed over to produce critical values for all seasonal periods Parameters ---------- m : int The seasonal differencing term. For monthly data, e.g., this would be 12. For quarterly, 4, etc. For the OCSB test to work, ``m`` must exceed 1. lag_method : str, optional (default="aic") The lag method to use. One of ("fixed", "aic", "bic", "aicc"). The metric for assessing model performance after fitting a linear model. max_lag : int, optional (default=3) The maximum lag order to be considered by ``lag_method``. References ---------- .. [1] Osborn DR, Chui APL, Smith J, and Birchenhall CR (1988) "Seasonality and the order of integration for consumption", Oxford Bulletin of Economics and Statistics 50(4):361-377. .. [2] R's forecast::OCSB test source code: https://bit.ly/2QYQHno """ _ic_method_map = { "aic": lambda fit: fit.aic, "bic": lambda fit: fit.bic, # TODO: confirm False for add_constant, since the model fit contains # . a constant term "aicc": lambda fit: _aicc(fit, fit.nobs, False) }
[docs] def __init__(self, m, lag_method="aic", max_lag=3): super(OCSBTest, self).__init__(m=m) self.lag_method = lag_method self.max_lag = max_lag
@staticmethod def _calc_ocsb_crit_val(m): """Compute the OCSB critical value""" # See: # https://github.com/robjhyndman/forecast/blob/ # 8c6b63b1274b064c84d7514838b26dd0acb98aee/R/unitRoot.R#L409 log_m = np.log(m) return -0.2937411 * \ np.exp(-0.2850853 * (log_m - 0.7656451) + (-0.05983644) * ((log_m - 0.7656451) ** 2)) - 1.652202 @staticmethod def _do_lag(y, lag, omit_na=True): """Perform the TS lagging""" n = y.shape[0] if lag == 1: return y.reshape(n, 1) # Create a 2d array of dims (n + (lag - 1), lag). This looks cryptic.. # If there are tons of lags, this may not be super efficient... out = np.ones((n + (lag - 1), lag)) * np.nan for i in range(lag): out[i:i + n, i] = y if omit_na: out = out[~np.isnan(out).any(axis=1)] return out @staticmethod def _gen_lags(y, max_lag, omit_na=True): """Create the lagged exogenous array used to fit the linear model""" if max_lag <= 0: return np.zeros(y.shape[0]) # delegate down return OCSBTest._do_lag(y, max_lag, omit_na) @staticmethod def _fit_ocsb(x, m, lag, max_lag): """Fit the linear model used to compute the test statistic""" y_first_order_diff = diff(x, m) # if there are no more samples, we have to bail if y_first_order_diff.shape[0] == 0: raise ValueError( "There are no more samples after a first-order " "seasonal differencing. See http://alkaline-ml.com/pmdarima/" "seasonal-differencing-issues.html for a more in-depth " "explanation and potential work-arounds." ) y = diff(y_first_order_diff) ylag = OCSBTest._gen_lags(y, lag) if max_lag > -1: # y = tail(y, -maxlag) y = y[max_lag:] # A constant term is added in the R code's lm formula. We do that in # the linear model's constructor mf = ylag[:y.shape[0]] ar_fit = sm.OLS(y, add_constant(mf)).fit(method='qr') # Create Z4 z4_y = y_first_order_diff[lag:] # new endog z4_lag = OCSBTest._gen_lags(y_first_order_diff, lag)[:z4_y.shape[0], :] z4_preds = ar_fit.predict(add_constant(z4_lag)) # preds z4 = z4_y - z4_preds # test residuals # Create Z5. Looks odd because y and lag depend on each other and go # back and forth for two stages z5_y = diff(x) z5_lag = OCSBTest._gen_lags(z5_y, lag) z5_y = z5_y[lag:] z5_lag = z5_lag[:z5_y.shape[0], :] z5_preds = ar_fit.predict(add_constant(z5_lag)) z5 = z5_y - z5_preds # Finally, fit a linear regression on mf with z4 & z5 features added data = np.hstack(( mf, z4[:mf.shape[0]].reshape(-1, 1), z5[:mf.shape[0]].reshape(-1, 1) )) return sm.OLS(y, data).fit(method='qr') def _compute_test_statistic(self, x): m = self.m maxlag = self.max_lag method = self.lag_method # We might try multiple lags in this case crit_regression = None if maxlag > 0 and method != 'fixed': try: icfunc = self._ic_method_map[method] except KeyError: raise ValueError("'%s' is an invalid method. Must be one " "of ('aic', 'aicc', 'bic', 'fixed')") fits = [] icvals = [] for lag_term in range(1, maxlag + 1): # 1 -> maxlag (incl) try: fit = self._fit_ocsb(x, m, lag_term, maxlag) fits.append(fit) icvals.append(icfunc(fit)) except np.linalg.LinAlgError: # Singular matrix icvals.append(np.nan) fits.append(None) # If they're all NaN, raise if np.isnan(icvals).all(): raise ValueError("All lag values up to 'maxlag' produced " "singular matrices. Consider using a longer " "series, a different lag term or a different " "test.") # Compute the information criterion vals best_index = int(np.nanargmin(icvals)) maxlag = best_index - 1 # Save this in case we can't compute a better one crit_regression = fits[best_index] # Compute the actual linear model used for determining the test stat try: regression = self._fit_ocsb(x, m, maxlag, maxlag) except np.linalg.LinAlgError: # Singular matrix if crit_regression is not None: regression = crit_regression # Otherwise we have no solution to fall back on else: raise ValueError("Could not find a solution. Try a longer " "series, different lag term, or a different " "test.") # Get the coefficients for the z4 and z5 matrices tvals = regression.tvalues[-2:] # len 2 return tvals[-1] # just z5, like R does it
[docs] def estimate_seasonal_differencing_term(self, x): """Estimate the seasonal differencing term. Parameters ---------- x : array-like, shape=(n_samples,) The time series vector. Returns ------- D : int The seasonal differencing term. For different values of ``m``, the OCSB statistic is compared to an estimated critical value, and returns 1 if the computed statistic is greater than the critical value, or 0 if not. """ if not self._base_case(x): return 0 # ensure vector x = check_endog(x, dtype=DTYPE, preserve_series=False) # Get the critical value for m stat = self._compute_test_statistic(x) crit_val = self._calc_ocsb_crit_val(self.m) return int(stat > crit_val)