import numpy as np
[docs]class Normalizer(object):
def __init__(self, X):
self._min = np.min(X)
self._max = np.max(X)
[docs] def min_max_normal(self, X):
return (X - self._min) / (self._max - self._min)
[docs] def min_max_denormal(self, X):
return X * (self._max - self._min) + self._min
# def white_normal(self):
# pass
[docs]class MoveSample(object):
def __init__(self, feature_step, feature_stride, feature_length, target_length):
self.feature_step = feature_step
self.feature_stride = feature_stride
self.feature_length = feature_length
self.target_length = target_length
[docs] def general_move_sample(self, data):
feature = []
target = []
for i in range(len(data) - self.feature_length -
(self.feature_step-1)*self.feature_stride - self.target_length + 1):
feature.append([data[i + step*self.feature_stride: i + step*self.feature_stride + self.feature_length]
for step in range(self.feature_step)])
target.append(data[i + (self.feature_step-1) * self.feature_stride + self.feature_length:\
i + (self.feature_step-1) * self.feature_stride + self.feature_length + self.target_length])
return np.array(feature), np.array(target)
[docs]class ST_MoveSample(object):
def __init__(self, closeness_len, period_len, trend_len, target_length=1, daily_slots=24):
self._c_t = closeness_len
self._p_t = period_len
self._t_t = trend_len
self._target_length = target_length
self._daily_slots = daily_slots
# 1 init Move_Sample object
self.move_sample_closeness = MoveSample(feature_step=self._c_t, feature_stride=1,
feature_length=1, target_length=self._target_length)
self.move_sample_period = MoveSample(feature_step=self._p_t + 1, feature_stride=int(self._daily_slots),
feature_length=1, target_length=0)
self.move_sample_trend = MoveSample(feature_step=self._t_t + 1, feature_stride=int(self._daily_slots) * 7,
feature_length=1, target_length=0)
[docs] def move_sample(self, data):
# 2 general move sample
closeness, y = self.move_sample_closeness.general_move_sample(data)
period, _ = self.move_sample_period.general_move_sample(data)
trend, _ = self.move_sample_trend.general_move_sample(data)
# 3 remove the front part
min_length = min(len(closeness), len(period), len(trend))
closeness = closeness[-min_length:]
y = y[-min_length:]
period = period[-min_length:]
trend = trend[-min_length:]
# 4 remove tail of period and trend
period = period[:, :-1]
trend = trend[:, :-1]
if self._c_t and self._c_t > 0:
closeness = np.transpose(closeness, [0] + list(range(3, len(closeness.shape))) + [1, 2])
else:
closeness = np.array([])
if self._p_t and self._p_t > 0:
period = np.transpose(period, [0] + list(range(3, len(period.shape))) + [1, 2])
else:
period = np.array([])
if self._t_t and self._t_t > 0:
trend = np.transpose(trend, [0] + list(range(3, len(trend.shape))) + [1, 2])
else:
trend = np.array([])
y = np.transpose(y, [0] + list(range(2, len(y.shape))) + [1])
return closeness, period, trend, y
[docs]class SplitData(object):
[docs] @staticmethod
def split_data(data, ratio_list):
if np.sum(ratio_list) != 1:
ratio_list = np.array(ratio_list)
ratio_list = ratio_list / np.sum(ratio_list)
return [data[int(sum(ratio_list[0:e])*len(data)):
int(sum(ratio_list[0:e+1])*len(data))] for e in range(len(ratio_list))]
[docs] @staticmethod
def split_feed_dict(feed_dict, sequence_length, ratio_list):
if np.sum(ratio_list) != 1:
ratio_list = np.array(ratio_list)
ratio_list = ratio_list / np.sum(ratio_list)
return [{key: value[int(sum(ratio_list[0:e])*len(value)):int(sum(ratio_list[0:e+1])*len(value))]
if len(value) == sequence_length else value for key, value in feed_dict.items()}
for e in range(len(ratio_list))]