Source code for chainer.functions.connection.n_step_gru

import itertools

import numpy
import six

from chainer import cuda
from chainer.functions.activation import sigmoid
from chainer.functions.activation import tanh
from chainer.functions.array import concat
from chainer.functions.array import reshape
from chainer.functions.array import split_axis
from chainer.functions.array import stack
from chainer.functions.connection import linear
from chainer.functions.connection import n_step_rnn
from chainer.functions.connection.n_step_rnn import get_random_state
from chainer.functions.noise import dropout

if cuda.cudnn_enabled:
    cudnn = cuda.cudnn
    libcudnn = cuda.cudnn.cudnn
    _cudnn_version = libcudnn.getVersion()


class NStepGRU(n_step_rnn.BaseNStepRNN):

    def __init__(self, n_layers, states, train=True):
        n_step_rnn.BaseNStepRNN.__init__(self, n_layers, states,
                                         rnn_dir='uni', rnn_mode='gru',
                                         train=train)


class NStepBiGRU(n_step_rnn.BaseNStepRNN):

    def __init__(self, n_layers, states, train=True):
        n_step_rnn.BaseNStepRNN.__init__(self, n_layers, states,
                                         rnn_dir='bi', rnn_mode='gru',
                                         train=train)


[docs]def n_step_gru( n_layers, dropout_ratio, hx, ws, bs, xs, train=True, use_cudnn=True): """Stacked Uni-directional Gated Recurrent Unit function. This function calculates stacked Uni-directional GRU with sequences. This function gets an initial hidden state :math:`h_0`, an input sequence :math:`x`, weight matrices :math:`W`, and bias vectors :math:`b`. This function calculates hidden states :math:`h_t` for each time :math:`t` from input :math:`x_t`. .. math:: r_t &= \\sigma(W_0 x_t + W_3 h_{t-1} + b_0 + b_3) \\\\ z_t &= \\sigma(W_1 x_t + W_4 h_{t-1} + b_1 + b_4) \\\\ h'_t &= \\tanh(W_2 x_t + b_2 + r_t \\cdot (W_5 h_{t-1} + b_5)) \\\\ h_t &= (1 - z_t) \\cdot h'_t + z_t \\cdot h_{t-1} As the function accepts a sequence, it calculates :math:`h_t` for all :math:`t` with one call. Six weight matrices and six bias vectors are required for each layers. So, when :math:`S` layers exists, you need to prepare :math:`6S` weigth matrices and :math:`6S` bias vectors. If the number of layers ``n_layers`` is greather than :math:`1`, input of ``k``-th layer is hidden state ``h_t`` of ``k-1``-th layer. Note that all input variables except first layer may have different shape from the first layer. Args: n_layers(int): Number of layers. dropout_ratio(float): Dropout ratio. hx (chainer.Variable): Variable holding stacked hidden states. Its shape is ``(S, B, N)`` where ``S`` is number of layers and is equal to ``n_layers``, ``B`` is mini-batch size, and ``N`` is dimention of hidden units. ws (list of list of chainer.Variable): Weight matrices. ``ws[i]`` represents weights for i-th layer. Each ``ws[i]`` is a list containing six matrices. ``ws[i][j]`` is corresponding with ``W_j`` in the equation. Only ``ws[0][j]`` where ``0 <= j < 3`` is ``(I, N)`` shape as they are multiplied with input variables. All other matrices has ``(N, N)`` shape. bs (list of list of chainer.Variable): Bias vectors. ``bs[i]`` represnents biases for i-th layer. Each ``bs[i]`` is a list containing six vectors. ``bs[i][j]`` is corresponding with ``b_j`` in the equation. Shape of each matrix is ``(N,)`` where ``N`` is dimention of hidden units. xs (list of chainer.Variable): A list of :class:`~chainer.Variable` holding input values. Each element ``xs[t]`` holds input value for time ``t``. Its shape is ``(B_t, I)``, where ``B_t`` is mini-batch size for time ``t``, and ``I`` is size of input units. Note that this functions supports variable length sequences. When sequneces has different lengths, sort sequences in descending order by length, and transpose the sorted sequence. :func:`~chainer.functions.transpose_sequence` transpose a list of :func:`~chainer.Variable` holding sequence. So ``xs`` needs to satisfy ``xs[t].shape[0] >= xs[t + 1].shape[0]``. train (bool): If ``True``, this function executes dropout. use_cudnn (bool): If ``True``, this function uses cuDNN if available. Returns: tuple: This functions returns a tuple concaining three elements, ``hy`` and ``ys``. - ``hy`` is an updated hidden states whose shape is same as ``hx``. - ``ys`` is a list of :class:`~chainer.Variable` . Each element ``ys[t]`` holds hidden states of the last layer corresponding to an input ``xs[t]``. Its shape is ``(B_t, N)`` where ``B_t`` is mini-batch size for time ``t``, and ``N`` is size of hidden units. Note that ``B_t`` is the same value as ``xs[t]``. """ return n_step_gru_base(n_layers, dropout_ratio, hx, ws, bs, xs, train, use_cudnn, use_bi_direction=False)
[docs]def n_step_bigru( n_layers, dropout_ratio, hx, ws, bs, xs, train=True, use_cudnn=True): """Stacked Bi-directional Gated Recurrent Unit function. This function calculates stacked Bi-directional GRU with sequences. This function gets an initial hidden state :math:`h_0`, an input sequence :math:`x`, weight matrices :math:`W`, and bias vectors :math:`b`. This function calculates hidden states :math:`h_t` for each time :math:`t` from input :math:`x_t`. .. math:: r^{f}_t &= \\sigma(W^{f}_0 x_t + W^{f}_3 h_{t-1} + b^{f}_0 + b^{f}_3) \\\\ z^{f}_t &= \\sigma(W^{f}_1 x_t + W^{f}_4 h_{t-1} + b^{f}_1 + b^{f}_4) \\\\ h^{f'}_t &= \\tanh(W^{f}_2 x_t + b^{f}_2 + r^{f}_t \\cdot (W^{f}_5 h_{t-1} + b^{f}_5)) \\\\ h^{f}_t &= (1 - z^{f}_t) \\cdot h^{f'}_t + z^{f}_t \\cdot h_{t-1} \\\\ r^{b}_t &= \\sigma(W^{b}_0 x_t + W^{b}_3 h_{t-1} + b^{b}_0 + b^{b}_3) \\\\ z^{b}_t &= \\sigma(W^{b}_1 x_t + W^{b}_4 h_{t-1} + b^{b}_1 + b^{b}_4) \\\\ h^{b'}_t &= \\tanh(W^{b}_2 x_t + b^{b}_2 + r^{b}_t \\cdot (W^{b}_5 h_{t-1} + b^{b}_5)) \\\\ h^{b}_t &= (1 - z^{b}_t) \\cdot h^{b'}_t + z^{b}_t \\cdot h_{t-1} \\\\ h_t &= [h^{f}_t; h^{f}_t] \\\\ where :math:`W^{f}` is weight matrices for forward-GRU, :math:`W^{b}` is weight matrices for backward-GRU. As the function accepts a sequence, it calculates :math:`h_t` for all :math:`t` with one call. Six weight matrices and six bias vectors are required for each layers. So, when :math:`S` layers exists, you need to prepare :math:`6S` weigth matrices and :math:`6S` bias vectors. If the number of layers ``n_layers`` is greather than :math:`1`, input of ``k``-th layer is hidden state ``h_t`` of ``k-1``-th layer. Note that all input variables except first layer may have different shape from the first layer. Args: n_layers(int): Number of layers. dropout_ratio(float): Dropout ratio. hx (chainer.Variable): Variable holding stacked hidden states. Its shape is ``(S, B, N)`` where ``S`` is number of layers and is equal to ``n_layers``, ``B`` is mini-batch size, and ``N`` is dimention of hidden units. ws (list of list of chainer.Variable): Weight matrices. ``ws[i]`` represents weights for i-th layer. Each ``ws[i]`` is a list containing six matrices. ``ws[i][j]`` is corresponding with ``W_j`` in the equation. Only ``ws[0][j]`` where ``0 <= j < 3`` is ``(I, N)`` shape as they are multiplied with input variables. All other matrices has ``(N, N)`` shape. bs (list of list of chainer.Variable): Bias vectors. ``bs[i]`` represnents biases for i-th layer. Each ``bs[i]`` is a list containing six vectors. ``bs[i][j]`` is corresponding with ``b_j`` in the equation. Shape of each matrix is ``(N,)`` where ``N`` is dimention of hidden units. xs (list of chainer.Variable): A list of :class:`~chainer.Variable` holding input values. Each element ``xs[t]`` holds input value for time ``t``. Its shape is ``(B_t, I)``, where ``B_t`` is mini-batch size for time ``t``, and ``I`` is size of input units. Note that this functions supports variable length sequences. When sequneces has different lengths, sort sequences in descending order by length, and transpose the sorted sequence. :func:`~chainer.functions.transpose_sequence` transpose a list of :func:`~chainer.Variable` holding sequence. So ``xs`` needs to satisfy ``xs[t].shape[0] >= xs[t + 1].shape[0]``. train (bool): If ``True``, this function executes dropout. use_cudnn (bool): If ``True``, this function uses cuDNN if available. use_bi_direction (bool): If ``True``, this function uses Bi-direction GRU. Returns: tuple: This functions returns a tuple concaining three elements, ``hy`` and ``ys``. - ``hy`` is an updated hidden states whose shape is same as ``hx``. - ``ys`` is a list of :class:`~chainer.Variable` . Each element ``ys[t]`` holds hidden states of the last layer corresponding to an input ``xs[t]``. Its shape is ``(B_t, N)`` where ``B_t`` is mini-batch size for time ``t``, and ``N`` is size of hidden units. Note that ``B_t`` is the same value as ``xs[t]``. """ return n_step_gru_base(n_layers, dropout_ratio, hx, ws, bs, xs, train, use_cudnn, use_bi_direction=True)
def n_step_gru_base(n_layers, dropout_ratio, hx, ws, bs, xs, train, use_cudnn, use_bi_direction): """Base function for Stack GRU/BiGRU functions. This function is used at :func:`chainer.functions.n_step_bigru` and :func:`chainer.functions.n_step_gru`. This function's behavior depends on argument ``use_bi_direction``. Args: n_layers(int): Number of layers. dropout_ratio(float): Dropout ratio. hx (chainer.Variable): Variable holding stacked hidden states. Its shape is ``(S, B, N)`` where ``S`` is number of layers and is equal to ``n_layers``, ``B`` is mini-batch size, and ``N`` is dimention of hidden units. ws (list of list of chainer.Variable): Weight matrices. ``ws[i]`` represents weights for i-th layer. Each ``ws[i]`` is a list containing six matrices. ``ws[i][j]`` is corresponding with ``W_j`` in the equation. Only ``ws[0][j]`` where ``0 <= j < 3`` is ``(I, N)`` shape as they are multiplied with input variables. All other matrices has ``(N, N)`` shape. bs (list of list of chainer.Variable): Bias vectors. ``bs[i]`` represnents biases for i-th layer. Each ``bs[i]`` is a list containing six vectors. ``bs[i][j]`` is corresponding with ``b_j`` in the equation. Shape of each matrix is ``(N,)`` where ``N`` is dimention of hidden units. xs (list of chainer.Variable): A list of :class:`~chainer.Variable` holding input values. Each element ``xs[t]`` holds input value for time ``t``. Its shape is ``(B_t, I)``, where ``B_t`` is mini-batch size for time ``t``, and ``I`` is size of input units. Note that this functions supports variable length sequences. When sequneces has different lengths, sort sequences in descending order by length, and transpose the sorted sequence. :func:`~chainer.functions.transpose_sequence` transpose a list of :func:`~chainer.Variable` holding sequence. So ``xs`` needs to satisfy ``xs[t].shape[0] >= xs[t + 1].shape[0]``. train (bool): If ``True``, this function executes dropout. use_cudnn (bool): If ``True``, this function uses cuDNN if available. activation (str): Activation function name. Please select ``tanh`` or ``relu``. use_bi_direction (bool): If ``True``, this function uses Bi-direction GRU. .. seealso:: :func:`chainer.functions.n_step_rnn` :func:`chainer.functions.n_step_birnn` """ xp = cuda.get_array_module(hx, hx.data) if use_cudnn and xp is not numpy and cuda.cudnn_enabled and \ _cudnn_version >= 5000: states = get_random_state().create_dropout_states(dropout_ratio) # flatten all input variables inputs = tuple(itertools.chain( (hx, ), itertools.chain.from_iterable(ws), itertools.chain.from_iterable(bs), xs)) if use_bi_direction: rnn = NStepBiGRU(n_layers, states, train=train) else: rnn = NStepGRU(n_layers, states, train=train) ret = rnn(*inputs) hy, = ret[:1] ys = ret[1:] return hy, ys else: direction = 2 if use_bi_direction else 1 hx = split_axis.split_axis(hx, n_layers * direction, axis=0, force_tuple=True) hx = [reshape.reshape(h, h.shape[1:]) for h in hx] xws = [concat.concat([w[0], w[1], w[2]], axis=0) for w in ws] hws = [concat.concat([w[3], w[4], w[5]], axis=0) for w in ws] xbs = [concat.concat([b[0], b[1], b[2]], axis=0) for b in bs] hbs = [concat.concat([b[3], b[4], b[5]], axis=0) for b in bs] xs_next = xs hy = [] for layer in six.moves.range(n_layers): def _one_directional_loop(di): # di=0, forward GRU # di=1, backward GRU xs_list = xs_next if di == 0 else reversed(xs_next) layer_idx = direction * layer + di h = hx[layer_idx] h_list = [] for x in xs_list: batch = x.shape[0] if h.shape[0] > batch: h, h_rest = split_axis.split_axis(h, [batch], axis=0) else: h_rest = None if layer > 0: x = dropout.dropout(x, ratio=dropout_ratio, train=train) gru_x = linear.linear(x, xws[layer_idx], xbs[layer_idx]) gru_h = linear.linear(h, hws[layer_idx], hbs[layer_idx]) W_r_x, W_z_x, W_x = split_axis.split_axis(gru_x, 3, axis=1) U_r_h, U_z_h, U_x = split_axis.split_axis(gru_h, 3, axis=1) r = sigmoid.sigmoid(W_r_x + U_r_h) z = sigmoid.sigmoid(W_z_x + U_z_h) h_bar = tanh.tanh(W_x + r * U_x) h_bar = (1 - z) * h_bar + z * h if h_rest is not None: h = concat.concat([h_bar, h_rest], axis=0) else: h = h_bar h_list.append(h_bar) return h, h_list # Forward GRU h, h_forward = _one_directional_loop(di=0) hy.append(h) if use_bi_direction: # Backward GRU h, h_backward = _one_directional_loop(di=1) h_backward.reverse() # Concat xs_next = [concat.concat([hfi, hbi], axis=1) for (hfi, hbi) in six.moves.zip(h_forward, h_backward)] hy.append(h) else: # Uni-directional GRU xs_next = h_forward ys = xs_next hy = stack.stack(hy) return hy, tuple(ys)