Source code for chainer.functions.array.broadcast

import six

from chainer import cuda
from chainer import function
from chainer.utils import type_check


def _backward_one(x, g):
    if g is None:
        xp = cuda.get_array_module(x)
        return xp.zeros_like(x)

    if g.ndim != x.ndim:
        g = g.sum(axis=tuple(range(g.ndim - x.ndim)))
        # An input variable is always an array, not a scalar.
        # We need to convert a scalar value to a zero-dim array.
        xp = cuda.get_array_module(x)
        if xp.isscalar(g):
            g = xp.array(g)

    axis = tuple(i for i, sx in enumerate(x.shape) if sx == 1)
    if len(axis) > 0:
        return g.sum(keepdims=True, axis=axis)
    else:
        return g


class Broadcast(function.Function):

    """Function that broadcasts given arrays."""

    def check_type_forward(self, in_types):
        type_check.expect(in_types.size() > 0)

        shapes = [t.eval().shape for t in in_types]
        r_shapes = [s[::-1] for s in shapes]
        r_filled = six.moves.zip_longest(*r_shapes, fillvalue=1)
        for ss in r_filled:
            d = max(ss)
            if not all(s == d or s == 1 for s in ss):
                expect = 'each dimension has the same size or is 1'
                actual = 'shapes: ' + ', '.join(map(str, shapes))
                raise type_check.InvalidType(expect, actual)

    def forward(self, xs):
        xp = cuda.get_array_module(*xs)
        return tuple(xp.broadcast_arrays(*xs))

    def backward(self, xs, grads):
        return tuple(_backward_one(x, g) for x, g in six.moves.zip(xs, grads))


[docs]def broadcast(*args): """Broadcast given variables. Args: args (:class:`~chainer.Variable` or :class:`numpy.ndarray` \ or :class:`cupy.ndarray`): Input variables to be broadcasted. Each dimension of the shapes \ of the input variables must have the same size. Returns: ~chainer.Variable: :class:`~chainer.Variable` or tuple of \ :class:`~chainer.Variable` objects which are broadcasted \ from given arguments. .. admonition:: Example >>> x = np.random.uniform(0, 1, (3, 2)).astype('f') >>> y = F.broadcast(x) >>> np.all(x == y.data) True >>> z = np.random.uniform(0, 1, (3, 2)).astype('f') >>> y, w = F.broadcast(x, z) >>> np.all(x == y.data) & np.all(z == w.data) True """ return Broadcast()(*args)
class BroadcastTo(function.Function): """Function that broadcasts an array to a new shape.""" def __init__(self, shape): self._shape = tuple(shape) def check_type_forward(self, in_types): type_check.expect(in_types.size() == 1) ndim = type_check.Variable(len(self._shape), 'len(shape)') type_check.expect(in_types[0].ndim <= ndim) shape = in_types[0].shape.eval() # check the shape in inverse order for i in six.moves.range(-1, -len(shape) - 1, -1): if shape[i] == self._shape[i] or shape[i] == 1: continue expect = 'in_type[0].shape[%d] == %d' % (i, self._shape[i]) if self._shape[i] != 1: expect += ' or in_type[0].shape[%d] == 1' % i actual = 'in_type[0].shape: %s' % str(shape) raise type_check.InvalidType(expect, actual) def forward(self, xs): xp = cuda.get_array_module(*xs) x = xs[0] if hasattr(xp, 'broadcast_to'): return xp.broadcast_to(x, self._shape), else: # numpy 1.9 doesn't support broadcast_to method dummy = xp.empty(self._shape) bx, _ = xp.broadcast_arrays(x, dummy) return bx, def backward(self, xs, grads): return _backward_one(xs[0], grads[0]),
[docs]def broadcast_to(x, shape): """Broadcast a given variable to a given shape. Args: x (:class:`~chainer.Variable` or :class:`numpy.ndarray` or \ :class:`cupy.ndarray`): Input variable be broadcasted. A \ :math:`(s_1, s_2, ..., s_N)`-shaped float array. shape (tuple): Tuple of :class:`int` of the shape of the \ output variable. Returns: ~chainer.Variable: Output variable broadcasted to the given shape. .. admonition:: Example >>> x = np.arange(0, 3) >>> x array([0, 1, 2]) >>> y = F.broadcast_to(x, (3, 3)) >>> y.data array([[0, 1, 2], [0, 1, 2], [0, 1, 2]]) """ return BroadcastTo(shape)(x)