Communications

Workers, the Scheduler, and Clients communicate by sending each other Python objects (such as Protocol messages or user data). The communication layer handles appropriate encoding and shipping of those Python objects between the distributed endpoints. The communication layer is able to select between different transport implementations, depending on user choice or (possibly) internal optimizations.

The communication layer lives in the distributed.comm package.

Addresses

Communication addresses are canonically represented as URIs, such as tcp://127.0.0.1:1234. For compatibility with existing code, if the URI scheme is omitted, a default scheme of tcp is assumed (so 127.0.0.1:456 is really the same as tcp://127.0.0.1:456). The default scheme may change in the future.

The following schemes are currently implemented in the distributed source tree:

  • tcp is the main transport; it uses TCP sockets and allows for IPv4 and IPv6 addresses.

  • tls is a secure transport using the well-known TLS protocol over TCP sockets. Using it requires specifying keys and certificates as outlined in TLS/SSL.

  • inproc is an in-process transport using simple object queues; it eliminates serialization and I/O overhead, providing almost zero-cost communication between endpoints as long as they are situated in the same process.

Some URIs may be valid for listening but not for connecting. For example, the URI tcp:// will listen on all IPv4 and IPv6 addresses and on an arbitrary port, but you cannot connect to that address.

Higher-level APIs in distributed may accept other address formats for convenience or compatibility, for example a (host, port) pair. However, the abstract communications layer always deals with URIs.

Functions

There are a number of top-level functions in distributed.comm to help deal with addresses:

distributed.comm.parse_address(addr: str, strict: bool = False) tuple[str, str][source]

Split address into its scheme and scheme-dependent location string.

>>> parse_address('tcp://127.0.0.1')
('tcp', '127.0.0.1')

If strict is set to true the address must have a scheme.

distributed.comm.unparse_address(scheme: str, loc: str) str[source]

Undo parse_address().

>>> unparse_address('tcp', '127.0.0.1')
'tcp://127.0.0.1'
distributed.comm.normalize_address(addr: str) str[source]

Canonicalize address, adding a default scheme if necessary.

>>> normalize_address('tls://[::1]')
'tls://[::1]'
>>> normalize_address('[::1]')
'tcp://[::1]'
distributed.comm.resolve_address(addr: str) str[source]

Apply scheme-specific address resolution to addr, replacing all symbolic references with concrete location specifiers.

In practice, this can mean hostnames are resolved to IP addresses.

>>> resolve_address('tcp://localhost:8786')
'tcp://127.0.0.1:8786'
distributed.comm.get_address_host(addr: str) str[source]

Return a hostname / IP address identifying the machine this address is located on.

In contrast to get_address_host_port(), this function should always succeed for well-formed addresses.

>>> get_address_host('tcp://1.2.3.4:80')
'1.2.3.4'

Communications API

The basic unit for dealing with established communications is the Comm object:

class distributed.comm.Comm(deserialize: bool = True)[source]

A message-oriented communication object, representing an established communication channel. There should be only one reader and one writer at a time: to manage current communications, even with a single peer, you must create distinct Comm objects.

Messages are arbitrary Python objects. Concrete implementations of this class can implement different serialization mechanisms depending on the underlying transport’s characteristics.

abstract abort()[source]

Close the communication immediately and abruptly. Useful in destructors or generators’ finally blocks.

abstract async close()[source]

Close the communication cleanly. This will attempt to flush outgoing buffers before actually closing the underlying transport.

This method returns a coroutine.

abstract closed()[source]

Return whether the stream is closed.

property extra_info

Return backend-specific information about the communication, as a dict. Typically, this is information which is initialized when the communication is established and doesn’t vary afterwards.

static handshake_configuration(local: dict[str, Any], remote: dict[str, Any]) dict[str, Any][source]

Find a configuration that is suitable for both local and remote

Parameters
local

Output of handshake_info() in this process

remote

Output of handshake_info() on the remote host

See also

handshake_info
handshake_info() dict[str, Any][source]

Share environment information with the peer that may differ, i.e. compression settings.

Notes

By the time this method runs, the “auto” compression setting has been updated to an actual compression algorithm. This matters if both peers had compression set to ‘auto’ but only one has lz4 installed. See distributed.protocol.compression._update_and_check_compression_settings()

abstract property local_address: str

The local address

abstract property peer_address: str

The peer’s address

abstract async read(deserializers=None)[source]

Read and return a message (a Python object).

This method returns a coroutine.

Parameters
deserializersdict[str, tuple[Callable, Callable, bool]] | None

An optional dict appropriate for distributed.protocol.deserialize. See Serialization for more.

property same_host: bool

Return True if the peer is on localhost; False otherwise

abstract async write(msg, serializers=None, on_error=None)[source]

Write a message (a Python object).

This method returns a coroutine.

Parameters
msg
on_errorstr | None

The behavior when serialization fails. See distributed.protocol.core.dumps for valid values.

You don’t create Comm objects directly: you either listen for incoming communications, or connect to a peer listening for connections:

async distributed.comm.connect(addr, timeout=None, deserialize=True, handshake_overrides=None, **connection_args)[source]

Connect to the given address (a URI such as tcp://127.0.0.1:1234) and yield a Comm object. If the connection attempt fails, it is retried until the timeout is expired.

distributed.comm.listen(addr, handle_comm, deserialize=True, **kwargs)[source]

Create a listener object with the given parameters. When its start() method is called, the listener will listen on the given address (a URI such as tcp://0.0.0.0) and call handle_comm with a Comm object for each incoming connection.

handle_comm can be a regular function or a coroutine.

Listener objects expose the following interface:

class distributed.comm.core.Listener[source]
abstract property contact_address

An address this listener can be contacted on. This can be different from listen_address if the latter is some wildcard address such as ‘tcp://0.0.0.0:123’.

abstract property listen_address

The listening address as a URI string.

abstract async start()[source]

Start listening for incoming connections.

abstract stop()[source]

Stop listening. This does not shutdown already established communications, but prevents accepting new ones.

Extending the Communication Layer

Each transport is represented by a URI scheme (such as tcp) and backed by a dedicated Backend implementation, which provides entry points into all transport-specific routines.

Out-of-tree backends can be registered under the group distributed.comm.backends in setuptools entry_points. For example, a hypothetical dask_udp package would register its UDP backend class by including the following in its setup.py file:

setup(name="dask_udp",
      entry_points={
        "distributed.comm.backends": [
            "udp=dask_udp.backend:UDPBackend",
        ]
      },
      ...
)
class distributed.comm.registry.Backend[source]

A communication backend, selected by a given URI scheme (e.g. ‘tcp’).

abstract get_address_host(loc)[source]

Get a host name (normally an IP address) identifying the host the address is located on. loc is a scheme-less address.

get_address_host_port(loc)[source]

Get the (host, port) tuple of the scheme-less address loc. This should only be implemented by IP-based transports.

abstract get_connector()[source]

Get a connector object usable for connecting to addresses.

abstract get_listener(loc, handle_comm, deserialize, **connection_args)[source]

Get a listener object for the scheme-less address loc.

abstract get_local_address_for(loc)[source]

Get the local listening address suitable for reaching loc.

abstract resolve_address(loc)[source]

Resolve the address into a canonical form. loc is a scheme-less address.

Simple implementations may return loc unchanged.