Table of Contents

Streaming deserialization

While typical SerializeAsync and DeserializeAsync methods exist, these are methods that complete only when the entire job is done. In particular for DeserializeAsync, this means that no subset of the deserialized data is available until it is fully deserialized.

There are times however where progressively obtaining the deserialized elements can be useful. For example, perhaps the stream contains a very long sequence of elements, and processing them incrementally instead of all at once can save memory or improve performance. Or perhaps the stream is intentionally a long-lived generator stream that emits values over long periods of time, and it is important to the receiver that values are produced and available right away, before the stream ends.

The DeserializeEnumerableAsync methods address such use cases.

One must first classify the presentation of msgpack streaming values to be deserialized. Two forms are supported:

  1. A stream that contains multiple msgpack structures without any envelope (e.g. a msgpack array).
  2. A stream that contains a msgpack structure, within which is a sequence to be streamed (e.g. a msgpack array of elements).

Sequence with no envelope

A sequence of msgpack structures without an array or any other data is said to have no envelope. To asynchronously enumerate each of these structures, we use the DeserializeEnumerableAsync methods that take no MessagePackSerializer.StreamingEnumerationOptions<T, TElement> parameter, such as DeserializeEnumerableAsync<T>(PipeReader, CancellationToken).

private static readonly MessagePackSerializer Serializer = new();

async Task ReadListAsync(PipeReader reader)
{
    await foreach (Person? item in Serializer.DeserializeEnumerableAsync<Person>(reader))
    {
        // Process item here.
    }
}

[GenerateShape]
internal partial record Person(int Age);

Sequence within an envelope

A sequence of msgpack structures that are found within a larger structure (e.g. a msgpack array) is said to have an envelope. To asynchronously enumerate each of these structures requires first parsing through the envelope preamble to navigate to the sequence. After enumerating the sequence, the remainder of the envelope is parsed in order to leave the reader positioned at valid position, at the end of the overall msgpack structure.

Navigating through the envelope is done by an expression provided to the MessagePackSerializer.StreamingEnumerationOptions<T, TElement> argument passed to any of the DeserializeEnumerableAsync methods that accept that as a parameter.

private static readonly MessagePackSerializer Serializer = new();

async Task ReadFamilyMembersAsync(PipeReader reader)
{
    MessagePackSerializer.StreamingEnumerationOptions<Family, Person> options = new(f => f.Members);
    await foreach (Person? item in Serializer.DeserializeEnumerableAsync(reader, options))
    {
        // Process item here.
    }
}

[GenerateShape]
internal partial record Family(Person[] Members);

internal record Person(int Age);

The paths from envelope to sequence may include stepping through properties, indexing into arrays or even dictionaries. However, not every valid C# expression will be accepted as a path.