# -*- coding: utf-8 -*-
#
# Author: Taylor Smith <taylor.smith@alkaline-ml.com>
#
# Common ARIMA functions
from __future__ import absolute_import
from sklearn.utils.validation import check_array, column_or_1d
import numpy as np
from ..utils import get_callable
from ..utils.array import diff
from ..compat.numpy import DTYPE
from .stationarity import KPSSTest, ADFTest, PPTest
from .seasonality import CHTest # OCSBTest
__all__ = [
'get_callable',
'is_constant',
'ndiffs',
'nsdiffs'
]
VALID_TESTS = {
'kpss': KPSSTest,
'adf': ADFTest,
'pp': PPTest
}
VALID_STESTS = {
# 'ocsb': OCSBTest, # todo: once this is fixed, enable it
'ch': CHTest
}
[docs]def is_constant(x):
"""Test ``x`` for constancy.
Determine whether a vector is composed of all of the same elements
and nothing else.
Parameters
----------
x : array-like, shape=(n_samples,)
The time series vector.
Examples
--------
>>> import numpy as np
>>> x = np.array([1, 2, 3])
>>> y = np.ones(3)
>>> [is_constant(x), is_constant(y)]
[False, True]
"""
x = column_or_1d(x) # type: np.ndarray
return (x == x[0]).all()
[docs]def nsdiffs(x, m, max_D=2, test='ch', **kwargs):
"""Estimate the seasonal differencing term, ``D``.
Perform a test of seasonality for different levels of ``D`` to
estimate the number of seasonal differences required to make a given time
series stationary. Will select the maximum value of ``D`` for which
the time series is judged seasonally stationary by the statistical test.
Parameters
----------
x : array-like, shape=(n_samples, [n_features])
The array to difference.
m : int
The number of seasonal periods (i.e., frequency of the
time series)
max_D : int, optional (default=2)
Maximum number of seasonal differences allowed. Must
be a positive integer. The estimated value of ``D`` will not
exceed ``max_D``.
test : str, optional (default='ch')
Type of unit root test of seasonality to use in order
to detect seasonal periodicity. Currently, the only allowed value
is 'ch'.
Returns
-------
D : int
The estimated seasonal differencing term. This is the maximum value
of ``D`` such that ``D <= max_D`` and the time series is judged
seasonally stationary. If the time series is constant, will return 0.
"""
if max_D <= 0:
raise ValueError('max_D must be a positive integer')
# get the test - this validates m internally
testfunc = get_callable(test, VALID_STESTS)(m, **kwargs)\
.estimate_seasonal_differencing_term
x = column_or_1d(check_array(x, ensure_2d=False,
force_all_finite=True, dtype=DTYPE))
if is_constant(x):
return 0
D = 0
dodiff = testfunc(x)
while dodiff == 1 and D < max_D:
D += 1
x = diff(x, lag=m)
if is_constant(x):
return D
dodiff = testfunc(x)
return D
[docs]def ndiffs(x, alpha=0.05, test='kpss', max_d=2, **kwargs):
"""Estimate ARIMA differencing term, ``d``.
Perform a test of stationarity for different levels of ``d`` to
estimate the number of differences required to make a given time
series stationary. Will select the maximum value of ``d`` for which
the time series is judged stationary by the statistical test.
Parameters
----------
x : array-like, shape=(n_samples, [n_features])
The array (time series) to difference.
alpha : float, optional (default=0.05)
Level of the test. This is the value above below which the P-value
will be deemed significant.
test : str, optional (default='kpss')
Type of unit root test of stationarity to use in order to
test the stationarity of the time-series. One of ('kpss', 'adf', 'pp')
max_d : int, optional (default=2)
Maximum number of non-seasonal differences allowed. Must
be a positive integer. The estimated value of ``d`` will not
exceed ``max_d``.
Returns
-------
d : int
The estimated differencing term. This is the maximum value of ``d``
such that ``d <= max_d`` and the time series is judged stationary.
If the time series is constant, will return 0.
"""
if max_d <= 0:
raise ValueError('max_d must be a positive integer')
# get the test
testfunc = get_callable(test, VALID_TESTS)(alpha, **kwargs).is_stationary
x = column_or_1d(check_array(x, ensure_2d=False,
force_all_finite=True, dtype=DTYPE))
# base case, if constant return 0
d = 0
if is_constant(x):
return d
# get initial diff
pval, dodiff = testfunc(x)
# if initially NaN, return 0
if np.isnan(pval):
return 0 # (d is zero, but this is more explicit to the reader)
# Begin loop.
while dodiff and d < max_d:
d += 1
# do differencing
x = diff(x)
if is_constant(x):
return d
# get new result
pval, dodiff = testfunc(x)
# if it's NaN now, take the last non-null one
if np.isnan(pval):
return d - 1
# when d >= max_d
return d