Worker

Overview

Workers provide two functions:

  1. Compute tasks as directed by the scheduler

  2. Store and serve computed results to other workers or clients

Each worker contains a ThreadPool that it uses to evaluate tasks as requested by the scheduler. It stores the results of these tasks locally and serves them to other workers or clients on demand. If the worker is asked to evaluate a task for which it does not have all of the necessary data then it will reach out to its peer workers to gather the necessary dependencies.

A typical conversation between a scheduler and two workers Alice and Bob may look like the following:

Scheduler -> Alice:  Compute ``x <- add(1, 2)``!
Alice -> Scheduler:  I've computed x and am holding on to it!

Scheduler -> Bob:    Compute ``y <- add(x, 10)``!
                     You will need x.  Alice has x.
Bob -> Alice:        Please send me x.
Alice -> Bob:        Sure.  x is 3!
Bob -> Scheduler:    I've computed y and am holding on to it!

Storing Data

Data is stored locally in a dictionary in the .data attribute that maps keys to the results of function calls.

>>> worker.data
{'x': 3,
 'y': 13,
 ...
 '(df, 0)': pd.DataFrame(...),
 ...
 }

This .data attribute is a MutableMapping that is typically a combination of in-memory and on-disk storage with an LRU policy to move data between them.

Read more: Worker Memory Management

Thread Pool

Each worker sends computations to a thread in a concurrent.futures.ThreadPoolExecutor for computation. These computations occur in the same process as the Worker communication server so that they can access and share data efficiently between each other. For the purposes of data locality all threads within a worker are considered the same worker.

If your computations are mostly numeric in nature (for example NumPy and Pandas computations) and release the GIL entirely then it is advisable to run dask worker processes with many threads and one process. This reduces communication costs and generally simplifies deployment.

If your computations are mostly Python code and don’t release the GIL then it is advisable to run dask worker processes with many processes and one thread per process:

$ dask worker scheduler:8786 --nworkers 8 --nthreads 1

This will launch 8 worker processes each of which has its own ThreadPoolExecutor of size 1.

If your computations are external to Python and long-running and don’t release the GIL then beware that while the computation is running the worker process will not be able to communicate to other workers or to the scheduler. This situation should be avoided. If you don’t link in your own custom C/Fortran code then this topic probably doesn’t apply.

Command Line tool

Use the dask worker command line tool to start an individual worker. For more details on the command line options, please have a look at the command line tools documentation.

Internal Scheduling

See dedicated page: Worker State Machine

API Documentation

class distributed.worker.Worker(scheduler_ip: str | None = None, scheduler_port: int | None = None, *, scheduler_file: str | None = None, nthreads: int | None = None, loop: IOLoop | None = None, local_directory: str | None = None, services: dict | None = None, name: Any | None = None, reconnect: bool | None = None, executor: Executor | dict[str, Executor] | Literal['offload'] | None = None, resources: dict[str, float] | None = None, silence_logs: int | None = None, death_timeout: Any | None = None, preload: list[str] | None = None, preload_argv: list[str] | list[list[str]] | None = None, security: Security | dict[str, Any] | None = None, contact_address: str | None = None, heartbeat_interval: Any = '1s', extensions: dict[str, type] | None = None, metrics: Mapping[str, Callable[[Worker], Any]] = {}, startup_information: Mapping[str, Callable[[Worker], Any]] = {}, interface: str | None = None, host: str | None = None, port: int | str | Collection[int] | None = None, protocol: str | None = None, dashboard_address: str | None = None, dashboard: bool = False, http_prefix: str = '/', nanny: Nanny | None = None, plugins: tuple[WorkerPlugin, ...] = (), low_level_profiler: bool | None = None, validate: bool | None = None, profile_cycle_interval=None, lifetime: Any | None = None, lifetime_stagger: Any | None = None, lifetime_restart: bool | None = None, transition_counter_max: int | Literal[False] = False, memory_limit: str | float = 'auto', data: WorkerDataParameter = None, memory_target_fraction: float | Literal[False] | None = None, memory_spill_fraction: float | Literal[False] | None = None, memory_pause_fraction: float | Literal[False] | None = None, scheduler_sni: str | None = None, **kwargs)[source]

Worker node in a Dask distributed cluster

Workers perform two functions:

  1. Serve data from a local dictionary

  2. Perform computation on that data and on data from peers

Workers keep the scheduler informed of their data and use that scheduler to gather data from other workers when necessary to perform a computation.

You can start a worker with the dask worker command line application:

$ dask worker scheduler-ip:port

Use the --help flag to see more options:

$ dask worker --help

The rest of this docstring is about the internal state that the worker uses to manage and track internal computations.

State

Informational State

These attributes don’t change significantly during execution.

  • nthreads: int:

    Number of nthreads used by this worker process

  • executors: dict[str, concurrent.futures.Executor]:

    Executors used to perform computation. Always contains the default executor.

  • local_directory: path:

    Path on local machine to store temporary files

  • scheduler: PooledRPCCall:

    Location of scheduler. See .ip/.port attributes.

  • name: string:

    Alias

  • services: {str: Server}:

    Auxiliary web servers running on this worker

  • service_ports: {str: port}:

  • transfer_outgoing_count_limit: int

    The maximum number of concurrent outgoing data transfers. See also distributed.worker_state_machine.WorkerState.transfer_incoming_count_limit.

  • batched_stream: BatchedSend

    A batched stream along which we communicate to the scheduler

  • log: [(message)]

    A structured and queryable log. See Worker.story

Volatile State

These attributes track the progress of tasks that this worker is trying to complete. In the descriptions below a key is the name of a task that we want to compute and dep is the name of a piece of dependent data that we want to collect from others.

  • threads: {key: int}

    The ID of the thread on which the task ran

  • active_threads: {int: key}

    The keys currently running on active threads

  • state: WorkerState

    Encapsulated state machine. See BaseWorker and WorkerState

Parameters
scheduler_ip: str, optional
scheduler_port: int, optional
scheduler_file: str, optional
host: str, optional
data: MutableMapping, type, None

The object to use for storage, builds a disk-backed LRU dict by default.

If a callable to construct the storage object is provided, it will receive the worker’s attr:local_directory as an argument if the calling signature has an argument named worker_local_directory.

nthreads: int, optional
local_directory: str, optional

Directory where we place local resources

name: str, optional
memory_limit: int, float, string

Number of bytes of memory that this worker should use. Set to zero for no limit. Set to ‘auto’ to calculate as system.MEMORY_LIMIT * min(1, nthreads / total_cores) Use strings or numbers like 5GB or 5e9

memory_target_fraction: float or False

Fraction of memory to try to stay beneath (default: read from config key distributed.worker.memory.target)

memory_spill_fraction: float or False

Fraction of memory at which we start spilling to disk (default: read from config key distributed.worker.memory.spill)

memory_pause_fraction: float or False

Fraction of memory at which we stop running new tasks (default: read from config key distributed.worker.memory.pause)

max_spill: int, string or False

Limit of number of bytes to be spilled on disk. (default: read from config key distributed.worker.memory.max-spill)

executor: concurrent.futures.Executor, dict[str, concurrent.futures.Executor], “offload”
The executor(s) to use. Depending on the type, it has the following meanings:
  • Executor instance: The default executor.

  • Dict[str, Executor]: mapping names to Executor instances. If the “default” key isn’t in the dict, a “default” executor will be created using ThreadPoolExecutor(nthreads).

  • Str: The string “offload”, which refer to the same thread pool used for offloading communications. This results in the same thread being used for deserialization and computation.

resources: dict

Resources that this worker has like {'GPU': 2}

nanny: str

Address on which to contact nanny, if it exists

lifetime: str

Amount of time like “1 hour” after which we gracefully shut down the worker. This defaults to None, meaning no explicit shutdown time.

lifetime_stagger: str

Amount of time like “5 minutes” to stagger the lifetime value The actual lifetime will be selected uniformly at random between lifetime +/- lifetime_stagger

lifetime_restart: bool

Whether or not to restart a worker after it has reached its lifetime Default False

kwargs: optional

Additional parameters to ServerNode constructor

Examples

Use the command line to start a worker:

$ dask scheduler
Start scheduler at 127.0.0.1:8786

$ dask worker 127.0.0.1:8786
Start worker at:               127.0.0.1:1234
Registered with scheduler at:  127.0.0.1:8786
batched_send(msg: dict[str, Any]) None[source]

Implements BaseWorker abstract method.

Send a fire-and-forget message to the scheduler through bulk comms.

If we’re not currently connected to the scheduler, the message will be silently dropped!

async close(timeout: float = 30, executor_wait: bool = True, nanny: bool = True, reason: str = 'worker-close') str | None[source]

Close the worker

Close asynchronous operations running on the worker, stop all executors and comms. If requested, this also closes the nanny.

Parameters
timeout

Timeout in seconds for shutting down individual instructions

executor_wait

If True, shut down executors synchronously, otherwise asynchronously

nanny

If True, close the nanny

reason

Reason for closing the worker

Returns
str | None

None if worker already in closing state or failed, “OK” otherwise

async close_gracefully(restart=None, reason: str = 'worker-close-gracefully')[source]

Gracefully shut down a worker

This first informs the scheduler that we’re shutting down, and asks it to move our data elsewhere. Afterwards, we close as normal

property data: collections.abc.MutableMapping[Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], object]

{task key: task payload} of all completed tasks, whether they were computed on this Worker or computed somewhere else and then transferred here over the network.

When using the default configuration, this is a zict buffer that automatically spills to disk whenever the target threshold is exceeded. If spilling is disabled, it is a plain dict instead. It could also be a user-defined arbitrary dict-like passed when initialising the Worker or the Nanny. Worker logic should treat this opaquely and stick to the MutableMapping API.

Note

This same collection is also available at self.state.data and self.memory_manager.data.

digest_metric(name: collections.abc.Hashable, value: float) None[source]

Implement BaseWorker.digest_metric by calling Server.digest_metric

async execute(key: Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], *, stimulus_id: str) distributed.worker_state_machine.StateMachineEvent[source]

Execute a task. Implements BaseWorker abstract method.

async gather(who_has: dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], list[str]]) dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], object][source]

Endpoint used by Scheduler.rebalance() and Scheduler.replicate()

async gather_dep(worker: str, to_gather: collections.abc.Collection[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]], total_nbytes: int, *, stimulus_id: str) distributed.worker_state_machine.StateMachineEvent[source]

Implements BaseWorker abstract method

get_current_task() Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]][source]

Get the key of the task we are currently running

This only makes sense to run within a task

See also

get_worker

Examples

>>> from dask.distributed import get_worker
>>> def f():
...     return get_worker().get_current_task()
>>> future = client.submit(f)  
>>> future.result()  
'f-1234'
handle_stimulus(*stims: distributed.worker_state_machine.StateMachineEvent) None[source]

Override BaseWorker method for added validation

log_event(topic: str | collections.abc.Collection[str], msg: Any) None[source]

Log an event under a given topic

Parameters
topicstr, list[str]

Name of the topic under which to log an event. To log the same event under multiple topics, pass a list of topic names.

msg

Event message to log. Note this must be msgpack serializable.

See also

Client.log_event
async retry_busy_worker_later(worker: str) distributed.worker_state_machine.StateMachineEvent[source]

Wait some time, then take a peer worker out of busy state. Implements BaseWorker abstract method.

async start_unsafe()[source]

Attempt to start the server. This is not idempotent and not protected against concurrent startup attempts.

This is intended to be overwritten or called by subclasses. For a safe startup, please use Server.start instead.

If death_timeout is configured, we will require this coroutine to finish before this timeout is reached. If the timeout is reached we will close the instance and raise an asyncio.TimeoutError

transfer_outgoing_bytes: int

Current total size of open data transfers to other workers

transfer_outgoing_bytes_total: int

Total size of data transfers to other workers (including in-progress and failed transfers)

transfer_outgoing_count: int

Current number of open data transfers to other workers

transfer_outgoing_count_total: int

Total number of data transfers to other workers since the worker was started

trigger_profile() None[source]

Get a frame from all actively computing threads

Merge these frames into existing profile counts

property worker_address

For API compatibility with Nanny

Nanny

Dask workers are by default launched, monitored, and managed by a small Nanny process.

class distributed.nanny.Nanny(scheduler_ip=None, scheduler_port=None, scheduler_file=None, worker_port: int | str | collections.abc.Collection[int] | None = 0, nthreads=None, loop=None, local_directory=None, services=None, name=None, memory_limit='auto', reconnect=True, validate=False, quiet=False, resources=None, silence_logs=None, death_timeout=None, preload=None, preload_argv=None, preload_nanny=None, preload_nanny_argv=None, security=None, contact_address=None, listen_address=None, worker_class=None, env=None, interface=None, host=None, port: int | str | collections.abc.Collection[int] | None = None, protocol=None, config=None, **worker_kwargs)[source]

A process to manage worker processes

The nanny spins up Worker processes, watches them, and kills or restarts them as necessary. It is necessary if you want to use the Client.restart method, or to restart the worker automatically if it gets to the terminate fraction of its memory limit.

The parameters for the Nanny are mostly the same as those for the Worker with exceptions listed below.

Parameters
env: dict, optional

Environment variables set at time of Nanny initialization will be ensured to be set in the Worker process as well. This argument allows to overwrite or otherwise set environment variables for the Worker. It is also possible to set environment variables using the option distributed.nanny.environ. Precedence as follows

  1. Nanny arguments

  2. Existing environment variables

  3. Dask configuration

Note

Some environment variables, like OMP_NUM_THREADS, must be set before importing numpy to have effect. Others, like MALLOC_TRIM_THRESHOLD_ (see Memory not released back to the OS), must be set before starting the Linux process. Such variables would be ineffective if set here or in distributed.nanny.environ; they must be set in distributed.nanny.pre-spawn-environ so that they are set before spawning the subprocess, even if this means poisoning the process running the Nanny.

For the same reason, be warned that changing distributed.worker.multiprocessing-method from spawn to fork or forkserver may inhibit some environment variables; if you do, you should set the variables yourself in the shell before you start dask-worker.

See also

Worker
async close(timeout: float = 5, reason: str = 'nanny-close') Literal['OK'][source]

Close the worker process, stop all comms.

close_gracefully(reason: str = 'nanny-close-gracefully') None[source]

A signal that we shouldn’t try to restart workers if they go away

This is used as part of the cluster shutdown process.

async instantiate() distributed.core.Status[source]

Start a local worker process

Blocks until the process is up and the scheduler is properly informed

async kill(timeout: float = 2, reason: str = 'nanny-kill') None[source]

Kill the local worker process

Blocks until both the process is down and the scheduler is properly informed

log_event(topic, msg)[source]

Log an event under a given topic

Parameters
topicstr, list[str]

Name of the topic under which to log an event. To log the same event under multiple topics, pass a list of topic names.

msg

Event message to log. Note this must be msgpack serializable.

See also

Client.log_event
async start_unsafe()[source]

Start nanny, start local process, start watching