Tuesday, 3 July 2018

Pipe Dreams, part 2

Pipelines - a guided tour of the new IO API in .NET, part 2

In part 1, we discussed some of the problems that exist in the familiar Stream API, and we had an introduction to the Pipe, PipeWriter and PipeReader APIs, looking at how to write to a single Pipe and then consume the data from that Pipe; we also discussed how FlushAsync() and ReadAsync() work together to keep both sides of the machinery working, dealing with "empty" and "full" scenarios - suspending the reader when there is nothing to do, and resuming it when data arrives; and suspending the writer when it is out-pacing the reader (over-filling the pipe), and resuming when the reader has caught up; and we discussed what it means to "resume" here, in terms of the threading model.

In this part, we're going to discuss the memory model of pipelines: where does the data actually live?. We'll also start looking at how we can use pipelines in realistic scenarios to fulfil real needs.

The memory model: where are all my datas?

In part 1, we spoke about how the pipe owns all the buffers, allowing the writer to request a buffer via GetMemory() and GetSpan(), with the committed data later being exposed to the reader via the .Buffer on ReadAsync() - which is a ReadOnlySequence<byte>, i.e some number of segments of data.

So what actually happens?

Each Pipe instance has a reference to a MemoryPool<byte> - a new device in System.Memory for, unsurprisingly, creating a memory pool. You can specify a specific MemoryPool<byte> in the options when creating a Pipe, but by default (and, I imagine, almost always) - a shared application-wide pool (MemoryPool<byte>.Shared) is used.

The MemoryPool<byte> concept is very open-ended. The default implementation simply makes use of ArrayPool<byte>.Shared (the application wide array-pool), renting arrays as needed, and returning them when done. This ArrayPool<T> is implemented using WeakReference, so pooled arrays are collectible if memory pressure demands it. However, when you ask GetMemory(someSize) or GetSpan(someSize), it doesn't simply ask the memory pool for that amount; instead, it tracks a "segment" internally. A new "segment" will be (by default, configurable) the larger of someSize or 2048 bytes. Requesting a non-trivial amount of memory means that we aren't filling the system with tiny arrays, which would significantly impact garbage collection. When you Advance(bytesWritten) in the writer, it:

  • moves an internal counter that is how much of the current segment has been used
  • updates the end of the "available to be read" chain for the reader; if we've just written the first bytes of an empty segment, this will mean adding a new segment to the chain, otherwise it'll mean increasing the end marker of the final segment of the existing chain

It is this "available to be read" chain that we fetch in ReadAsync(); and as we AdvanceTo in the reader - when entire segments are consumed, the pipe hands those segments back to the memory pool. From there, they can be reused many times. And as a direct consequence of the two points above, we can see that most of the time, even with multiple calls to Advance in the writer, we may end up with a single segment in the reader, with multiple segments happening either at segment boundaries, or where the reader is falling behind the writer, and data is starting to accumulate.

What this achieves just using the default pool is:

  • we don't need to keep allocating every time we call GetMemory() / GetSpan()
  • we don't need a separate array per GetMemory() / GetSpan() - we'll often just get a different range of the same "segment"
  • a relatively small number of non-trivial buffer arrays are used
  • they are automatically recycled without needing lots of library code
  • when not being used, they are available for garbage collection

This also explains why the approach of requesting a very small amount in GetMemory() / GetSpan() and then checking the size can be so successful: we have access to the rest of the unused part of the current segment. Meaning: with a segment size of 2048, of which 200 bytes were already used by previous writes - even if we only ask for 5 bytes, we'll probably find we have 1848 bytes available to play with. Or possibly more - remember that obtaining an array from ArrayPool<T>.Shared is also an "at least this big" operation.

Zero copy buffers

Something else to notice in this setup is that we get data buffering without any data copying. The writer asked for a buffer, and wrote the data to where it needed to be the first time, on the way in. This then acted as a buffer between the writer and the reader without any need to copy data around. And if the reader couldn't process all the data yet, it was able to push data back into the pipe simply by saying explicitly what it did consume. There was no need to maintain a separate backlog of data for the reader, something that is very common in protocol processing code using Stream.

It is this combination of features that makes the memory aspect of pipeline code so friendly. You could do all of this with Stream, but it is an excruciating amount of error-prone code to do it, and even more if you want to do it well - and you'd pretty much have to implement it separately for each scenario. Pipelines makes good memory handling the default simple path - the pit of success.

More exotic memory pools

You aren't limited to the memory model discussed; you can implement your own custom memory pool! The advantage of the default pool is that it is simple. In particular, it doesn't really matter if we aren't 100% perfect about returning every segment - if we somehow drop a pipe on the floor, the worst that can happen is that the garbage collector collects the abandoned segments at some point. They won't go back into the pool, but that's fine.

You can, however, do much more interesting things. Imagine, for example, a MemoryPool<byte> that takes huge slabs of memory - either managed memory via a number of very large arrays, or unmanaged memory via Marshal.AllocHGlobal (note that Memory<T> and Span<T> are not limited to arrays - all they require is some kind of contiguous memory), leasing blocks of this larger chunk as required. This has great potential, but it becomes increasingly important to ensure that segments are reliably returned. Most systems shouldn't need this, but it is good that the flexibility is offered.

Useful pipes in real systems

The example that we used in part 1 was of a single Pipe that was written and read by the same code. That's clearly not a realistic scenario (unless we're trying to mock an "echo" server), so what can we do for more realisting scenarios? First, we need to connect our pipelines to something. We don't usually want a Pipe in isolation; we want a pipe that integrates with a common system or API. So; let's start by seeng what this would look like.

Here we need a bit of caveat and disclaimer: the pipelines released in .NET Core 2.1 do not include any endpoint implementations. Meaning: the Pipe machinery is there, but nothing is shipped inside the box that actually connects pipes with any other existing systems - like shipping the abstract Stream base-type, but without shipping FileStream, NetworkStream, etc. Yes, that sounds frustrating, but it was a pragmatic reality of time constraints. Don't panic! There are... "lively" conversations going on right now about which bindings to implement with which priority; and there are few community offerings to bridge the most obvious gaps for today.

Since we find ourselves in that position, we might naturally ask: "what does it take to connect pipelines to another data backend?".

Perhaps a good place to start would be connecting a pipe to a Stream. I know what you're thinking: "Marc, but in part 1 you went out of your way to say how terrible Stream is!". I haven't changed my mind; it isn't necessarily ideal - for any scenario-specific Stream implementation (such as NetworkStream or FileStream) we could have a dedicated pipelines-based endpoint that talked directly to that service with minimal indirection; but it is a useful first step:

  • it gives us immediate access to a huge range of API surfaces - anything that can expose data via Stream, and anything that can act as a middle-layer via wrapped streams (encryption, compression, etc)
  • it hides all the wrinkly bits of the Stream API behind a clear unambiguous surface
  • it gives us almost all of the advantages that we have mentioned so far

So, let's get started! The first thing we need to think about is: what is the direction here? As previously mentioned, a Stream is ambiguous - and could be read-only, write-only, or read-write. Let's assume we want to deal with the most general case: a read-write stream that acts in a duplex manner - this will give us access to things like sockets (via NetworkStream). This means we're actually going to want two pipes - one for the input, one for the output. Pipelines helps clear this up for us, by declaring an interface expressly for this: IDuplexPipe. This is a very simple interface, and being handed an IDuplexPipe is analogous to being handed the ends of two pipes - one marked "in", one marked "out":

interface IDuplexPipe
{
    PipeReader Input { get; }
    PipeWriter Output { get; }
}

What we want to do, then, is create a type that implements IDuplexPipe, but using 2 Pipe instances internally:

  • one Pipe will be the output buffer (from the consumer's perspective), which will be filled by caller-code writing to Output - and we'll have a loop that consumes this Pipe and pushes the data into the underlying Stream (to be written to the network, or whatever the stream does)
  • one Pipe will be the input buffer (from the consumer's perspective); we'll have a loop that reads data from the underlying Stream (from the network, etc) and pushes it into the Pipe, where it will be drained by caller-code reading from Input

This approach immediately solves a wide range of problems that commonly affect people using Stream:

  • we now have input/output buffers that decouple stream access from the read/write caller-code, without having to add BufferedStream or similar to prevent packet fragmentation (for the writing code), and to make it very easy to continue receiving more data while we process it (for the reading code especially, so we don't have to keep pausing while we ask for more data)
  • if the caller-code is writing faster than the stream Write can process, the back-pressure feature will kick in, throttling the caller-code so we don't end up with a huge buffer of unsent data
  • if the stream Read is out-pacing the caller-code that is consuming the data, the back-pressure will kick in here too, throttling our stream read loop so we don't end up with a huge buffer of unprocessed data
  • both the read and write implementations benefit from all the memory pool goodness that we discussed above
  • the caller-code doesn't ever need to worry about backlog of data (incomplete frames), etc - the pipe deals with it

So what might that look like?

Essentially, all we need to do, is something like:

class StreamDuplexPipe : IDuplexPipe
{
    Stream _stream;
    Pipe _readPipe, _writePipe;

    public PipeReader Input => _readPipe.Reader;
    public PipeWriter Output => _writePipe.Writer;
    
    // ... more here
}

Note that we have two different pipes; the caller gets one end of each pipe - and our code will act on the other end of each pipe.

Pumping the pipe

So what does the code look like to interact with the stream? We need two methods, as disccused above. The first - and simplest - has a loop that reads data from the _stream and pushes it to _readPipe, to be consumed by the calling code; the core of this method could be something like

while (true)
{
    // note we'll usually get *much* more than we ask for
    var buffer = _readPipe.Writer.GetMemory(1);
    int bytes = await _stream.ReadAsync(buffer);
    _readPipe.Writer.Advance(bytes);
    if (bytes == 0) break; // source EOF
    var flush = await _readPipe.Writer.FlushAsync();
    if (flush.IsCompleted || flush.IsCanceled) break;
}

This loop asks the pipe for a buffer, then uses the new netcoreapp2.1 overload of Stream.ReadAsync that accepts a Memory<byte> to populate that buffer - we'll discuss what to do if you don't have an API that takes Memory<byte> shortly. When the read is complete, it commits that-many bytes to the pipe using Advance, then it invokes FlushAsync() on the pipe to (if needed) awaken the reader, or pause the write loop while the back-pressure eases. Note we should also check the outcome of the Pipe's FlushAsync() - it could tell us that the pipe's consumer has signalled that they've finished reading the data they want (IsCompleted), or that the pipe itself was shut down (IsCanceled).

Note that in both cases, we want to ensure that we tell the pipe when this loop has exited - however it exits - so that we don't end up with the calling code awaiting forever on data that will never come. Accidents happen, and sometimes the call to _stream.ReadAsync (or any other method) might throw an exception, so a good way to do this is with a try/finally block:

Exception error = null;
try
{
    // our loop from the previous sample
}
catch(Exception ex) { error = ex; }
finally { _readPipe.Writer.Complete(error); }

If you prefer, you could also use two calls to Complete - one at the end of the try (for success) and one inside the catch (for failure).

The second method we need is a bit more complex; we need a loop that consumes data from _writePipe and pushes it to _stream. The core of this could be something like:

while (true)
{
    var read = await _writePipe.Reader.ReadAsync();
    var buffer = read.Buffer;
    if (buffer.IsCanceled) break;
    if (buffer.IsEmpty && read.IsCompleted) break;

    // write everything we got to the stream
    foreach (var segment in buffer)
    {
        await _stream.WriteAsync(segment);
    }
    _writePipe.AdvanceTo(buffer.End);
    await _stream.FlushAsync();    
}

This awaits some data (which could be in multiple buffers), and checks some exit conditions; as before, we can give up if IsCanceled, but the next check is more subtle: we don't want to stop writing just because the producer indicated that they've written everything they wanted to (IsCompleted), or we might not write the last few segments of their data - we need to continue until we've written all their data, so buffer.IsEmpty. This is simplified in this case because we're always writing everything - we'll see a more complex example shortly. Once we have data, we write each of the non-contiguous buffers to the stream sequentially - because Stream can only write one buffer at a time (again, I'm using the netcoreapp2.1 overload here that accepts ReadOnlyMemory<byte>, but we aren't restricted to this). Once it has written the buffers, it tells the pipe that we have consumed the data, and flushes the underlying Stream.

In "real" code we might want to be a bit more aggressive about optimizing to reduce flushing the underlying stream until we know there is no more data readily available, perhaps using the _writePipe.Reader.TryRead(...) method in addition to _writePipe.Reader.ReadAsync() method; this method works similarly to ReadAsync() but is guaranteed to always return synchronously - useful for testing "did the writer append something while I was busy?". But the above illustrates the point.

Additionally, like before we would want to add a try/finally, so that we always call _writePipe.Reader.Complete(); when we exit.

We can use the PipeScheduler to start these two pumps, which will ensure that they run in the intended context, and our loops start pumping data. We'd have a little more house-keeping to add (we'd probably want a mechanism to Close()/Dispose() the underlying stream, etc) - but as you can see, it doesn't have to be a huge task to connect an IDuplexPipe to a source that wasn't designed with pipelines in mind.

Here's one I made earlier...

I've simplified the above a little (not too much, honest) to make it consise for discussion, but you still probably don't want to start copying/pasting chunks from here to try and get it to work. I'm not claiming they are the perfect solution for all situations, but as part of the 2.0 work for StackExchange.Redis, we have implemented a range of bindings for pipelines that we are making available on nuget - unimaginatively titled Pipelines.Sockets.Unofficial (nuget, github); this includes:

  • converting a duplex Stream to an IDuplexPipe (like the above)
  • converting a read-only Stream to a PipeReader
  • converting a write-only Stream to a PipeWriter
  • converting an IDuplexPipe to a duplex Stream
  • converting a PipeReader to a read-only Stream
  • converting a PipeWriter to a writer-only Stream
  • converting a Socket to an IDuplexPipe directly (without going via NetworkStream)

The first six are all available via static methods on StreamConnection; the last is available via SocketConnection.

StackExchange.Redis is very involved in Socket work, so we are very interested in how to connect pipelines to sockets; for redis connections without TLS, we can connect our Socket direct to the pipeline:

  • SocketSocketConnection

For redis connections with TLS (in particular: cloud redis providers), we can connect the pieces thusly:

  • SocketNetworkStreamSslStreamStreamConnection

Both of these configurations give us a Socket at one end, and an IDuplexPipe at the other, and it begins to show how we can orchcestrate pipelines as part of a more complex system. Perhaps more importantly, it gives us room in the future to change the implementation. As examples of future possibilities:

  • Tim Seaward has been working on Leto, which provides TLS capability as an IDuplexPipe directly, without requiring SslStream (and thus: no stream inverters)
  • between Tim Seaward, David Fowler and Ben Adams, there are a range of experimental or in-progress network layers directly implementing pipelines without using managed sockets, including "libuv", "RIO" (Registered IO), and most recently, "magma" - which pushes the entire TCP stack into user code to reduce syscalls.

It'll be interesting to see how this space develops!

But my existing API doesn't talk in Span<byte> or Memory<byte>!

When writing code to pump data from a pipe to another system (such as a Socket), it is very likely you'll bump into APIs that don't take Memory<byte> or Span<byte>. Don't panic, all is not lost! You still have multiple ways of breaking out of that world into something more ... traditional.

The first trick, for when you have a Memory<T> or ReadOnlyMemory<T>, is MemoryMarshal.TryGetArray(...). This takes in a memory and attempts to get an ArraySegment<T> that describes the same data in terms of a T[] vector and an int offset/count pair. Obviously this can only work if the memory was based on a vector, which is not always the case. So this can fail on exotic memory pools. Our second escape hatch is MemoryMarshal.GetReference(...). This takes in a span and returns a reference (actually a "managed pointer", aka ref T) to the start of the data. Once we have a ref T, we can use unsafe C# to get an unmanaged pointer to the data, useful for APIs that talk in such:

Span<byte> span = ...
fixed(byte* ptr = &MemoryMarshal.GetReference(span))
{
    // ...
}

It can still do this if the length of the span is zero, returning a reference to where the zeroth item would have been, and it even works for a default span where there never was any backing memory. This last one requires a slight word of caution because a ref T is not usually expected to be null, but that's exactly what you get here. Essentially, as long as you don't ever try to dereference this kind of null reference: you'll be fine. If you use fixed to convert it to an unmanaged pointer, you get back a null (zero) pointer, which is more expected (and can be useful in some P/Invoke scenarios). MemoryMarshal is essentially synonymous with unsafe code, even if the method you're calling doesn't require the unsafe keyword. It is perfectly valid to use it, but if you use it incorrectly, it reserves the right to hurt you - so just be careful.

What about the app-code end of the pipe?

OK, we've got our IDuplexPipe, and we've seen how to connect the "business end" of both pipes to your backend data service of choice. Now; how do we use it in our app code?

As in our example from part 1, we're going to hand the PipeWriter from IDuplexPipe.Output to our outbound code, and the PipeReader from IDuplexPipe.Input to our inbound code.

The outbound code is typically very simple, and is usually a very direct port to get from Stream-based code to PipeWriter-based. The key difference, once again, is that you don't control the buffers. A typical implementation might look something like:

ValueTask<bool> Write(SomeMessageType message, PipeWriter writer)
{
    // (this may be multiple GetSpan/Advance calls, or a loop,
    // depending on what makes sense for the message/protocol)
    var span = writer.GetSpan(...);
    // TODO: ... actually write the message
    int bytesWritten = ... // from writing
    writer.Advance(bytesWritten);

    return FlushAsync(writer);
}

private static async ValueTask<bool> FlushAsync(PipeWriter writer)
{
    // apply back-pressure etc
    var flush = await writer.FlushAsync();
    // tell the calling code whether any more messages
    // should be written
    return !(flush.IsCanceled || flush.IsCompleted);
}

The first part of Write is our business code - we do whatever we need to write the data to the buffers from writer; typically this will include multiple calls to GetSpan(...) and Advance(). When we've written our message, we can flush it to ensure the pump is active, and apply back-pressure. For very large messages we could also flush at intermediate points, but for most simple scenarios: flushing once per message is fine.

If you're wondering why I split the FlushAsync code into a separate method: that's because I want to await the result of FlushAsync to check the exit conditions, so it needs to be in an async method. The most efficient way to access memory here is via the Span<byte> API, and Span<byte> is a ref struct type; as a consequence we cannot use a Span<byte> local variable in an async method. A pragmatic solution is to simply split the methods, so one method deals with the Span<byte> work, and another method deals with the async aspect.

Random aside: async code, hot synchronous paths, and async machinery overhead

The machinery involved in async / await is pretty good, but it can still be a surprising amount stack work - you can see this on sharplab.io - take a look at the generated machinery for the OurCode.FlushAsync method - and the entirety of struct <FlushAsync>d__0. Now, this code is not terrible - it tries hard to avoid allocations in the synchronous path - but it is unnecessary. There are two ways to signficantly improve this; one is to not await at all, which is often possible if the await is the last line in a method and we don't need to process the results: don't await - just remove the async and return the task - complete or incomplete. We can't do that here, because we need to check the state of the result, but we can optimize for success by checking whether the task is already complete (via .IsCompletedSuccessfully - if it has completed but faulted, we still want to use the await to make sure the exception behaves correctly). If it is successfully completed, we're allowed to access the .Result; so we could also write our FlushAsync method as:

private static ValueTask<bool> Flush(PipeWriter writer)
{
    bool GetResult(FlushResult flush)
        // tell the calling code whether any more messages
        // should be written
        => !(flush.IsCanceled || flush.IsCompleted);

    async ValueTask<bool> Awaited(ValueTask<FlushResult> incomplete)
        => GetResult(await incomplete);

    // apply back-pressure etc
    var flushTask = writer.FlushAsync();

    return flushTask.IsCompletedSuccessfully
        ? new ValueTask<bool>(GetResult(flushTask.Result))
        : Awaited(flushTask);
}

This completely avoids the async/await machinery in the most common case: synchronous completion - as we can see again on sharplab.io. I should emphasize: there's absolutely no point doing this if the code is usually (or exclusively) going to actually be asynchronous; it only helps when the result is usually (or exclusively) going to be available synchronously.

And what about the reader?

As we've seen many times, the reader is often slightly more complicated - we can't know that a single "read" operation will contain exactly one inbound message. We may need to loop until we have all the data we need, and we may have additional data that we need to push back. So let's assume we want to consume a single message of some kind:

async ValueTask<SomeMessageType> GetNextMessage(
    PipeReader reader,
    CancellationToken cancellationToken = default)
{
    while (true)
    {
        var read = await reader.ReadAsync(cancellationToken);
        if (read.IsCanceled) ThrowCanceled();

        // can we find a complete frame?
        var buffer = read.Buffer;
        if (TryParseFrame(
            buffer,
            out SomeMessageType nextMessage,
            out SequencePosition consumedTo))
        {
            reader.AdvanceTo(consumedTo);
            return nextMessage;
        }
        reader.AdvanceTo(buffer.Start, buffer.End);
        if (read.IsCompleted) ThrowEOF();        
    }
}

Here we obtain some data from the pipe, checking exit conditions like cancelation. Next, we try to find a message; what this means depends on your exact code - this could mean:

  • looking through the buffer for some sentinel value such as an ASCII line-ending, then treating everything up to that point as a message (discarding the line ending)
  • parsing a well-defined binary frame header, obtaining the payload length, checking that we have that much data, and processing it
  • or anything else you want!

If we do manage to find a message, we can tell the pipe to discard the data that we've consumed - by AdvanceTo(consumedTo), which uses whatever our own frame-parsing code told us that we consumed. If we don't manage to find a message, the first thing to do is tell the pipe that we consumed nothing despite trying to read everything - by reader.AdvanceTo(buffer.Start, buffer.End). At this point, there are two possibilities:

  • we haven't got enough data yet
  • the pipe is dead and there will never be enough data

Our check on read.IsCompleted tests this, reporting failure in the latter case; otherwise we continue the loop, and await more data. What is left, then, is our frame parsing - we've reduced complex IO management down to simple operations; for example, if our messages are separated by line-feed sentinels:

private static bool TryParseFrame(
    ReadOnlySequence<byte> buffer,
    out SomeMessageType nextMessage,
    out SequencePosition consumedTo)
{
    // find the end-of-line marker
    var eol = buffer.PositionOf((byte)'\n');
    if (eol == null)
    {
        nextMessage = default;
        consumedTo = default;
        return false;
    }

    // read past the line-ending
    consumedTo = buffer.GetPosition(1, eol.Value);
    // consume the data
    var payload = buffer.Slice(0, eol.Value);
    nextMessage = ReadSomeMessageType(payload);
    return true;
}

Here PositionOf tries to find the first location of a line-feed. If it can't find one, we give up. Otherwise, we set consumedTo to be "the line-feed plus one" (so we consume the line-feed), and we slice our buffer to create a sub-range that represents the payload without the line-feed, which we can then parse (however). Finally, we report success, and can rejoice at the simplicity of parsing linux-style line-endings.

What's the point here?

With minimal code that is very similar to the most naïve and simple Stream version (without any nice features) our app code now has a reader and writer chain that automatically exploits a wide range of capabilities to ensure efficient and effective processing. Again, you can do all these things with Stream, but it is really, really hard to do well and reliably. By pushing all theses features into the framework, multiple code-bases can benefit from a single implementation. It also gives future scope for interesting custom pipeline endpoints and decorators that work directly on the pipeline API.

Summary

In this section, we looked at the memory model used by pipelines, and how it helps us avoid allocations. Then we looked at how we might integrate pipelines into existing APIs and systems such a Stream - and we introduced Pipelines.Sockets.Unofficial as an available utility library. We looked at the options available for integrating span/memory code with APIs that don't offer those options, and finally we looked at what the actual calling code might look like when talking to pipelines (taking a brief side step into how to optimize async code that is usually synchronous) - showing what our application code might look like. In the third and final part, we'll look at how we combine all these learning points when looking at a real-world library such at StackExchange.Redis - discussing what complications the code needed to solve, and how pipelines made it simple to do so.

Monday, 2 July 2018

Pipe Dreams, part 1

Pipelines - a guided tour of the new IO API in .NET, part 1

(part 2 here)

About two years ago I blogged about an upcoming experimental IO API in the .NET world - at the time provisionally called "Channels"; at the end of May 2018, this finally shipped - under the name System.IO.Pipelines. I am hugely interested in the API, and over the last few weeks I'm been consumed with converting StackExchange.Redis to use "pipelines", as part of our 2.0 library update.

My hope in this series, then, is to discuss:

  • what "pipelines" are
  • how to use them in terms of code
  • when you might want to use them

To help put this in concrete terms, after introducing "pipelines" I intend to draw heavily on the StackExchange.Redis conversion - and in particular by discussing which problems it solves for us in each scenario. Spoiler: in virtually all cases, the answer can be summarized as:

It perfectly fits a complex but common stumbling point in IO code; allowing us to replace an ugly kludge, workaround or compromise in our code - with a purpose-designed elegant solution that is in framework code.

I'm pretty sure that the pain points I'm going to cover below will be familiar to anyone who works at "data protocol" levels, and I'm equally sure that the hacks and messes that we'll be replacing with pipelines will be duplicated in a lot of code-bases.

What do pipelines replace / complement?

The starting point here has to be: what is the closest analogue in existing framework code? And that is simple: Stream. The Stream API will be familiar to anyone who has worked with serialization or data protocols. As an aside: Stream is actually a very ambiguous API - it works very differently in different scenarios:

  • some streams are read-only, some are write-only, some are read-write
  • the same concrete type can sometimes be read-only, and sometimes write-only (DeflateStream, for example)
  • when a stream is read-write, sometimes it works like a cassette tape, where read and write are operating on the same underlying data (FileStream, MemoryStream); and sometimes it works like two separate streams, where read and write are essentially completely separate streams (NetworkStream, SslStream) - a duplex stream
  • in many of the duplex cases, it is hard or impossible to express "no more data will be arriving, but you should continue to read the data to the end" - there's just Close(), which usually kills both halves of the duplex
  • sometimes streams are seekable and support concepts like Position and Length; often they're not
  • because of the progression of APIs over time, there are often multiple ways of performing the same operation - for example, we could use Read (synchronous), BeginRead/EndRead (asynchronous using the IAsyncResult pattern), or ReadAsync (asynchronous using the async/await pattern); calling code has no way in the general case of knowing which of these is the "intended" (optimal) API
  • if you use either of the asynchronous APIs, it is often unclear what the threading model is; will it always actually be synchronous? if not, what thread will be calling me back? does it use sync-context? thread-pool? IO completion-port threads?
  • and more recently, there are also extensions to allow Span<byte> / Memory<byte> to be used in place of byte[] - again, the caller has no way of knowing which is the "preferred" API
  • the nature of the API encourages copying data; need a buffer? that's a block-copy into another chunk of memory; need a backlog of data you haven't processed yet? block-copy into another chunk of memory; etc

So even before we start talking about real-world Stream examples and the problems that happen when using it, it is clear that there are a lot of problems in the Stream API itself. The first unsurprising news, then, is that pipelines sorts this mess out!

What are pipelines?

By "pipelines", I mean a set of 4 key APIs that between them implement decoupled and overlapped reader/writer access to a binary stream (not Stream), including buffer management (pooling, recycling), threading awareness, rich backlog control, and over-fill protection via back-pressure - all based around an API designed around non-contiguous memory. That's a heck of a word salad - but don't worry, I'll be talking about each element to explain what I mean.

Starting out simple: writing to, and reading from, a single pipe

Let's start with a Stream analogue, and write sometthing simple to a stream, and read it back - sticking to just the Stream API. We'll use ASCII text so we don't need to worry about any complex encoding concerns, and our read/write code shouldn't assume anything about the underlying stream. We'll just write the data, and then read to the end of the stream to consume it.

We'll do this with Stream first - familiar territory. Then we'll re-implement it with pipelines, to see where the similarities and differences lie. After that, we'll investigate what is actually happening under the hood, so we understand why this is interesting to us!

Also, before you say it: yes, I'm aware of TextReader/TextWriter; I'm not using them intentionally - because I'm trying to talk about the Stream API here, so that the example extends to a wide range of data protocols and scenarios.

using (MemoryStream ms = new MemoryStream())
{
    // write something
    WriteSomeData(ms);
    // rewind - MemoryStream works like a tape
    ms.Position = 0;
    // consume it
    ReadSomeData(ms);
}

Now, to write to a Stream the caller needs to obtain and populate a buffer which they then pass to the Stream. We'll keep it simple for now by using the synchronous API and simply allocating a byte[]:

void WriteSomeData(Stream stream)
{
    byte[] bytes = Encoding.ASCII.GetBytes("hello, world!");
    stream.Write(bytes, 0, bytes.Length);
    stream.Flush();
}

Note: there are tons of things in the above I could do for efficiency; but that isn't the point yet. So if you're familiar with this type of code and are twitching at the above... don't panic; we'll make it uglier - er, I mean more efficient - later.

The reading code is typically more complex than the writing code, because the reading code can't assume that it will get everything in a single call to Read. A read operation on a Stream can return nothing (which indicates the end of the data), or it could fill our buffer, or it could return a single byte despite being offered a huge buffer. So read code on a Stream is almost always a loop:

void ReadSomeData(Stream stream)
{
    int bytesRead;
    // note that the caller usually can't know much about
    // the size; .Length is not usually usable
    byte[] buffer = new byte[256];
    do
    {
        bytesRead = stream.Read(buffer, 0, buffer.Length);
        if (bytesRead > 0)
        {   // note this only works for single-byte encodings
            string s = Encoding.ASCII.GetString(
                buffer, 0, bytesRead);
            Console.Write(s);
        }
    } while (bytesRead > 0);
}

Now let's translate that to pipelines. A Pipe is broadly comparable to a MemoryStream, except instead of being able to rewind it many times, the data is more simply a "first in first out" queue. We have a writer API that can push data in at one end, and a reader API that can pull the data out at the other. The Pipe is the buffer that sits between the two. Let's reproduce our previous scenario, but using a single Pipe instead of the MemoryStream (again not something we'd usually do in practice, but it is simple to illustrate):

Pipe pipe = new Pipe();
// write something
await WriteSomeDataAsync(pipe.Writer);
// signal that there won't be anything else written
pipe.Writer.Complete();
// consume it
await ReadSomeDataAsync(pipe.Reader);

First we create a pipe using the default options, then we write to it. Note that IO operations on pipes are usually asynchronous, so we'll need to await our two helper methods. Note also that we don't pass the Pipe to them - unlike Stream, pipelines have separate API surfaces for read and write operations, so we pass a PipeWriter to the helper method that does our writing, and a PipeReader to the helper method that does our reading. After writing the data, we call Complete() on the PipeWriter. We didn't have to do this with the MemoryStream because it automatically EOFs when it reaches the end of the buffered data - but on some other Stream implementations - especially one-way streams - we might have had to call Close after writing the data.

OK, so what does WriteSomeDataAsync look like? Note, I've deliberately over-annotated here:

async ValueTask WriteSomeDataAsync(PipeWriter writer)
{
    // use an oversized size guess
    Memory<byte> workspace = writer.GetMemory(20);
    // write the data to the workspace
    int bytes = Encoding.ASCII.GetBytes(
        "hello, world!", workspace.Span);
    // tell the pipe how much of the workspace
    // we actually want to commit
    writer.Advance(bytes);
    // this is **not** the same as Stream.Flush!
    await writer.FlushAsync();
}

The first thing to note is that when dealing with pipelines: you don't control the buffers: the Pipe does. Recall how in our Stream code, both the read and write code created a local byte[], but we don't have that here. Instead, we ask the Pipe for a buffer (workspace), via the GetMemory method (or it's twin - GetSpan). As you might expect from the name, this gives us either a Memory<byte> or a Span<byte> - of size at least twenty bytes.

Having obtained this buffer, we encode our string into it. This means that we're writing directly into the pipe's memory, and keep track of how many bytes we actually used, so we can tell it in Advance. We are under no obligation to use the twenty that we asked for: we could write zero, one, twenty, or even fifty bytes. The last one may seem surprising, but it is actually actively encouraged! The emphasis previously was on "at least" - the writer can actually give us a much bigger buffer than we ask for. When dealing with larger data, it is common to make modest requests but expect greatness: ask for the minumum we can usefully utilize, but then check the size of the memory/span that it gives us before deciding how much to actually write.

The call to Advance is important; this completes a single write operation, making the data available in the pipe to be consumed by a reader. The call to FlushAsync is equally important, but much more nuanced. However, before we can adequately describe what it does, we need to take a look at the reader. So; here's our ReadSomeDataAsync method:

async ValueTask ReadSomeDataAsync(PipeReader reader)
{
    while (true)
    {
        // await some data being available
        ReadResult read = await reader.ReadAsync();
        ReadOnlySequence<byte> buffer = read.Buffer;
        // check whether we've reached the end
        // and processed everything
        if (buffer.IsEmpty && read.IsCompleted)
            break; // exit loop

        // process what we received
        foreach (Memory<byte> segment in buffer)
        {
            string s = Encoding.ASCII.GetString(
                segment.Span);
            Console.Write(s);
        }
        // tell the pipe that we used everything
        reader.AdvanceTo(buffer.End);
    }
}

Just like with the Stream example, we have a loop that continues until we've reached the end of the data. With Stream, that is defined as being when Read returns a non-positive result, but with pipelines there are two things to check:

  • read.IsCompleted tells us whether the write pipe has been signalled as completed and therefore no more data will be written (pipe.Writer.Complete(); in our earlier code did this)
  • buffer.IsEmpty tells us whether there is any data left to proces in this iteration

If there's nothing in the pipe now and the writer has been completed, then there will never be anything in the pipe, and we can exit.

If we do have data, then we can look at buffer. So first - let's talk about buffer; in the code it is a ReadOnlySequence<byte>, which is a new type - this concept combines a few roles:

  • describing non-contiguous memory, speficially a sequence of zero, one or many ReadOnlyMemory<byte> chunks
  • describing a logical position (SequencePosition) in such a data-stream - in particular via buffer.Start and buffer.End

The non-contiguous is very important here. We'll look at where the data is actually going shortly, but in terms of reading: we need to be prepared to handle data that could be spread accross multiple segments. In this case, we do this by a simple foreach over the buffer, decoding each segment in turn. Note that even though the API is designed to be able to describe multiple non-contiguous buffers, it is frequently the case that the data received is contiguous in a single buffer; and in that case, it is often possible to write an optimized implementation for a single buffer. You can do that by checking buffer.IsSingleSegment and accessing buffer.First.

Finally, we call AdvanceTo, which tells the pipe how much data we actually used.

Key point: you don't need to take everything you are given!

Contrast to Stream: when you call Read on a Stream, it puts data into the buffer you gave it. In most real-world scenarios, it isn't always possible to consume all the data yet - maybe it only makes sense to consider "commands" as "entire text lines", and you haven't yet seen a cr/lf in the data. With Stream: this is tough - once you've been given the data, it is your problem; if you can't use it yet, you need to store the backlog somewhere. However, with pipelines, you can tell it what you've consumed. In our case, we're telling it that we consumed everything we were given, which we do by passing buffer.End to AdvanceTo. That means we'll never see that data again, just like with Stream. However, we could also have passed buffer.Start, which would mean "we didn't use anything" - and even though we had chance to inspect the data, it would remain in the pipe for subsequent reads. We can also get arbitrary SequencePosition values inside the buffer - if we read 20 bytes, for example - so we have full control over how much data is dropped from the pipe. There are two ways of getting a SequencePosition:

  • you can Slice(...) a ReadOnlySequence<byte> in the same way that you Slice(...) a Span<T> or Memory<T> - and access the .Start or .End of the resulting sub-range
  • you can use the .GetPosition(...) method of the ReadOnlySequence<byte>, which returns a relative position without actually slicing

Even more subtle: we can tell it separetely that we consumed some amount, but that we inspected a different amount. The most common example here is to express "you can drop this much - I'm done with that; but I looked at everything, I can't make any more progress at the moment - I need more data" - specifically:

reader.AdvanceTo(consumedToPosition, buffer.End);

This is where the subtle interplay of PipeWriter.FlushAsync() and PipeReader.ReadAsync() starts to come into play. I skipped over FlushAsync earlier, but it actually serves two different functions in one call:

  • if there is a ReadAsync call that is outstanding because it needs data, then it awakens the reader, allowing the read loop to continue
  • if the writer is out-pacing the reader, such that the pipe is filling up with data that isn't being cleared by the reader, it can suspend the writer (by not completing synchronously) - to be reactivated when there is more space in the pipe (the thresholds for writer suspend/resume can be optionally specified when creating the Pipe instance)

Obviously these concepts don't come into play in our example, but they are central ideas to how pipelines works. The ability to push data back into the pipe hugely simplifies a vast range of IO scenarios. Virtually every piece of protocol handling code I've seen before pipelines has masses of code related to handling the backlog of incomplete data - it is such a repeated piece of logic that I am incredibly happy to see it handled well in a framework library instead.

What does "awaken" or "reactivate" mean here?

You might have observed that I didn't really define what I meant here. At the obvious level, I mean that: an await operation of ReadAsync or FlushAsync had previously returned as incomplete, so now the asynchronous continuation gets invoked, allowing our async method to resume execution. Yeah, OK, but that's just re-stating what async/await mean. It is bug-bear of mine that I care deeply (really, it is alarming how deep) about which threads code runs on - for reasons that I'll talk about later in this series. So saying "the asynchronous continuation gets invoked" isn't enough for me. I want to understand who is invoking it, in terms of threads. The most common answers to this are:

  • it delegates via the SynchronizationContext (note: many systems do not have a SynchronizationContext)
  • the thread that triggered the state change gets used, at the point of the state change, to invoke the continuation
  • the global thread-pool is used to invoke the continuation

All of these can be fine in some cases, and all of these can be terrible in some cases! Sync-context is a well-established mechanism for getting from worker threads back to primary application threads (epecially: the UI thread in desktop applications). However, it isn't necessarily the case that just because we've finished one IO operation, we're ready to jump back to an application thread; and doing so can effectively push a lot of IO code and data processing code onto an application thread - usually the one thing we explicitly want to avoid. Additionally, it can be prone to deadlocks if the application code has used Wait() or .Result on an asynchronous call (which, to be fair, you're not meant to do). The second option (performing the callback "inline" on the thread that triggered it) can be problematic because it can steal a thread that you expected to be doing something else (and can lead to deadlocks as a consequence); and in some extreme cases it can lead to a stack-dive (and eventually a stack-overflow) when two asynchronous methods are essentially functioning as co-routines. The final option (global thread-pool) is immune to the problems of the other two - but can run into severe problems under some load conditions - something again that I'll discuss in a later part in this series.

However, the good news is that pipelines gives you control here. When creating the Pipe instance, we can supply PipeScheduler instances to use for the reader and writer (separately). The PipeScheduler is used to perform these activations. If not specified, then it defaults first to checking for SynchronizationContext, then using the global thread-pool, with "inline" continuations (i.e. intionally using the thread that caused the state change) as another option readily available. But: you can provide your own implementation of a PipeScheduler, giving you full control of the threading model.

Summary

So: we've looked at what a Pipe is when considered individually, and how we can write to a pipe with a PipeWriter, and read from a pipe with a PipeReader - and how to "advance" both reader and writer. We've looked at the similarity and differences with Stream, and we've discussed how ReadAsync() and FlushAsync() can interact to control how the writer and reader pieces execute. We looked at how responsibility for buffers is reversed, with the pipe providing all buffers - and how the pipe can simplify backlog management. Finally, we discussed the threading model that is active for continuations in the await operations.

That's probably enough for step 1; next, we'll look at how the memory model for pipelines works - i.e. where does the data live. We'll also look at how we can use pipelines in real scenarios to start doing interesting things.