Tuesday, 3 July 2018

Pipe Dreams, part 2

Pipelines - a guided tour of the new IO API in .NET, part 2

In part 1, we discussed some of the problems that exist in the familiar Stream API, and we had an introduction to the Pipe, PipeWriter and PipeReader APIs, looking at how to write to a single Pipe and then consume the data from that Pipe; we also discussed how FlushAsync() and ReadAsync() work together to keep both sides of the machinery working, dealing with "empty" and "full" scenarios - suspending the reader when there is nothing to do, and resuming it when data arrives; and suspending the writer when it is out-pacing the reader (over-filling the pipe), and resuming when the reader has caught up; and we discussed what it means to "resume" here, in terms of the threading model.

In this part, we're going to discuss the memory model of pipelines: where does the data actually live?. We'll also start looking at how we can use pipelines in realistic scenarios to fulfil real needs.

The memory model: where are all my datas?

In part 1, we spoke about how the pipe owns all the buffers, allowing the writer to request a buffer via GetMemory() and GetSpan(), with the committed data later being exposed to the reader via the .Buffer on ReadAsync() - which is a ReadOnlySequence<byte>, i.e some number of segments of data.

So what actually happens?

Each Pipe instance has a reference to a MemoryPool<byte> - a new device in System.Memory for, unsurprisingly, creating a memory pool. You can specify a specific MemoryPool<byte> in the options when creating a Pipe, but by default (and, I imagine, almost always) - a shared application-wide pool (MemoryPool<byte>.Shared) is used.

The MemoryPool<byte> concept is very open-ended. The default implementation simply makes use of ArrayPool<byte>.Shared (the application wide array-pool), renting arrays as needed, and returning them when done. This ArrayPool<T> is implemented using WeakReference, so pooled arrays are collectible if memory pressure demands it. However, when you ask GetMemory(someSize) or GetSpan(someSize), it doesn't simply ask the memory pool for that amount; instead, it tracks a "segment" internally. A new "segment" will be (by default, configurable) the larger of someSize or 2048 bytes. Requesting a non-trivial amount of memory means that we aren't filling the system with tiny arrays, which would significantly impact garbage collection. When you Advance(bytesWritten) in the writer, it:

  • moves an internal counter that is how much of the current segment has been used
  • updates the end of the "available to be read" chain for the reader; if we've just written the first bytes of an empty segment, this will mean adding a new segment to the chain, otherwise it'll mean increasing the end marker of the final segment of the existing chain

It is this "available to be read" chain that we fetch in ReadAsync(); and as we AdvanceTo in the reader - when entire segments are consumed, the pipe hands those segments back to the memory pool. From there, they can be reused many times. And as a direct consequence of the two points above, we can see that most of the time, even with multiple calls to Advance in the writer, we may end up with a single segment in the reader, with multiple segments happening either at segment boundaries, or where the reader is falling behind the writer, and data is starting to accumulate.

What this achieves just using the default pool is:

  • we don't need to keep allocating every time we call GetMemory() / GetSpan()
  • we don't need a separate array per GetMemory() / GetSpan() - we'll often just get a different range of the same "segment"
  • a relatively small number of non-trivial buffer arrays are used
  • they are automatically recycled without needing lots of library code
  • when not being used, they are available for garbage collection

This also explains why the approach of requesting a very small amount in GetMemory() / GetSpan() and then checking the size can be so successful: we have access to the rest of the unused part of the current segment. Meaning: with a segment size of 2048, of which 200 bytes were already used by previous writes - even if we only ask for 5 bytes, we'll probably find we have 1848 bytes available to play with. Or possibly more - remember that obtaining an array from ArrayPool<T>.Shared is also an "at least this big" operation.

Zero copy buffers

Something else to notice in this setup is that we get data buffering without any data copying. The writer asked for a buffer, and wrote the data to where it needed to be the first time, on the way in. This then acted as a buffer between the writer and the reader without any need to copy data around. And if the reader couldn't process all the data yet, it was able to push data back into the pipe simply by saying explicitly what it did consume. There was no need to maintain a separate backlog of data for the reader, something that is very common in protocol processing code using Stream.

It is this combination of features that makes the memory aspect of pipeline code so friendly. You could do all of this with Stream, but it is an excruciating amount of error-prone code to do it, and even more if you want to do it well - and you'd pretty much have to implement it separately for each scenario. Pipelines makes good memory handling the default simple path - the pit of success.

More exotic memory pools

You aren't limited to the memory model discussed; you can implement your own custom memory pool! The advantage of the default pool is that it is simple. In particular, it doesn't really matter if we aren't 100% perfect about returning every segment - if we somehow drop a pipe on the floor, the worst that can happen is that the garbage collector collects the abandoned segments at some point. They won't go back into the pool, but that's fine.

You can, however, do much more interesting things. Imagine, for example, a MemoryPool<byte> that takes huge slabs of memory - either managed memory via a number of very large arrays, or unmanaged memory via Marshal.AllocHGlobal (note that Memory<T> and Span<T> are not limited to arrays - all they require is some kind of contiguous memory), leasing blocks of this larger chunk as required. This has great potential, but it becomes increasingly important to ensure that segments are reliably returned. Most systems shouldn't need this, but it is good that the flexibility is offered.

Useful pipes in real systems

The example that we used in part 1 was of a single Pipe that was written and read by the same code. That's clearly not a realistic scenario (unless we're trying to mock an "echo" server), so what can we do for more realisting scenarios? First, we need to connect our pipelines to something. We don't usually want a Pipe in isolation; we want a pipe that integrates with a common system or API. So; let's start by seeng what this would look like.

Here we need a bit of caveat and disclaimer: the pipelines released in .NET Core 2.1 do not include any endpoint implementations. Meaning: the Pipe machinery is there, but nothing is shipped inside the box that actually connects pipes with any other existing systems - like shipping the abstract Stream base-type, but without shipping FileStream, NetworkStream, etc. Yes, that sounds frustrating, but it was a pragmatic reality of time constraints. Don't panic! There are... "lively" conversations going on right now about which bindings to implement with which priority; and there are few community offerings to bridge the most obvious gaps for today.

Since we find ourselves in that position, we might naturally ask: "what does it take to connect pipelines to another data backend?".

Perhaps a good place to start would be connecting a pipe to a Stream. I know what you're thinking: "Marc, but in part 1 you went out of your way to say how terrible Stream is!". I haven't changed my mind; it isn't necessarily ideal - for any scenario-specific Stream implementation (such as NetworkStream or FileStream) we could have a dedicated pipelines-based endpoint that talked directly to that service with minimal indirection; but it is a useful first step:

  • it gives us immediate access to a huge range of API surfaces - anything that can expose data via Stream, and anything that can act as a middle-layer via wrapped streams (encryption, compression, etc)
  • it hides all the wrinkly bits of the Stream API behind a clear unambiguous surface
  • it gives us almost all of the advantages that we have mentioned so far

So, let's get started! The first thing we need to think about is: what is the direction here? As previously mentioned, a Stream is ambiguous - and could be read-only, write-only, or read-write. Let's assume we want to deal with the most general case: a read-write stream that acts in a duplex manner - this will give us access to things like sockets (via NetworkStream). This means we're actually going to want two pipes - one for the input, one for the output. Pipelines helps clear this up for us, by declaring an interface expressly for this: IDuplexPipe. This is a very simple interface, and being handed an IDuplexPipe is analogous to being handed the ends of two pipes - one marked "in", one marked "out":

interface IDuplexPipe
{
    PipeReader Input { get; }
    PipeWriter Output { get; }
}

What we want to do, then, is create a type that implements IDuplexPipe, but using 2 Pipe instances internally:

  • one Pipe will be the output buffer (from the consumer's perspective), which will be filled by caller-code writing to Output - and we'll have a loop that consumes this Pipe and pushes the data into the underlying Stream (to be written to the network, or whatever the stream does)
  • one Pipe will be the input buffer (from the consumer's perspective); we'll have a loop that reads data from the underlying Stream (from the network, etc) and pushes it into the Pipe, where it will be drained by caller-code reading from Input

This approach immediately solves a wide range of problems that commonly affect people using Stream:

  • we now have input/output buffers that decouple stream access from the read/write caller-code, without having to add BufferedStream or similar to prevent packet fragmentation (for the writing code), and to make it very easy to continue receiving more data while we process it (for the reading code especially, so we don't have to keep pausing while we ask for more data)
  • if the caller-code is writing faster than the stream Write can process, the back-pressure feature will kick in, throttling the caller-code so we don't end up with a huge buffer of unsent data
  • if the stream Read is out-pacing the caller-code that is consuming the data, the back-pressure will kick in here too, throttling our stream read loop so we don't end up with a huge buffer of unprocessed data
  • both the read and write implementations benefit from all the memory pool goodness that we discussed above
  • the caller-code doesn't ever need to worry about backlog of data (incomplete frames), etc - the pipe deals with it

So what might that look like?

Essentially, all we need to do, is something like:

class StreamDuplexPipe : IDuplexPipe
{
    Stream _stream;
    Pipe _readPipe, _writePipe;

    public PipeReader Input => _readPipe.Reader;
    public PipeWriter Output => _writePipe.Writer;
    
    // ... more here
}

Note that we have two different pipes; the caller gets one end of each pipe - and our code will act on the other end of each pipe.

Pumping the pipe

So what does the code look like to interact with the stream? We need two methods, as disccused above. The first - and simplest - has a loop that reads data from the _stream and pushes it to _readPipe, to be consumed by the calling code; the core of this method could be something like

while (true)
{
    // note we'll usually get *much* more than we ask for
    var buffer = _readPipe.Writer.GetMemory(1);
    int bytes = await _stream.ReadAsync(buffer);
    _readPipe.Writer.Advance(bytes);
    if (bytes == 0) break; // source EOF
    var flush = await _readPipe.Writer.FlushAsync();
    if (flush.IsCompleted || flush.IsCanceled) break;
}

This loop asks the pipe for a buffer, then uses the new netcoreapp2.1 overload of Stream.ReadAsync that accepts a Memory<byte> to populate that buffer - we'll discuss what to do if you don't have an API that takes Memory<byte> shortly. When the read is complete, it commits that-many bytes to the pipe using Advance, then it invokes FlushAsync() on the pipe to (if needed) awaken the reader, or pause the write loop while the back-pressure eases. Note we should also check the outcome of the Pipe's FlushAsync() - it could tell us that the pipe's consumer has signalled that they've finished reading the data they want (IsCompleted), or that the pipe itself was shut down (IsCanceled).

Note that in both cases, we want to ensure that we tell the pipe when this loop has exited - however it exits - so that we don't end up with the calling code awaiting forever on data that will never come. Accidents happen, and sometimes the call to _stream.ReadAsync (or any other method) might throw an exception, so a good way to do this is with a try/finally block:

Exception error = null;
try
{
    // our loop from the previous sample
}
catch(Exception ex) { error = ex; }
finally { _readPipe.Writer.Complete(error); }

If you prefer, you could also use two calls to Complete - one at the end of the try (for success) and one inside the catch (for failure).

The second method we need is a bit more complex; we need a loop that consumes data from _writePipe and pushes it to _stream. The core of this could be something like:

while (true)
{
    var read = await _writePipe.Reader.ReadAsync();
    var buffer = read.Buffer;
    if (buffer.IsCanceled) break;
    if (buffer.IsEmpty && read.IsCompleted) break;

    // write everything we got to the stream
    foreach (var segment in buffer)
    {
        await _stream.WriteAsync(segment);
    }
    _writePipe.AdvanceTo(buffer.End);
    await _stream.FlushAsync();    
}

This awaits some data (which could be in multiple buffers), and checks some exit conditions; as before, we can give up if IsCanceled, but the next check is more subtle: we don't want to stop writing just because the producer indicated that they've written everything they wanted to (IsCompleted), or we might not write the last few segments of their data - we need to continue until we've written all their data, so buffer.IsEmpty. This is simplified in this case because we're always writing everything - we'll see a more complex example shortly. Once we have data, we write each of the non-contiguous buffers to the stream sequentially - because Stream can only write one buffer at a time (again, I'm using the netcoreapp2.1 overload here that accepts ReadOnlyMemory<byte>, but we aren't restricted to this). Once it has written the buffers, it tells the pipe that we have consumed the data, and flushes the underlying Stream.

In "real" code we might want to be a bit more aggressive about optimizing to reduce flushing the underlying stream until we know there is no more data readily available, perhaps using the _writePipe.Reader.TryRead(...) method in addition to _writePipe.Reader.ReadAsync() method; this method works similarly to ReadAsync() but is guaranteed to always return synchronously - useful for testing "did the writer append something while I was busy?". But the above illustrates the point.

Additionally, like before we would want to add a try/finally, so that we always call _writePipe.Reader.Complete(); when we exit.

We can use the PipeScheduler to start these two pumps, which will ensure that they run in the intended context, and our loops start pumping data. We'd have a little more house-keeping to add (we'd probably want a mechanism to Close()/Dispose() the underlying stream, etc) - but as you can see, it doesn't have to be a huge task to connect an IDuplexPipe to a source that wasn't designed with pipelines in mind.

Here's one I made earlier...

I've simplified the above a little (not too much, honest) to make it consise for discussion, but you still probably don't want to start copying/pasting chunks from here to try and get it to work. I'm not claiming they are the perfect solution for all situations, but as part of the 2.0 work for StackExchange.Redis, we have implemented a range of bindings for pipelines that we are making available on nuget - unimaginatively titled Pipelines.Sockets.Unofficial (nuget, github); this includes:

  • converting a duplex Stream to an IDuplexPipe (like the above)
  • converting a read-only Stream to a PipeReader
  • converting a write-only Stream to a PipeWriter
  • converting an IDuplexPipe to a duplex Stream
  • converting a PipeReader to a read-only Stream
  • converting a PipeWriter to a writer-only Stream
  • converting a Socket to an IDuplexPipe directly (without going via NetworkStream)

The first six are all available via static methods on StreamConnection; the last is available via SocketConnection.

StackExchange.Redis is very involved in Socket work, so we are very interested in how to connect pipelines to sockets; for redis connections without TLS, we can connect our Socket direct to the pipeline:

  • SocketSocketConnection

For redis connections with TLS (in particular: cloud redis providers), we can connect the pieces thusly:

  • SocketNetworkStreamSslStreamStreamConnection

Both of these configurations give us a Socket at one end, and an IDuplexPipe at the other, and it begins to show how we can orchcestrate pipelines as part of a more complex system. Perhaps more importantly, it gives us room in the future to change the implementation. As examples of future possibilities:

  • Tim Seaward has been working on Leto, which provides TLS capability as an IDuplexPipe directly, without requiring SslStream (and thus: no stream inverters)
  • between Tim Seaward, David Fowler and Ben Adams, there are a range of experimental or in-progress network layers directly implementing pipelines without using managed sockets, including "libuv", "RIO" (Registered IO), and most recently, "magma" - which pushes the entire TCP stack into user code to reduce syscalls.

It'll be interesting to see how this space develops!

But my existing API doesn't talk in Span<byte> or Memory<byte>!

When writing code to pump data from a pipe to another system (such as a Socket), it is very likely you'll bump into APIs that don't take Memory<byte> or Span<byte>. Don't panic, all is not lost! You still have multiple ways of breaking out of that world into something more ... traditional.

The first trick, for when you have a Memory<T> or ReadOnlyMemory<T>, is MemoryMarshal.TryGetArray(...). This takes in a memory and attempts to get an ArraySegment<T> that describes the same data in terms of a T[] vector and an int offset/count pair. Obviously this can only work if the memory was based on a vector, which is not always the case. So this can fail on exotic memory pools. Our second escape hatch is MemoryMarshal.GetReference(...). This takes in a span and returns a reference (actually a "managed pointer", aka ref T) to the start of the data. Once we have a ref T, we can use unsafe C# to get an unmanaged pointer to the data, useful for APIs that talk in such:

Span<byte> span = ...
fixed(byte* ptr = &MemoryMarshal.GetReference(span))
{
    // ...
}

It can still do this if the length of the span is zero, returning a reference to where the zeroth item would have been, and it even works for a default span where there never was any backing memory. This last one requires a slight word of caution because a ref T is not usually expected to be null, but that's exactly what you get here. Essentially, as long as you don't ever try to dereference this kind of null reference: you'll be fine. If you use fixed to convert it to an unmanaged pointer, you get back a null (zero) pointer, which is more expected (and can be useful in some P/Invoke scenarios). MemoryMarshal is essentially synonymous with unsafe code, even if the method you're calling doesn't require the unsafe keyword. It is perfectly valid to use it, but if you use it incorrectly, it reserves the right to hurt you - so just be careful.

What about the app-code end of the pipe?

OK, we've got our IDuplexPipe, and we've seen how to connect the "business end" of both pipes to your backend data service of choice. Now; how do we use it in our app code?

As in our example from part 1, we're going to hand the PipeWriter from IDuplexPipe.Output to our outbound code, and the PipeReader from IDuplexPipe.Input to our inbound code.

The outbound code is typically very simple, and is usually a very direct port to get from Stream-based code to PipeWriter-based. The key difference, once again, is that you don't control the buffers. A typical implementation might look something like:

ValueTask<bool> Write(SomeMessageType message, PipeWriter writer)
{
    // (this may be multiple GetSpan/Advance calls, or a loop,
    // depending on what makes sense for the message/protocol)
    var span = writer.GetSpan(...);
    // TODO: ... actually write the message
    int bytesWritten = ... // from writing
    writer.Advance(bytesWritten);

    return FlushAsync(writer);
}

private static async ValueTask<bool> FlushAsync(PipeWriter writer)
{
    // apply back-pressure etc
    var flush = await writer.FlushAsync();
    // tell the calling code whether any more messages
    // should be written
    return !(flush.IsCanceled || flush.IsCompleted);
}

The first part of Write is our business code - we do whatever we need to write the data to the buffers from writer; typically this will include multiple calls to GetSpan(...) and Advance(). When we've written our message, we can flush it to ensure the pump is active, and apply back-pressure. For very large messages we could also flush at intermediate points, but for most simple scenarios: flushing once per message is fine.

If you're wondering why I split the FlushAsync code into a separate method: that's because I want to await the result of FlushAsync to check the exit conditions, so it needs to be in an async method. The most efficient way to access memory here is via the Span<byte> API, and Span<byte> is a ref struct type; as a consequence we cannot use a Span<byte> local variable in an async method. A pragmatic solution is to simply split the methods, so one method deals with the Span<byte> work, and another method deals with the async aspect.

Random aside: async code, hot synchronous paths, and async machinery overhead

The machinery involved in async / await is pretty good, but it can still be a surprising amount stack work - you can see this on sharplab.io - take a look at the generated machinery for the OurCode.FlushAsync method - and the entirety of struct <FlushAsync>d__0. Now, this code is not terrible - it tries hard to avoid allocations in the synchronous path - but it is unnecessary. There are two ways to signficantly improve this; one is to not await at all, which is often possible if the await is the last line in a method and we don't need to process the results: don't await - just remove the async and return the task - complete or incomplete. We can't do that here, because we need to check the state of the result, but we can optimize for success by checking whether the task is already complete (via .IsCompletedSuccessfully - if it has completed but faulted, we still want to use the await to make sure the exception behaves correctly). If it is successfully completed, we're allowed to access the .Result; so we could also write our FlushAsync method as:

private static ValueTask<bool> Flush(PipeWriter writer)
{
    bool GetResult(FlushResult flush)
        // tell the calling code whether any more messages
        // should be written
        => !(flush.IsCanceled || flush.IsCompleted);

    async ValueTask<bool> Awaited(ValueTask<FlushResult> incomplete)
        => GetResult(await incomplete);

    // apply back-pressure etc
    var flushTask = writer.FlushAsync();

    return flushTask.IsCompletedSuccessfully
        ? new ValueTask<bool>(GetResult(flushTask.Result))
        : Awaited(flushTask);
}

This completely avoids the async/await machinery in the most common case: synchronous completion - as we can see again on sharplab.io. I should emphasize: there's absolutely no point doing this if the code is usually (or exclusively) going to actually be asynchronous; it only helps when the result is usually (or exclusively) going to be available synchronously.

And what about the reader?

As we've seen many times, the reader is often slightly more complicated - we can't know that a single "read" operation will contain exactly one inbound message. We may need to loop until we have all the data we need, and we may have additional data that we need to push back. So let's assume we want to consume a single message of some kind:

async ValueTask<SomeMessageType> GetNextMessage(
    PipeReader reader,
    CancellationToken cancellationToken = default)
{
    while (true)
    {
        var read = await reader.ReadAsync(cancellationToken);
        if (read.IsCanceled) ThrowCanceled();

        // can we find a complete frame?
        var buffer = read.Buffer;
        if (TryParseFrame(
            buffer,
            out SomeMessageType nextMessage,
            out SequencePosition consumedTo))
        {
            reader.AdvanceTo(consumedTo);
            return nextMessage;
        }
        reader.AdvanceTo(buffer.Start, buffer.End);
        if (read.IsCompleted) ThrowEOF();        
    }
}

Here we obtain some data from the pipe, checking exit conditions like cancelation. Next, we try to find a message; what this means depends on your exact code - this could mean:

  • looking through the buffer for some sentinel value such as an ASCII line-ending, then treating everything up to that point as a message (discarding the line ending)
  • parsing a well-defined binary frame header, obtaining the payload length, checking that we have that much data, and processing it
  • or anything else you want!

If we do manage to find a message, we can tell the pipe to discard the data that we've consumed - by AdvanceTo(consumedTo), which uses whatever our own frame-parsing code told us that we consumed. If we don't manage to find a message, the first thing to do is tell the pipe that we consumed nothing despite trying to read everything - by reader.AdvanceTo(buffer.Start, buffer.End). At this point, there are two possibilities:

  • we haven't got enough data yet
  • the pipe is dead and there will never be enough data

Our check on read.IsCompleted tests this, reporting failure in the latter case; otherwise we continue the loop, and await more data. What is left, then, is our frame parsing - we've reduced complex IO management down to simple operations; for example, if our messages are separated by line-feed sentinels:

private static bool TryParseFrame(
    ReadOnlySequence<byte> buffer,
    out SomeMessageType nextMessage,
    out SequencePosition consumedTo)
{
    // find the end-of-line marker
    var eol = buffer.PositionOf((byte)'\n');
    if (eol == null)
    {
        nextMessage = default;
        consumedTo = default;
        return false;
    }

    // read past the line-ending
    consumedTo = buffer.GetPosition(1, eol.Value);
    // consume the data
    var payload = buffer.Slice(0, eol.Value);
    nextMessage = ReadSomeMessageType(payload);
    return true;
}

Here PositionOf tries to find the first location of a line-feed. If it can't find one, we give up. Otherwise, we set consumedTo to be "the line-feed plus one" (so we consume the line-feed), and we slice our buffer to create a sub-range that represents the payload without the line-feed, which we can then parse (however). Finally, we report success, and can rejoice at the simplicity of parsing linux-style line-endings.

What's the point here?

With minimal code that is very similar to the most naïve and simple Stream version (without any nice features) our app code now has a reader and writer chain that automatically exploits a wide range of capabilities to ensure efficient and effective processing. Again, you can do all these things with Stream, but it is really, really hard to do well and reliably. By pushing all theses features into the framework, multiple code-bases can benefit from a single implementation. It also gives future scope for interesting custom pipeline endpoints and decorators that work directly on the pipeline API.

Summary

In this section, we looked at the memory model used by pipelines, and how it helps us avoid allocations. Then we looked at how we might integrate pipelines into existing APIs and systems such a Stream - and we introduced Pipelines.Sockets.Unofficial as an available utility library. We looked at the options available for integrating span/memory code with APIs that don't offer those options, and finally we looked at what the actual calling code might look like when talking to pipelines (taking a brief side step into how to optimize async code that is usually synchronous) - showing what our application code might look like. In the third and final part, we'll look at how we combine all these learning points when looking at a real-world library such at StackExchange.Redis - discussing what complications the code needed to solve, and how pipelines made it simple to do so.

Monday, 2 July 2018

Pipe Dreams, part 1

Pipelines - a guided tour of the new IO API in .NET, part 1

(part 2 here)

About two years ago I blogged about an upcoming experimental IO API in the .NET world - at the time provisionally called "Channels"; at the end of May 2018, this finally shipped - under the name System.IO.Pipelines. I am hugely interested in the API, and over the last few weeks I'm been consumed with converting StackExchange.Redis to use "pipelines", as part of our 2.0 library update.

My hope in this series, then, is to discuss:

  • what "pipelines" are
  • how to use them in terms of code
  • when you might want to use them

To help put this in concrete terms, after introducing "pipelines" I intend to draw heavily on the StackExchange.Redis conversion - and in particular by discussing which problems it solves for us in each scenario. Spoiler: in virtually all cases, the answer can be summarized as:

It perfectly fits a complex but common stumbling point in IO code; allowing us to replace an ugly kludge, workaround or compromise in our code - with a purpose-designed elegant solution that is in framework code.

I'm pretty sure that the pain points I'm going to cover below will be familiar to anyone who works at "data protocol" levels, and I'm equally sure that the hacks and messes that we'll be replacing with pipelines will be duplicated in a lot of code-bases.

What do pipelines replace / complement?

The starting point here has to be: what is the closest analogue in existing framework code? And that is simple: Stream. The Stream API will be familiar to anyone who has worked with serialization or data protocols. As an aside: Stream is actually a very ambiguous API - it works very differently in different scenarios:

  • some streams are read-only, some are write-only, some are read-write
  • the same concrete type can sometimes be read-only, and sometimes write-only (DeflateStream, for example)
  • when a stream is read-write, sometimes it works like a cassette tape, where read and write are operating on the same underlying data (FileStream, MemoryStream); and sometimes it works like two separate streams, where read and write are essentially completely separate streams (NetworkStream, SslStream) - a duplex stream
  • in many of the duplex cases, it is hard or impossible to express "no more data will be arriving, but you should continue to read the data to the end" - there's just Close(), which usually kills both halves of the duplex
  • sometimes streams are seekable and support concepts like Position and Length; often they're not
  • because of the progression of APIs over time, there are often multiple ways of performing the same operation - for example, we could use Read (synchronous), BeginRead/EndRead (asynchronous using the IAsyncResult pattern), or ReadAsync (asynchronous using the async/await pattern); calling code has no way in the general case of knowing which of these is the "intended" (optimal) API
  • if you use either of the asynchronous APIs, it is often unclear what the threading model is; will it always actually be synchronous? if not, what thread will be calling me back? does it use sync-context? thread-pool? IO completion-port threads?
  • and more recently, there are also extensions to allow Span<byte> / Memory<byte> to be used in place of byte[] - again, the caller has no way of knowing which is the "preferred" API
  • the nature of the API encourages copying data; need a buffer? that's a block-copy into another chunk of memory; need a backlog of data you haven't processed yet? block-copy into another chunk of memory; etc

So even before we start talking about real-world Stream examples and the problems that happen when using it, it is clear that there are a lot of problems in the Stream API itself. The first unsurprising news, then, is that pipelines sorts this mess out!

What are pipelines?

By "pipelines", I mean a set of 4 key APIs that between them implement decoupled and overlapped reader/writer access to a binary stream (not Stream), including buffer management (pooling, recycling), threading awareness, rich backlog control, and over-fill protection via back-pressure - all based around an API designed around non-contiguous memory. That's a heck of a word salad - but don't worry, I'll be talking about each element to explain what I mean.

Starting out simple: writing to, and reading from, a single pipe

Let's start with a Stream analogue, and write sometthing simple to a stream, and read it back - sticking to just the Stream API. We'll use ASCII text so we don't need to worry about any complex encoding concerns, and our read/write code shouldn't assume anything about the underlying stream. We'll just write the data, and then read to the end of the stream to consume it.

We'll do this with Stream first - familiar territory. Then we'll re-implement it with pipelines, to see where the similarities and differences lie. After that, we'll investigate what is actually happening under the hood, so we understand why this is interesting to us!

Also, before you say it: yes, I'm aware of TextReader/TextWriter; I'm not using them intentionally - because I'm trying to talk about the Stream API here, so that the example extends to a wide range of data protocols and scenarios.

using (MemoryStream ms = new MemoryStream())
{
    // write something
    WriteSomeData(ms);
    // rewind - MemoryStream works like a tape
    ms.Position = 0;
    // consume it
    ReadSomeData(ms);
}

Now, to write to a Stream the caller needs to obtain and populate a buffer which they then pass to the Stream. We'll keep it simple for now by using the synchronous API and simply allocating a byte[]:

void WriteSomeData(Stream stream)
{
    byte[] bytes = Encoding.ASCII.GetBytes("hello, world!");
    stream.Write(bytes, 0, bytes.Length);
    stream.Flush();
}

Note: there are tons of things in the above I could do for efficiency; but that isn't the point yet. So if you're familiar with this type of code and are twitching at the above... don't panic; we'll make it uglier - er, I mean more efficient - later.

The reading code is typically more complex than the writing code, because the reading code can't assume that it will get everything in a single call to Read. A read operation on a Stream can return nothing (which indicates the end of the data), or it could fill our buffer, or it could return a single byte despite being offered a huge buffer. So read code on a Stream is almost always a loop:

void ReadSomeData(Stream stream)
{
    int bytesRead;
    // note that the caller usually can't know much about
    // the size; .Length is not usually usable
    byte[] buffer = new byte[256];
    do
    {
        bytesRead = stream.Read(buffer, 0, buffer.Length);
        if (bytesRead > 0)
        {   // note this only works for single-byte encodings
            string s = Encoding.ASCII.GetString(
                buffer, 0, bytesRead);
            Console.Write(s);
        }
    } while (bytesRead > 0);
}

Now let's translate that to pipelines. A Pipe is broadly comparable to a MemoryStream, except instead of being able to rewind it many times, the data is more simply a "first in first out" queue. We have a writer API that can push data in at one end, and a reader API that can pull the data out at the other. The Pipe is the buffer that sits between the two. Let's reproduce our previous scenario, but using a single Pipe instead of the MemoryStream (again not something we'd usually do in practice, but it is simple to illustrate):

Pipe pipe = new Pipe();
// write something
await WriteSomeDataAsync(pipe.Writer);
// signal that there won't be anything else written
pipe.Writer.Complete();
// consume it
await ReadSomeDataAsync(pipe.Reader);

First we create a pipe using the default options, then we write to it. Note that IO operations on pipes are usually asynchronous, so we'll need to await our two helper methods. Note also that we don't pass the Pipe to them - unlike Stream, pipelines have separate API surfaces for read and write operations, so we pass a PipeWriter to the helper method that does our writing, and a PipeReader to the helper method that does our reading. After writing the data, we call Complete() on the PipeWriter. We didn't have to do this with the MemoryStream because it automatically EOFs when it reaches the end of the buffered data - but on some other Stream implementations - especially one-way streams - we might have had to call Close after writing the data.

OK, so what does WriteSomeDataAsync look like? Note, I've deliberately over-annotated here:

async ValueTask WriteSomeDataAsync(PipeWriter writer)
{
    // use an oversized size guess
    Memory<byte> workspace = writer.GetMemory(20);
    // write the data to the workspace
    int bytes = Encoding.ASCII.GetBytes(
        "hello, world!", workspace.Span);
    // tell the pipe how much of the workspace
    // we actually want to commit
    writer.Advance(bytes);
    // this is **not** the same as Stream.Flush!
    await writer.FlushAsync();
}

The first thing to note is that when dealing with pipelines: you don't control the buffers: the Pipe does. Recall how in our Stream code, both the read and write code created a local byte[], but we don't have that here. Instead, we ask the Pipe for a buffer (workspace), via the GetMemory method (or it's twin - GetSpan). As you might expect from the name, this gives us either a Memory<byte> or a Span<byte> - of size at least twenty bytes.

Having obtained this buffer, we encode our string into it. This means that we're writing directly into the pipe's memory, and keep track of how many bytes we actually used, so we can tell it in Advance. We are under no obligation to use the twenty that we asked for: we could write zero, one, twenty, or even fifty bytes. The last one may seem surprising, but it is actually actively encouraged! The emphasis previously was on "at least" - the writer can actually give us a much bigger buffer than we ask for. When dealing with larger data, it is common to make modest requests but expect greatness: ask for the minumum we can usefully utilize, but then check the size of the memory/span that it gives us before deciding how much to actually write.

The call to Advance is important; this completes a single write operation, making the data available in the pipe to be consumed by a reader. The call to FlushAsync is equally important, but much more nuanced. However, before we can adequately describe what it does, we need to take a look at the reader. So; here's our ReadSomeDataAsync method:

async ValueTask ReadSomeDataAsync(PipeReader reader)
{
    while (true)
    {
        // await some data being available
        ReadResult read = await reader.ReadAsync();
        ReadOnlySequence<byte> buffer = read.Buffer;
        // check whether we've reached the end
        // and processed everything
        if (buffer.IsEmpty && read.IsCompleted)
            break; // exit loop

        // process what we received
        foreach (Memory<byte> segment in buffer)
        {
            string s = Encoding.ASCII.GetString(
                segment.Span);
            Console.Write(s);
        }
        // tell the pipe that we used everything
        reader.AdvanceTo(buffer.End);
    }
}

Just like with the Stream example, we have a loop that continues until we've reached the end of the data. With Stream, that is defined as being when Read returns a non-positive result, but with pipelines there are two things to check:

  • read.IsCompleted tells us whether the write pipe has been signalled as completed and therefore no more data will be written (pipe.Writer.Complete(); in our earlier code did this)
  • buffer.IsEmpty tells us whether there is any data left to proces in this iteration

If there's nothing in the pipe now and the writer has been completed, then there will never be anything in the pipe, and we can exit.

If we do have data, then we can look at buffer. So first - let's talk about buffer; in the code it is a ReadOnlySequence<byte>, which is a new type - this concept combines a few roles:

  • describing non-contiguous memory, speficially a sequence of zero, one or many ReadOnlyMemory<byte> chunks
  • describing a logical position (SequencePosition) in such a data-stream - in particular via buffer.Start and buffer.End

The non-contiguous is very important here. We'll look at where the data is actually going shortly, but in terms of reading: we need to be prepared to handle data that could be spread accross multiple segments. In this case, we do this by a simple foreach over the buffer, decoding each segment in turn. Note that even though the API is designed to be able to describe multiple non-contiguous buffers, it is frequently the case that the data received is contiguous in a single buffer; and in that case, it is often possible to write an optimized implementation for a single buffer. You can do that by checking buffer.IsSingleSegment and accessing buffer.First.

Finally, we call AdvanceTo, which tells the pipe how much data we actually used.

Key point: you don't need to take everything you are given!

Contrast to Stream: when you call Read on a Stream, it puts data into the buffer you gave it. In most real-world scenarios, it isn't always possible to consume all the data yet - maybe it only makes sense to consider "commands" as "entire text lines", and you haven't yet seen a cr/lf in the data. With Stream: this is tough - once you've been given the data, it is your problem; if you can't use it yet, you need to store the backlog somewhere. However, with pipelines, you can tell it what you've consumed. In our case, we're telling it that we consumed everything we were given, which we do by passing buffer.End to AdvanceTo. That means we'll never see that data again, just like with Stream. However, we could also have passed buffer.Start, which would mean "we didn't use anything" - and even though we had chance to inspect the data, it would remain in the pipe for subsequent reads. We can also get arbitrary SequencePosition values inside the buffer - if we read 20 bytes, for example - so we have full control over how much data is dropped from the pipe. There are two ways of getting a SequencePosition:

  • you can Slice(...) a ReadOnlySequence<byte> in the same way that you Slice(...) a Span<T> or Memory<T> - and access the .Start or .End of the resulting sub-range
  • you can use the .GetPosition(...) method of the ReadOnlySequence<byte>, which returns a relative position without actually slicing

Even more subtle: we can tell it separetely that we consumed some amount, but that we inspected a different amount. The most common example here is to express "you can drop this much - I'm done with that; but I looked at everything, I can't make any more progress at the moment - I need more data" - specifically:

reader.AdvanceTo(consumedToPosition, buffer.End);

This is where the subtle interplay of PipeWriter.FlushAsync() and PipeReader.ReadAsync() starts to come into play. I skipped over FlushAsync earlier, but it actually serves two different functions in one call:

  • if there is a ReadAsync call that is outstanding because it needs data, then it awakens the reader, allowing the read loop to continue
  • if the writer is out-pacing the reader, such that the pipe is filling up with data that isn't being cleared by the reader, it can suspend the writer (by not completing synchronously) - to be reactivated when there is more space in the pipe (the thresholds for writer suspend/resume can be optionally specified when creating the Pipe instance)

Obviously these concepts don't come into play in our example, but they are central ideas to how pipelines works. The ability to push data back into the pipe hugely simplifies a vast range of IO scenarios. Virtually every piece of protocol handling code I've seen before pipelines has masses of code related to handling the backlog of incomplete data - it is such a repeated piece of logic that I am incredibly happy to see it handled well in a framework library instead.

What does "awaken" or "reactivate" mean here?

You might have observed that I didn't really define what I meant here. At the obvious level, I mean that: an await operation of ReadAsync or FlushAsync had previously returned as incomplete, so now the asynchronous continuation gets invoked, allowing our async method to resume execution. Yeah, OK, but that's just re-stating what async/await mean. It is bug-bear of mine that I care deeply (really, it is alarming how deep) about which threads code runs on - for reasons that I'll talk about later in this series. So saying "the asynchronous continuation gets invoked" isn't enough for me. I want to understand who is invoking it, in terms of threads. The most common answers to this are:

  • it delegates via the SynchronizationContext (note: many systems do not have a SynchronizationContext)
  • the thread that triggered the state change gets used, at the point of the state change, to invoke the continuation
  • the global thread-pool is used to invoke the continuation

All of these can be fine in some cases, and all of these can be terrible in some cases! Sync-context is a well-established mechanism for getting from worker threads back to primary application threads (epecially: the UI thread in desktop applications). However, it isn't necessarily the case that just because we've finished one IO operation, we're ready to jump back to an application thread; and doing so can effectively push a lot of IO code and data processing code onto an application thread - usually the one thing we explicitly want to avoid. Additionally, it can be prone to deadlocks if the application code has used Wait() or .Result on an asynchronous call (which, to be fair, you're not meant to do). The second option (performing the callback "inline" on the thread that triggered it) can be problematic because it can steal a thread that you expected to be doing something else (and can lead to deadlocks as a consequence); and in some extreme cases it can lead to a stack-dive (and eventually a stack-overflow) when two asynchronous methods are essentially functioning as co-routines. The final option (global thread-pool) is immune to the problems of the other two - but can run into severe problems under some load conditions - something again that I'll discuss in a later part in this series.

However, the good news is that pipelines gives you control here. When creating the Pipe instance, we can supply PipeScheduler instances to use for the reader and writer (separately). The PipeScheduler is used to perform these activations. If not specified, then it defaults first to checking for SynchronizationContext, then using the global thread-pool, with "inline" continuations (i.e. intionally using the thread that caused the state change) as another option readily available. But: you can provide your own implementation of a PipeScheduler, giving you full control of the threading model.

Summary

So: we've looked at what a Pipe is when considered individually, and how we can write to a pipe with a PipeWriter, and read from a pipe with a PipeReader - and how to "advance" both reader and writer. We've looked at the similarity and differences with Stream, and we've discussed how ReadAsync() and FlushAsync() can interact to control how the writer and reader pieces execute. We looked at how responsibility for buffers is reversed, with the pipe providing all buffers - and how the pipe can simplify backlog management. Finally, we discussed the threading model that is active for continuations in the await operations.

That's probably enough for step 1; next, we'll look at how the memory model for pipelines works - i.e. where does the data live. We'll also look at how we can use pipelines in real scenarios to start doing interesting things.

Thursday, 12 April 2018

Having a serious conversation about open source

Having a serious conversation about open source

A certain topic has been surfacing a lot lately on places like twitter; one that I've quietly tried to ignore, but which has been gnawing away at me slowly. It is seemingly the most taboo, dirty and avoided topics.

Open source and money

See, I said it was taboo and dirty

Talking openly about money is always hard, but when you combine money and open source, it very quickly devolves into a metaphorical warzone, complete with entrenched camps, propaganda, etc.

This is a complex area, and if I mis-speak I hope that you'll afford me some generosity in interpreting my words as meant constructively and positively. This is largely a bit of a brain-dump; I don't intend it to be too ranty or preachy, but I'm probably not the best judge of that.

I absolutely love open source and the open source community. I love sharing ideas, being challenged by requirements and perspectives outside of my own horizon, benefitting from the contributions and wisdom of like-minded folks etc. I love that packages I've created (usually originally because I needed to solve a problem that vexed me) have been downloaded and used to help people tens of millions of times - that's a greate feeling! I love that I have benefitted indirectly from community recognition (including things like an MVP award), and professionally (I doubt I'd have got my job at Stack Overflow if the team hadn't been using protobuf-net from a very early date).

But: the consumers of open source (and I very much include myself in this) have become... for want of a better word: entitled. We've essentially reinforced that software is free (edit: I mean in the "beer" sense, not in the "Stallman" sense). That our efforts - as an open source community: have no value beyond the occasional pat on a back. Perhaps worse, it undermines the efforts of those developers trying to earn an honest buck by offering professional products in the same area... or maybe it just forces them to offer very clear advantages and extra features, which perhaps is a good thing for them? Or is that just me trying to suppress a sense of guilt at cutting off someone else's customer base?

Yes it is true that some open source projects get great community backing from companies benefitting from the technology, but that seems to be the minority. Most open source projects... just don't.

  • maybe this is because the project isn't popular
  • maybe it is popular, but not in a way that helps "enterprise" customers (the people most likely to pay)
  • maybe the project team simply haven't made a pay option available, which could be lack of confidence, or lack or know-how, or legal issues - or it could be the expectation that the software is completely free
  • maybe people like it, but not enough to pay anything towards it
  • maybe anything other than the "free to use and open licensing" is massively disruptive to the dominant tool-chain for accessing libraries in a particular ecosytem (npm, nuget, etc)
  • maybe with multiple contributors of different-sized efforts, it would become massively confusing as to who receives what money, if any was made
  • (added) does your daytime employment contract prohibit taking payment for additional work

But whatever the reason; most open source libraries don't get financially supported. Sometimes this might not be a problem; maybe a library is sponsored internally by a company that has then made that software available for other people to benefit from. But: the moment a library hits the public, it has to deal with all the scenarios and problems and edge cases that the originating company didn't need to worry about. For example, you'd be amazed at how much trouble SQLite causes dapper due to the data types, or how much complexity things like "sentinel", "cluster" and "modules" make for redis clients. But: the originating company (Stack Overflow in the case of dapper, etc) doesn't use those features, so they don't get fixed on company time. This is a recurring theme in many such projects - and now you're in an even more complex place where the people maintaining and advancing something are doing a lot of that work on their own time, but it is now even more awkward to ask the simple question: "this thing that I'm doing to benefit real users: am I getting paid for this? can I even accept contributions other than PRs?".

Is this a problem?

Perhaps, perhaps not. I'm certainly not bitter; I love working on this stuff - I do it for hobby and pleasure reasons too (I love solving challenging problems), and it has hugely advanced my knowledge, but I have to be honest and admit that there's a peice of me that thinks an opportunity has been missed. Take protobuf-net: if I sat down and added up the hours that I've spent on that, it would be horrifying. And I know people are succeding with it, and using it for commercial gain - I get the support emails from people using it in incredibly interesting ways.

Quite a while back I tried adding small and discreet contribution options for protobuf-net (think: "buy me a beer"). It wasn't entirely unsuccessful: to date I've received a little over (edit: incorrectly stated USD) GBP 100 in direct contributions; most of that was in one very much appreciated donation - that I can't find the details of because "pledgie" closed down. But overall, almost all of the work done has been completely for free. Again, I don't resent this, but it feels that there's a huge imbalance in terms of who is doing the work, versus who is benefiting from the work. There is very little motivation for companies benefiting from open source to contribute back to it - even when they're using it in commercial ways that are helping them create profits from successful products or services.

In my view, this is just as bad for the consuming company as it is for the author: if the developer isn't motivated to improve and maintain a library that you depend on for your successful product, then: that sounds a lot like a supply-chain risk. But then, I guess you can just move onto the next competing free tool if one author burns out.

I'm not sure I have a solution here, but I do think there's a very real conversation that we shouldn't be afraid of having, about how we - as an industry - avoid open source being treated simply as free contractors.

I'm toying with ideas

For protobuf-net, I'm aware that a good number of my users are doing things like games, which tend to run on limited runtimes that don't tend to have runtime-emit support (they are "AOT" runtimes). This really hurts the performance of reflection-based libraries. I've been toying for ages with new ideas to make protobuf-net work much better on those platforms by having much richer build tooling that does all of the emit code up-front as part of the build, and one of the ideas I'm playing with is to:

  • keep the core protobuf-net runtime library "as is" (and continue to make it available for free)
  • add an additional separate package that adds the AOT features
  • but make this package dual licencesed: GPL or purchase non-GPL

But I'm very very unsure about this. Philosophically, I kinda hate the GPL. I just do. But the stickiness of the GPL might be the thing that actually gets some customers - the ones who care about compliance - to pay a little for it. It doesn't have to be much; just enough to make it feel justified to spend such a vast amount of time developing these complex features. As for the people that don't care about compliance: they weren't going to pay anything anyway, so frankly it isn't worth worrying about what they do.

Is this a terrible idea? Is this just me exploiting the fact that I know some users have AOT requirements? Am I just being greedy in my middle-age? Is this just going to make a nightmare of accounting and legal problems? Am I just being grumpy? Should I just accept that open source is free work? I genuinely don't know, and I haven't made my mind up on what to do. I'm genuinely interested in what people think though; comments should be open below (edit: comments were open, but... blog spam, so much blog span; maybe tweet me @marcgravell).

Tuesday, 30 January 2018

Sorting myself out, extreme edition

...where I go silly with optimization

Yes, I’m still talking about sorting... ish. I love going deep on problems - it isn’t enough to just have a solution - I like knowing that I have the best solution that I am capable of. This isn’t always possible in a business setting - deadlines etc - but this is my own time.

In part 1, we introduced the scenario - and discussed how to build a composite unsigned value that we could use to sort multiple properties as a single key.

In part 2, we looked a little at radix sort, and found that it was a very compelling candidate.

In this episode, we’re going to look at some ways to signficantly improve what we’ve done so far. In particular, we’ll look at:

  • using knowledge of how signed data works to avoid having to transform between them
  • performing operations in blocks rather than per value to reduce calls
  • using Span<T> as a replacemment for unsafe code and unmanaged pointers, allowing you to get very high performance even in 100% managed/safe code
  • investigating branch removal as a performance optimization of critical loops
  • vectorizing critical loops to do the same work with significantly fewer CPU operations

Hopefully, as a follow up after this one, I’ll look at pratical guidance on parallelizing this same work to spread the load over available cores.

Key point: the main purpose of these words is not to discuss how to implement a radix sort - in fact, we don’t even do that. Instead, it uses one small part of radix sort as an example problem with which to discuss much broader concepts of performance optimization in C# / .NET.

Obviously I can’t cover the entire of radix sort for these, so I’m going to focus on one simple part: composing the radix for sorting. To recall, a naive implementation of radix sort requires unsigned keys, so that the data is naturally sortable in their binary representation. Signed integers and floating point numbers don’t follow this layout, so in part 1 we introduced some basic tools to change between them:

uint Sortable(int value)
{
    // re-base eveything upwards, so anything
    // that was the min-value is now 0, etc
    var val = unchecked((uint)(value - int.MinValue));
    return val;
}
unsafe uint Sortable (float value)
{
    const int MSB = 1 << 31;
    int raw = *(int*)(&value);
    if ((raw & MSB) != 0) // IEEE first bit is the sign bit
    {
        // is negative; shoult interpret as -(the value without the MSB) - not the same as just
        // dropping the bit, since integer math is twos-complement
        raw = -(raw & ~MSB);
    }
    return Sortable(raw);
}

These two simple transformation - applied to our target values - will form the central theme of this entry.

To measure performance, I’ll be using the inimitable BenchmarkDotNet, looking at multiple iterations of transforming 2 million random float values taken from a seeded Random(), with varying signs etc. The method above will be our baseline, and at each step we’ll add a new row to our table at the bottom:

Method Mean Scaled
SortablePerValue 10,483.8 us 1.00

This gives us a good starting point.

A negative sign of the times

What’s faster than performing a fast operation? Not performing a fast operation. The way radix sort works is by looking at the sort values r bits at a time (commonly 4, 8, 10, but any number is valid) and for that block of bits: counting how many candidates are in each of the 1 << r possible buckets. So if r is 3, we have 8 possible buckets. From that it computes target offsets for each group: if there are 27 values in bucket 0, 12 in bucket 1, 3 in bucket 2, etc - then when sorted bucket 0 will start at offset 0, bucket 1 at offset 27, bucket 2 at offset 39, bucket 3 at offset 41, and so on - just by accumulating the counts. But this breaks if we have signed numbers.

Why?

First, let’s remind ourselves of the various ways that signed and unsigned data can be represented in binary, using a 4 bit number system and integer representations:

Binary Unsigned 2s-complement 1s-complement Sign bit
0000 0 0 +0 +0
0001 1 1 1 1
0010 2 2 2 2
0011 3 3 3 3
0100 4 4 4 4
0101 5 5 5 5
0110 6 6 6 6
0111 7 7 7 7
1000 8 -8 -7 -0
1001 9 -7 -6 -1
1010 10 -6 -5 -2
1011 11 -5 -4 -3
1100 12 -4 -3 -4
1101 13 -3 -2 -5
1110 14 -2 -1 -6
1111 15 -1 -0 -7

We’re usually most familiar with unsigned and 2s-complement representations, because that is what most modern processors use to represent integers. 1s-complement is where -x ≡ ~x - i.e. to negate something we simply invert all the bits. This works fine but has two zeros, which is one more than we usually need - hence we usually use 2s-complement which simply adds an off-by-one step; this makes zero unambiguous (very useful for false as we’ll see later) and (perhaps less important) gives us an extra negative value to play with.

The final option is to use a sign bit; to negate a number we flip the most significant bit, so -x ≡ x ^ 0b1000. IEEE754 floating point numbers (float and double) are implemented using a sign bit, which is why floating point numbers have +0 and -0. Due to clever construction, the rest of the value can be treated as naturally/bitwise sortable - even without needing to understand about the “mantissa”, “exponent” and “bias”. This means that to convert a negative float (or any other sign-bit number) to a 1s-complement representation, we simply flip all the bits except the most significant bit. Or we flip all the bits and put the most significant bit back again, since we know it should be a 1.


So: armed with this knowledge, we can see that signed data in 1s-complement or 2s-complement is almost “naturally sortable” in binary, but simply: the negative values are sorted in increasing numerical value, but come after the positive values (we can happily assert that -0 < +0). This means that we can educate radix sort about 1s-complement and 2s-complement signed data simply by being clever when processing the final left-most chunk: based on r and the bit-width, calculate which bit is the most-signficant bit (which indicates sign), and simply process the negative buckets first (still in the same order) when calculating the offsets; then calculate the offsets of the non-negative buckets. If we were using the 4-bit system above and r=4, we would have 16 buckets, and would calculate offsets in the order (of unsigned buckets) 8-15 then 0-7.

By doing this, we can completely remove any need to do any pre-processing when dealing with values like int. We could perhaps wish that the IEEE754 committee had preferred 1s-complement so we could skip all of this for float too, but a: I think it is fair to assume that there are good technical reasons for the choice (presumably relating to fast negation, and fast access to the mantissa/exponent), and b: it is moot: IEEE754 is implemented in CPU architectures and is here to stay.

So we’re still left with an issue for float: we can’t use the same trick for values with a sign bit, because the sign bit changes the order throughout the data - making grouping impossible. We can make our lives easier though: since the algorithm can now cope with 1s-complement and 2s-complement data, we can switch to 1s-complement rather than to fully unsigned, which as discussed above: is pretty easy:

(Aside: actually, there is a related trick we can do to avoid having to pre-process floating point data, but: it would make this entire blog redundant! So for the purposes of a problem to investigate, we’re going to assume that we need to do this transformation.)

unsafe int ToRadix (float value)
{
    const int MSB = 1 << 31;
    int raw = *(int*)(&value);
    // if sign bit set: flip all bits except the MSB
    return (raw & MSB) == 0 ? raw : ~raw | MSB;
}

A nice side-effect of this is that it is self-reversing: we can apply the exact same bit operations to convert from 1s-complement back to a sign bit.

Method Mean Scaled
SortablePerValue 10,483.8 us 1.00
ToRadixPerValue 10,120.5 us 0.97

We’ve made a slight but measurable improvment - nothing drastic, but the code is nicer.

Blocking ourselves out

We have a large chunk of data, and we want to perform a transformation on each value. So far, we’ve looked at a per-value transformation function (Sortable), but that means the overhead of a call per-value (which may or may not get inlined, depending on the complexity, and how we resolve the method - i.e. does it involve a virtual call to a type that isn’t reliably known). Additioanlly, it makes it very hard for us to apply more advanced optimizations! Blocks good.

So; let’s say we have our existing loop:

float[] values = ...
int[] radix = ...
for(int i = 0 ; i < values.Length; i++)
{
    radix = someHelper.Sortable(values[i]);
}

and we want to retain the ability to swap in per-type implementations of someHelper.Sortable; we can significantly reduce the call overhead by performing a block-based transformation. Consider:

float[] values = ...
int[] radix = ...
someHelper.ToRadix(values, radix);
...
unsafe void ToRadix(float[] source, int[] destination)
{
    const int MSB = 1 << 31;
    for(int i = 0 ; i < source.Length; i++)
    {
        var val = source[i];
        int raw = *(int*)(&val);
        // if sign bit set: flip all bits except the MSB
        destination[i] = (raw & MSB) == 0 ? raw : ~raw | MSB;
    }
}

How much of a speed improvement this makes depends a lot on whether the JIT managed to inline the IL from the original version. It is usually a good win by itself, but more importantly: it is a key stepping stone to further optimizations.

Method Mean Scaled
SortablePerValue 10,483.8 us 1.00
ToRadixPerValue 10,120.5 us 0.97
ToRadixBlock 10,080.0 us 0.96

Another small improvement; I was hoping for more, but I suspect that the JIT was already doing a good job of inlining the method we're calling, making it almost the same loop at runtime. This is not always the case, though - especially if you have multiple different transformations to apply through a single API.

Safely spanning the performance chasm

You’ll notice that in the code above I’ve made use of unsafe code. There are a few things that make unsafe appealing, but one of the things it does exceptionally well is allow us to reintrepret chunks of data as other types, which is what this line does:

int raw = *(int*)(&val);

Actually, there are some methods on BitConverter to do exactly this, but only the 64-bit (double/long) versions exist in the “.NET Framework” (“.NET Core” has both 32-bit and 64-bit) - and that only helps us with this single example, rather than the general case.

For example, if we are talking in pointers, we can tell the compiler to treat a float* as though it were an int*. One way we might be tempted to rewrite our ToRadix method could be to move this coercison earlier:

unsafe void ToRadix(float[] source, int[] destination)
{
    const int MSB = 1 << 31;
    fixed(float* fPtr = source)
    {
        int* ptr = (int*)fPtr;
        for(int i = 0 ; i < values.Length; i++)
        {
            int raw = *ptr++;
            destination[i] = (raw & MSB) == 0 ? raw : ~raw | MSB;
        }
    }
}

Now we’re naturally reading values out as int, rather than performing any reinterpretation per value. This is useful, but it requires us to use unsafe code (always a great way to get hurt), and it doesn’t work with generics - you cannot use T* for some <T>, even with the where T : struct constraint.

I’ve spoken more than a few times about Span<T>; quite simply: it rocks. To recap, Span<T> (and it’s heap-friendly cousin, Memory<T>) is a general purpose, efficient, and versatile representation of contiguous memory - which includes things like arrays (float[]), but more exotic things too.

One of the most powerful (but simple) features of Span<T> is that it allows us to do type coercison in fully safe managed code. For example, instead of a float[], let’s say that we have a Span<float>. We can reinterpet that as int very simply:

Span<float> values = ...
var asIntegers = values.NonPortableCast<float, int>();

Note: this API is likely to change - by the time it hits general availablity, it’ll probably be:

var asIntegers = values.Cast<int>();

What this does is:

  • look at the sizes of the original and target type
  • calculate how many of the target type fit into the original data
  • round down (so we never go out of range)
  • hand us back a span of the target type, of that (possibly reduced) length

Since float and int are the same size, we’ll find that asIntegers has the same length as values.

What is especially powerful here is that this trick works with generics. It does something that unsafe code will not do for us. Note that a lot of love has been shown to Span<T> in the runtime and JIT - essentially all of the same tricks that make T[] array performance largely indistinguishable from pointer T* performance.

This means we could simplify a lot of things by converting from our generic T even earlier (so we only do it once for the entire scope of our radix code), and having our radix converter just talk in terms of the raw bits (usually: uint).

// our original data
float[] values = ...
// recall that radix sort needs some extra space to work in
float[] workspace = ... 
// implicit <float> and implicit conversion to Span<float>
RadixSort32(values, workspace); 

...
RadixSort32<TSource>(Span<T> typedSource, Span<T> typedWorkspace)
    where T : struct
{
    // note that the JIT can inline this *completely* and remove impossible
    //  code paths, because for struct T, the JIT is per-T
    if (Unsafe.SizeOf<T>() != Unsafe.SizeOf<uint>()) .. throw an error

    var source = typedSource.NonPortableCast<T, uint>();
    var workspace = typedWorkspace.NonPortableCast<T, uint>();

    // convert the radix if needed (into the workspace)
    var converter = GetConverter<T>();
    converter?.ToRadix(source, workspace);

    // ... more radix sort details not shown
}
...
// our float converter
public void ToRadix(Span<uint> values, Span<uint> destination)
{
    const uint MSB = 1U << 31;
    for(int i = 0 ; i < values.Length; i++)
    {
        uint raw = values[i];
        destination[i] = (raw & MSB) == 0 ? raw : ~raw | MSB;
    }
}

The code is getting simpler, while retaining performance and becoming more generic-friendly; and we haven’t needed to use a single unsafe. You’ll have to excuse me an Unsafe.SizeOf<T>() - despite the name, this isn’t really an “unsafe” operation in the usual sense - this is simply a wrapper to the sizeof IL instruction that is perfectly well defined for all T that are usable in generics. It just isn’t directly available in safe C#.

Method Mean Scaled
SortablePerValue 10,483.8 us 1.00
ToRadixPerValue 10,120.5 us 0.97
ToRadixBlock 10,080.0 us 0.96
ToRadixSpan 7,976.3 us 0.76

Now we’re starting to make decent improvements - Span<T> is really useful for large operations where type coercion is necessary.

Taking up tree surgery: prune those branches

Something that gnaws at my soul in where we’ve got to is that it includes a branch - an if test, essentially - in the inner part of the loop. Actually, there’s two and they’re both hidden. The first is in the for loop, but the one I’m talking about here is the one hidden in the ternary conditional operation, a ? b : c. CPUs are very clever about branching, with branch prediction and other fancy things - but it can still stall the instruction pipeline, especially if the prediction is wrong. If only there was a way to rewrite that operation to not need a branch. I’m sure you can see where this is going.

Branching: bad. Bit operations: good. A common trick we can use to remove branches is to obtain a bit-mask that is either all 0s (000…000) or all 1s (111…111) - so: 0 and -1 in 2s-complement terms. There are various ways we can do that (although it also depends on the actual value of true in your target system, which is a surprisingly complex question). Obviously one way to do that would be:

// -1 if negative, 0 otherwise
var mask = (raw & MSB) == 0 ? 0 : ~0;

but that just adds another branch. If we were using C, we could use the knowledge that the equality test returns an integer of either 0 or 1, and just negate that to get 0 or -1:

// -1 if negative, 0 otherwise
var mask = -((raw & MSB) == 0);

But no such trick is available in C#. What we can do, though, is use knowledge of arithmetic shift. Left-shift (<<) is simple; we shift our bits n places to the left, filling in with 0s on the right. So binary 11001101 << 3 becomes 01101000 (we lose the 110 from the left).

Right-shift is more subtle, as there are two of them: logical and arithmetic, which are essentially unsigned and signed. The logical shift (used with uint etc) moves our bits n places to the right, filling in with 0s on the left, so 11001101 >> 3 gives 00011001 (we lose the 101 from the right). The arithmetic shift behaves differently depending on whether the most significant bit (which tells us the sign of the value) is 0 or 1. If it is 0, it behaves exactly like the logical shift; however, if it is 1, it fills in with 1s on the left; so 11001101 >> 3 gives 11111001. Using this, we can use >> 31 (or >> 63 for 64-bit data) to create a mask that matches the sign of the original data:

// -1 if negative, 0 otherwise
var mask = (uint)(((int)raw) >> 31);

Don’t worry about the extra conversions: as long as we’re in an unchecked context (which we are by default), they simply don’t exist. All they do is tell the compiler which shift operation to emit. If you’re curious, you can see this here, but in IL terms, this is just:

(push the value onto the stack)
ldc.i4.s 31 // push 31 onto the stack
shr         // arithmetic right shift

In IL terms, there’s really no difference between signed and unsigned integers, other than whether the compiler emites the signed or unsigned opcodes for operations. In this case we’ve told it to emit shr - the signed/arithmetic opcode, instead of shr.un - the unsigned/logical opcode.

OK; so we’ve got a mask that is either all zeros or all ones. Now we need to use it to avoid a branch, but how? Consider:

var condition = // all zeros or all ones
var result = (condition & trueValue) | (~condition & falseValue);

If condition is all zeros, then the condition & trueValue gives us zero; the ~condition becomes all ones, and therefore ~condition & falseValue gives us falseValue. When we “or” (|) those together, we get falseValue.

Likewise, if condition is all ones, then condition & trueValue gives us trueValue; the ~condition becomes all zeros, and therefore ~condition & falseValue gives us zero. When we “or” (|) those together, we get trueValue.

So our branchless operation becomes:

public void ToRadix(Span<uint> values, Span<uint> destination)
{
    const uint MSB = 1U << 31;
    for(int i = 0 ; i < values.Length; i++)
    {
        uint raw = values[i];
        var ifNeg = (uint)(((int)raw) >> 31);
        destination[i] =
            (ifNeg & (~raw | MSB)) // true
            | (~ifNeg & raw);      // false
    }
}

This might look more complicated, but it is very CPU-friendly: it pipelines very well, and doesn’t involve any branches for it to worry about. Doing a few extra bit operations is nothing to a CPU - especially if they can be pipelined. Long instruction pipelines are actually a good thing to a CPU - compared to a branch or something that might involve a cache miss, at least.

Method Mean Scaled
SortablePerValue 10,483.8 us 1.00
ToRadixPerValue 10,120.5 us 0.97
ToRadixBlock 10,080.0 us 0.96
ToRadixSpan 7,976.3 us 0.76
Branchless 2,507.0 us 0.24

By removing the branches, we’re down to less than a quarter of the original run-time; that’s a huge win, even if the code is slightly more complex.

Why do one thing at a time?

OK, so we’ve got a nice branchless version, and the world looks great. We’ve made significant improvements. But we can still get much better. At the moment we’re processing each value one at a time, but as it happens, this is a perfect scenario for vectorization via SIMD (“Single instruction, multiple data”).

We have a pipelineable operation without any branches; if we can execute it for one value, we can probably execute it for multiple values at the same time - magic, right? Many modern CPUs include support for performing basic operations like the above on multiple values at a time, using super-wide registers. Right now we’re using 32-bit values, but most current CPUs will have support for AVX (mostly: 128-bit) or AVX2 (mostly: 256-bit) operations. If you’re on a very expensive server, you might have more (AVX512). But let’s assume AVX2: that means we can handle 8 32-bit values at a time. That means 1/8th of the main operations, and also 1/8th of the if branches hidden in the for loop.

Some languages have automatic vectorization during compilation; C# doesn’t have that, and neither does the JIT. But, we still have access to a range of vectorized operations (with support for the more exotic intrinsics being added soon). Until recently, one of the most awkward things about working with vectorization has been loading the values. This might sound silly, but it is surprisingly difficult to pull the values in efficiently when you don’t know how wide the vector registers are on the target CPU. Fortunately, our amazing new friend Span<T> jumps to the rescue here - making it almost embarrassingly easy!

First, let’s look at what the shell loop might look like, without actually doing the real work in the middle:

public void ToRadix(Span<uint> values, Span<uint> destination)
{
    const uint MSB = 1U << 31;

    int i = 0;
    if (Vector.IsHardwareAccelerated)
    {                               
        var vSource = values.NonPortableCast<uint, Vector<uint>>();
        var vDest = destination.NonPortableCast<uint, Vector<uint>>();
        
        for (int j = 0; j < vSource.Length; j++)
        {
            var vec = vSource[j];
            vDest[j] = // TODO
        }
        // change our root offset for the remainder of the values
        i = vSource.Length * Vector<uint>.Count;
    }
    for( ; i < values.Length; i++)
    {
        uint raw = values[i];
        var ifNeg = (uint)(((int)raw) >> 31);
        destination[i] =
            (ifNeg & (~raw | MSB)) // true
            | (~ifNeg & raw);      // false
    }
}

First, look at the bottom of the code; here we see that our regular branchless code still persists. This is for two reasons:

  • the target CPU might not be capable of vectorization
  • our input data might not be a nice multiple of the register-width, so we might need to process a final few items the old way

Note that we’ve changed the for loop so that it doesn’t reset the position of i - we don’t necessarily start at 0.

Now look at the if (Vector.IsHardwareAccelerated); this checks that suitable vectorization support is available. Note that the JIT can optimize this check away completely (and remove all of the inner code if it won’t be reached). If we do have support, we cast the span from a Span<uint> to a Span<Vector<uint>>. Note that Vector<T> is recognized by the JIT, and will be reshaped by the JIT to match the size of the available vectorization support on the running computer. That means that when using Vector<T> we don’t need to worry about whether the target computer has SSE vs AVX vs AVX2 etc - or what the available widths are; simply: “give me what you can”, and the JIT worries about the details.

We can now loop over the vectors available in the cast spans - loading an entire vector at a time simply using our familiar: var vec = vSource[j];. This is a huge difference to what loading vectors used to look like. We then do some operation (not shown) on vec, and assign the result again as an entire vector to vDest[j]. On my machine with AVX2 support, vec is block of 8 32-bit values.

Next, we need to think about that // TODO - what are we actually going to do here? If you’ve already re-written your inner logic to be branchless, there’s actually a very good chance that it will be a like-for-like translation of your branchless code. In fact, it turns out that the ternary conditional scenario we’re looking at here is so common that there are vectorized operations precisely to do it; the “conditional select” vectorized CPU instruction can essentially be stated as:

// result conditionalSelect(condition, left, right)
result = (condition & left) | (~condition & right);

Where condition is usually either all-zeros or all-ones (but it doesn’t have to be; if you want to pull different bits from each value, you can do that too).

This intrinsic is exposed directly on Vector, so our missing code becomes simply:

var vMSB = new Vector<uint>(MSB);
var vNOMSB = ~vMSB;
for (int j = 0; j < vSource.Length; j++)
{
    var vec = vSource[j];
    vDest[j] = Vector.ConditionalSelect(
        condition: Vector.GreaterThan(vec, vNOMSB),
        left: ~vec | vMSB, // when true
        right: vec // when false
    );
}

Note that I’ve pre-loaded a vector with the MSB value (which creates a vector with that value in every cell), and I’ve switched to using a > test instead of a bit test and shift. Partly, this is because the vectorized equality / inequality operations expect this kind of usage, and very kindly return -1 as their true value - using the result to directly feed “conditional select”.

Method Mean Scaled
SortablePerValue 10,483.8 us 1.00
ToRadixPerValue 10,120.5 us 0.97
ToRadixBlock 10,080.0 us 0.96
ToRadixSpan 7,976.3 us 0.76
Branchless 2,507.0 us 0.24
Vectorized 930.0 us 0.09

As you can see, the effect of vectorization on this type of code is just amazing - with us now getting more than an order-of-magnitude improvement on the original data. That’s why I’m so excited about how easy (relatively speaking) Span<T> makes vectorization, and why I can’t wait for Span<T> to hit production.

A reasonable range of common operations are available on Vector and Vector<T>. If you need exotic operations like “gather”, you might need to wait until System.Runtime.Intrinsics lands. One key difference here is that Vector<T> exposes the common intersection of operations that might be available (with different widths) against different CPU instruction sets, where as System.Runtime.Intrinsics aims to expose the underlying intrinsics - giving access to the full range of instructions, but forcing you to code specifically to a chosen instruction set (or possibly having two implementations - one for AVX and one for AVX2). This is simply because there isn’t a uniform API surface between generatons and vendors - it isn’t simply that you get the same operations with different widths: you get different operations too. So you’d typically be checking Aes.IsSupported, Avx2.IsSupported, etc. Being realistic: Vector<T> is what we have today, and it worked damned well.

Summary

We’ve looked at a range of advanced techniques for improving performance of critical loops of C# code, including (to repeat the list from the start):

  • using knowledge of how signed data works to avoid having to transform between them
  • performing operations in blocks rather than per value to reduce calls
  • using Span<T> as a replacemment for unsafe code and unmanaged pointers, allowing you to get very high performance even in 100% managed/safe code
  • investigating branch removal as a performance optimization of critical loops
  • vectorizing critical loops to do the same work with significantly fewer CPU operations

And we’ve seen dramatic improvements to the performance. Hopefully, some or all of these techniques will be applicable to your own code. Either way, I hope it has been an interesting diversion.

Next time: practical parallelization

Addendum

For completeness: yes I also tried a val ^ ~MSB approach for both branchless and vectorized; it wasn't an improvement.

And for the real implementation (the "aside" mentioned above): what the code actually does for sign-bit data (IEEE754) is: sort just on the sign bit first, use the count data to find where the sign changes (without scanning over the data an extra time), and then sort the two halves separately ignoring the MSB, with the first chunk in descending order and the second chunk in ascending order. By doing this, we avoid the need for the transform - again, by using knowledge of the bit layout of the data.