import tensorflow as tf
from ..model_unit import BaseModel
from ..model_unit import DCGRUCell
from tensorflow.contrib import legacy_seq2seq
[docs]class DCRNN(BaseModel):
"""
Args:
num_nodes(int): Number of nodes in the graph, e.g. number of stations in NYC-Bike dataset.
num_diffusion_matrix: Number of diffusion matrix used in model.
num_rnn_units: Number of RNN units.
num_rnn_layers: Number of RNN layers
max_diffusion_step: Number of diffusion steps
seq_len: Input sequence length
use_curriculum_learning(bool): model's prediction (True) or the previous ground truth in training (False).
input_dim: Dimension of the input feature
output_dim: Dimension of the output feature
cl_decay_steps: When use_curriculum_learning=True, cl_decay_steps is used to adjust the ratio of using ground
true labels, where with more training steps, the ratio drops.
target_len(int): Output sequence length.
lr(float): Learning rate
epsilon: epsilon in Adam
optimizer_name(str): 'sgd' or 'Adam' optimizer
code_version(str): Current version of this model code, which will be used as filename for saving the model
model_dir(str): The directory to store model files. Default:'model_dir'.
gpu_device(str): To specify the GPU to use. Default: '0'.
"""
def __init__(self,
num_nodes,
num_diffusion_matrix,
num_rnn_units=64,
num_rnn_layers=1,
max_diffusion_step=2,
seq_len=6,
use_curriculum_learning=False,
input_dim=1,
output_dim=1,
cl_decay_steps=1000,
target_len=1,
lr=1e-4,
epsilon=1e-3,
optimizer_name='Adam',
code_version='DCRNN-QuickStart',
model_dir='model_dir',
gpu_device='0', **kwargs):
super(DCRNN, self).__init__(code_version=code_version, model_dir=model_dir, gpu_device=gpu_device)
self._num_nodes = num_nodes
self._num_diffusion_matrix = num_diffusion_matrix
self._num_rnn_units = num_rnn_units
self._num_rnn_layers = num_rnn_layers
self._max_diffusion_step = max_diffusion_step
self._seq_len = seq_len
self._use_curriculum_learning = use_curriculum_learning
self._input_dim = input_dim
self._output_dim = output_dim
self._target_len = target_len
self._cl_decay_steps = cl_decay_steps
self._optimizer_name = optimizer_name
self._lr = lr
# self._batch_size = batch_size
self._epsilon = epsilon
[docs] def build(self, init_vars=True, max_to_keep=5):
with self._graph.as_default():
inputs = tf.placeholder(tf.float32, shape=(None, self._seq_len,
self._num_nodes, self._input_dim), name='inputs')
labels = tf.placeholder(tf.float32, shape=(None, self._target_len,
self._num_nodes, self._output_dim), name='labels')
diffusion_matrix = tf.placeholder(tf.float32, shape=(self._num_diffusion_matrix, self._num_nodes,
self._num_nodes), name='diffusion_matrix')
batch_size = tf.shape(inputs)[0]
self._input['inputs'] = inputs.name
self._input['target'] = labels.name
self._input['diffusion_matrix'] = diffusion_matrix.name
go_symbol = tf.zeros(shape=(tf.shape(inputs)[0], self._num_nodes * self._output_dim))
cell = DCGRUCell(self._num_rnn_units, self._input_dim, self._num_diffusion_matrix, diffusion_matrix,
max_diffusion_step=self._max_diffusion_step, num_nodes=self._num_nodes)
cell_with_projection = DCGRUCell(self._num_rnn_units, self._input_dim,
self._num_diffusion_matrix, diffusion_matrix,
max_diffusion_step=self._max_diffusion_step,
num_nodes=self._num_nodes, num_proj=self._output_dim)
encoding_cells = [cell] * self._num_rnn_layers
decoding_cells = [cell] * (self._num_rnn_layers - 1) + [cell_with_projection]
encoding_cells = tf.contrib.rnn.MultiRNNCell(encoding_cells, state_is_tuple=True)
decoding_cells = tf.contrib.rnn.MultiRNNCell(decoding_cells, state_is_tuple=True)
global_step = tf.train.get_or_create_global_step()
# Outputs: (batch_size, timesteps, num_nodes, output_dim)
with tf.variable_scope('DCRNN_SEQ'):
inputs_unstack = tf.unstack(tf.reshape(inputs, (batch_size,
self._seq_len, self._num_nodes * self._input_dim)),
axis=1)
labels_unstack = tf.unstack(
tf.reshape(labels[..., :self._output_dim],
(batch_size, self._target_len, self._num_nodes * self._output_dim)), axis=1)
labels_unstack.insert(0, go_symbol)
def _compute_sampling_threshold(global_step, k):
"""
Computes the sampling probability for scheduled sampling using inverse sigmoid.
global_step:
k:
:return:
"""
return tf.cast(k / (k + tf.exp(global_step / k)), tf.float32)
def _loop_function_train(prev, i):
# Return either the model's prediction or the previous ground truth in training.
if self._use_curriculum_learning:
c = tf.random_uniform((), minval=0, maxval=1.)
threshold = _compute_sampling_threshold(global_step, self._cl_decay_steps)
result = tf.cond(tf.less(c, threshold), lambda: labels_unstack[i], lambda: prev)
else:
result = labels_unstack[i]
return result
def _loop_function_test(prev, i):
# Return the prediction of the model in testing.
return prev
a, enc_state = tf.contrib.rnn.static_rnn(encoding_cells, inputs_unstack, dtype=tf.float32)
with tf.variable_scope('train', reuse=False):
train_outputs, _ = legacy_seq2seq.rnn_decoder(labels_unstack, enc_state, decoding_cells,
loop_function=_loop_function_train)
with tf.variable_scope('text', reuse=True):
test_outputs, _ = legacy_seq2seq.rnn_decoder(labels_unstack, enc_state, decoding_cells,
loop_function=_loop_function_test)
# Project the output to output_dim.
train_outputs = tf.stack(train_outputs[:-1], axis=1)
test_outputs = tf.stack(test_outputs[:-1], axis=1)
# Configure optimizer
optimizer = tf.train.AdamOptimizer(self._lr, epsilon=float(self._epsilon))
if self._optimizer_name == 'sgd':
optimizer = tf.train.GradientDescentOptimizer(self._lr)
loss = tf.sqrt(tf.reduce_mean(tf.square(train_outputs - labels[:, :, :, 0])))
train_op = optimizer.minimize(loss)
self._output['prediction'] = test_outputs.name
self._output['loss'] = loss.name
self._op['train_op'] = train_op.name
super(DCRNN, self).build(init_vars=init_vars, max_to_keep=5)
# Define your '_get_feed_dict function‘, map your input to the tf-model
def _get_feed_dict(self,
inputs,
diffusion_matrix,
target=None,):
feed_dict = {
'inputs': inputs,
'diffusion_matrix': diffusion_matrix,
}
if target is not None:
feed_dict['target'] = target
return feed_dict