API

Executor

Executor(address[, start, loop, timeout]) Drive computations on a distributed cluster
Executor.cancel(futures) Cancel running futures
Executor.compute(args[, sync]) Compute dask collections on cluster
Executor.gather(futures[, errors]) Gather futures from distributed memory
Executor.get(dsk, keys, **kwargs) Compute dask graph
Executor.has_what([workers]) Which keys are held by which workers
Executor.map(func, *iterables, **kwargs) Map a function on a sequence of arguments
Executor.ncores([workers]) The number of threads/cores available on each worker node
Executor.persist(collections) Persist dask collections on cluster
Executor.rebalance([futures, workers]) Rebalance data within network
Executor.replicate(futures[, n, workers, ...]) Set replication of futures within network
Executor.restart() Restart the distributed network
Executor.run(function, *args, **kwargs) Run a function on all workers outside of task scheduling system
Executor.scatter(data[, workers, broadcast]) Scatter data into distributed memory
Executor.shutdown([timeout]) Send shutdown signal and wait until scheduler terminates
Executor.submit(func, *args, **kwargs) Submit a function application to the scheduler
Executor.upload_file(filename) Upload local package to workers
Executor.who_has([futures]) The workers storing each future’s data

Future

Future(key, executor) A remotely running computation
Future.cancel() Returns True if the future has been cancelled
Future.cancelled() Returns True if the future has been cancelled
Future.done() Is the computation complete?
Future.exception() Return the exception of a failed task
Future.result() Wait until computation completes.
Future.traceback() Return the traceback of a failed task

Other

as_completed(fs) Return futures in the order in which they complete
distributed.diagnostics.progress(*futures, ...) Track progress of futures
wait(fs[, timeout, return_when]) Wait until all futures are complete

Executor

class distributed.executor.Executor(address, start=True, loop=None, timeout=3)[source]

Drive computations on a distributed cluster

The Executor connects users to a distributed compute cluster. It provides an asynchronous user interface around functions and futures. This class resembles executors in concurrent.futures but also allows Future objects within submit/map calls.

Parameters:

address: string, tuple, or ``Scheduler``

This can be the address of a Scheduler server, either as a string '127.0.0.1:8787' or tuple ('127.0.0.1', 8787) or it can be a local Scheduler object.

See also

distributed.scheduler.Scheduler
Internal scheduler

Examples

Provide cluster’s head node address on initialization:

>>> executor = Executor('127.0.0.1:8787')  

Use submit method to send individual computations to the cluster

>>> a = executor.submit(add, 1, 2)  
>>> b = executor.submit(add, 10, 20)  

Continue using submit or map on results to build up larger computations

>>> c = executor.submit(add, a, b)  

Gather results with the gather method.

>>> executor.gather([c])  
33
cancel(futures)[source]

Cancel running futures

This stops future tasks from being scheduled if they have not yet run and deletes them if they have already run. After calling, this result and all dependent results will no longer be accessible

Parameters:futures: list of Futures
compute(args, sync=False)[source]

Compute dask collections on cluster

Parameters:

args: iterable of dask objects or single dask object

Collections like dask.array or dataframe or dask.value objects

sync: bool (optional)

Returns Futures if False (default) or concrete values if True

Returns:

List of Futures if input is a sequence, or a single future otherwise

See also

Executor.get
Normal synchronous dask.get function

Examples

>>> from dask import do, value
>>> from operator import add
>>> x = dask.do(add)(1, 2)
>>> y = dask.do(add)(x, x)
>>> xx, yy = executor.compute([x, y])  
>>> xx  
<Future: status: finished, key: add-8f6e709446674bad78ea8aeecfee188e>
>>> xx.result()  
3
>>> yy.result()  
6

Also support single arguments

>>> xx = executor.compute(x)  
gather(futures, errors='raise')[source]

Gather futures from distributed memory

Accepts a future, nested container of futures, iterator, or queue. The return type will match the input type.

Returns:Future results

See also

Executor.scatter
Send data out to cluster

Examples

>>> from operator import add  
>>> e = Executor('127.0.0.1:8787')  
>>> x = e.submit(add, 1, 2)  
>>> e.gather(x)  
3
>>> e.gather([x, [x], x])  # support lists and dicts 
[3, [3], 3]
>>> seq = e.gather(iter([x, x]))  # support iterators 
>>> next(seq)  
3
get(dsk, keys, **kwargs)[source]

Compute dask graph

Parameters:

dsk: dict

keys: object, or nested lists of objects

restrictions: dict (optional)

A mapping of {key: {set of worker hostnames}} that restricts where jobs can take place

See also

Executor.compute
Compute asynchronous collections

Examples

>>> from operator import add  
>>> e = Executor('127.0.0.1:8787')  
>>> e.get({'x': (add, 1, 2)}, 'x')  
3
has_what(workers=None)[source]

Which keys are held by which workers

Parameters:

workers: list (optional)

A list of worker addresses, defaults to all

Examples

>>> x, y, z = e.map(inc, [1, 2, 3])  
>>> e.has_what()  
{'192.168.1.141:46784': ['inc-1c8dd6be1c21646c71f76c16d09304ea',
                         'inc-fd65c238a7ea60f6a01bf4c8a5fcf44b',
                         'inc-1e297fc27658d7b67b3a758f16bcf47a']}
map(func, *iterables, **kwargs)[source]

Map a function on a sequence of arguments

Arguments can be normal objects or Futures

Parameters:

func: callable

iterables: Iterables, Iterators, or Queues

pure: bool (defaults to True)

Whether or not the function is pure. Set pure=False for impure functions like np.random.random.

workers: set, iterable of sets

A set of worker hostnames on which computations may be performed. Leave empty to default to all workers (common case)

Returns:

List, iterator, or Queue of futures, depending on the type of the

inputs.

See also

Executor.submit
Submit a single function

Examples

>>> L = executor.map(func, sequence)  
nbytes(keys=None, summary=True)[source]

The bytes taken up by each key on the cluster

This is as measured by sys.getsizeof which may not accurately reflect the true cost.

Parameters:

keys: list (optional)

A list of keys, defaults to all keys

summary: boolean, (optional)

Summarize keys into key types

See also

Executor.who_has

Examples

>>> x, y, z = e.map(inc, [1, 2, 3])  
>>> e.nbytes(summary=False)  
{'inc-1c8dd6be1c21646c71f76c16d09304ea': 28,
 'inc-1e297fc27658d7b67b3a758f16bcf47a': 28,
 'inc-fd65c238a7ea60f6a01bf4c8a5fcf44b': 28}
>>> e.nbytes(summary=True)  
{'inc': 84}
ncores(workers=None)[source]

The number of threads/cores available on each worker node

Parameters:

workers: list (optional)

A list of workers that we care about specifically. Leave empty to receive information about all workers.

Examples

>>> e.ncores()  
{'192.168.1.141:46784': 8,
 '192.167.1.142:47548': 8,
 '192.167.1.143:47329': 8,
 '192.167.1.144:37297': 8}
persist(collections)[source]

Persist dask collections on cluster

Starts computation of the collection on the cluster in the background. Provides a new dask collection that is semantically identical to the previous one, but now based off of futures currently in execution.

Parameters:

collections: sequence or single dask object

Collections like dask.array or dataframe or dask.value objects

Returns:

List of collections, or single collection, depending on type of input.

See also

Executor.compute

Examples

>>> xx = executor.persist(x)  
>>> xx, yy = executor.persist([x, y])  
rebalance(futures=None, workers=None)[source]

Rebalance data within network

Move data between workers to roughly balance memory burden. This either affects a subset of the keys/workers or the entire network, depending on keyword arguments.

This operation is generally not well tested against normal operation of the scheduler. It it not recommended to use it while waiting on computations.

Parameters:

futures: list, optional

A list of futures to balance, defaults all data

workers: list, optional

A list of workers on which to balance, defaults to all workers

replicate(futures, n=None, workers=None, branching_factor=2)[source]

Set replication of futures within network

This performs a tree copy of the data throughout the network individually on each piece of data.

This operation blocks until complete. It does not guarantee replication of data to future workers.

Parameters:

futures: list of futures

Futures we wish to replicate

n: int, optional

Number of processes on the cluster on which to replicate the data. Defaults to all.

workers: list of worker addresses

Workers on which we want to restrict the replication. Defaults to all.

branching_factor: int, optional

The number of workers that can copy data in each generation

Examples

>>> x = e.submit(func, *args)  
>>> e.replicate([x])  # send to all workers  
>>> e.replicate([x], n=3)  # send to three workers  
>>> e.replicate([x], workers=['alice', 'bob'])  # send to specific  
>>> e.replicate([x], n=1, workers=['alice', 'bob'])  # send to one of specific workers  
>>> e.replicate([x], n=1)  # reduce replications 
restart()[source]

Restart the distributed network

This kills all active work, deletes all data on the network, and restarts the worker processes.

run(function, *args, **kwargs)[source]

Run a function on all workers outside of task scheduling system

This calls a function on all currently known workers immediately, blocks until those results come back, and returns the results asynchronously as a dictionary keyed by worker address. This method if generally used for side effects, such and collecting diagnostic information or installing libraries.

Parameters:

function: callable

*args: arguments for remote function

**kwargs: keyword arguments for remote function

workers: list

Workers on which to run the function. Defaults to all known workers.

Examples

>>> e.run(os.getpid)  
{'192.168.0.100:9000': 1234,
 '192.168.0.101:9000': 4321,
 '192.168.0.102:9000': 5555}

Restrict computation to particular workers with the workers= keyword argument.

>>> e.run(os.getpid, workers=['192.168.0.100:9000',
...                           '192.168.0.101:9000'])  
{'192.168.0.100:9000': 1234,
 '192.168.0.101:9000': 4321}
scatter(data, workers=None, broadcast=False)[source]

Scatter data into distributed memory

Parameters:

data: list, iterator, dict, or Queue

Data to scatter out to workers. Output type matches input type.

workers: list of tuples (optional)

Optionally constrain locations of data. Specify workers as hostname/port pairs, e.g. ('127.0.0.1', 8787).

broadcast: bool (defaults to False)

Whether to send each data element to all workers. By default we round-robin based on number of cores.

Returns:

List, dict, iterator, or queue of futures matching the type of input.

See also

Executor.gather
Gather data back to local process

Examples

>>> e = Executor('127.0.0.1:8787')  
>>> e.scatter([1, 2, 3])  
[<Future: status: finished, key: c0a8a20f903a4915b94db8de3ea63195>,
 <Future: status: finished, key: 58e78e1b34eb49a68c65b54815d1b158>,
 <Future: status: finished, key: d3395e15f605bc35ab1bac6341a285e2>]
>>> e.scatter({'x': 1, 'y': 2, 'z': 3})  
{'x': <Future: status: finished, key: x>,
 'y': <Future: status: finished, key: y>,
 'z': <Future: status: finished, key: z>}

Constrain location of data to subset of workers >>> e.scatter([1, 2, 3], workers=[(‘hostname’, 8788)]) # doctest: +SKIP

Handle streaming sequences of data with iterators or queues >>> seq = e.scatter(iter([1, 2, 3])) # doctest: +SKIP >>> next(seq) # doctest: +SKIP <Future: status: finished, key: c0a8a20f903a4915b94db8de3ea63195>,

Broadcast data to all workers >>> [future] = e.scatter([element], broadcast=True) # doctest: +SKIP

shutdown(timeout=10)[source]

Send shutdown signal and wait until scheduler terminates

start(**kwargs)[source]

Start scheduler running in separate thread

submit(func, *args, **kwargs)[source]

Submit a function application to the scheduler

Parameters:

func: callable

*args:

**kwargs:

pure: bool (defaults to True)

Whether or not the function is pure. Set pure=False for impure functions like np.random.random.

workers: set, iterable of sets

A set of worker hostnames on which computations may be performed. Leave empty to default to all workers (common case)

allow_other_workers: bool (defaults to False)

Used with workers. Inidicates whether or not the computations may be performed on workers that are not in the workers set(s).

Returns:

Future

See also

Executor.map
Submit on many arguments at once

Examples

>>> c = executor.submit(add, a, b)  
upload_file(filename)[source]

Upload local package to workers

This sends a local file up to all worker nodes. This file is placed into a temporary directory on Python’s system path so any .py or .egg files will be importable.

Parameters:

filename: string

Filename of .py or .egg file to send to workers

Examples

>>> executor.upload_file('mylibrary.egg')  
>>> from mylibrary import myfunc  
>>> L = e.map(myfunc, seq)  
who_has(futures=None)[source]

The workers storing each future’s data

Parameters:

futures: list (optional)

A list of futures, defaults to all data

Examples

>>> x, y, z = e.map(inc, [1, 2, 3])  
>>> e.who_has()  
{'inc-1c8dd6be1c21646c71f76c16d09304ea': ['192.168.1.141:46784'],
 'inc-1e297fc27658d7b67b3a758f16bcf47a': ['192.168.1.141:46784'],
 'inc-fd65c238a7ea60f6a01bf4c8a5fcf44b': ['192.168.1.141:46784']}
>>> e.who_has([x, y])  
{'inc-1c8dd6be1c21646c71f76c16d09304ea': ['192.168.1.141:46784'],
 'inc-1e297fc27658d7b67b3a758f16bcf47a': ['192.168.1.141:46784']}

CompatibleExecutor

class distributed.executor.CompatibleExecutor(address, start=True, loop=None, timeout=3)[source]

A concurrent.futures-compatible Executor

A subclass of Executor that conforms to concurrent.futures API, allowing swapping in for other Executors.

map(func, *iterables, **kwargs)[source]

Map a function on a sequence of arguments

Returns:

iter_results: iterable

Iterable yielding results of the map.

See also

Executor.map
for more info

Future

class distributed.executor.Future(key, executor)[source]

A remotely running computation

A Future is a local proxy to a result running on a remote worker. A user manages future objects in the local Python process to determine what happens in the larger cluster.

See also

Executor
Creates futures

Examples

Futures typically emerge from Executor computations

>>> my_future = executor.submit(add, 1, 2)  

We can track the progress and results of a future

>>> my_future  
<Future: status: finished, key: add-8f6e709446674bad78ea8aeecfee188e>

We can get the result or the exception and traceback from the future

>>> my_future.result()  
cancel()[source]

Returns True if the future has been cancelled

cancelled()[source]

Returns True if the future has been cancelled

done()[source]

Is the computation complete?

exception()[source]

Return the exception of a failed task

See also

Future.traceback

result()[source]

Wait until computation completes. Gather result to local process

traceback()[source]

Return the traceback of a failed task

This returns a traceback object. You can inspect this object using the traceback module. Alternatively if you call future.result() this traceback will accompany the raised exception.

See also

Future.exception

Examples

>>> import traceback  
>>> tb = future.traceback()  
>>> traceback.export_tb(tb)  
[...]

Other

distributed.executor.as_completed(fs)[source]

Return futures in the order in which they complete

This returns an iterator that yields the input future objects in the order in which they complete. Calling next on the iterator will block until the next future completes, irrespective of order.

This function does not return futures in the order in which they are input.

distributed.diagnostics.progress(*futures, **kwargs)[source]

Track progress of futures

This operates differently in the notebook and the console

  • Notebook: This returns immediately, leaving an IPython widget on screen
  • Console: This blocks until the computation completes
Parameters:

futures: Futures

A list of futures or keys to track

notebook: bool (optional)

Running in the notebook or not (defaults to guess)

multi: bool (optional)

Track different functions independently (defaults to True)

complete: bool (optional)

Track all keys (True) or only keys that have not yet run (False) (defaults to True)

Examples

>>> progress(futures)  
[########################################] | 100% Completed |  1.7s
distributed.executor.wait(fs, timeout=None, return_when='ALL_COMPLETED')[source]

Wait until all futures are complete

Parameters:fs: list of futures
Returns:Named tuple of completed, not completed