Shared Futures With Channels

A channel is a changing stream of futures shared between any number of clients connected to the same scheduler.

Examples

Basic Usage

Create channels from your Client:

>>> client = Client('scheduler-address:8786')
>>> chan = client.channel('my-channel')

Append futures onto a channel

>>> future = client.submit(add, 1, 2)
>>> chan.append(future)

A channel maintains a collection of current futures added by both your client, and others.

>>> chan.futures
deque([<Future: status: pending, key: add-12345>,
       <Future: status: pending, key: sub-56789>])

If you wish to persist the current status of tasks outside of the distributed cluster (e.g. to take a snapshot in case of shutdown) you can copy a channel full of futures as it is a python deque.

channelcopy = list(chan.futures)

You can iterate over a channel to get back futures.

>>> anotherclient = Client('scheduler-address:8786')
>>> chan = anotherclient.channel('my-channel')
>>> for future in chan:
...     pass

When done writing, call flush to wait until your appends have been fully registered with the scheduler.

>>> client = Client('scheduler-address:8786')
>>> chan = client.channel('my-channel')
>>> future2 = client.submit(time.sleep,2)
>>> chan.append(future2)
>>> chan.flush()

Example with local_client

Using channels with local client allows for a more decoupled version of what is possible with Data Streams with Queues in that independent worker clients can build up a set of results which can be read later by a different client. This opens up Dask/Distributed to being integrated in a wider application environment similar to other python task queues such as Celery.

import random, time, operator
from distributed import Client, local_client
from time import sleep

def emit(name):
    with local_client() as c:
       chan = c.channel(name)
       while True:
           future = c.submit(random.random, pure=False)
           chan.append(future)
           sleep(1)

def combine():
    with local_client() as c:
        a_chan = c.channel('a')
        b_chan = c.channel('b')
        out_chan = c.channel('adds')
        for a, b in zip(a_chan, b_chan):
            future = c.submit(operator.add, a, b)
            out_chan.append(future)

client = Client()

emitters = (client.submit(emit, 'a'), client.submit(emit, 'b'))
combiner = client.submit(combine)
chan = client.channel('adds')


for future in chan:
    print(future.result())
   ...:
1.782009416831722
...

All iterations on a channel by different clients can be stopped using the stop method

chan.stop()

Very short-lived clients

If you wish to submit work to your cluster from a short lived client such as a web application view, an AWS Lambda function or some other fire and forget script, channels give a way to do this.

Further Details

Often it is desirable to respond to events from outside the distributed cluster or to instantiate a new client in order to check on the progress of a set of tasks. The channels feature makes these and many other workflows possible.

This functionality is similar to queues but additionally means that multiple clients can send data to a long running function rather than one client holding a queue instance.

Several clients connected to the same scheduler can communicate a sequence of futures between each other through shared channels. All clients can append to the channel at any time. All clients will be updated when a channel updates. The central scheduler maintains consistency and ordering of events. It also allows the Dask Scheduler to be extended in a clean way using the normal Distributed task submission.