Serialization

When we communicate data between computers we first convert that data into a sequence of bytes that can be communicated across the network. Choices made in serialization can affect performance and security.

The standard Python solution to this, Pickle, is often but not always the right solution. Dask uses a number of different serialization schemes in different situations. These are extensible to allow users to control in sensitive situations and also to enable library developers to plug in more performant serialization solutions.

This document first describes Dask’s default solution for serialization and then discusses ways to control and extend that serialiation.

Defaults

There are three kinds of messages passed through the Dask network:

  1. Small administrative messages like “Worker A has finished task X” or “I’m running out of memory”. These are always serialized with msgpack.
  2. Movement of program data, such as Numpy arrays and Pandas dataframes. This uses a combination of pickle and custom serializers and is the topic of the next section
  3. Computational tasks like f(x) that are defined and serialized on client processes and deserialized and run on worker processes. These are serialized using a fixed scheme decided on by those libraries. Today this is a combination of pickle and cloudpickle.

Serialization families

Use

For the movement of program data (item 2 above) we can use a few different families of serializers. By default the following families are built in:

  1. Pickle and cloudpickle
  2. Msgpack
  3. Custom per-type serializers that come with Dask for the special serialization of important classes of data like Numpy arrays

You can choose which families you want to use to serialize data and to deserialize data when you create a Client

from dask.distributed import Client
client = Client('tcp://scheduler-address:8786',
                serializers=['dask', 'pickle'],
                deserializers=['dask', 'msgpack'])

This can be useful if, for example, you are sensitive about receiving Pickle-serialized data for security reasons.

Dask uses the serializers ['dask', 'pickle'] by default, trying to use dask custom serializers (described below) if they work and then falling back to pickle/cloudpickle.

Extend

These families can be extended by creating two functions, dumps and loads, which return and consume a msgpack-encodable header, and a list of byte-like objects. These must then be included in the distributed.protocol.serialize dictionary with an appropriate name. Here is the definition of pickle_dumps and pickle_loads to serve as an example.

import pickle

def pickle_dumps(x):
    header = {'serializer': 'pickle'}
    frames = [pickle.dumps(x)]
    return header, frames

def pickle_loads(header, frames):
    if len(frames) > 1:  # this may be cut up for network reasons
        frame = ''.join(frames)
    else:
        frame = frames[0]
    return pickle.loads(frame)

from distributed.protocol.serialize import families
families['pickle'] = (pickle_dumps, pickle_loads)

After this the name 'pickle' can be used in the serializers= and deserializers= keywords in Client and other parts of Dask.

Dask Serialization Family

Use

Dask maintains its own custom serialization family that special cases a few important types, like Numpy arrays. These serializers either operate more efficiently than Pickle, or serialize types that Pickle can not handle.

You don’t need to do anything special to use this family of serializers. It is on by default (along with pickle). Note that Dask custom serializers may use pickle internally in some cases. It should not be considered more secure.

Extend

As with serialization families in general, the Dask family in particular is also extensible. This is a good way to support custom serialization of a single type of object. The method is similar, you create serialize and deserialize function that create and consume a header and frames, and then register them with Dask.

class Human(object):
    def __init__(self, name):
        self.name = name

def serialize(human):
    header = {}
    frames = [human.name.encode()]
    return header, frames

def deserialize(header, frames):
    return Human(frames[0].decode())

from distributed.protocol.serialize import register_serialization
register_serialization(Human, serialize, deserialize)

API

register_serialization(cls, serialize, …) Register a new class for dask-custom serialization
serialize(x[, serializers, on_error]) Convert object to a header and list of bytestrings
deserialize(header, frames[, deserializers]) Convert serialized header and list of bytestrings back to a Python object
distributed.protocol.serialize.register_serialization(cls, serialize, deserialize)[source]

Register a new class for dask-custom serialization

Parameters:
cls: type
serialize: function
deserialize: function

Examples

>>> class Human(object):
...     def __init__(self, name):
...         self.name = name
>>> def serialize(human):
...     header = {}
...     frames = [human.name.encode()]
...     return header, frames
>>> def deserialize(header, frames):
...     return Human(frames[0].decode())
>>> register_serialization(Human, serialize, deserialize)
>>> serialize(Human('Alice'))
({}, [b'Alice'])
distributed.protocol.serialize.serialize(x, serializers=None, on_error='message')[source]

Convert object to a header and list of bytestrings

This takes in an arbitrary Python object and returns a msgpack serializable header and a list of bytes or memoryview objects.

The serialization protocols to use are configurable: a list of names define the set of serializers to use, in order. These names are keys in the serializer_registry dict (e.g., ‘pickle’, ‘msgpack’), which maps to the de/serialize functions. The name ‘dask’ is special, and will use the per-class serialization methods. None gives the default list ['dask', 'pickle'].

Returns:
header: dictionary containing any msgpack-serializable metadata
frames: list of bytes or memoryviews, commonly of length one

See also

deserialize
Convert header and frames back to object
to_serialize
Mark that data in a message should be serialized
register_serialization
Register custom serialization functions

Examples

>>> serialize(1)
({}, [b'\x80\x04\x95\x03\x00\x00\x00\x00\x00\x00\x00K\x01.'])
>>> serialize(b'123')  # some special types get custom treatment
({'type': 'builtins.bytes'}, [b'123'])
>>> deserialize(*serialize(1))
1
distributed.protocol.serialize.deserialize(header, frames, deserializers=None)[source]

Convert serialized header and list of bytestrings back to a Python object

Parameters:
header: dict
frames: list of bytes

See also

serialize