Local Cluster

For convenience you can start a local cluster from your Python session.

>>> from distributed import Client, LocalCluster
>>> cluster = LocalCluster()
LocalCluster("", workers=8, ncores=8)
>>> client = Client(cluster)
<Client: scheduler= processes=8 cores=8>

You can dynamically scale this cluster up and down:

>>> worker = cluster.add_worker()
>>> cluster.remove_worker(worker)

Alternatively, a LocalCluster is made for you automatically if you create an Client with no arguments:

>>> from distributed import Client
>>> client = Client()
>>> client
<Client: scheduler= processes=8 cores=8>


Within a Python script you need to start a local cluster in the if __name__ == '__main__' block:

if __name__ == '__main__':
    cluster = LocalCluster()
    client = Client(cluster)
    # Your code follows here


class distributed.deploy.local.LocalCluster(n_workers=None, threads_per_worker=None, processes=True, loop=None, start=None, ip=None, scheduler_port=0, silence_logs=30, dashboard_address=':8787', diagnostics_port=None, services=None, worker_services=None, service_kwargs=None, asynchronous=False, security=None, protocol=None, blocked_handlers=None, **worker_kwargs)[source]

Create local Scheduler and Workers

This creates a “cluster” of a scheduler and workers running on the local machine.

n_workers: int

Number of workers to start

processes: bool

Whether to use processes (True) or threads (False). Defaults to True

threads_per_worker: int

Number of threads per each worker

scheduler_port: int

Port of the scheduler. 8786 by default, use 0 to choose a random port

silence_logs: logging level

Level of logs to print out to stdout. logging.WARN by default. Use a falsey value like False or None for no change.

ip: string

IP address on which the scheduler will listen, defaults to only localhost

dashboard_address: str

Address on which to listen for the Bokeh diagnostics server like ‘localhost:8787’ or ‘’. Defaults to ‘:8787’. Set to None to disable the dashboard. Use port 0 for a random port.

diagnostics_port: int

Deprecated. See dashboard_address.

asynchronous: bool (False by default)

Set to True if using this cluster within async/await functions or within Tornado gen.coroutines. This should remain False for normal use.

kwargs: dict

Extra worker arguments, will be passed to the Worker constructor.

blocked_handlers: List[str]

A list of strings specifying a blacklist of handlers to disallow on the Scheduler, like ['feed', 'run_function']

service_kwargs: Dict[str, Dict]

Extra keywords to hand to the running services

security : Security
protocol: str (optional)

Protocol to use like tcp://, tls://, inproc:// This defaults to sensible choice given other keyword arguments like processes and security


>>> c = LocalCluster()  # Create a local cluster with as many workers as cores  # doctest: +SKIP
>>> c  # doctest: +SKIP
LocalCluster("", workers=8, ncores=8)
>>> c = Client(c)  # connect to local cluster  # doctest: +SKIP

Add a new worker to the cluster

>>> w = c.start_worker(ncores=2)  # doctest: +SKIP

Shut down the extra worker

>>> c.stop_worker(w)  # doctest: +SKIP

Pass extra keyword arguments to Bokeh

>>> LocalCluster(service_kwargs={'bokeh': {'prefix': '/foo'}})  # doctest: +SKIP

Close the cluster


Remove workers from the cluster

Given a list of worker addresses this function should remove those workers from the cluster. This may require tracking which jobs are associated to which worker address.

This can be implemented either as a function or as a Tornado coroutine.

scale_up(n, **kwargs)[source]

Bring the total count of workers up to n

This function/coroutine should bring the total number of workers up to the number n.

This can be implemented either as a function or as a Tornado coroutine.


Add a new worker to the running cluster

port: int (optional)

Port on which to serve the worker, defaults to 0 or random

ncores: int (optional)

Number of threads to use. Defaults to number of logical cores

The created Worker or Nanny object. Can be discarded.


>>> c = LocalCluster()  # doctest: +SKIP
>>> c.start_worker(ncores=2)  # doctest: +SKIP

Stop a running worker


>>> c = LocalCluster()  # doctest: +SKIP
>>> w = c.start_worker(ncores=2)  # doctest: +SKIP
>>> c.stop_worker(w)  # doctest: +SKIP