Worker

Overview

Workers provide two functions:

  1. Compute tasks as directed by the scheduler
  2. Store and serve computed results to other workers or clients

Each worker contains a ThreadPool that it uses to evaluate tasks as requested by the scheduler. It stores the results of these tasks locally and serves them to other workers or clients on demand. If the worker is asked to evaluate a task for which it does not have all of the necessary data then it will reach out to its peer workers to gather the necessary dependencies.

A typical conversation between a scheduler and two workers Alice and Bob may look like the following:

Scheduler -> Alice:  Compute ``x <- add(1, 2)``!
Alice -> Scheduler:  I've computed x and am holding on to it!

Scheduler -> Bob:    Compute ``y <- add(x, 10)``!
                     You will need x.  Alice has x.
Bob -> Alice:        Please send me x.
Alice -> Bob:        Sure.  x is 3!
Bob -> Scheduler:    I've computed y and am holding on to it!

Storing Data

Data is stored locally in a dictionary in the .data attribute that maps keys to the results of function calls.

>>> worker.data
{'x': 3,
 'y': 13,
 ...
 '(df, 0)': pd.DataFrame(...),
 ...
 }

This .data attribute is a MutableMapping that is typically a combination of in-memory and on-disk storage with an LRU policy to move data between them.

Spill Excess Data to Disk

Short version: To enable workers to spill excess data to disk start dask-worker with the --memory-limit option. Either giving auto to have it guess how many bytes to keep in memory or an integer, if you know the number of bytes it should use:

$ dask-worker scheduler:port --memory-limit=auto  # 75% of available RAM
$ dask-worker scheduler:port --memory-limit=2e9  # two gigabytes

Some workloads may produce more data at one time than there is available RAM on the cluster. In these cases Workers may choose to write excess values to disk. This causes some performance degradation because writing to and reading from disk is generally slower than accessing memory, but is better than running out of memory entirely, which can cause the system to halt.

If the dask-worker --memory-limit=NBYTES keyword is set during initialization then the worker will store at most NBYTES of data (as measured with sizeof) in memory. After that it will start storing least recently used (LRU) data in a temporary directory. Workers serialize data for writing to disk with the same system used to write data on the wire, a combination of pickle and the default compressor.

Now whenever new data comes in it will push out old data until at most NBYTES of data is in RAM. If an old value is requested it will be read from disk, possibly pushing other values down.

It is still possible to run out of RAM on a worker. Here are a few possible issues:

  1. The objects being stored take up more RAM than is stated with the __sizeof__ protocol. If you use custom classes then we encourage adding a faithful __sizeof__ method to your class that returns an accurate accounting of the bytes used.
  2. Computations and communications may take up additional RAM not accounted for. It is wise to have a suitable buffer of memory that can handle your most expensive function RAM-wise running as many times as there are active threads on the machine.
  3. It is possible to misjudge the amount of RAM on the machine. Using the --memory-limit=auto heuristic sets the value to 75% of the return value of psutil.virtual_memory().total.

Thread Pool

Each worker sends computations to a thread in a concurrent.futures.ThreadPoolExecutor for computation. These computations occur in the same process as the Worker communication server so that they can access and share data efficiently between each other. For the purposes of data locality all threads within a worker are considered the same worker.

If your computations are mostly numeric in nature (for example NumPy and Pandas computations) and release the GIL entirely then it is advisable to run dask-worker processes with many threads and one process. This reduces communication costs and generally simplifies deployment.

If your computations are mostly Python code and don’t release the GIL then it is advisable to run dask-worker processes with many processes and one thread per core:

$ dask-worker scheduler:8786 --nprocs 8

If your computations are external to Python and long-running and don’t release the GIL then beware that while the computation is running the worker process will not be able to communicate to other workers or to the scheduler. This situation should be avoided. If you don’t link in your own custom C/Fortran code then this topic probably doesn’t apply to you.

Command Line tool

Use the dask-worker command line tool to start an individual worker. Here are the available options:

$ dask-worker --help
Usage: dask-worker [OPTIONS] SCHEDULER

Options:
  --worker-port INTEGER  Serving worker port, defaults to randomly assigned
  --http-port INTEGER    Serving http port, defaults to randomly assigned
  --nanny-port INTEGER   Serving nanny port, defaults to randomly assigned
  --port INTEGER         Deprecated, see --nanny-port
  --host TEXT            Serving host. Defaults to an ip address that can
                         hopefully be visible from the scheduler network.
  --nthreads INTEGER     Number of threads per process. Defaults to number of
                         cores
  --nprocs INTEGER       Number of worker processes.  Defaults to one.
  --name TEXT            Alias
  --memory-limit TEXT     Number of bytes before spilling data to disk
  --no-nanny
  --help                 Show this message and exit.

Internal Scheduling

Internally tasks that come to the scheduler proceed through the following pipeline:

Dask worker task states

The worker also tracks data dependencies that are required to run the tasks above. These follow through a simpler pipeline:

Dask worker dependency states

As tasks arrive they are prioritized and put into a heap. They are then taken from this heap in turn to have any remote dependencies collected. For each dependency we select a worker at random that has that data and collect the dependency from that worker. To improve bandwidth we opportunistically gather other dependencies of other tasks that are known to be on that worker, up to a maximum of 200MB of data (too little data and bandwidth suffers, too much data and responsiveness suffers). We use a fixed number of connections (around 10-50) so as to avoid overly-fragmenting our network bandwidth. After all dependencies for a task are in memory we transition the task to the ready state and put the task again into a heap of tasks that are ready to run.

We collect from this heap and put the task into a thread from a local thread pool to execute.

Optionally, this task may identify itself as a long-running task (see Tasks launching tasks), at which point it secedes from the thread pool.

A task either errs or its result is put into memory. In either case a response is sent back to the scheduler.

API Documentation

class distributed.worker.Worker(*args, **kwargs)[source]

Worker node in a Dask distributed cluster

Workers perform two functions:

  1. Serve data from a local dictionary
  2. Perform computation on that data and on data from peers

Workers keep the scheduler informed of their data and use that scheduler to gather data from other workers when necessary to perform a computation.

You can start a worker with the dask-worker command line application:

$ dask-worker scheduler-ip:port

Use the --help flag to see more options

$ dask-worker –help

The rest of this docstring is about the internal state the the worker uses to manage and track internal computations.

State

Informational State

These attributes don’t change significantly during execution.

  • ncores: int:

    Number of cores used by this worker process

  • executor: concurrent.futures.ThreadPoolExecutor:

    Executor used to perform computation

  • local_dir: path:

    Path on local machine to store temporary files

  • scheduler: rpc:

    Location of scheduler. See .ip/.port attributes.

  • name: string:

    Alias

  • services: {str: Server}:

    Auxiliary web servers running on this worker

  • service_ports: {str: port}:

  • total_connections: int

    The maximum number of concurrent connections we want to see

  • total_comm_nbytes: int

  • batched_stream: BatchedSend

    A batched stream along which we communicate to the scheduler

  • log: [(message)]

    A structured and queryable log. See Worker.story

Volatile State

This attributes track the progress of tasks that this worker is trying to complete. In the descriptions below a key is the name of a task that we want to compute and dep is the name of a piece of dependent data that we want to collect from others.

  • data: {key: object}:

    Dictionary mapping keys to actual values

  • task_state: {key: string}:

    The state of all tasks that the scheduler has asked us to compute. Valid states include waiting, constrained, exeucuting, memory, erred

  • tasks: {key: dict}

    The function, args, kwargs of a task. We run this when appropriate

  • dependencies: {key: {deps}}

    The data needed by this key to run

  • dependents: {dep: {keys}}

    The keys that use this dependency

  • data_needed: deque(keys)

    The keys whose data we still lack, arranged in a deque

  • waiting_for_data: {kep: {deps}}

    A dynamic verion of dependencies. All dependencies that we still don’t have for a particular key.

  • ready: [keys]

    Keys that are ready to run. Stored in a LIFO stack

  • constrained: [keys]

    Keys for which we have the data to run, but are waiting on abstract resources like GPUs. Stored in a FIFO deque

  • executing: {keys}

    Keys that are currently executing

  • executed_count: int

    A number of tasks that this worker has run in its lifetime

  • long_running: {keys}

    A set of keys of tasks that are running and have started their own long-running clients.

  • dep_state: {dep: string}:

    The state of all dependencies required by our tasks Valid states include waiting, flight, and memory

  • who_has: {dep: {worker}}

    Workers that we believe have this data

  • has_what: {worker: {deps}}

    The data that we care about that we think a worker has

  • pending_data_per_worker: {worker: [dep]}

    The data on each worker that we still want, prioritized as a deque

  • in_flight_tasks: {task: worker}

    All dependencies that are coming to us in current peer-to-peer connections and the workers from which they are coming.

  • in_flight_workers: {worker: {task}}

    The workers from which we are currently gathering data and the dependencies we expect from those connections

  • comm_bytes: int

    The total number of bytes in flight

  • suspicious_deps: {dep: int}

    The number of times a dependency has not been where we expected it

  • nbytes: {key: int}

    The size of a particular piece of data

  • types: {key: type}

    The type of a particular piece of data

  • threads: {key: int}

    The ID of the thread on which the task ran

  • exceptions: {key: exception}

    The exception caused by running a task if it erred

  • tracebacks: {key: traceback}

    The exception caused by running a task if it erred

  • startstops: {key: [(str, float, float)]}

    Log of transfer, load, and compute times for a task

  • priorities: {key: tuple}

    The priority of a key given by the scheduler. Determines run order.

  • durations: {key: float}

    Expected duration of a task

  • resource_restrictions: {key: {str: number}}

    Abstract resources required to run a task

Parameters:

scheduler_ip: str

scheduler_port: int

ip: str, optional

ncores: int, optional

loop: tornado.ioloop.IOLoop

local_dir: str, optional

Directory where we place local resources

name: str, optional

heartbeat_interval: int

Milliseconds between heartbeats to scheduler

memory_limit: int

Number of bytes of data to keep in memory before using disk

executor: concurrent.futures.Executor

resources: dict

Resources that thiw worker has like {'GPU': 2}

See also

distributed.scheduler.Scheduler, distributed.nanny.Nanny

Examples

Use the command line to start a worker:

$ dask-scheduler
Start scheduler at 127.0.0.1:8786

$ dask-worker 127.0.0.1:8786
Start worker at:               127.0.0.1:1234
Registered with scheduler at:  127.0.0.1:8786