API Reference

Communicators

chainermn.create_communicator(communicator_name='pure_nccl', mpi_comm=None, **kwargs)

Create a ChainerMN communicator.

Different communicators provide different approaches of communication, so they have different performance charasteristics. The default communicator pure_nccl is expected to generally perform well on a variety of environments, so one need not to change communicators in most cases. However, you may need to choose other communicators depending on your computing platform and the availability of NCCL library. The following communicators are available.

Name

CPU

GPU

NCCL

Recommended Use Cases

pure_nccl

OK

Required (>= v2)

pure_nccl is recommended when NCCL2 is available in the environment.

flat

OK

N/A

naive

OK

OK

Testing on CPU mode

pure_nccl communicator supports multiple data types, FP32 and FP16, in gradient exchange. The communication data type is determined based on chainer.global_config.dtype and allreduce_grad_dtype. When allreduce_grad_dtype is the default value None, FP32 is used when chainer.global_config.dtype is numpy.float32 and FP16 otherwise. allreduce_grad_dtype parameter, which is either numpy.float16 or numpy.float32, overwrites the chainer.global_config.dtype.

The table blow summarizes the data type selection in gradient exchange.

allreduce_grad_dtype

global_config.dtype

None

numpy.float16

numpy.float32

chainer.mixed16

FP16

FP16

FP32

numpy.float16

FP16

FP16

FP32

numpy.float32

FP32

FP16

FP32

Other communicators, namely flat and naive, support only float32 communication, no matter what the model is. This is due to MPI’s limited support of float16.

Parameters
  • communicator_name – The name of communicator (naive, flat, or pure_nccl)

  • mpi_comm – MPI4py communicator

  • allreduce_grad_dtype – Data type of gradient used in All-Reduce. If None, the dtype of a model is used.

Returns

ChainerMN communicator that implements methods defined in chainermn.CommunicatorBase

class chainermn.CommunicatorBase

Interface definition of all communicators.

All communicators that have compatible set of methods with this class is supposed to work in ChainerMN’s parallel computation implementation. The methods are named after MPI functions, such as bcast() came from MPI_Bcast().

There are two types of methods: one that treats Python objects have _obj suffix. The other has methods without any suffix and it handles ndarray and arrays filled with scaler values. So the number of methods would be

[send, recv, bcast, gather, allreduce] * [ '_obj', '']

(with single exception alltoall, multi_node_mean_grad, split and bcast_data so far). Also methods are supposed to be written in this order. All those methods must be implemented in its implementation class, or otherwise it cannot be instantiated in runtime.

Note

As most implementation of _obj-sufficed methods involves Python object pickling and unpickling, there is an implicit size limit.

TODO(kuenishi): as of now no implementation class actually has allreduce method.

abstract allgather(x)

A primitive of inter-process all-gather communication.

This method tries to invoke all-gather communication within the communicator. All processes in the communicator are expected to invoke allgather(). This method relies on mpi4py fast communication optimized for numpy arrays, as well as send() and recv().

Note that this method can only handle the same shapes of data over all processes, and cannot handle tuple data.

Parameters

x (numpy/cupy array) – Array to be gathered.

Returns

Received arrays.

Return type

ys (tuple of numpy/cupy array)

abstract allreduce(data)

Allreduce operation among processes

Processes one of several aggregation operations using all data from all processes and returns the result of the aggregation to all processes.

TODO(kuenishi): add op argument once we find a use case for operations other than ‘SUM’.

Parameters

data (ndarray) – the data to aggregate among all nodes.

Returns

Sum of all data from all processes.

allreduce_grad(model, zero_fill=False)

mean Chainer model gradients.

Deprecated since version v7.0.0: This API is deprecated. Please use multi_node_mean_grad() instead.

Parameters
  • link (Link) – Link object.

  • zero_fill – A knob to control whether to fill gradients of initialized and unused Link (which is None internally) with zero-valued array, because the all gradients must be an array among processes for performing all-reduce, which might be an array or None after backward computation. Gradients of uninitialized Link are skipped. If it is False, gradients of unused Link are just skipped.

abstract allreduce_obj(obj)

Apply a reduce operation to all objects and spread the result.

For example of integers and summation, equivalent local code is:

>>> from functools import reduce
>>> reduce(lambda x, y: x + y, [1, 2, 3, 4, 5])
15

The only operation currently supported is summation.

TODO(kuenishi): support other operations such as ‘MAX’, ‘MIN’ and ‘PROD’ with op argument once we need any of them.

Parameters

obj – An arbitrary object to apply reduce operation. Must have corresponding operation method e.g. __plus__().

Returns

The result of the operation applied to all objects.

abstract alltoall(xs)

All-to-all implementation for ndarray

Parameters

xs (tuple of numpy/cupy array) –

Returns

Received arrays. The length of tuple equals to the communicator size.

Return type

ys (tuple of numpy/cupy array)

abstract bcast(data, max_buf_len=None, root=0)

Broadcasts an ndarray from root process to all processes

Parameters
  • data (numpy/cupy array) – for root process, the data to broadcast. For non-root processes, this argument is ignored.

  • max_buf_len (int) – Length of send buffer.

  • root (int) – the process who has the data to broadcast.

Returns

The data sent from root process

Return type

ys (numpy/cupy array)

abstract bcast_data(model)

Broadcast Chainer model parameter data

abstract bcast_obj(obj, max_buf_len=None, root=0)

Broadcasts an arbitrary object from root to all non-root processes.

Parameters
  • obj – arbitrary object to broadcast to all other non-root processes. Will be ignored at all non-root processes.

  • max_buf_len (int) – max length of the send buffer

  • root (int) – rank of the root processes who sends an object

Returns

an object sent from the root process.

finalize()

Finalizes and cleans up internal resource.

The communicator SHALL NOT be used after calling this finalize(). The behaviour is undefined when calling finalize on the same communicator multiple times.

abstract gather(data, root=0)

Gathers an ndarray from all processes to root process

Parameters
  • data (ndarray, or scaler) – for root process this is ignored. For For non-root processes, the data to send to root process.

  • root (int) – rank of the process who receives the data.

Returns

For root process, the ndarray sent from non-root processes. For non-root processes, what?

abstract gather_obj(obj, root=0)

Gathers arbitrary objects from all non-root processes to the root.

Parameters
  • obj – arbtrary object to send to root process. Root process will receive this argument included in returned list.

  • root (int) – rank of the root node who receives all objects.

Returns

A list of objects sent from all processes.

TODO(kuenishi): make sure the ordering of objects in the returned list.

get_config(name=None)

Get configuration value(s)

Parameters

name (str) – Name of the configuration to get. If it is None, all config names and values are returned.

Returns

Actual value of the configuration if it is on. None if it is off. If None is given as name, None or dictionary of names and configuration values is returned.

property inter_rank

The rank of this node in the cluster.

property inter_size

Number of nodes that participates the cluster.

property intra_rank

Intra rank (process id in the machine) of this process.

abstract multi_node_mean_grad(model, zero_fill=False)

mean Chainer model gradients.

Parameters
  • link (Link) – Link object.

  • zero_fill – A knob to control whether to fill gradients of initialized and unused Link (which is None internally) with zero-valued array, because the all gradients must be an array among processes for performing all-reduce, which might be an array or None after backward computation. Gradients of uninitialized Link are skipped. If it is False, gradients of unused Link are just skipped.

property rank

Rank (process id in the cluster) of this process in integer.

abstract recv(source, tag)

Receives an ndarray from source.

To receive the message, sender must send the data.

Parameters
  • source (int) – Rank of the source process

  • tag (int) – The tag to specifically receive the message

Returns

The data sent from source process

abstract recv_obj(source, tag)

Receives an arbitrary Python object from source process with a tag.

Parameters
  • source (int) – Rank number of sender process, to selectively receive the object.

  • tag – tag to identify the message.

Returns

an object sent from the source by send_obj.

abstract scatter(xs, root=0)

A primitive of inter-process scatter communication.

This method tries to invoke scatter communication within the communicator. All processes in the communicator are expected to invoke scatter().

Parameters
  • xs (tuple of numpy/cupy array) – Arrays to be scattered.

  • root (int) – Rank of root process.

Returns

Received arrays.

Return type

ys (numpy/cupy array)

abstract send(data, dest, tag)

Sends an ndarray to destination

Receiver must invoke recv() to wait for the message.

Parameters
  • data – data to be sent (tuple, list or raw numpy/cupy array)

  • dest (int) – Rank of the destination process

  • tag (int) – The tag to identify the message

abstract send_obj(obj, dest, tag)

Sends an arbitrary Python object to destination with a tag.

Parameters
  • obj – Arbitrary object to send to receiver.

  • dest (int) – Rank number of receiver process (destination).

  • tag – tag to identify the message.

set_config(name, **kwargs)

Set configurations(s) on/off

The usage of configurations depends on each communicator. See create_communicator() for available configurations.

Parameters
  • name (str) – Name of configuration to set.

  • value – Give arbitrary object to set.

  • kwargs – Arbitrary arguments depending on each configuration.

property size

Number of processes of the cluster.

abstract split(color, key)

A function anologous to MPI_Comm_Split .

This method splits the inter MPI commnicator and return a wrapped ChainerMN communicator.

Parameters
  • color (int) – Index of new group. The process with the same color will be assigned to the same group.

  • key (int) – Control of rank assignment. The process will be assigned a rank in the new group ordered by the value of key. If you do not care of the rank, you can just simply specify the original rank.

Returns

CommunicatorBase

Optimizers and Evaluators

chainermn.create_multi_node_optimizer(actual_optimizer, communicator, double_buffering=False, zero_fill=True)

Create a multi node optimizer from a Chainer optimizer.

Parameters
  • actual_optimizer – Chainer optimizer (e.g., chainer.optimizers.Adam).

  • communicator – ChainerMN communicator.

  • double_buffering – If True, all-reduce and other processing (such as forward and backward) are overlapped using double buffering. There are cases where accuracy is affected because the gradients of the previous iteration are used for update. This flag is supported by PureNcclCommunicator only.

  • zero_fill – A knob to control whether to fill gradients of initialized and unused Link (which is None internally) with zero-valued array, because the all gradients must be an array among processes for performing all-reduce, which might be an array or None after backward computation. Gradients of uninitialized Link are skipped. If it is False, gradients of unused Link are just skipped.

Returns

The multi node optimizer based on actual_optimizer.

chainermn.create_multi_node_evaluator(actual_evaluator, communicator)

Create a multi node evaluator from a normal evaluator.

Actually this method patches the evaluator to work in multi node environment. This method adds several hidden attributes starting with _mn_ prefix.

Parameters
  • actual_evaluator – evaluator to be patched (e.g., chainer.training.extensions.Evaluator)

  • communicator – ChainerMN communicator

Returns

The multi-node patched actual_evaluator.

Note

After patched, original evaluator does not work correctly in non-MPI environment.

class chainermn.extensions.GenericMultiNodeEvaluator(comm, iterator, target, device=None, converter=<chainer.dataset.convert._ArbitraryCallableConverter object>, root=0, **kwargs)

Generic multi-node evaluator for non-allreducable evaluation.

This is to evaluate a Dataset that cannot evenly divided across all processes in the communicator, for evaluation calculation that is not applicable to a simple add-and-devide style averaging among processes.

Users are recommeneded to implement its own local calculation calc_local() (e.g. at each distributed GPU) and aggregation aggregate() of its results. Although it has built-in implementaiton of those two methods.

It has several drawbacks; 1) Additional implementation of aggregation required to users, and 2) no compatibility with Evaluator.

Note

No automatic support of Reporter is provided; Set it up at initialize() method

Parameters
  • comm – ChainerMN communicator object

  • iterator – An iterator for test dataset. Must be non-repeated.

  • target (callable) – A model to evaluate with test dataset

  • device (int or chainer.backend.Device) – A device indicator to send data with converter. Not used when the converter is not using any devices.

  • converter (callable) – A converter. Default value is chainer.dataset.concat_examples() .

  • root (int) – Rank number of root process to run bcast and gather with.

  • progress_hook (callable) – A callable that receives single argument for indicators. The callable is only callled at root process.

aggregate(results)

A generic aggregation method.

Override this method for original aggregation calculation. By default, it just does nothing but returns the input. This method is called once and only once across the cluster, at root process. Reporting can be run here.

Parameters

results (list) – List of return value of calc_local() obtained from all nodes..

calc_local(*args, **kwargs)

A generic method for local calculation.

Override this method to run its local calculation. Otherwise, results are calculated with original target and test dataset.

Parameters
  • args – Result of converter when it is tuple.

  • kwargs – Result of converter when it is dict.

Returns

Arbrary value may be returned, but must not be None.

Dataset Utilities

chainermn.scatter_dataset(dataset, comm, root=0, shuffle=False, seed=None, max_buf_len=268435456, *, force_equal_length=True)

Scatter the given dataset to the workers in the communicator.

The dataset of worker root (i.e., the worker whose comm.rank is root) is scattered to all workers. The given dataset of other workers are ignored. The dataset is split to sub datasets of almost equal sizes and scattered to workers. To create a sub dataset, chainer.datasets.SubDataset is used.

Note::

Make sure force_equal_length flag is not off for multinode evaluator or multinode updaters, which assume that the iterator has the same lengths among processes to work correctly.

Parameters
  • dataset – A dataset (e.g., list, numpy.ndarray, chainer.datasets.TupleDataset, …).

  • comm – ChainerMN communicator or MPI4py communicator.

  • shuffle (bool) – If True, the order of examples is shuffled before being scattered.

  • root (int) – The root process of the scatter operation.

  • seed (int) – Seed the generator used for the permutation of indexes. If an integer being convertible to 32 bit unsigned integers is specified, it is guaranteed that each sample in the given dataset always belongs to a specific subset. If None, the permutation is changed randomly.

  • max_buf_len (int) – Max buffer size to be used at broadcasting binaries. Must not be larger than 2147483647.

  • force_equal_length (bool) – Force the scattered fragments of the dataset have equal length. If True, number of scattered examples is guaranteed to be equal among processes and scattered datasets may have duplication among processes. Otherwise, number of scattered examples may not be equal among processes, but scattered examples are guaranteed to have no duplication among processes, intended for strict evaluation of test dataset to avoid duplicated examples.

Returns

Scattered dataset.

chainermn.scatter_index(n_total_samples, comm, root=0, *, force_equal_length=True)

Scatters only index to avoid heavy dataset broadcast

This is core functionality of scatter_dataset, which is almost equal to following code snippet:

(b, e) = scatter_index(len(dataset), comm)
order = None
if shuffle:
    order = numpy.random.RandomState(seed).permutation(
        n_total_samples)
    order = comm.bcast_obj(order)
dataset = SubDataset(dataset, b, e, order)
Note::

Make sure force_equal_length flag is not off for multinode evaluator or multinode updaters, which assume that the iterator has the same lengths among processes to work correctly.

Parameters
  • n_total_samples (int) – number of total samples to scatter

  • comm – ChainerMN communicator object

  • root (int) – root rank to coordinate the operation

  • force_equal_length (bool) – Force the scattered fragments of the index have equal length. If True, number of scattered indices is guaranteed to be equal among processes and scattered datasets may have duplication among processes. Otherwise, number of scattered indices may not be equal among processes, but scattered indices are guaranteed to have no duplication among processes, intended for strict evaluation of test dataset to avoid duplicated examples.

Returns

Tuple of two integers, that stands for beginning and ending offsets of the assigned sub part of samples. The ending offset is not border inclusive.

chainermn.datasets.create_empty_dataset(dataset)

Creates an empty dataset for models with no inputs and outputs.

This function generates an empty dataset, i.e., __getitem__() only returns None. Its dataset is compatible with the original one. Such datasets used for models which do not take any inputs, neither return any outputs. We expect models, e.g., whose forward() is starting with chainermn.functions.recv() and ending with chainermn.functions.send().

Parameters

dataset – Dataset to convert.

Returns

Dataset consists of only patterns in the original one.

Return type

TransformDataset

Functions

chainermn.functions.send(x, communicator, rank, tag=0)

Send elements to target process.

This function returns a dummy variable only holding the computational graph. If backward() is invoked by this dummy variable, it will try to receive gradients from the target process and send them back to the parent nodes.

Parameters
  • x (Variable) – Variable holding a matrix which you would like to send.

  • communicator (chainer.communicators.CommunicatorBase) – ChainerMN communicator.

  • rank (int) – Target process specifier.

  • tag (int) – Optional message ID (MPI feature).

Returns

A dummy variable with no actual data, only holding the computational graph. Please refer chainermn.functions.pseudo_connect for detail.

Return type

Variable

chainermn.functions.recv(communicator, rank, delegate_variable=None, tag=0, force_tuple=False)

Receive elements from target process.

This function returns data received from target process. If backward() is invoked, it will try to send gradients to the target process. The received array will be on the current CUDA device if the corresponding send() is invoked with arrays on GPU. Please be aware that the current CUDA device is intended one. (https://docs-cupy.chainer.org/en/stable/tutorial/basic.html#current-device)

Note

If you define non-connected computational graph on one process, you have to use delegate_variable to specify the output of previous computational graph component. Otherwise backward() does not work well. Please refer chainermn.functions.pseudo_connect for detail.

Parameters
  • communicator (chainer.communicators.CommunicatorBase) – ChainerMN communicator.

  • rank (int) – Target process specifier.

  • delegate_variable (chainer.Variable) – Pointer to the other non-connected component.

  • tag (int) – Optional message ID (MPI feature).

  • force_tuple (bool) – If False (the default) a Variable will be returned when the number of outputs is one. Otherwise, this method returns a tuple even when the number of outputs is one.

Returns

Data received from target process. If backward() is invoked by this variable, it will send gradients to the target process.

Return type

Variable

chainermn.functions.pseudo_connect(delegate_variable, *actual_variables)

Connect independent connected graph component.

This function is implemented to return received arguments directly, except the first delegate_variable. In backward computation, it returns received gradients directly, adding a zero grad corresponding to delegate_variable. The detail of delegate_variable is described in the following notes.

Note

In model-parallel framework, models on each process might have many non-connected components. Here we call a given graph non-connected when multiple inter-process communications are needed for its computation. For example, consider the following example:

class ConnectedGraph(chainermn.MultiNodeChainList):

    def __init__(self, comm):
        super(ConnectedGraph, self).__init__(comm)
        self.add_link(ConnectedGraphSub(), rank_in=3, rank_out=1)

This model receives inputs from rank=3 process and sends its outputs to rank=1 process. The entire graph can be seen as one connected component ConnectedGraphSub. Please refer the documentation of MultiNodeChainList for detail.

On the other hand, see the next example:

class NonConnectedGraph(chainermn.MultiNodeChainList):

    def __init__(self, comm):
        super(NonConnectedGraph, self).__init__(comm)
        self.add_link(NonConnectedGraphSubA(), rank_in=3, rank_out=1)
        self.add_link(NonConnectedGraphSubB(), rank_in=1, rank_out=2)

This model consists of two components: at first, NonConnectedGraphSubA receives inputs from rank=3 process and sends its outputs to rank=1 process, and then NonConnectedGraphSubB receives inputs from rank=1 process and sends its outputs to rank=2 process. Here multiple inter-process communications are invoked between NonConnectedGraphSubA and NonConnectedGraphSubB, so it is regarded as non-connected.

Such kind of non-connected models can be problematic in backward computation. Chainer traces back the computational graph from the output variable, however naive implementation of chainermn.functions.recv does not take any inputs rather receives inputs by MPI_Recv, where backward path vanishes.

To prevent this, dummy variables what we call delegate_variable are used. In principle, chainermn.functions.send does not return any outputs because it sends data to the other process by MPI_Send. However, chainermn.functions.send returns a dummy / empty variable in our implementation, which is called delegate_variable. This variable does not hold any data, just used for retaining backward computation path. We can guarantee the backward computation just by putting delegate_variable to the next chainermn.functions.recv (chainermn.functions.recv has an optional argument to receive delegate_variable).

Note

In some cases the intermediate graph component returns model outputs. See the next example:

class NonConnectedGraph2(chainermn.MultiNodeChainList):

    def __init__(self, comm):
        super(NonConnectedGraph2, self).__init__(comm)
        self.add_link(NonConnectedGraphSubA(), rank_in=1, rank_out=None)
        self.add_link(NonConnectedGraphSubB(), rank_in=None, rank_out=1)

This model first receives inputs from rank=1 process and make model outputs (specified by rank_out=None) in NonConnectedGraphSubA. Then using model inputs (specified by rank_in=None), NonConnectedGraphSubB sends its outputs to rank=1 process. Since MultiNodeChainList.__call__ returns outputs of the last component (in this case, outputs of NonConnectedGraphSubB), naive implementation cannot output the returned value of NonConnectedGraphSubA as the model outputs. In this case, pseudo_connect should be used.

pseudo_connect takes two arguments. The first one delegate_variable is what we explained in above note. In this case, returned value of NonConnectedGraphSubB corresponds to delegate_variable. The second one actual_variables is “what we want delegate_variable to imitate”. In NonConnectedGraph2, we obtain returned value of NonConnectedGraphSubB as the model outputs, but what we actually want is returned value of NonConnectedGraphSubA. At the same time we want to trace back this resulted variable in backward computation. Using pseudo_connect, we can make a variable whose data is the same as the returned value of NonConnectedGraphSubA, and which traces back NonConnectedGraphSubB first.

pseudo_connect should also be used in some pathological cases, for example, where multiple chainermn.functions.send occurs sequentially.

Parameters
  • delegate_variable (chainer.Variable) – Pointer to the previous non-connected graph component.

  • actual_variables (tuple of chainer.Variable) – Actual values which delegate_variable imitate.

Returns

A variable with the given values combined with delegating variable.

Return type

tuple of chainer.Variable

chainermn.functions.bcast(comm, x, root=0)

Differentiable broadcast communication between workers.

This function invokes broadcast communications among processes specified by the communicator. Backward will be invoked as well as the ordinary chainer functions, where gradients are gathered to the root process and summed up.

The received array will be on the current CUDA device if x on the invoking process is on GPU. Please be aware that the current CUDA device is intended one. (https://docs-cupy.chainer.org/en/stable/tutorial/basic.html#current-device)

Parameters
  • comm – ChainerMN communicator.

  • x (chainer.Variable) – Variable to be sent.

Returns

Broadcasted variable.

Return type

y (chainer.Variable)

chainermn.functions.gather(comm, x, root=0)

Differentiable gather communication between workers.

This function invokes gather communications among processes specified by the communicator. Backward will be invoked as well as the ordinary chainer functions, where gradients are scattered from the root process to each slave.

The received array will be on the current CUDA device if x on the root process is on GPU. Please be aware that the current CUDA device is intended one. (https://docs-cupy.chainer.org/en/stable/tutorial/basic.html#current-device)

Parameters
  • comm – ChainerMN communicator.

  • x (chainer.Variable) – Variable to be sent.

Returns

Gathered variables. None for slaves.

Return type

ys (chainer.Variable)

chainermn.functions.scatter(comm, xs, root=0)

Differentiable scatter communication between workers.

This function invokes scatter communications among processes specified by the communicator. Backward will be invoked as well as the ordinary chainer functions, where gradients are gathered to the root process.

The received array will be on the current CUDA device if xs on the root process is on GPU. Please be aware that the current CUDA device is intended one. (https://docs-cupy.chainer.org/en/stable/tutorial/basic.html#current-device)

Parameters
  • comm – ChainerMN communicator.

  • xs (list of chainer.Variable) – Variables to be scattered for master process. None for slave process.

Returns

Scattered variable.

Return type

y (chainer.Variable)

chainermn.functions.alltoall(comm, xs)

Differentiable all-to-all communication between workers.

This function invokes all-to-all communications among processes specified by the communicator. Backward will be invoked as well as the ordinary chainer functions, just passing input gradients back. Unlike point-to-point communication such as chainermn.functions.send and chainermn.functions.recv, users need not to care about delegate variables, since backward() will not be invoked until all gradients from output direction arrive. Please refer to chainermn.functions.pseudo_connect about the detail of delegate variables.

The received array will be on the current CUDA device on the invoking process if xs is on GPU. Please be aware that the current CUDA device is intended one. (https://docs-cupy.chainer.org/en/stable/tutorial/basic.html#current-device)

Parameters
  • comm – ChainerMN communicator.

  • xs (list of chainer.Variables) – Variables to send.

Returns

Received variables.

Return type

ys (list of chainer.Variables)

chainermn.functions.allgather(comm, x)

Differentiable all-gather communication between workers.

This function invokes gather communications among processes specified by the communicator. Backward will be invoked as well as the ordinary chainer functions, where gradients are reduced to each process.

The received array will be on the current CUDA device on the invoking process if x is on GPU. Please be aware that the current CUDA device is intended one. (https://docs-cupy.chainer.org/en/stable/tutorial/basic.html#current-device)

Parameters
  • comm – ChainerMN communicator.

  • x (chainer.Variables) – Variables to send.

Returns

Received variables.

Return type

ys (list of chainer.Variables)

Iterators

chainermn.iterators.create_multi_node_iterator(actual_iterator, communicator, rank_master=0)

Create a multi node iterator from a Chainer iterator.

This iterator shares the same batches on multiple processes, simply broadcasting batches from master process to slave processes in each iteration. Master process obtains batches from actual_iterator, which you can specify any Chainer iterator (e.g. chainer.iterators.SerialIterator).

Here is an example situation. When we train a sequence-to-sequence model, where the encoder and the decoder is located on two different processes, we want to share the same batches on each process, thus inputs for the encoder and output teacher signals for the decoder become consistent.

In order to use the multi node iterator, first create the iterator from Chainer iterator and ChainerMN communicator:

iterator = chainermn.iterators.create_multi_node_iterator(
    chainer.iterators.SerialIterator(
        dataset, batch_size, shuffle=True),
    communicator)

Then you can use it as the ordinary Chainer iterator:

updater = chainer.training.StandardUpdater(iterator, optimizer)
trainer = training.Trainer(updater)
trainer.run()

Since this iterator shares batches through network in each iteration, communication might be large. If you train your model-parallel network on extremely large dataset, you can also consider to use chainermn.iterators.create_synchronized_iterator.

Current multi node iterator supports numpy.float32 or tuple of numpy.float32 as the data type of the batch element.

Note

create_multi_node_iterator and serialize of created iterators must be called at the same time by master and slaves, unless it falls into deadlock because they synchronize internal states of iterators.

Parameters
  • actual_iterator – Chainer iterator (chainer.iterators.SerialIterator and chainer.iterators.MultiprocessIterator are supported).

  • communicator – ChainerMN communicator.

  • rank_master – process rank to be master.

Returns

The master-slave iterator based on actual_iterator.

chainermn.iterators.create_synchronized_iterator(actual_iterator, communicator)

Create a synchronized iterator from a Chainer iterator.

This iterator shares the same batches on multiple processes, using the same random number generators to maintain the order of batch shuffling same.

Here is an example situation. When we train a sequence-to-sequence model, where the encoder and the decoder is located on two different processes, we want to share the same batches on each process, thus inputs for the encoder and output teacher signals for the decoder become consistent.

In order to use the synchronized iterator, first create the iterator from Chainer iterator and ChainerMN communicator:

iterator = chainermn.iterators.create_synchronized_iterator(
    chainer.iterators.SerialIterator(
        dataset, batch_size, shuffle=True),
    communicator)

Then you can use it as the ordinary Chainer iterator:

updater = chainer.training.StandardUpdater(iterator, optimizer)
trainer = training.Trainer(updater)
trainer.run()

The resulting iterator shares the same shuffling order among processes in the specified communicator.

Parameters
  • actual_iterator – Chainer iterator (e.g., chainer.iterators.SerialIterator).

  • communicator – ChainerMN communicator.

Returns

The synchronized iterator based on actual_iterator.

Trainer extensions

class chainermn.extensions.AllreducePersistent(model, comm)

Chainer extension to averagize persistents over workers.

When called, this extension invokes all-reduce communication among workers to compute averages of persistent variables in the model. Persistent variables are updated to the averages. Currently, we ignore integer persistent variables, and only float persistent variables are handled.

This extension is mainly to improve the running mean and variance of BatchNormalization by increasing the effective number of examples. We do not need to call this frequently; call just before storing or evaluating the model.

Parameters
  • model (chainer.link.Link) – Target link object.

  • comm (ChainerMN communicator) – communicator to compute averages.

chainermn.extensions.multi_node_snapshot(comm, snapshot, replica_sets)

Create trainer extension for multi-node snapshots

Provides generis multi-node snapshot saving and auto-load feature at multi-node environment, leveraging power of single-node snapshot.

In many cases snapshot target may differ, e.g. only trainer of rank 0 process often has extensions such as LogReport and so on, to not confuse terminal output. Just loading at one process and broadcasting it to other processes does not work in that case.

This wrapper addresses that issue by defining sets of replicas where within the set the target object is replicated and supposed to be same among processes. For example, a trainer example, only the trainer at rank 0 has special extensions and others doesn’t:

trainer = Trainer(updater)
if comm.rank == 0:
    trainer.extend(extensions.DumpGraph('main/loss'))
    trainer.extend(extensions.LogReport())
    trainer.extend(extensions.PrintReport(
        ['epoch', 'main/loss', 'validation/main/loss',
         'main/accuracy', 'validation/main/accuracy', 'elapsed_time']))
    trainer.extend(extensions.ProgressBar())

This case can be described with two replica sets, where each set can be represented as single integer that indicates rank number, or iterable set/list/generator of integers like this:

replica_sets = [[0], range(1, comm.size)]

Here the first replica set is described as [0], or simply in short just 0, and the second replica set is range(1, comm.size), representing rest of processes other than 0. The remaining list can be omitted. Thus in that case, it can be simplified more:

replica_sets = [0,]

In this case, the snapshot will be saved at rank 0 process and at rank 1 process. The latter represents the replica set of range(1, comm.size) . In this case autoloading at initialization of snapshot extension works after the restart cleanly, even though the size of the communicator differs.

Once the replica sets are defined, it can be easily extended:

replica_sets = [0,]
snapshot = multi_node_snapshot(comm, extensions.snapshot(),
                               replica_sets)
trainer.extend(snapshot, trigger=(1, 'epoch'))

More example tuples of replica set representation follows:

code

nproc

actual sets

[0]

4

[{0}, {1, 2, 3}]

[0, 1]

4

[{0}, {1}, {2, 3}]

[0, 1], [2, 3]]

4

[{0, 1}, {2, 3}]

[]

4

[{0, 1, 2, 3}]

[range(0, 8, 2)]

8

[set(range(0, 8, 2)), set(range(1, 8, 2))]

Parameters
  • comm (ChainerMN communicator) – communicater object

  • snapshot – Snapshot extension object obtained via snapshot() .

  • replica_sets – list of replica set definition, where a replica set can be defined by single integer as rank number, or iterable integers.

Returns

Trainer extension that wraps snapshot and properly controles number of snapshots.

chainermn.create_multi_node_checkpointer(name, comm, cp_interval=5, gc_interval=5, path=None)

Create multi-node checkpointer object

Generational snapshot extension to allow fault tolerance; It keeps several old snapshots to rollback synchronized snapshot at each MPI process. Snapshot files are identified as ‘<name>.<rank>.<iteration>’.

  • <name> … identifier of the run where snapshot is kept for

  • <rank> … which process owned the model

  • <iteration> … number of iteration.

This extension keeps several files for each execution and allows users to resume the whole job at the latest snapshots of each MPI process, and the iteration where all snapshots agrees.

As this object is a usual Chainer extension, users can just create this object and pass to the trainer as an extension:

checkpointer = create_multi_node_checkpointer(name=run_id, comm=comm)
trainer.extend(checkpointer, trigger=(25, 'iteration'))

To run recovery at startup, before first iteration, run

checkpointer.maybe_load(trainer, optimizer)

before trainer.run() . If nothing is recovered (i.e. no snapshot found), trainer.updater.iteration will remain 0 . Otherwise it will have the value of snapshot and the training will resume from that iteration. optimizer is optional but this will let multi node optimizer avoid initial broadcast when all snapshot data among nodes are all in sync.

Note

Make sure that checkpointer.maybe_load is called after all extensions with states, such as ExponentialShift, set to the trainer.

Note

The checkpointer is deprecated. Please use chainermn.extensions.multi_node_snapshot() instead.

After training finished without errors all those temporary checkpoints will be cleaned up at all nodes.

Another example to use checkpointer without trainer would be:

checkpointer = create_multi_node_checkpointer(name=run_id, comm=comm)
checkpointer.maybe_load(obj_you_want_to_snap, optimizer)

while True: ## Training loop
    ...
    updater.update()
    ...
    checkpointer.save(obj_you_want_to_snap)  # Make a checkpoint
Parameters
  • name (str) – unique id of the run

  • comm – communicater in ChainerMN

  • cp_interval (int) – minimum number of checkpoints to preserve

  • gc_interval (int) – interval to collect non-preserved checkpoints

Configurations