Local Cluster

For convenience you can start a local cluster from your Python session.

>>> from distributed import Client, LocalCluster
>>> cluster = LocalCluster()
LocalCluster("127.0.0.1:8786", workers=8, nthreads=8)
>>> client = Client(cluster)
<Client: scheduler=127.0.0.1:8786 processes=8 cores=8>

You can dynamically scale this cluster up and down:

>>> worker = cluster.add_worker()
>>> cluster.remove_worker(worker)

Alternatively, a LocalCluster is made for you automatically if you create an Client with no arguments:

>>> from distributed import Client
>>> client = Client()
>>> client
<Client: scheduler=127.0.0.1:8786 processes=8 cores=8>

Note

Within a Python script you need to start a local cluster in the if __name__ == '__main__' block:

if __name__ == '__main__':
    cluster = LocalCluster()
    client = Client(cluster)
    # Your code follows here

API

class distributed.deploy.local.LocalCluster(n_workers=None, threads_per_worker=None, processes=True, loop=None, start=None, host=None, ip=None, scheduler_port=0, silence_logs=30, dashboard_address=':8787', worker_dashboard_address=None, diagnostics_port=None, services=None, worker_services=None, service_kwargs=None, asynchronous=False, security=None, protocol=None, blocked_handlers=None, interface=None, worker_class=None, **worker_kwargs)[source]

Create local Scheduler and Workers

This creates a “cluster” of a scheduler and workers running on the local machine.

Parameters:
n_workers: int

Number of workers to start

processes: bool

Whether to use processes (True) or threads (False). Defaults to True

threads_per_worker: int

Number of threads per each worker

scheduler_port: int

Port of the scheduler. 8786 by default, use 0 to choose a random port

silence_logs: logging level

Level of logs to print out to stdout. logging.WARN by default. Use a falsey value like False or None for no change.

host: string

Host address on which the scheduler will listen, defaults to only localhost

ip: string

Deprecated. See host above.

dashboard_address: str

Address on which to listen for the Bokeh diagnostics server like ‘localhost:8787’ or ‘0.0.0.0:8787’. Defaults to ‘:8787’. Set to None to disable the dashboard. Use ‘:0’ for a random port.

diagnostics_port: int

Deprecated. See dashboard_address.

asynchronous: bool (False by default)

Set to True if using this cluster within async/await functions or within Tornado gen.coroutines. This should remain False for normal use.

worker_kwargs: dict

Extra worker arguments, will be passed to the Worker constructor.

blocked_handlers: List[str]

A list of strings specifying a blacklist of handlers to disallow on the Scheduler, like ['feed', 'run_function']

service_kwargs: Dict[str, Dict]

Extra keywords to hand to the running services

security : Security or bool, optional

Configures communication security in this cluster. Can be a security object, or True. If True, temporary self-signed credentials will be created automatically.

protocol: str (optional)

Protocol to use like tcp://, tls://, inproc:// This defaults to sensible choice given other keyword arguments like processes and security

interface: str (optional)

Network interface to use. Defaults to lo/localhost

worker_class: Worker

Worker class used to instantiate workers from.

Examples

>>> cluster = LocalCluster()  # Create a local cluster with as many workers as cores  # doctest: +SKIP
>>> cluster  # doctest: +SKIP
LocalCluster("127.0.0.1:8786", workers=8, threads=8)
>>> c = Client(cluster)  # connect to local cluster  # doctest: +SKIP

Scale the cluster to three workers

>>> cluster.scale(3)  # doctest: +SKIP

Pass extra keyword arguments to Bokeh

>>> LocalCluster(service_kwargs={'dashboard': {'prefix': '/foo'}})  # doctest: +SKIP