Source code for UCTB.model.ARIMA

import numpy as np
import pandas as pd
import statsmodels.api as sm

import warnings
warnings.filterwarnings("ignore")


[docs]class ARIMA(object): def __init__(self, time_sequence, order=None, seasonal_order=(0, 0, 0, 0), max_ar=6, max_ma=4, max_d=2): self.seasonal_order = seasonal_order auto_order = self.get_order(time_sequence, order, max_ar=max_ar, max_ma=max_ma, max_d=max_d) model = sm.tsa.SARIMAX(time_sequence, order=auto_order, seasonal_order=self.seasonal_order) model_res = model.fit(disp=False) self.order = auto_order self.model_res = model_res
[docs] def get_order(self, series, order=None, max_ar=6, max_ma=2, max_d=2): def stationary(series): t = ARIMA.adf_test(series, verbose=False) if t[0] < t[4]['1%']: return True else: return False if order is None: order_i = 0 while not stationary(np.diff(series, order_i)): order_i += 1 if order_i > max_d: break order = sm.tsa.stattools.arma_order_select_ic(np.diff(series, order_i), max_ar=max_ar, max_ma=max_ma, ic=['aic']).aic_min_order order = list(order) order.insert(1, order_i) return order
[docs] @staticmethod def adf_test(time_series, max_lags=None, verbose=True): t = sm.tsa.stattools.adfuller(time_series, maxlag=max_lags) if verbose: output = pd.DataFrame( index=['Test Statistic Value', "p-value", "Lags Used", "Number of Observations Used", "Critical Value(1%)", "Critical Value(5%)", "Critical Value(10%)"], columns=['value']) output['value']['Test Statistic Value'] = t[0] output['value']['p-value'] = t[1] output['value']['Lags Used'] = t[2] output['value']['Number of Observations Used'] = t[3] output['value']['Critical Value(1%)'] = t[4]['1%'] output['value']['Critical Value(5%)'] = t[4]['5%'] output['value']['Critical Value(10%)'] = t[4]['10%'] print(output) return t
[docs] def predict(self, time_sequences, forecast_step=1): result = [] for i in range(0, len(time_sequences), forecast_step): fs = forecast_step if ((i + forecast_step) < len(time_sequences)) else (len(time_sequences) - i) model = sm.tsa.SARIMAX(time_sequences[i], order=self.order, seasonal_order=self.seasonal_order) model_res = model.filter(self.model_res.params) p = model_res.forecast(fs).reshape([-1, 1]) result.append(p) return np.array(result, dtype=np.float32)