Scheduler

Scheduler layout

The scheduler orchestrates which workers work on which tasks in what order. It tracks the current state of the entire cluster. It consists of several coroutines rounning in a single event loop.

class distributed.scheduler.Scheduler(center=None, loop=None, resource_interval=1, resource_log_size=1000, max_buffer_size=2069891072.0, delete_interval=500, ip=None, services=None, heartbeat_interval=500, **kwargs)[source]

Dynamic distributed task scheduler

The scheduler tracks the current state of workers, data, and computations. The scheduler listens for events and responds by controlling workers appropriately. It continuously tries to use the workers to execute an ever growing dask graph.

All events are handled quickly, in linear time with respect to their input (which is often of constant size) and generally within a millisecond. To accomplish this the scheduler tracks a lot of state. Every operation maintains the consistency of this state.

The scheduler communicates with the outside world either by adding pairs of in/out queues or by responding to a new IOStream (the Scheduler can operate as a typical distributed Server). It maintains a consistent and valid view of the world even when listening to several clients at once.

A Scheduler is typically started either with the dscheduler executable:

$ dscheduler 127.0.0.1:8787  # address of center

Or as part of when an Executor starts up and connects to a Center:

>>> e = Executor('127.0.0.1:8787')  
>>> e.scheduler  
Scheduler(...)

Users typically do not interact with the scheduler except through Plugins. See http://distributed.readthedocs.io/en/latest/plugins.html

State

  • tasks: {key: task}:

    Dictionary mapping key to task, either dask task, or serialized dict like: {'function': b'xxx', 'args': b'xxx'} or {'task': b'xxx'}

  • dependencies: {key: {key}}:

    Dictionary showing which keys depend on which others

  • dependents: {key: {key}}:

    Dictionary showing which keys are dependent on which others

  • waiting: {key: {key}}:

    Dictionary like dependencies but excludes keys already computed

  • waiting_data: {key: {key}}:

    Dictionary like dependents but excludes keys already computed

  • ready: deque(key)

    Keys that are ready to run, but not yet assigned to a worker

  • ncores: {worker: int}:

    Number of cores owned by each worker

  • idle: {worker}:

    Set of workers that are not fully utilized

  • services: {str: port}:

    Other services running on this scheduler, like HTTP

  • worker_info: {worker: {str: data}}:

    Information about each worker

  • host_info: {hostname: dict}:

    Information about each worker host

  • who_has: {key: {worker}}:

    Where each key lives. The current state of distributed memory.

  • has_what: {worker: {key}}:

    What worker has what keys. The transpose of who_has.

  • who_wants: {key: {client}}:

    Which clients want each key. The active targets of computation.

  • wants_what: {client: {key}}:

    What keys are wanted by each client.. The transpose of who_wants.

  • nbytes: {key: int}:

    Number of bytes for a key as reported by workers holding that key.

  • processing: {worker: {key: cost}}:

    Set of keys currently in execution on each worker and their expected duration

  • task_duration: {key-prefix: time}

    Time we expect certain functions to take, e.g. {'sum': 0.25}

  • occupancy: {worker: time}

    Expected runtime for all tasks currently processing on a worker

  • stacks: {worker: [keys]}:

    List of keys waiting to be sent to each worker

  • released: {keys}

    Set of keys that are known, but released from memory

  • unrunnable: {key}

    Keys that we are unable to run

  • retrictions: {key: {hostnames}}:

    A set of hostnames per key of where that key can be run. Usually this is empty unless a key has been specifically restricted to only run on certain hosts. These restrictions don’t include a worker port. Any worker on that hostname is deemed valid.

  • loose_retrictions: {key}:

    Set of keys for which we are allow to violate restrictions (see above) if not valid workers are present.

  • keyorder: {key: tuple}:

    A score per key that determines its priority

  • scheduler_queues: [Queues]:

    A list of Tornado Queues from which we accept stimuli

  • report_queues: [Queues]:

    A list of Tornado Queues on which we report results

  • streams: [IOStreams]:

    A list of Tornado IOStreams from which we both accept stimuli and report results

  • coroutines: [Futures]:

    A list of active futures that control operation

  • exceptions: {key: Exception}:

    A dict mapping keys to remote exceptions

  • tracebacks: {key: list}:

    A dict mapping keys to remote tracebacks stored as a list of strings

  • exceptions_blame: {key: key}:

    A dict mapping a key to another key on which it depends that has failed

  • deleted_keys: {key: {workers}}

    Locations of workers that have keys that should be deleted

  • loop: IOLoop:

    The running Torando IOLoop

add_client(stream, client=None)[source]

Listen to messages from an IOStream

add_plugin(plugin)[source]

Add external plugin to scheduler

See http://http://distributed.readthedocs.io/en/latest/plugins.html

broadcast(stream=None, msg=None, workers=None)[source]

Broadcast message to workers, return all results

cancel(stream, keys=None, client=None)[source]

Stop execution on a list of keys

cleanup()[source]

Clean up queues and coroutines, prepare to stop

clear_data_from_workers()[source]

This is intended to be run periodically,

The self._delete_periodic_callback attribute holds a PeriodicCallback that runs this every self.delete_interval milliseconds``.

close(stream=None)[source]

Send cleanup signal to all coroutines then wait until finished

coerce_address(addr)[source]

Coerce possible input addresses to canonical form

Handles lists, strings, bytes, tuples, or aliases

correct_time_delay(worker, msg)[source]

Apply offset time delay in message times

Operates in place

ensure_idle_ready()[source]

Run ready tasks on idle workers

Work stealing policy

If some workers are idle but not others, if there are no globally ready tasks, and if there are tasks in worker stacks, then we start to pull preferred tasks from overburdened workers and deploy them back into the global pool in the following manner.

We determine the number of tasks to reclaim as the number of all tasks in all stacks times the fraction of idle workers to all workers. We sort the stacks by size and walk through them, reclaiming half of each stack until we have enough task to fill the global pool. We are careful not to reclaim tasks that are restricted to run on certain workers.

ensure_in_play(key)[source]

Ensure that a key is on track to enter memory in the future

This will only act on keys currently in self.released.

ensure_occupied_stacks(worker)[source]

Send tasks to worker while it has tasks and free cores

These tasks may come from the worker’s own stacks or from the global ready deque.

We update the idle workers set appropriately.

finished()[source]

Wait until all coroutines have ceased

forget(key)[source]

Forget a key if no one cares about it

This removes all knowledge of how to produce a key from the scheduler. This is almost exclusively called by release_held_data

gather(stream=None, keys=None)[source]

Collect data in from workers

handle_messages(in_queue, report, client=None)[source]

Master coroutine. Handles inbound messages.

This runs once per Queue or Stream.

handle_queues(scheduler_queue, report_queue)[source]

Register new control and report queues to the Scheduler

identity(stream)[source]

Basic information about ourselves and our cluster

log_state(msg='')[source]

Log current full state of the scheduler

mark_failed(key, failing_key=None)[source]

When a task fails mark it and all dependent task as failed

mark_key_in_memory(key, workers=None, type=None)[source]

Mark that a key now lives in distributed memory

mark_missing_data(keys=None, key=None, worker=None, **kwargs)[source]

Mark that certain keys have gone missing. Recover.

See also

recover_missing

mark_ready_to_run(key)[source]

Mark a task as ready to run.

If the task should be assigned to a worker then make that determination and assign appropriately. Otherwise place task in the ready queue.

Trigger appropriate workers if idle.

See also

decide_worker, Scheduler.ensure_occupied

mark_task_erred(key=None, worker=None, exception=None, traceback=None, **kwargs)[source]

Mark that a task has erred on a particular worker

mark_task_finished(key=None, worker=None, nbytes=None, type=None, compute_start=None, compute_stop=None, transfer_start=None, transfer_stop=None, **kwargs)[source]

Mark that a task has finished execution on a particular worker

put(msg)[source]

Place a message into the scheduler’s queue

recover_missing(key)[source]

Recover a recently lost piece of data

This assumes that we’ve already removed this key from who_has/has_what.

release_held_data(keys=None)[source]

Mark that a key is no longer externally required to be in memory

release_live_dependencies(key)[source]

We no longer need to keep data in memory to compute this

This occurs after we’ve computed it or after we’ve forgotten it

remove_worker(stream=None, address=None)[source]

Mark that a worker no longer seems responsive

replicate(stream=None, keys=None, n=None, workers=None, branching_factor=2)[source]

Replicate data throughout cluster

This performs a tree copy of the data throughout the network individually on each piece of data.

Parameters:

keys: Iterable

list of keys to replicate

n: int

Number of replications we expect to see within the cluster

branching_factor: int, optional

The number of workers that can copy data in each generation

See also

Scheduler.rebalance

report(msg)[source]

Publish updates to all listening Queues and Streams

restart()[source]

Restart all workers. Reset local state

scatter(stream=None, data=None, workers=None, client=None, broadcast=False)[source]

Send data out to workers

start(port=8786, start_queues=True)[source]

Clear out old state and restart all running coroutines

update_data(who_has=None, nbytes=None, client=None)[source]

Learn that new data has entered the network from an external source

update_graph(client=None, tasks=None, keys=None, dependencies=None, restrictions=None, loose_restrictions=None)[source]

Add new computations to the internal dask graph

This happens whenever the Executor calls submit, map, get, or compute.

workers_list(workers)[source]

List of qualifying workers

Takes a list of worker addresses or hostnames. Returns a list of all worker addresses that match

distributed.scheduler.decide_worker(dependencies, stacks, processing, who_has, has_what, restrictions, loose_restrictions, nbytes, key)[source]

Decide which worker should take task

>>> dependencies = {'c': {'b'}, 'b': {'a'}}
>>> stacks = {'alice:8000': ['z'], 'bob:8000': []}
>>> processing = {'alice:8000': set(), 'bob:8000': set()}
>>> who_has = {'a': {'alice:8000'}}
>>> has_what = {'alice:8000': {'a'}}
>>> nbytes = {'a': 100}
>>> restrictions = {}
>>> loose_restrictions = set()

We choose the worker that has the data on which ‘b’ depends (alice has ‘a’)

>>> decide_worker(dependencies, stacks, processing, who_has, has_what,
...               restrictions, loose_restrictions, nbytes, 'b')
'alice:8000'

If both Alice and Bob have dependencies then we choose the less-busy worker

>>> who_has = {'a': {'alice:8000', 'bob:8000'}}
>>> has_what = {'alice:8000': {'a'}, 'bob:8000': {'a'}}
>>> decide_worker(dependencies, stacks, processing, who_has, has_what,
...               restrictions, loose_restrictions, nbytes, 'b')
'bob:8000'

Optionally provide restrictions of where jobs are allowed to occur

>>> restrictions = {'b': {'alice', 'charlie'}}
>>> decide_worker(dependencies, stacks, processing, who_has, has_what,
...               restrictions, loose_restrictions, nbytes, 'b')
'alice:8000'

If the task requires data communication, then we choose to minimize the number of bytes sent between workers. This takes precedence over worker occupancy.

>>> dependencies = {'c': {'a', 'b'}}
>>> who_has = {'a': {'alice:8000'}, 'b': {'bob:8000'}}
>>> has_what = {'alice:8000': {'a'}, 'bob:8000': {'b'}}
>>> nbytes = {'a': 1, 'b': 1000}
>>> stacks = {'alice:8000': [], 'bob:8000': []}
>>> decide_worker(dependencies, stacks, processing, who_has, has_what,
...               {}, set(), nbytes, 'c')
'bob:8000'