Spans

Note

This is an experimental feature and may rapidly change without a deprecation cycle.

Dask offers various diagnostics and Fine Performance Metrics about tasks, grouping them by their prefix (commonly, the name of the function being called). This can be suboptimal:

  • your client code may be sophisticated enough that correlating lines on the client and tasks being run on the cluster may prove difficult;

  • the same function may be applied to different parts of your workflow, with a different performance profile;

  • you may be in a multitenancy setup, where part of the load on the cluster was not caused by your client code.

In these situations, it may be useful to attach meaningful tags to your workflow, or segments of it. To do so, you should use the span() context manager inside the client code.

For example:

import dask.config
import dask.array as da
from distributed import Client, span

# Read important note below
dask.config.set({"optimization.fuse.active": False})
client = Client()

with span("Alice's workflow"):
    with span("data load"):
        a = da.read_zarr(...)
    with span("ML preprocessing"):
        a = preprocess(a)
    with span("Model training"):
        model = train(a)
    model = model.compute()

Note how the span() context manager can be nested. The example will create the following spans on the scheduler:

  • ("Alice's workflow", )

  • ("Alice's workflow", "data load")

  • ("Alice's workflow", "ML preprocessing")

  • ("Alice's workflow", "Model training")

Each of the spans will be mapped to the tasks matching the segment of the graph that was defined inside its context manager. The parent span will be mapped to all tasks of its children.

Tags are arbitrary and nothing stops you from parameterizing them; for example

>>> with span(f"{user}'s workflow"):
...     ...

Which may give you

  • ("Alice's workflow", "data load")

  • ("Bob's workflow", "data load")

  • etc.

This is useful for example if you want to observe either all the workload submitted by Alice, while hiding Bob’s activity, or alternatively all the data loading activity, regardless of who submitted it.

The possibilities are more or less endless - instead or in addition to a username at the top, you could store information on what dataset you’re processing, etc.

The default span

If you don’t use the span() context manager, your tasks will be automatically attributed to the ("default", ) span.

Viewing the spans

You can filter by span tags in the Fine Performance Metrics dashboard widget to filter your workload:

Span selection in the Fine Performance Metrics dashboard

Additionally, spans can be queried using scheduler extensions or run_on_scheduler(); see Dask Developer API.

User API

Important

Dataframes have a minimum granularity of a single call to compute() or persist() and can’t break it down further into groups of operations - if the example above used dataframes, everything would have been uniformly tagged as “Alice’s Workflow”, as it is the span that’s active during compute().

In other collections, such as arrays and delayed objects, spans that don’t wrap around a call to compute() or persist() can get lost during the optimization phase. To prevent this issue, you must set

>>> dask.config.set({"optimization.fuse.active": False})

Or in dask.yaml:

optimization:
  fuse:
    active: false

A possible workaround, that also works for dataframes, can be to perform intermediate calls to persist(). Note however that this can significantly impact optimizations and reduce overall performance.

with span("Alice's workflow"):
    with span("data load"):
        a = dd.read_parquet(...).persist()
    with span("ML preprocessing"):
        a = preprocess(a).persist()
        del a  # Release distributed memory for a as soon as possible
    with span("Model training"):
        model = train(b).persist()
        del b  # Release distributed memory for b as soon as possible
        model = model.compute()
distributed.span(*tags: str) collections.abc.Iterator[str][source]

Tag group of tasks to be part of a certain group, called a span.

This context manager can be nested, thus creating sub-spans. If you close and re-open a span context manager with the same tag, you’ll end up with two separate spans.

Every cluster defines a global “default” span when no span has been defined by the client; the default span is automatically closed and reopened when all tasks associated to it have been completed; in other words the cluster is idle save for tasks that are explicitly annotated by a span. Note that, in some edge cases, you may end up with overlapping default spans, e.g. if a worker crashes and all unique tasks that were in memory on it need to be recomputed.

You may capture the ID of a span on the client to match it with the Span objects the scheduler:

>>> client = Client()
>>> with span("my workflow") as span_id:
...     client.submit(lambda: "Hello world!").result()
>>> client.cluster.scheduler.extensions["spans"].spans[span_id]
Span<name=('my workflow',), id=5dc9b908-116b-49a5-b0d7-5a681f49a111>

Notes

You may retrieve the current span with dask.get_annotations().get("span"). You can do so in the client code as well as from inside a task.

Dask Developer API

Intended audience

This section is only of interest to developers maintaining Dask or writing scheduler extensions, e.g. to create an alternative dashboard or to store metrics long-term.

Spans can be accessed on the scheduler through Scheduler.extensions["spans"], which contains a singleton instance of SpansSchedulerExtension. In turn, the extension contains a mapping of all Span objects, plus a variety of convenience methods to access and aggregate them.

Note how Span objects offer a variety of methods that the dashboard currently doesn’t use - such as start/stop times, tasks counts, and size of the output.

class distributed.spans.Span(name: tuple[str, ...], id_: str, parent: distributed.spans.Span | None, total_nthreads_history: list[tuple[float, int]])[source]
property active_cpu_seconds: float

Return number of CPU seconds that were made available on the cluster while this Span was running; in other words (Span.stop - Span.enqueued) * Scheduler.total_nthreads.

This accounts for workers joining and leaving the cluster while this Span was active. If this Span is the output of merge(), do not count gaps between input spans.

See also

enqueued
stop
nthreads_intervals
distributed.scheduler.SchedulerState.total_nthreads
property all_durations: dict[str, float]

Cumulative duration of all completed actions in this span tree, by action

See also

duration
distributed.scheduler.TaskGroup.all_durations
property annotation: dict[str, tuple[str, ...]] | None

Rebuild the dask graph annotation which contains the full id history

Note that this may not match the original annotation in case of TaskGroup collision.

children: list[Span]

Direct children of this span, sorted by creation time

property code: list[tuple[SourceCode, ...]]

Code snippets, sent by the client on compute(), persist(), and submit().

Only populated if distributed.diagnostics.computations.nframes is non-zero.

property cumulative_worker_metrics: dict[tuple[collections.abc.Hashable, ...], float]

Replica of Worker.digests_total and Scheduler.cumulative_worker_metrics, but only for the metrics that can be attributed to the current span tree. The span id has been removed from the key.

At the moment of writing, all keys are ("execute", <task prefix>, <activity>, <unit>) or ("p2p", <where>, <activity>, <unit>) but more may be added in the future with a different format; please test e.g. for k[0] == "execute".

property done: bool

Return True if all tasks in this span tree are completed; False otherwise.

See also

distributed.scheduler.TaskGroup.done

Notes

This property may transition from True to False, e.g. when a new sub-span is added or when a worker that contained the only replica of a task in memory crashes and the task need to be recomputed.

property duration: float

The total amount of time spent on all tasks in this span tree

See also

all_durations
distributed.scheduler.TaskGroup.duration
enqueued: float

Time when the span first appeared on the scheduler. The same property on parent spans is always less than or equal to this.

See also

start
stop
groups: set[TaskGroup]

Notes

TaskGroups are forgotten by the Scheduler when the last task is forgotten, but remain referenced here indefinitely. If a user calls compute() twice on the same collection, you’ll have more than one group with the same tg.name in this set! For the same reason, while the same TaskGroup object is guaranteed to be attached to exactly one Span, you may have different TaskGroups with the same key attached to different Spans.

id: str

Unique ID, generated by span() and taken from TaskState.annotations["span"]["id"][-1]. Matches distributed.scheduler.TaskState.group.span_id and distributed.worker_state_machine.TaskState.span_id.

static merge(*items: distributed.spans.Span) distributed.spans.Span[source]

Merge multiple spans into a synthetic one. The input spans must not be related with each other.

name: tuple[str, ...]

(<tag>, <tag>, …) Matches TaskState.annotations["span"]["name"], both on the scheduler and the worker.

property nbytes_total: int

The total number of bytes that this span tree has produced

See also

distributed.scheduler.TaskGroup.nbytes_total
property nthreads_intervals: list[tuple[float, float, int]]
Returns
List of tuples:
  • begin timestamp
  • end timestamp
  • Scheduler.total_nthreads during this interval
When the Span is the output of merge(), the intervals may not be
contiguous.

See also

enqueued
stop
active_cpu_seconds
distributed.scheduler.SchedulerState.total_nthreads
property start: float

Earliest time when a task belonging to this span tree started computing; 0 if no task has finished computing yet.

See also

enqueued
stop
distributed.scheduler.TaskGroup.start

Notes

This is not updated until at least one task has finished computing. It could move backwards as tasks complete.

property states: dict[TaskStateState, int]

The number of tasks currently in each state in this span tree; e.g. {"memory": 10, "processing": 3, "released": 4, ...}.

See also

distributed.scheduler.TaskGroup.states
property stop: float

When this span tree finished computing, or current timestamp if it didn’t finish yet.

See also

enqueued
start
done
distributed.scheduler.TaskGroup.stop

Notes

This differs from TaskGroup.stop when there aren’t unfinished tasks; is also will never be zero.

traverse_groups() Iterator[TaskGroup][source]

All TaskGroups belonging to this branch of span tree

traverse_spans() collections.abc.Iterator[distributed.spans.Span][source]

Top-down recursion of all spans belonging to this branch off span tree, including self

class distributed.spans.SpansSchedulerExtension(scheduler: Scheduler)[source]

Scheduler extension for spans support

find_by_tags(*tags: str) collections.abc.Iterator[distributed.spans.Span][source]

Yield all spans that contain any of the given tags. When a tag is shared both by a span and its (grand)children, only return the parent.

heartbeat(ws: scheduler_module.WorkerState, data: dict[tuple[Hashable, ...], float]) None[source]

Triggered by SpansWorkerExtension.heartbeat().

Populate Span.cumulative_worker_metrics() with data from the worker.

merge_all() distributed.spans.Span[source]

Return a synthetic Span which is the sum of all spans

merge_by_tags(*tags: str) distributed.spans.Span[source]

Return a synthetic Span which is the sum of all spans containing the given tags

observe_tasks(tss: Iterable[scheduler_module.TaskState], code: tuple[SourceCode, ...]) dict[Key, dict][source]

Acknowledge the existence of runnable tasks on the scheduler. These may either be new tasks, tasks that were previously unrunnable, or tasks that were already fed into this method already.

Attach newly observed tasks to either the desired span or to (“default”, ). Update TaskGroup.span_id and wipe TaskState.annotations[“span”].

Returns
Updated ‘span’ annotations: {key: {“name”: (…, …), “ids”: (…, …)}}
root_spans: list[Span]

Only the spans that don’t have any parents, sorted by creation time. This is a convenience helper structure to speed up searches.

spans: dict[str, Span]

All Span objects by id

spans_search_by_name: defaultdict[tuple[str, ...], list[Span]]

All spans, keyed by their full name and sorted by creation time. This is a convenience helper structure to speed up searches.

spans_search_by_tag: defaultdict[str, list[Span]]

All spans, keyed by the individual tags that make up their name and sorted by creation time. This is a convenience helper structure to speed up searches.

class distributed.spans.SpansWorkerExtension(worker: Worker)[source]

Worker extension for spans support

collect_digests() None[source]

Make a local copy of Worker.digests_total_since_heartbeat. We can’t just parse it directly in heartbeat() as the event loop may be yielded between its call and self.worker.digests_total_since_heartbeat.clear(), causing the scheduler to become misaligned with the workers.

heartbeat() dict[tuple[collections.abc.Hashable, ...], float][source]

Apportion the metrics that do have a span to the Spans on the scheduler

Returns
{(context, span_id, prefix, activity, unit): value}}

See also

SpansSchedulerExtension.heartbeat
Span.cumulative_worker_metrics
distributed.worker.Worker.get_metrics