Monday, 30 July 2018

Pipe Dreams, part 3.1

Pipelines - a guided tour of the new IO API in .NET, part 3.1

(part 1, part 2, part 3)

After part 3, I got some great feedback - mostly requests to clarify things that I touched on, but could do with further explanation. Rather than make part 3 even longer, I want to address those here! Yay, more words!


Isn't ArrayPoolOwner<T> doing the same thing as MemoryPool<T>? Why don't you just use MemoryPool<T> ?

Great question! I didn't actually mention MemoryPool<T>, so I'd better introduce it.

MemoryPool<T> is an abstract base type that offers an API of the form:

public abstract class MemoryPool<T> : IDisposable
{
    public abstract IMemoryOwner<T> Rent(int minBufferSize = -1);
    // not shown: a few unrelated details
}

As you can see, this Rent() method looks exactly like what we were looking for before - it takes a size and returns an IMemoryOwner<T> (to provide a Memory<T>), with it being returned from whence it came upon disposal.

MemoryPool<T> also has a default implementation (public static MemoryPool<T> Shared { get; }), which returns a MemoryPool<T> that is based on the ArrayPool<T> (i.e. ArrayMemoryPool<T>). The Rent() method returns an ArrayMemoryPoolBuffer, which looks remarkably like the thing that I called ArrayPoolOwner<T>.

So: a very valid question would be: "Marc, didn't you just re-invent the default memory pool?". The answer is "no", but it is for a very subtle reason that I probably should have expounded upon at the time.

The problem is in the name minBufferSize; well... not really the name, but the consequence. What this means is: when you Rent() from the default MemoryPool<T>.Shared, the .Memory that you get back will be over-sized. Often this isn't a problem, but in our case we really want the .Memory to represent the actual number of bytes that were sent (even if we are, behind the scenes, using a larger array from the pool to contain it).

We could use an extension method on arbitrary memory pools to wrap potentially oversized memory, i.e.

public static IMemoryOwner<T> RentRightSized<T>(
    this MemoryPool<T> pool, int size)
{
    var leased = pool.Rent(size);
    if (leased.Memory.Length == size)
        return leased; // already OK
    return new RightSizeWrapper<T>(leased, size);
}
class RightSizeWrapper<T> : IMemoryOwner<T>
{
    public RightSizeWrapper(
        IMemoryOwner<T> inner, int length)
    {
        _inner = inner;
        _length = length;
    } 
    IMemoryOwner<T> _inner;
    int _length;
    public void Dispose() => _inner.Dispose();
    public Memory<T> Memory
        => _inner.Memory.Slice(0, _length);
}

but... this would mean allocating two objects for most leases - one for the actual lease, and one for the thing that fixes the length. So, since we only really care about the array-pool here, it is preferable IMO to cut out the extra layer, and write our own right-sized implementation from scratch.

So: that's the difference in the reasoning and implementation. As a side note, though: it prompts the question as to whether I should refactor my API to actually implement the MemoryPool<T> API.


You might not want to complete with success if the cancellation token is cancelled

This is in relation to the while in the read loop:

while (!cancellationToken.IsCancellationRequested)
{...}

The more typical expectation for cancellation is for it to throw with a cancellation exception of some kind; therefore, if it is cancelled, I might want to reflect that.

This is very valid feedback! Perhaps the most practical fix here is simply to use while (true) and let the subsequent await reader.ReadAsync(cancellationToken) worry about what cancellation should look like.


You should clarify about testing the result in async "sync path" scenarios

In my aside about async uglification (optimizing when we expect it to be synchronous in most cases), I ommitted to talk about getting results from the pseudo-awaited operation. Usually, this comes down to calling .Result on an awaitable (something like a Task<T>, ValueTask<T>, or .GetResult() on an awaiter (the thing you get from .GetAwaiter()). I haven't done it in the example because in async terms this would simply have been an await theThing; usage, not a var local = await theThing; usage; but you can if you need that.

I must, however, clarify a few points that perhaps weren't clear:

  • you should not (usually) try to access the .Result of a task unless you know that it has already completed
  • knowing that it has completed isn't enough to know that it has completed successfully; if you only test "is completed", you can use .GetResult() on the awaiter to check for exceptions while also fetching the result (which you can then discard if you like)
  • in my case, I'm taking a shortcut by checking IsCompletedSuccessfully; this exists on ValueTask[<T>] (and on Task[<T>] in .NET Core 2.0, else you can check .Status == TaskStatus.RanToCompletion) - which is only true in the "completed without an exception" case
  • because of expectations around how exceptions on async operations are wrapped and surfaced, it is almost always preferable to just switch into the async flow if you know a task has faulted, and just await it; the compiler knows how to get the exception out in the most suitable way, so: let it do the hard work

You should explain more about ValueTask[<T>] vs Task[<T>] - not many people understand them well

OK! Many moons ago, Task<T> became a thing, and all was well. Task<T> actually happened long before C# had any kind of support for async/await, and the main scenarios it was concerned about were genuinely asynchronous - it was expected that the answer would not be immediately available. So, the ovehead of allocating a placeholder object was fine and dandy, dandy and fine.

As the usage of Task<T> grew, and the language support came into effect, it started to become clear that there were many cases where:

  • the operation would often be available immediately (think: caches, buffered data, uncontested locking primitives, etc)
  • it was being used inside a tight loop, or just at high frequency - i.e. something that happens thousands of times a second (file IO, network IO, synchronization over a collection, etc)

When you put those two things together, you find yourself allocating large numbers of objects for something that was only rarely actually asynchronous (so: when there wasn't data available in the socket, or the file buffer was empty, or the lock was contested). For some scenarios, there are pre-completed reusable task instances available (such as Task.CompletedTask, and inbuilt handling for some low integers), but this doesn't help if the return value is outside this very limited set. To help avoid the allocations in the general case, ValueTask[<T>] was born. A ValueTask[<T>] is a struct that implements the "awaitable" pattern (a duck-typed pattern, like foreach, but that's a story for another day), that essentially contains two fields:

  • a T if the value was known immediately (obviously not needed for the untyped ValueTask case)
  • a Task<T> if the value is pending and the answer depends on the result of the incomplete operation

That means that if the value is known now, no Task[<T>] (and no corresponding TaskCompletionSource<T>) ever needs to be allocated - we just throw back the struct, it gets unwrapped by the async pattern, and life is good. Only in the case where the operation is actually asynchronous does an object need to be allocated.

Now, there are three common views on what we should do with this:

  1. always expose Task[<T>], regardless of whether it is likely to be synchronous
  2. expose Task[<T>] if we know it will be async, expose ValueTask[<T>] if we think it may be synchronous
  3. always expose ValueTask[<T>]

Frankly, the only valid reason to use 1 is because your API surface was baked and fixed back before ValueTask[<T>] existed.

The choice between 2 and 3 is interesting; what we're actually talking about there is an implementation detail, so a good case could be argued for 3, allowing you to amaze yourself later if you find a way of doing something synchronously (where it was previously asynchronous), without breaking the API. I went for 2 in the code shown, but it would be something I'd be willing to change without much prodding.

You should also note that there is actually a fourth option: use custom awaitables (meaning: a custom type that implements the "awaitable" duck-typed pattern). This is an advanced topic, and needs very careful consideration. I'm not even going to give examples of how to do that, but it is worth noting that ReadAsync and FlushAsync ("pipelines" methods that we've used extensively here) do return custom awaitables. You'd need to really, really understand your reasons before going down that path, though.


I spotted a bug in your "next message number" code

Yes, the code shown in the post can generate two messages with id 1, after 4-billion-something messages:

messageId = ++_nextMessageId;
if (messageId == 0) messageId = 1;

Note that I didn't increment _nextMessageId when I dodged the sentinel (zero). There's also a very small chance that a previous message from 4-billion-ago still hasn't been replied to. Both of these are fixed in the "real" code.


You might be leaking your lease around the TrySetResult

In the original blog code, I had

tcs?.TrySetResult(payload.Lease());

If tcs is not null (via the "Elvis operator"), this allocates a lease and then invokes TrySetResult. However, TrySetResult can return false - meaning: it couldn't do that, because the underlying task was already completed in some other way (perhaps we added timeout code). The only time we should consider that we have successfully transferred ownership of the lease to the task is if it returns true. The real code fixes this, ensuring that it is disposed in all cases except where TrySetResult returns true.


What about incremental frame parsers?

In my discussion of handling the frame, I was using an approach that processed a frame either in it's entirety, or not at all. This is not the only option, and you can consume any amount of the frame that you want, as long as you write code to track the internal state. For example, if you are parsing http, you could parse the http headers into some container as long as you have at least one entire http header name/value pair (without requiring all the headers to start parsing). Similarly, you could consume some of the payload (perhaps writing what you have so far to disk). In both cases, you would simply need to Advance past the bit that you consider consumed, and update your own state object.

So yes, that is absolutely possible - even highly desirable in some cases. In some cases it is highly desirable not to start until you've got everything. Remember that parsing often means taking the data from a streamed representation, and pushing it into a model representation - you might actually need more memory for the model representation (especially if the source data is compressed or simply stored in a dense binary format). An advantage of incremental parsing is that when the last few bytes dribble in, you might already have done most of the parsing work - allowing you to overlap pre-processing the data with data receive - rather than "buffer, buffer, buffer; right - now start parsing it".

However, in the case I was discussing: the header was 8 bytes, so there's not much point trying to over-optimize; if we don't have an entire header now, we'll mostly likely have a complete header when the next packet arrives, or we'll never have an entire header. Likewise, because we want to hand the entire payload to the consumer as a single chunk, we need all the data. We could actually lease the target array as soon as we know the size, and start copying data into that buffer and releasing the source buffers. We're not actually gaining much by this - we're simply exchanging data in one buffer for the same amount of data in another buffer; but we're actually exposing ourselves to an attack vector: a malicious (or buggy) client can sent a message-header that claims to be sending a large amount of data (maybe 1GiB), then just ... keeps the socket open and doesn't send anything more. In this scenario, the client has sent almost nothing (maybe just 8 bytes!), but they've chewed up a lot of server memory. Now imagine they do this from 10, 100, or 1000 parallel connections - and you can see how they've achieved disproportionate harm to our server, for almost zero cost at the client. There are two pragmatic fixes for this:

  • Put an upper limit on the message size, and put an upper limit on the connections from a single endpoint
  • Make the client pay their dues: if they claim to be sending a large message (which may indeed have legitimate uses): don't lease any expensive resources until they've actually sent that much (which is what the code as-implemented achieves)

Emphasis: your choice of frame parsing strategy is entirely contextual, and you can play with other implementations.


So; that's the amendments. I hope they are useful. A huge "thanks" to the people who are keeping me honest here, including Shane GrĂ¼ling, David Fowler, and Nick Craver.

Sunday, 29 July 2018

Pipe Dreams, part 3

Pipelines - a guided tour of the new IO API in .NET, part 3

Update: please also see part 3.1 for further clarifications on this post

Sorry, it has been longer than anticipated since part 2 (also: part 1). A large part of the reason for that is that I've been trying to think how best to explain some of the inner guts of StackExchange.Redis in a way that makes it easy to understand, and is useful for someone trying to learn about "pipelines", not StackExchange.Redis. I've also been thinking on ways to feed more practical "pipelines" usage guidance into the mix, which was something that came up a lot in feedback to parts 1/2.

In the end, I decided that the best thing to do was to step back from StackExchange.Redis, and use a completely different example, but one that faces almost all of the same challenges.

So, with your kind permission, I'd like to deviate from our previously advertised agenda, and instead talk about a library by my colleague David Haney - SimplSockets. What I hope to convey is a range of both the reasoning behind prefering pipelines, but also practical guidance that the reader can directly transfer to their own IO-based needs. In particular, I hope to discuss:

  • different ways to pass chunks of data between APIs
  • working effectively with the array-pool
  • async/await optimization in the context of libraries
  • practical real-world examples of writing to and reading from pipelines
  • how to connect pipelines client and server types to the network
  • performance comparisons from pipelines, and tips on measuring performance

I'll be walking through a lot of code here, but I'll also be making the "real" code available for further exploration; this also includes some things I dodn't have time to cover here, such as how to host a pipelines server inside the Kestrel server.

Sound good?

What is SimplSockets?

This is a network helper library designed to make it easier to implement basic client/server network comms over a socket:

  • it implements a simple framing protocol to separate messages
  • it allows for concurrent usage over a single client, with a message queuing mechanism
  • it embeds additional data in the framing data to allow responses to be tied back to requests, to complete operations
  • out-of-order and out-of-band replies are allowed - you might send requests A, B, C - and get the responses A, C, D, B - i.e. two of the responses came in the opposite order (presumably B took longer to execute), and D came from the server unsolicited (broadcasts, etc)
  • individual messages are always complete in a single frame - there is no frame splitting
  • in terms of API surface: everything is synchronous and byte[] based; for example the client has a byte[] SendReceive(byte[]) method that sends a payload and blocks until the corresponding response is received, and there is a MessageReceived event for unsolicited messages that exposes a byte[]
  • the server takes incoming requests via the same MessageReceived event, and can (if required, not always) post replies via a Reply(byte[], ...) method that also takes the incoming message (for pairing) - and has a Broadcast(byte[]) method for sending a message to all clients
  • there are some other nuances like heartbeats, but; that's probably enough

So; we've probably got enough there to start talking about real-world - and very common - scenarios in network code, and we can use that to start thinking about how "pipelines" makes our life easier.

Also an important point: anything I say below is not meant to be critical of SimplSockets - rather, it is to acknowledge that it was written when a lot of pieces like "pipelines" and async/await didn't exist - so it is more an exploration into how we could implement this differently with today's tools.

First things first: we need to think about our exchange types

The first question I have here is - for received messages in particular: "how should we expose this data to consumers?". By this I mean: SimplSockets went with byte[] as the exchange type; can we improve on that? Unsurprisingly: yes. There are many approaches we can use here.

  1. at one extreme, we can stick with byte[] - i.e. allocate a standalone copy of the data, that we can hand to the user; simple to work with, and very safe (nobody else sees that array - no risk of confusion), but it comes at the cost of allocations and copy time.
  2. at the other extreme, we can use zero-copy - and stick with ReadOnlySequence<byte> - this means we're consuming the non-contiguous buffers in the pipe itself; this is fast, but somewhat limiting - we can't hand that out, because once we Advance the pipe: that data is going to be recycled. This might be a good option for strictly controlled server-side processing (where the data never escapes the request context)
  3. as an extension of 2, we could move the payload parsing code into the library (based on the live ReadOnlySequence<byte>), just exposing the deconstructed data, perhaps using custom structs that map to the scenario; efficient, but requires lots more knowledge of the contents than a general message-passing API allows; this might be a good option if you can pair the library with a serializer that accepts input as ReadOnlySequence<byte>, though - allowing the serializer to work on the data without any copies
  4. we could return a Memory<byte> to a copy of the data, perhaps using an oversized byte[] from the ArrayPool<byte>.Shared pool; but it isn't necessarily obvious to the consumer that they should return it to the pool (and indeed: getting a T[] array back from a Memory<T> is an advanced and "unsafe" operation - not all Memory<T> is based on T[] - so we really shouldn't encourage users to try)
  5. we could compromise by returning something that provides a Memory<byte> (or Span<byte> etc), but which makes it very obvious via a well-known API that the user is meant to do something when they're done with it - i.e. IDisposable / using - and have the exchange-type itself return things to the pool when Dispose() is called

In the context of a general purpose messaging API, I think that 5 is a reasonable option - it means the caller can store the data for some period while they work with it, without jamming the pipe, while still allowing us to make good use of the array pool. And if someone forgets the using, it is less efficient, but nothing will actually explode - it just means it'll tend to run a bit more like option 1. But: this decision of exchange types needs careful consideration for your scenario. The StackExchange.Redis client uses option 3, handing out deconstructed data; I also have a fake redis server using the StackExchange.Redis framing code, which uses option 2 - never allowing live escape a request context. You need to take time in considering your exchange types, because it is basically impossible to change this later!

As a pro tip for option 2 (using live ReadOnlySequence<byte> data and not letting it escape the context - zero-copy for maxiumum efficiency), one way to guarantee this is to wrap the buffer in a domain-specific ref struct before handing it to the code that needs to consume it. It is impossible to store a ref struct, which includes holding onto it in an async/await context, and includes basic reflection (since that requires "boxing", and you cannot "box" a ref struct) - so you have confidence that when the method completes, they no longer have indirect access to the data.

But, let's assume we're happy with option 5 (for this specific scenario - there is no general "here's the option you should use", except: not 1 if you can help it). What might that look like? It turns out that this intent is already desribed in the framework, as System.Buffers.IMemoryOwner<T>:

public interface IMemoryOwner<T> : IDisposable
{
    Memory<T> Memory { get; }
}

We can then implement this to put our leased arrays back into the array-pool when disposed, taking care to be thread-safe so that if it is disposed twice, we don't put the array into the pool twice (very bad):

private sealed class ArrayPoolOwner<T> : IMemoryOwner<T>
{
    private readonly int _length;
    private T[] _oversized;

    internal ArrayPoolOwner(T[] oversized, int length)
    {
        _length = length;
        _oversized = oversized;                
    }

    public Memory<T> Memory => new Memory<T>(GetArray(), 0, _length);

    private T[] GetArray() =>
        Interlocked.CompareExchange(ref _oversized, null, null)
        ?? throw new ObjectDisposedException(ToString());

    public void Dispose()
    {
        var arr = Interlocked.Exchange(ref _oversized, null);
        if (arr != null) ArrayPool<T>.Shared.Return(arr);
    }
}

The key point here is in Dispose(), where it swaps out the array field (using Interlocked.Exchange), and puts the array back into the pool. Once we've done this, subsequent calls to .Memory will fail, and calls to Dispose() will do nothing.

Some important things to know about the array pool:

  1. the arrays it gives you are often oversized (so that it can give you a larger array if it doesn't have one in exactly your size, but it has a larger one ready to go). This means we need to track the expected length (_length), and use that when constructing .Memory.
  2. the array is not zeroed upon fetch - it can contain garbage. In our case, this isn't a problem because (below) we are immediately going to overwrite it with the data we want to represent, so the external caller will never see this, but in the general case, you might want to consider a: should I zero the contents on behalf of the receiver before giving it to them?, and b: is my data sensitive such that I don't want to accidentally leak it into the pool? (there is an existing "zero when returning to the pool" option in the array-pool, for this reason)

As a side note, I wonder whether the above concept might be a worthy addition inside the framework itself, for usage directly from ArrayPool<T> - i.e. a method like IMemoryOwner<T> RentOwned(int length) alongside T[] Rent(int minimumLength) - perhaps with the additions of flags for "zero upon fetch" and "zero upon return".

The idea here is that passing an IMemoryOwner<T> expresses a transfer of ownership, so a typical usage might be:

void DoSomethingWith(IMemoryOwner<byte> data)
{
    using (data)
    {
        // ... other things here ...
        DoTheThing(data.Memory);
    }
    // ... more things here ...
}

The caller doesn't need to know about the implementation details (array-pool, etc). Note that we still have to allocate a small object to represent this, but this is still hugely preferable to allocating a large byte[] buffer each time, for our safety.

As a caveat, we should note that a badly written consumer could store the .Memory somewhere, which would lead to undefined behaviour after it has been disposed; or they could use MemoryMarshal to get an array from the memory. If we really needed to prevent these problems, we could do so by implementing a custom MemoryManager<T> (most likely, by making ArrayPoolOwner<T> : MemoryManager<T>, since MemoryManager<T> : IMemoryOwner<T>). We could then make .Span fail just like .Memory does above, and we could prevent MemoryMarshal from being able to obtain the underlying array. It is almost certainly overkill here, but it is useful to know that this option exists, for more extreme scenarios.

At this point you're probably thinking "wow, Marc, you're really over-thinking this - just give them the data", but: getting the exchange types right is probably the single most important design decision you have to make, so: this bit matters!

OK, so how would we populate this? Fortunately, that is pretty simple, as ReadOnlySequence<T> has a very handy CopyTo method that does all the heavy lifting:

public static IMemoryOwner<T> Lease<T>(
    this ReadOnlySequence<T> source)
{
    if (source.IsEmpty) return Empty<T>();

    int len = checked((int)source.Length);
    var arr = ArrayPool<T>.Shared.Rent(len);
    source.CopyTo(arr);
    return new ArrayPoolOwner<T>(arr, len);
}

This shows how we can use ArrayPool<T> to obtain a (possibly oversized) array that we can use to hold a copy of the data; once we've copied it, we can hand the copy to a consumer to use however they need (and being a flat vector here makes it simple to consume), while the network code can advance the pipe and discard / re-use the buffers. When they Dispose() it, it goes back in the pool, and everyone is happy.

Starting the base API

There is a lot of overlap in the code between a client and server; both need thread-safe mechanisms to write data, and both need some kind of read-loop to check for received data; but what happens is different. So - it sounds like a a base-class might be useful; let's start with a skeleton API that let's us hand in a pipe (or two: recall that an IDuplexPipe is actually the ends of two different pipes - .Input and .Output):

public abstract class SimplPipeline : IDisposable
{
    private IDuplexPipe _pipe;
    protected SimplPipeline(IDuplexPipe pipe)
        => _pipe = pipe;

    public void Dispose() => Close();
    public void Close() {/* burn the pipe*/}
}

The first thing we need after this is some mechanism to send a message in a thread-safe way that doesn't block the caller unduly. The way SimplSockets handles this (and also how StackExchange.Redis v1 works) is to have a message queue of messages that have not yet been written. When the caller calls Send, the messages is added to the queue (synchronized, etc), and will at some point be dequeued and written to the socket. This helps with perceived performance and can help avoid packet fragmentation in some scenarios, but:

  • it has a lot of moving parts
  • it duplicates something that "pipelines" already provides

For the latter, specifically: the pipe is the queue; meaning: we already have a buffer of data between the actual output. Adding a second queue is just duplicating this and retaining complexity, so: the second major design change we can make is: throw away the unsent queue; just write to the pipe (synchronized, etc), and let the pipe worry about the rest. One slight consequence of this is that the v1 code had a concept of prioritising messages that are expecting a reply - essentially queue-jumping. By treating the pipe as the outbound queue we lose this ability, but in reality this is unlikely to make a huge difference, so I'm happy to lose it. For very similar reasons, StackExchange.Redis v2 loses the concept of CommandFlags.HighPriority, which is this exact same queue-jumping idea. I'm not concerned by this.

We also need to consider the shape of this API, to allow a server or client to add a messagee

  • we don't necessarily want to be synchronous; we don't need to block while waiting to access to write to the pipe, or while waiting for a response from the server
  • we might want to expose alternate APIs for whether the caller is simply giving us memory to write (ReadOnlyMember<byte>), or giving us owneship of the data, for us to clean up when we've written it (IMemoryOwner<byte>)
  • let's assume that write and read are decoupled - we don't want to worry about the issues of response messages here

So; putting that together, I quite like:

protected async ValueTask WriteAsync(
    IMemoryOwner<byte> payload, int messageId)
{
    using (payload)
    {
        await WriteAsync(payload.Memory, messageId);
    }
}
protected ValueTask WriteAsync(
    ReadOnlyMemory<byte> payload, int messageId);

Here we're giving the caller the conveninence of passing us either an IMemoryOwner<byte> (which we then dispose correctly), or a ReadOnlyMemory<byte> if they don't need to convery ownership.

The ValueTask makes sense because a write to a pipe is often synchronous; we probably won't be contested for the single-writer access, and the only async part of writing to a pipe is flushing if the pipe is backed up (flushing is very often always synchronous). The messageId is the additional metadata in the frame header that lets us pair replies later. We'll worry about what it is in a bit.

Writes and wrongs

So; let's implement that. The first thing we need is guaranteed single-writer access. It would be tempting to use a lock, but lock doesn't play well with async (even if you don't screw it up). Because the flush may be async, the continuation could come back on another thread, so we need an async-compatible locking primitive; SemaphoreSlim should suffice.

Next, I'm going to go off on one of my wild tangents. Premise:

In general, application code should be optimized for readability; library code should be optimized for performance.

You may or may not agree with this, but it is the general guidance that I code by. What I mean by this is that library code tends to have a single focused purpose, often being maintained by someone whose experience may be "deep but not necessarily wide"; your mind is focusing on that one area, and it is OK to go to bizarre lengths to optimize the code. Conversely, application code tends to involve a lot more plumbing of different concepts - "wide but not necessarily deep" (the depth being hidden in the various libraries). Application code often has more complex and unpredictable interactions, so the focus should be on maintainable and "obviously right".

Basically, my point here is that I tend to focus a lot on optimizations that you wouldn't normally put into application code, because I know from experience and extensive benchmarking that they really matter. So... I'm going to do some things that might look odd, and I want you to take that journey with me.

Let's start with the "obviously right" implementation:

private readonly SemaphoreSlim _singleWriter
    = new SemaphoreSlim(1);
protected async ValueTask WriteAsync(
    ReadOnlyMemory<byte> payload, int messageId)
{
    await _singleWriter.WaitAsync();
    try
    {
        WriteFrameHeader(writer, payload.Length, messageId);
        await writer.WriteAsync(payload);
    }
    finally
    {
        _singleWriter.Release();
    }
}

This awaits single-writer access to the pipe, writes the frame header using WriteFrameHeader (which we'll show in a bit), then drops the payload using the framework-provided WriteAsync method, noting that this includes the FlushAsync as well. There's nothing wrong with this code, but... it does involve unnecessary state machine plumbing in the most likely case - i.e. where everything completes synchronously (the writer is not contested, and the pipe is not backed up). We can tweak this code by asking:

  • can I get the single-writer access uncontested?
  • was the flush synchronous?

Consider, instead - making the method we just wrote private and renaming it to WriteAsyncSlowPath, and adding a non-async method instead:

protected ValueTask WriteAsync(
    ReadOnlyMemory<byte> payload, int messageId)
{
    // try to get the conch; if not, switch to async
    if (!_singleWriter.Wait(0))
        return WriteAsyncSlowPath(payload, messageId);
    bool release = true;
    try
    {
        WriteFrameHeader(writer, payload.Length, messageId);
        var write = writer.WriteAsync(payload);
        if (write.IsCompletedSuccessfully) return default;
        release = false;
        return AwaitFlushAndRelease(write);
    }
    finally
    {
        if (release) _singleWriter.Release();
    }
}
async ValueTask AwaitFlushAndRelease(
    ValueTask<FlushResult> flush)
{
    try { await flush; }
    finally { _singleWriter.Release(); }
}

The Wait(0) returns true if and only if we can take the semaphore synchronously without delay. If we can't: all bets are off, just switch to the async version. Note once you've gone async, there's no point doing any more of these "hot path" checks - you've already built a state machine (and probably boxed it): the meal is already paid for, so you might as well sit and eat.

However, if we do get the semaphore for free, we can continue and do our writing for free. The header is synchronous anyway, so our next decision is: did the flush complete synchronously? If it did (IsCompletedSuccessfully), we're done - away we go (return default;). Otherwise, we'll need to await the flush. Now, we can't do that from our non-async method, but we can write a separate method (AwaitFlushAndRelease) that takes our incomplete flush, and awaits it. In particular, note that we only want the semaphore to be released after the flush has completed, hence the Release() in our helper method. This is also why we set release to false in the calling method, so it doesn't get released prematurely.

We can apply similar techniques to most async operations if we know they're going to often be synchronous, and it is a pattern you may wish to consider. Emphasis: it doesn't help you at all if the result is usually or always genuinely asynchronous - so: don't over-apply it.


Right; so - how do we write the header? What is the header? SimplSockets defines the header to be 8 bytes composed of two little-endian 32-bit integers. The first 4 bytes contains the payload length in bytes; the second 4 bytes is the messageId used to correlate requests and responses. Writing this is remarkably simple:

void WriteFrameHeader(PipeWriter writer, int length, int messageId)
{
    var span = writer.GetSpan(8);
    BinaryPrimitives.WriteInt32LittleEndian(
        span, length);
    BinaryPrimitives.WriteInt32LittleEndian(
        span.Slice(4), messageId);
    writer.Advance(8);
}

You can ask a PipeWriter for "reasonable" sized buffers with confidence, and 8 bytes is certainly a reasonable size. The helpful framework-provided BinaryPrimitives type provides explicit-endian tools, perfect for network code. The first call writes length to the first 4 bytes of the span. After that, we need to Slice the span so that the second call writes to the next 4 bytes - and finally we call Advance(8) which commits our header to the pipe without flushing it. Normally, you might have to write lots of pieces manually, then call FlushAsync explicitly, but this particular protocol is a good fit for simply calling WriteAsync on the pipe to attach the payload, which includes the flush. So; putting those pieces together, we've successfully written our message to the pipe.

Using that from a client

We have a WriteAsync method in the base class; now let's add a concrete client class and start hooking pieces together. Consider:

public class SimplPipelineClient : SimplPipeline
{
    public async Task<IMemoryOwner<byte>> SendReceiveAsync(ReadOnlyMemory<byte> message)
    {
        var tcs = new TaskCompletionSource<IMemoryOwner<byte>>();
        int messageId;
        lock (_awaitingResponses)
        {
            messageId = ++_nextMessageId;
            if (messageId == 0) messageId = 1;
            _awaitingResponses.Add(messageId, tcs);
        }
        await WriteAsync(message, messageId);
        return await tcs.Task;
    }
    public async Task<IMemoryOwner<byte>> SendReceiveAsync(IMemoryOwner<byte> message)
    {
        using (message)
        {
            return await SendReceiveAsync(message.Memory);
        }
    }
}

where _awaitingResponses is a dictionary of int message-ids to TaskCompletionSource<IMemoryOwner<byte>>. This code invents a new messageId (avoiding zero, which we'll use as a sentinel value), and creates a TaskCompletionSource<T> to represent our in-progress operation. Since this definitely will involve network access, there's no benefit in exposing it as ValueTask<T>, so this works well. Once we've added our placeholder for catching the reply we write our message (always do book-keeping first, to avoid race conditions). Finally, expose the incomplete task to the caller.

Note that I've implemented this the "obvious" way, but we can optimize this like we did previously, by checking if WriteAsync completed synchronously and simply returning the tcs.Task without awaiting it. Note also that SimplSockets used the calling thread-id as the message-id; this works fine in a blocking scenario, but it isn't viable when we're using async - but: the number is opaque to the "other end" anyway - all it has to do is return the same number.

Programmed to receive

That's pretty-much it for write; next we need to think about receive. As mentioned in the previous posts, there's almost always a receive loop - especially if we need to support out-of-band and out-of-order messages (so: we can't just read one frame immediately after writing). A basic read loop can be approximated by:

protected async Task StartReceiveLoopAsync(
   CancellationToken cancellationToken = default)
{
   try
   {
       while (!cancellationToken.IsCancellationRequested)
       {
           var readResult = await reader.ReadAsync(cancellationToken);
           if (readResult.IsCanceled) break;

           var buffer = readResult.Buffer;

           var makingProgress = false;
           while (TryParseFrame(ref buffer, out var payload, out var messageId))
           {
               makingProgress = true;
               await OnReceiveAsync(payload, messageId);
           }
           reader.AdvanceTo(buffer.Start, buffer.End);
           if (!makingProgress && readResult.IsCompleted) break;
       }
       try { reader.Complete(); } catch { }
   }
   catch (Exception ex)
   {
       try { reader.Complete(ex); } catch { }
   }
}
protected abstract ValueTask OnReceiveAsync(
   ReadOnlySequence<byte> payload, int messageId);

Note: since we are bound to have an async delay at some point (probably immediately), we might as well just jump straight to an "obvoious" async implementation - we'll gain nothing from trying to be clever here. Key points to observe:

  • we get data from the pipe (note that we might want to also consider TryRead here, but only if we are making progress - otherwise we could find ourselves in a hot loop)
  • read (TryParseFrame) and process (OnReceiveAsync) as many frames as we can
  • advance the reader to report our progress, noting that TryParseFrame will have updated buffer.Start, and since we're actively reading as many frames as we can, it is true to say that we've "inspected" to buffer.End
  • keep in mind that the pipelines code is dealing with all the back-buffer concerns re data that we haven't consumed yet (usually a significant amount of code repeated in lots of libraries)
  • check for exit conditions - if we aren't progressing and the pipe won't get any more data, we're done
  • report when we've finished reading - through success or failure

Unsurprisingly, TryParseFrame is largely the reverse of WriteAsync:

private bool TryParseFrame(
    ref ReadOnlySequence<byte> input,
    out ReadOnlySequence<byte> payload, out int messageId)
{
    if (input.Length < 8)
    {   // not enough data for the header
        payload = default;
        messageId = default;
        return false;
    }

    int length;
    if (input.First.Length >= 8)
    {   // already 8 bytes in the first segment
        length = ParseFrameHeader(
            input.First.Span, out messageId);
    }
    else
    {   // copy 8 bytes into a local span
        Span<byte> local = stackalloc byte[8];
        input.Slice(0, 8).CopyTo(local);
        length = ParseFrameHeader(
            local, out messageId);
    }

    // do we have the "length" bytes?
    if (input.Length < length + 8)
    {
        payload = default;
        return false;
    }

    // success!
    payload = input.Slice(8, length);
    input = input.Slice(payload.End);
    return true;
}

First we check whether we have enough data for the frame header (8 bytes); if we don't have that - we certainly don't have a frame. Once we know we have enough bytes for the frame header, we can parse it out to find the payload length. This is a little subtle, because we need to recall that ReadOnlySequence<byte> can be discontiguous multiple buffers. Since we're only talking about 8 bytes, the simplest thing to do is:

  • check whether the first segment has 8 bytes; if so, parse from that
  • otherwise, stackalloc a span (note that this doesn't need unsafe), copy 8 bytes from input into that, and parse from there.

Once we know how much payload we're expecting, we can check whether we have that too; if we don't: cede back to the read loop. But if we do:

  • our actual payload is the length bytes after the header - i.e. input.Slice(8, length)
  • we want to update input by cutting off everything up to the end of the frame, i.e. input = input.Slice(payload.End)

This means that when we return true, payload now contains the bytes that were sent to us, as a discontiguous buffer.

We should also take a look at ParseFrameHeader, which is a close cousin to WriteFrameHeader:

static int ParseFrameHeader(
    ReadOnlySpan<byte> input, out int messageId)
{
    var length = BinaryPrimitives
            .ReadInt32LittleEndian(input);
    messageId = BinaryPrimitives
            .ReadInt32LittleEndian(input.Slice(4));
    return length;
}

Once again, BinaryPrimitives is helping us out, and we are slicing the input in exactly the same way as before to get the two halves.


So; we can parse frames; now we need to act upon them; here's our client implementation:

protected override ValueTask OnReceiveAsync(
    ReadOnlySequence<byte> payload, int messageId)
{
    if (messageId != 0)
    {   // request/response
        TaskCompletionSource<IMemoryOwner<byte>> tcs;
        lock (_awaitingResponses)
        {
            if (_awaitingResponses.TryGetValue(messageId, out tcs))
            {
                _awaitingResponses.Remove(messageId);
            }
        }
        tcs?.TrySetResult(payload.Lease());
    }
    else
    {   // unsolicited
        MessageReceived?.Invoke(payload.Lease());
    }
    return default;
}

This code has two paths; it can be the request/response scenario, or it can be an out-of-band response message with no request. So; if we have a non-zero messageId, we check (synchronized) in our _awaitingResponses dictionary to see if we have a message awaiting completion. If we do, we use TrySetResult to complete the task (after exiting the lock), giving it a lease with the data from the message. Otherwise, we check whether the MessageReceived event is subscribed, and invoke that similarly. In both cases, the use of ?. here means that we don't populate a leased array if nobody is listening. It will be the receiver's job to ensure the lease is disposed, as only they can know the lifetime.

Service, please

We need to think a little about how we orchestrate this at the server. The SimplPipeline base type above relates to a single connection - it is essentially a proxy to a socket. But servers usually have many clients. Because of that, we'll create a server type that does the actual processing, that internally has a client-type that is our SimplPipeline, and a set of connected clients; so:

public abstract class SimplPipelineServer : IDisposable
{
    protected abstract ValueTask<IMemoryOwner<byte>> 
        OnReceiveForReplyAsync(IMemoryOwner<byte> message);
    
    public int ClientCount => _clients.Count;
    public Task RunClientAsync(IDuplexPipe pipe,
        CancellationToken cancellationToken = default)
        => new Client(pipe, this).RunAsync(cancellationToken);
    
    private class Client : SimplPipeline
    {
        public Task RunAsync(CancellationToken cancellationToken)
            => StartReceiveLoopAsync(cancellationToken);

        private readonly SimplPipelineServer _server;
        public Client(IDuplexPipe pipe, SimplPipelineServer server)
            : base(pipe) => _server = server;

        protected override async ValueTask OnReceiveAsync(
            ReadOnlySequence<byte> payload, int messageId)
        {
            using (var msg = payload.Lease())
            {
                var response = await _server.OnReceiveForReplyAsync(msg);
                await WriteAsync(response, messageId);
            }
        }
    }
}

So; our publicly visible server type, SimplPipelineServer has an abstract method for providing the implementation for what we want to do with messages: OnReceiveForReplyAsync - that takes a payload, and returns the response. Behind the scenes we have a set of clients, _clients, although the details of that aren't interesting.

We accept new clients via the RunClientAsync method; this might seem counter-intuitive, but the emerging architecture for pipelines servers (especially considering "Kestrel" hosts) is to let an external host deal with listening and accepting connections, and all we need to do is have something that accepts an IDuplexPipe and returns a Task. In this case, what that does is create a new Client and start the client's read loop, StartReceiveLoopAsync. When the client receives a message (OnReceiveAsync), it asks the server for a response (_server.OnReceiveForReplyAsync), and then writes that response back via WriteAsync. Note that the version of OnReceiveAsync shown has the consequence of meaning that we can't handle multiple overlapped messages on the same connection at the same time; the "real" version has been aggressively uglified, to check whether _server.OnReceiveForReplyAsync(msg) has completed synchronously; if it hasn't, then it schedules a continuation to perform the WriteAsync (also handling the disposal of msg), and yields to the caller. It also optimizes for the "everything is synchronous" case.

The only other server API we need is a broadcast:

public async ValueTask<int> BroadcastAsync(
    ReadOnlyMemory<byte> message)
{
    int count = 0;
    foreach (var client in _clients)
    {
        try
        {
            await client.Key.SendAsync(message);
            count++;
        }
        catch { } // ignore failures on specific clients
    }
    return count;
}

(again, possibly with an overload that takes IMemoryOwner<byte>)

where SendAsync is simply:

public ValueTask SendAsync(ReadOnlyMemory<byte> message)
    => WriteAsync(message, 0);

Putting it all together; implementing a client and server

So how can we use all of this? How can we get a working client and server? Let's start with the simpler of the two, the client:

using (var client = await SimplPipelineClient.ConnectAsync(
    new IPEndPoint(IPAddress.Loopback, 5000)))
{
    // subscribe to broadcasts
    client.MessageReceived += async msg => {
        if (!msg.Memory.IsEmpty)
            await WriteLineAsync('*', msg);
    };

    string line;
    while ((line = await Console.In.ReadLineAsync()) != null)
    {
        if (line == "q") break;

        using (var leased = line.Encode())
        {
            var response = await client.SendReceiveAsync(leased.Memory);
            await WriteLineAsync('<', response);
        }     
    }
}

SimplPipelineClient.ConnectAsync here just uses Pipelines.Sockets.Unofficial to spin up a client socket pipeline, and starts the StartReceiveLoopAsync() method. Taking an additional dependency on Pipelines.Sockets.Unofficial is vexing, but right now there is no framework-supplied client-socket API for pipelines, so: it'll do the job.

This code sets up a simple console client that takes keyboard input; if it receives a "q" it quits; otherwise it sends the message to the server (Encode, not shown, is just a simple text-encode into a leased buffer), and writes the response. The WriteLineAsync method here takes a leased buffer, decodes it, and writes the output to the console - then disposes the buffer. We also listen for unsolicited messages via MessageReceived, and write those to the console with a different prefix.

The server is a little more involved; first we need to implement a server; in this case let's simply reverse the bytes we get:

class ReverseServer : SimplPipelineServer
{
    protected override ValueTask<IMemoryOwner<byte>>
        OnReceiveForReplyAsync(IMemoryOwner<byte> message)
    {
        // since the "message" outlives the response write,
        // we can do an in-place reverse and hand
        // the same buffer back
        var memory = message.Memory;
        Reverse(memory.Span); // details not shown
        return new ValueTask<IMemoryOwner<byte>>(memory);
    }
}

All this does is respond to messages by returning the same payload, but backwards. And yes, I realize that since we're dealing with text, this could go horribly wrong for grapheme-clusters and/or multi-byte code-points! I never said it was a useful server...

Next up, we need a host. Kestrel (the "ASP.NET Core" server) is an excellent choice there, but implementing a Kestrel host requires introducing quite a few more concepts. But... since we already took a dependency on Pipelines.Sockets.Unofficial for the client, we can use that for the server host with a few lines of code:

class SimplPipelineSocketServer : SocketServer
{
    public SimplPipelineServer Server { get; }

    public SimplPipelineSocketServer(SimplPipelineServer server)
        => Server = server;

    protected override Task OnClientConnectedAsync(
        in ClientConnection client)
        => Server.RunClientAsync(client.Transport);

    public static SimplPipelineSocketServer For<T>()
        where T : SimplPipelineServer, new()
        => new SimplPipelineSocketServer(new T());

    protected override void Dispose(bool disposing)
    {
        if (disposing) Server.Dispose();
    }
}

The key line in here is our OnClientConnectedAsync method, which is how we accept new connections, simply by passing down the client.Transport (an IDuplexPipe). Hosting in Kestrel works very similarly, except you subclass ConnectionHandler instead of SocketServer, and override the OnConnectedAsync method - but there are a few more steps involved in plumbing everything together. Kestrel, however, has advantages such as supporting exotic socket APIs.

So, let's whack together a console that interacts with the server:

using (var socket =
    SimplPipelineSocketServer.For<ReverseServer>())
{
    socket.Listen(new IPEndPoint(IPAddress.Loopback, 5000));
    
    string line;
    while ((line = await Console.In.ReadLineAsync()) != null)
    {
        if (line == "q") break;

        int clientCount, len;
        using (var leased = line.Encode())
        {
            len = leased.Memory.Length;
            clientCount = await socket.Server.BroadcastAsync(leased.Memory);
        }
        await Console.Out.WriteLineAsync(
            $"Broadcast {len} bytes to {clientCount} clients");
    }
}

This works much like the client, except any input other than "q" is broadcast to all the clients.

Now race your horses

We're not just doing this for fun! The key obective of things like pipelines and the array-pool is that it makes it much simpler to write IO code that makes efficient use of memory; reducing allocations (and especially reducing large object allocations) signficantly reduces garbage collection overhead, allowing our code to be much more scalable (useful for both servers, and high-throughput client scenarios). Our use of async/await makes it much simpler to make effective use of the CPU: instead of blocking for a while, we can make the thread available to do other useful work - increasing throughput, and once again: reducing memory usage (having lots of threads is not cheap - each thread has a quite significant stack space reserved for it).

Note that this isn't entirely free; fetching arrays from the pool (and remembering to return them) by itself has some overhead - but the general expectation is that the cost of checking the pool is, overall, lower than the cost associated from constant allocations and collections. Similarly, async: the hope is that the increased scalability afforded by freeing up threads more-than-offsets the cost of the additional work required by the plumbing involved.

But: there's only one way to find out. As Eric Lippert puts it:

If you have two horses and you want to know which of the two is the faster then race your horses

Setting up a good race-track for code can be awkward, because we need to try to reproduce a meaningful scenario. And it is amazingly easy to write bad performnce tests. Rather than reinvent bad code, it is hugely adviseable to lean on tools like BenchmarkDotNet. If you are even remotely performance minded, and you haven't used BenchmarkDotNet: sorry, but you're doing it wrong.

There are 4 combinations we can check here:

  • SimplSocketClient against SimplSocketServer
  • SimplSocketClient against SimplPipelineServer
  • SimplPipelineClient against SimplSocketServer
  • SimplPipelineClient against SimplPipelineServer

I won't list all of these, but for these tests I'll use a [GlobalSetup] method (a BenchmarkDotNet concept) to spin up both servers (on different ports), then we can test clients against each. Here's our "SimplSocketClient against SimplSocketServer" test (remembering that SimplSocketClient is synchronous):

[Benchmark(OperationsPerInvoke = Ops)]
public long c1_s1()
{
    long x = 0;
    using (var client = new SimplSocketClient(CreateSocket))
    {
        client.Connect(s1);
        for (int i = 0; i < Ops; i++)
        {
            var response = client.SendReceive(_data);
            x += response.Length;
        }
    }
    return AssertResult(x);
}

and here's our "SimplPipelineClient against SimplPipelineServer" test (using a Task this time, as SimplPipelineClient uses an async API):

[Benchmark(OperationsPerInvoke = Ops)]
public async Task<long> c2_s2()
{
    long x = 0;
    using (var client =
        await SimplPipelineClient.ConnectAsync(s2))
    {
        for (int i = 0; i < Ops; i++)
        {
            using (var response =
                await client.SendReceiveAsync(_data))
            {
                x += response.Memory.Length;
            }
        }
    }
    return AssertResult(x);
}

Note that we're performing multiple operations (Ops) per run here, so we're not just measing overheads like connect. Other than that, we'll just let BenchmarkDotNet do the hard work. We run our tests, and we get (after some time; benchmarking isn't always fast, although you can make suggestions on the iterations etc to speed it up if you want):

Method Runtime Mean Error StdDev Gen 0 Gen 1
c1_s1 Clr NA NA NA N/A N/A
c1_s2 Clr NA NA NA N/A N/A
c2_s1 Clr NA NA NA N/A N/A
c2_s2 Clr 45.99us 0.4275us 0.2544us 0.3636 0.0909
c1_s1 Core NA NA NA N/A N/A
c1_s2 Core NA NA NA N/A N/A
c2_s1 Core NA NA NA N/A N/A
c2_s2 Core 29.87us 0.2294us 0.1518us 0.1250 -

Now, you're probaly looking at that table and thinking "huh? most of the data is missing - how can interpret that?" - and: you wouldn't be wrong! It turns out that the c1 (SimplSocketClient) and s1 (SimplSocketServer) implementations are simply unreliable. Ultimately, it was painfully hard to write reliable socket code before pipelines, and it looks like the legacy implementation simply has bugs and race conditions that don't show up in casual usage (it works fine in the REPL client), but which manifest pretty quickly when BenchmarkDotNet runs it aggressively. Our "pipelines" implementation simply used the "obvious" thing, and it works reliably first time. All of the complex pieces that IO authors previously had to worry about have now moved to the framework code, which enables programmers to focus on the interesting thing that they're trying to do (rather than spending most of their time fighting with IO intrinsics), and benefit from a reliable well-tested implementation of the ugly IO code.

A major advantage of moving to pipelines is getting rid of the gnarly IO bugs that you didn't even know you had.

I will be more than happy to update this table with updated numbers if SimplSockets can find the things that are stalling it.

Of the numbers that we do have, we can see that it behaves well on Clr (.NET Framework) but works much better on Core (.NET Core). .NET Core 2.1 is frankly amazing (and 3.0 looks even better) - with lots of advantages. If you're serious about performance, migrating to .NET Core should definitely be on your roadmap.

Summary

This has been a long read, but I hope I've conveyed some useful practical advice and tips for working with pipelines in real systems, in a way that is directly translatable to your own requirements. If you want to play with the code in more depth, or see it in action, you can see my fork here.

Update: please also see part 3.1 for further clarifications on this post

Enjoy!