Worker

We build a distributed network from two kinds of nodes.

  • A single scheduler node
  • Several Worker nodes
distributed network

This page describes the worker nodes.

Serve Data

Workers serve data from a local dictionary of data:

{'x': np.array(...),
 'y': pd.DataFrame(...)}

Operations include normal dictionary operations, like get, set, and delete key-value pairs. In the following example we connect to two workers, collect data from one worker and send it to another.

alice = rpc(ip='192.168.0.101', port=8788)
d = yield alice.get_data(keys=['x', 'y'])

bob = rpc(ip='192.168.0.102', port=8788)
yield bob.update_data(data=d)

However, this is only an example, typically one does not manually manage data transfer between workers. They handle that as necessary on their own.

Compute

Workers evaluate functions provided by the user on their data. They evaluate functions either on their data or can automatically collect data from peers (as shown above) if they don’t have the necessary data but their peers do:

z <- add(x, y)  # can be done with only local data
z <- add(x, a)  # need to find out where we can get 'a'

The result of such a computation on our end is just a response b'OK'. The actual result stays on the remote worker.

>>> response, metadata = yield alice.compute(function=add, keys=['x', 'a'])
>>> response
b'OK'
>>> metadata
{'nbytes': 1024}

The worker also reports back to the center/scheduler whenever it completes a computation. Metadata storage is centralized but all data transfer is peer-to-peer. Here is a quick example of what happens during a call to compute:

client:  Hey Alice!   Compute ``z <- add(x, a)``

Alice:   Hey Center!  Who has a?
Center:  Hey Alice!   Bob has a.
Alice:   Hey Bob!     Send me a!
Bob:     Hey Alice!   Here's a!

Alice:   Hey Client!  I've computed z and am holding on to it!
Alice:   Hey Center!  I have z!
class distributed.worker.Worker(center_ip, center_port, ip=None, ncores=None, loop=None, local_dir=None, services=None, service_ports=None, name=None, **kwargs)[source]

Worker Node

Workers perform two functions:

  1. Serve data from a local dictionary
  2. Perform computation on that data and on data from peers

Additionally workers keep a Center informed of their data and use that Center to gather data from other workers when necessary to perform a computation.

You can start a worker with the dworker command line application:

$ dworker scheduler-ip:port

State

  • data: {key: object}:

    Dictionary mapping keys to actual values

  • active: {key}:

    Set of keys currently under computation

  • ncores: int:

    Number of cores used by this worker process

  • executor: concurrent.futures.ThreadPoolExecutor:

    Executor used to perform computation

  • local_dir: path:

    Path on local machine to store temporary files

  • center: rpc:

    Location of center or scheduler. See .ip/.port attributes.

  • name: string:

    Alias

  • services: {str: Server}:

    Auxiliary web servers running on this worker

  • service_ports: {str: port}:

See also

distributed.center.Center

Examples

Create centers and workers in Python:

>>> from distributed import Center, Worker
>>> c = Center('192.168.0.100', 8787)  
>>> w = Worker(c.ip, c.port)  
>>> yield w._start(port=8788)  

Or use the command line:

$ dcenter
Start center at 127.0.0.1:8787

$ dworker 127.0.0.1:8787
Start worker at:            127.0.0.1:8788
Registered with center at:  127.0.0.1:8787