Thursday, 6 December 2018

A Thanksgiving Carol

Normally I write about programming topics (usually .NET); today I'm going to veer very far from that track - and talk about society, mental health, individual and corporate responsibility, and personal relationships. I genuinely hope you hear me out, but if that isn't your thing ... well, then you probably need to read it more than most. I could try a clever reverse psychology trick to oblige you to see it through, but you'd see straight through it... or would you?

My apologies in advance if I seem to be on a negative tone through much of this - I'm pulling no punches in something that has been a quite deep - and painful - personal journey and realisation. I assure you that it ends much more positively than the body might suggest. Maybe for me this is mostly cathartic self-indulgence and rambling, but.. it's my personal blog and I get to do that if I want. But if it makes even one person think for a few minutes, it has been well worth my time.

So; on with the real title:

Technology is Outpacing our Individual and Societal Health

This week, I've identified hugely with that famous (infamous?) festive favorite: Ebenezer Scrooge (humbug!). Not the usury part - but instead:

  • the familiar story of spending a long time making choices that cause harm
  • having some catastrophic event or events bring everything into focus
  • having a genuine yet painful inspection of those past (and present) choices
  • consideration of what those choices mean for the future
  • undergoing a fundamental transformation, a realignment of priorities and thinking, that should lead to a much happier future
  • actively walking that path with actions, not just hollow words

See, I got heavy and personal! Let's see how deep this rabbit hole goes. How to start...


Recently I nearly destroyed my marriage and a relationship of nearly 25 years.

As opening lines go, it isn't quite up there with "Marley was dead: to begin with.", but it's all I've got. It wasn't anything huge and obvious like an affair or a huge violent argument. What I did was to make - over an extended period of time - a series of bad choices about my relationship with technology.

The reality of the era is that we are absolutely surrounded by technology - it encroaches and invades on every aspect of our lives, and it has progressed so fast that we haven't really had time to figure out where "healthy" lies. I must immediately stress that I don't say this to absolve myself of responsibility; we're adults, and we must own the choices that we make, even if we make those choices in an environment that normalises them. So what do I mean?

Ultimately, the heart of my personal failings here stem from how easy - and tempting - it can be to lose ourselves in a digital world. We live in such a hyper-connected time, surrounded by flashing instant updates at every turn. It is alarmingly easy to confuse the signals that this electronic phantom universe provides, prioritising them over the real world in front of us. I'm sure we can all relate to seeing a group of people out together, whether at a bar, a meal, or some other social gathering - and seeing the mobile phones come out regularly. Don't get me started on the idiots who think they can drive while distracted by a phone. I'm certainly guilty of occasionally "parenting" by observing the digitial-tablet-infused face of one of my children, by half-watching them over the top of a mobile. And I'd be lying if I said I'd never treated my marriage with the same over-familiarity bordering on contempt.

The digital world is so easy and tempting. Everything is immediate and easy. The real world takes effort, work, and time. When I was growing up, "allow 28 days for delivery" was a mantra; today, if something physical won't arrive within 28 hours we look at alternative vendors; for purely virtual items, we'd get twitchy and worried if it took 28 minutes.

I've reached the conclusion that among other things, I was - for want of a better word - in an addictive and unhealthy relationship with the internet. The internet is amazing and brilliant - and I'm not proposing we need to nuke it from orbit, but it is at our great peril that we think that it is always (or ever) without harm. We have grown complacent, when we should be treating it with respect and, yes, at times: fear - or at least concern.

We build a global platform for communicating data - all the shared collective knowledge and wisdom of the world past and present, and how do we choose to use it? If only it was "sharing cat pics", maybe the world would be a better place. Instead, as people, we mostly seem to use it for either validating ourselves in echo chambers (tip: nothing useful is ever achieved by listening to people you already agree with), or getting into angry anonymous rows with strangers. Either triggers a spurt of rewarding chemicals to the brain, but they're both usually entirely empty of any real achievement. If only that was the only mine to avoid.

Perverse Incentives and Eroded Psychological Walls

Again, I want to keep emphasizing that no personal responsibility is voided, but we haven't arrived at this place in isolation. At risk of sounding like a radical anti-capitalist (I'm not - really), corporate interests are actively averse to us having a healthy relationship with the internet. One way this materializes is in the notion of "engagement". Now; "engagement" by itself isn't an unreasonable measure, but as with most measures: the moment that we start treating it as a target, all hell breaks loose.

Because all genuine inspections should start at home, I'll start by talking about Stack Overflow. We have a measure there, on a user's profile page: consecutive days visited. We're not monsters, so we only display this on your own profile, but: I can only see negative things about this number. On its own, it adds nothing (not least: you can't compare it to anything), but: I know that at some point in history I cared about that number. I would try to do something, anything to not lose this number, including checking in while on family holidays. And here's the thing: the more you maintain it, the more it feels to lose. It is purely a psychological thing, but... when thinking about it, I can't think of a single positive use of this number. The only thing it does is encourage wholly harmful behaviours. I love our users, and I want them to be happy, rested, and healthy. Making users not want to go even 24 hours without checking in with us - well, that doesn't seem good to me. If anything, it sounds like a great way to cause burnout and frustration. I would love to start a conversation internally about whether we should just nuke that number entirely - or if anything, use it to prompt a user "hey, we really love you, but ... maybe take a day off? we'll see you next week!". As a counterpoint to that: we actively enforce a daily "rep cap", which I think is hugely positive thing towards sensible and healthy usage; I just want to call that out for balance and fairness.

Now consider: in the grand scheme of things: we're cuddly kittens. Just think what the Facebooks, Googles, etc are doing with psychological factors to drive "engagement". We've already seen the disclosures about Facebook's manipulation of feeds to drive specific responses. Corporations are often perversely incentivized to be at odds with healthy engagement. We can see this most clearly in sectors like gambling, pornography, gaming (especially in-game/in-app purchases, "pay to win"), drugs (whether legal or illicit), "psychics" (deal with the air-quotes) etc. Healthy customers are all well and good, but you make most of your money from the customers with unhealthy relationships. The idea of fast-eroding virtual "credit" is rife. If I can pick another example: I used to play quite a bit of Elite: Dangerous; I stopped playing around the time of the "Powerplay" update, which involved a mechanic around "merits" with a steep decay cycle: if you didn't play significant amounts of grind every week (without fail): you'd basically always have zero merits. This is far from unusual in today's games, especially where an online component exists. I've seen YouTube content creators talking about how they strongly feel that if they don't publish on a hard schedule, their channel tanks - and it doesn't even matter whether they're right: their behaviour is driven by the perception, not cold reality (whatever it may be).

I now accept that I had developed some unhealthy relationships with the internet. It hugely impacted my relationships at home, both in quality and quantity. I would either be unavailable, or when I was available, I would be... distracted. Checking my phone way too often - essentially not really present, except in the "meat" sense. Over time, this eroded things. Important things.

And yet as a society we've normalized it.

Let's look at some of the worst examples from above - gambling, pornography, drugs, etc: it used to be that if you had a proclivity in those directions, there would be some psychological or physical barrier: you'd need to go to the book-maker or casino, or that seedy corner-shop, or find a dealer. Now we have all of those things in our pocket, 24/7, offering anonymous instant access to the best and worst of everything the world has to offer. How would you know that your colleague has a gambling problem, when placing a bet looks identical to responding to a work email? As if that wasn't enough, we've even invented new ways of paying - "crypto-currency" - the key purposes of which are (in no particular order) "to ensure we don't get caught" and "to burn electricity pointlessly". There is possibly some third option about "decentralization" (is that just another word for "crowd-sourced money-laundering"? I can't decide), but I think we all know that in reality for most regular crypto-currency users this is a very far third option; it is probably more important for the organised criminals using it, but... that's another topic.

We Need to Maintain Vigilance

I wouldn't be saying all this if I thought it was all doom. I do think we've reached a fundamentally unhealthy place with technology; maybe we've been over-indulging in an initial excited binge, but: we really need to get over it and see where we're harming and being harmed. We don't need to obsess over our phones - those little notifications mean almost nothing. I'm absolutely not saying that I'm detaching myself from the internet, but I am treating it with a lot more respect - and caution. I'm actively limiting the times that I engage to times that I am comfortable with. There are very few things that are important enough to need your constant attention; things can wait. For most things: if it is genuinely urgent, someone will simply call you. I've completely and irrevocably blocked my access to a range of locations that (upon introspection) I found myself over-using, but which weren't helping me as a person - again, hollow validation like echo-chambers and empty arguments. I can limit my usage of things like "twitter" to useful professional interactions, not the uglier side of twitter politics. And I can ensure that in the time I spend with my family: I'm actually there. In mind and person, not just body. I've completely removed technology from the bedroom - and no, I'm not being crude there - there is a lot of important and useful discussion and just closeness time to be had there, without touching on more ... "intimate" topics. You really, really don't need to check your inbox while on the toilet - nobody deserves that; just leave the phone outside.

I got lucky; whatever problems I had, I was able to identify, isolate, and work through before they caused total destruction - and I need to be thankful for the support and patience of my wife. But it was genuinely close, and I need to acknowledge that. I'm happier today - and closer to my wife - than I have been in a long long time, mostly through my own casual fault. I'm cautious that the next person might not be so lucky. I'm also terrified about the upcoming generation of children who have very little baseline to compare to. What, for them, is "normal"? How much time at school and home are we dedicating to teaching these impressionable youths successful tactics for navigating the internet, and what that means for their "real" life? I think we can agree that when we hear of "Fortnite", "kids" and "rehab" being used in the same sentence: something is wrong somewhere.

Maybe somewhere along the line we (culture) threw the baby out with the bathwater. I'm not at all a religious person, but if I look at most established religions with that atheistic lens, I have to acknowledge that among the superstition: there are some good wisdoms about leading a good and healthy life - whether by way of moral codes (that vary hugely by religion), or by instilling a sense of personal accountability and responsibility, or by the simple act of finding time to sit quietly - regularly - and be honestly contemplative. To consider the consequences of our actions, even - perhaps especially - when we haven't had to do so directly. Humility, patience, empathy. I know in the past I've been somewhat dismissive of even non-theistic meditation, but: I suspect that it is something that I might now be in a position to appreciate.

To re-state: I'm OK; I am (and in terms of my marriage: we are) in a much better, stronger, healthier place than I (we) have been in a long time. I've had my Thanksgiving Miracle, and I've come out the other side with a renewed energy, and some fundamentally altered opinions. I'm interested in your thoughts here, but I'm not opening comments; again - we've made it too easy and anonymous! If you want to email me on this, please do (marc.gravell at gmail.com - if you could use "Thanksgiving Carol" in the subject, that'd really help me organize my inbox); I may respond, but I won't guarantee it, and I certainly won't guarantee an immediate response. I'm also deliciously conscious of the apparent irony of my blogging about the harms of the internet. But: if - as Joel assures me - "Developers are Writing the Script for the Future" - we need to start being a bit more outspoken about what that script says, and calling out when some measure of "success" of a product or service is likely impactful to healthy usage.

Closing: technology is great, the internet is great; but: we need to treat them with respect, and use them in sensible moderation. And pay lots more attention to the real world.

Saturday, 8 September 2018

Monotoolism

One Tool To Rule Them All

A recent twitter thread reminded me of a trope that I see frequently as a library author (and just as a general observer) - let’s call it “monotoolism”.

Examples of this might be examples like:

  • “wait, you’re still using ‘LINQ to SQL’? I thought you were using ‘Dapper’?”
  • “Google’s protobuf impementation provides opinionated JSON parsing, but my JSON doesn’t fit that layout - how do I get the library to parse my layout?”
  • “how do I parse HTML with a regular expression?”
  • etc

The common theme here being the expectation that once you have one tool in a codebase that fits a particular category: that’s it - there is one and only one tool against each category; one “data access tool”, one “string parsing tool”, etc.

This has always irked me. I understand where people are coming from - they don’t want an explosion of different tools to have to worry about:

  • they don’t want an overly complex dependency tree
  • they don’t want to have to check licensing / compliance etc against a huge number of libraries
  • they don’t want to have to train everyone to use a plethora of tools
  • etc

It absolutely makes sense to minimize the dependency count, and to remove unnecessary library overlap. But the key word in that sentence: “unnecessary” - and I mean that in a fairly loose sense: you can use the handle of a screwdriver to drive in a nail if you try hard enough, but it is much easier (and you get a better outcome) if you use a hammer. I think I’d include a hammer as a “necessary” tool alongside a set of screwdrivers if you’re doing any form of construction (but is that a metric or imperial hammer?).

I often see people either expressing frustration that their chosen “one tool to rule them all” can’t do tangentially-related-feature-X, or bending their code massively out of shape to try to make it do it; sometimes they even succeed, which is even scarier as a library author - because now there’s some completely undesigned-for, unspecified, undocumented and just unknown usage in the wild (quite possibly abusing reflection to push buttons that aren’t exposed) that the library author is going to get yelled at when it breaks.

XKCD: Workflow

It is OK to use more than one tool!

Yes, it is desirable to minimize the number of unnecessary tools. But: it is OK to use more than one tool. Expected, even. You absolutely should be wary of uncontrolled tool propogation, but I strongly advocate against being too aggressive with rebukes along the lines of:

We already have a tool that does something kinda like that; can you just torture the tool and the domain model a bit and see if it works well enough to just about work?

Remember, the options here are:

  1. two (or more) different tools, each used in their intended way, closely following their respective documented examples in ways that are “obviously right” and which it is easy to ask questions of the library authors or the community
  2. one single tool, tortured and warped beyond recognition, looking nothing like… anything, where even the tool’s authors can’t understand what you’re doing (let alone why, and they’re probably too afraid to ask), where you’re the only usage like that, ever, and where your “elegant hack” might stop working in the next minor revision, because it wasn’t a tested scenario

I prefer “1”. It’ll keep your model cleaner. It’ll keep you relationship with the tool more successful. Yes, it will mean that you occasionally need more than one tool listed in a particular box. Deal with it! If the tool really is complex enough that this is problematic, just move the ugly complexity behind some abstraction, then only a limited number of people need to worry about how it works.

Always use the right tool for the job.

Thursday, 2 August 2018

protobuf-net, August 2018 update

An update on what's happening with protobuf-net

Headline: .proto processing now works directly from dotnet build and MSBuild, without any need for DSL processing steps; and - new shiny things in the future.


I haven't spoken about protobuf-net for quite a while, but: it is very much alive and active. However, I really should do a catch-up, and I'm really excited about where we are.

Level 100 primer, if you don't know what "protobuf" is

"protobuf" is Protocol Buffers, Google's cross-platform/language/OS/etc serialization format (and associated tools). It is primarily a dense binary format, but a JSON variant also exists. A lot of Google's public and private APIs are protobuf, but it is used widely outside of Google too.

The data/schema is often described via a custom DSL, .proto - which comes in 2 versions (proto2 and proto3). They both describe the same binary format.

Google provide implementations for a range of platforms including C# (note: "proto3" only), but ... I kinda find the "DSL first, always" approach limiting (I like the flexibility of "code first"), and ... the Google implementation is "Google-idiomatic", rather than ".NET idiomatic".

Hence protobuf-net exists; it is a fast/dense binary serializer that implements the protobuf specifiction, but which is .NET-idiomatic, and allows either code-first or DSL-first. I use it a lot.

Historically, it was biased towards "code first", with the "DSL first" tools a viable but more awkward option.

What's changed lately?

Bespoke managed DSL parser

Just over a year ago now, back in 2.3.0, I released a new set of DSL parsing tools. In the past, protobuf-net's tooling (protogen) made use of Google's protoc tool - a binary executable that processes .proto files, but this was incredibly akward to deploy between platforms. Essentially, the tools would probably work on Windows, but that was about it. This wasn't a great option going forward, so I implemented a completely bespoke 100% managed-code parser and code-generator that didn't depend on protoc at all. protogen was reborn (and it works with both "proto2" and "proto3"), but it lacked a good deployment route.

Playground website

Next, I threw together protogen.marcgravell.com. This is an ASP.NET Core web app that uses the same library code as protogen, but in an interactive web app. This makes for a pretty easy way to play with .proto files, including a code-editor and code generator. It also hosts protoc, if you prefer that - and includes a wide range of Google's API definitions available as imports. This is a very easy way of working with casual .proto usage, and it provides a download location for the standalone protogen tools. It isn't going to win any UI awards, but it works. It even includes a decoder, if you want to understand serialized protobuf data.

Global tools

Having a download for the command-line tools is a great step forward, but ... it is still a lot of hassle. If only there were a way of installing managed-code developer tools in a convenient way. Well, there is: .NET "global tools"; so, a few months ago I added protobuf-net.Protogen. As a "global tool", this can be installed once via

dotnet tool install --global protobuf-net.Protogen

and then protogen will be available anywhere, as a development tool. Impressively, "global tools" work between operating systems, so the exact same package will also work on linux (and presumably Mac). This starts to make .proto very friendly to work with, as a developer.

Build tools

I'm going to be frank and honest: MSBuild scares the bejeezus out of me. I don't understand .targets files, etc. It is a huge blind-spot of mine, but I've made my peace with that reality. So... I was genuinely delighted to receive a pull request from Mark Pflug that fills in the gaps! What this adds is protobuf-net.MSBuild - tools that tweak that build process from dotnet build and MSBuild. What this means is that you can just install protobuf-net.MSBuild into a project, and it automatically runs the .proto → C# code-generation steps as part of build. This means you can just maintain your .proto files without any need to generate the C# as a separate step. You can still extend the partial types in the usualy way. All you need to do is make sure the .proto files are in the project. It even includes the common Google import additions for free (without any extra files required), so: if you know what a .google.protobuf.timestamp.Timestamp is - know that it'll work without you having to add the relevant .proto file manually (although you still need the import statement).

I can't understate how awesome I think these tools are, and how much friendlier it makes the "DSL first" scenario. Finally, protobuf-net can use .proto as a truly first class experience. Thanks again, Mark Pflug!

What next?

That's where we are today, but : to give an update on my plans and priorities going forwards...

Spans and Pipelines

You might have noticed me talking about these a little lately; I've done lots of research to look at what protobuf-net might do with these, but it is probably time to start looking at doing it "for real". The first step there is getting some real timings on the performance difference between a few different approaches

AOT

In particular, platforms that don't allow IL-emit. This helps consumers like UWP, Unity, iOS, etc. They usually currently work with protobuf-net, but via huge compromises. To do better, we need radically overhaul how we approach those platforms. I see two viable avenues to explore there.

  1. we can enhance the .proto codegen (the bits that protobuf-net.MSBuild just made tons better), to include generation of the actual serialization code

  2. we can implement Roslyn-based tools that pull apart code-first usage to understand the model, and emit the serialization code at build time

All of these are going to keep me busy into the foreseeable!

Monday, 30 July 2018

Pipe Dreams, part 3.1

Pipelines - a guided tour of the new IO API in .NET, part 3.1

(part 1, part 2, part 3)

After part 3, I got some great feedback - mostly requests to clarify things that I touched on, but could do with further explanation. Rather than make part 3 even longer, I want to address those here! Yay, more words!


Isn't ArrayPoolOwner<T> doing the same thing as MemoryPool<T>? Why don't you just use MemoryPool<T> ?

Great question! I didn't actually mention MemoryPool<T>, so I'd better introduce it.

MemoryPool<T> is an abstract base type that offers an API of the form:

public abstract class MemoryPool<T> : IDisposable
{
    public abstract IMemoryOwner<T> Rent(int minBufferSize = -1);
    // not shown: a few unrelated details
}

As you can see, this Rent() method looks exactly like what we were looking for before - it takes a size and returns an IMemoryOwner<T> (to provide a Memory<T>), with it being returned from whence it came upon disposal.

MemoryPool<T> also has a default implementation (public static MemoryPool<T> Shared { get; }), which returns a MemoryPool<T> that is based on the ArrayPool<T> (i.e. ArrayMemoryPool<T>). The Rent() method returns an ArrayMemoryPoolBuffer, which looks remarkably like the thing that I called ArrayPoolOwner<T>.

So: a very valid question would be: "Marc, didn't you just re-invent the default memory pool?". The answer is "no", but it is for a very subtle reason that I probably should have expounded upon at the time.

The problem is in the name minBufferSize; well... not really the name, but the consequence. What this means is: when you Rent() from the default MemoryPool<T>.Shared, the .Memory that you get back will be over-sized. Often this isn't a problem, but in our case we really want the .Memory to represent the actual number of bytes that were sent (even if we are, behind the scenes, using a larger array from the pool to contain it).

We could use an extension method on arbitrary memory pools to wrap potentially oversized memory, i.e.

public static IMemoryOwner<T> RentRightSized<T>(
    this MemoryPool<T> pool, int size)
{
    var leased = pool.Rent(size);
    if (leased.Memory.Length == size)
        return leased; // already OK
    return new RightSizeWrapper<T>(leased, size);
}
class RightSizeWrapper<T> : IMemoryOwner<T>
{
    public RightSizeWrapper(
        IMemoryOwner<T> inner, int length)
    {
        _inner = inner;
        _length = length;
    } 
    IMemoryOwner<T> _inner;
    int _length;
    public void Dispose() => _inner.Dispose();
    public Memory<T> Memory
        => _inner.Memory.Slice(0, _length);
}

but... this would mean allocating two objects for most leases - one for the actual lease, and one for the thing that fixes the length. So, since we only really care about the array-pool here, it is preferable IMO to cut out the extra layer, and write our own right-sized implementation from scratch.

So: that's the difference in the reasoning and implementation. As a side note, though: it prompts the question as to whether I should refactor my API to actually implement the MemoryPool<T> API.


You might not want to complete with success if the cancellation token is cancelled

This is in relation to the while in the read loop:

while (!cancellationToken.IsCancellationRequested)
{...}

The more typical expectation for cancellation is for it to throw with a cancellation exception of some kind; therefore, if it is cancelled, I might want to reflect that.

This is very valid feedback! Perhaps the most practical fix here is simply to use while (true) and let the subsequent await reader.ReadAsync(cancellationToken) worry about what cancellation should look like.


You should clarify about testing the result in async "sync path" scenarios

In my aside about async uglification (optimizing when we expect it to be synchronous in most cases), I ommitted to talk about getting results from the pseudo-awaited operation. Usually, this comes down to calling .Result on an awaitable (something like a Task<T>, ValueTask<T>, or .GetResult() on an awaiter (the thing you get from .GetAwaiter()). I haven't done it in the example because in async terms this would simply have been an await theThing; usage, not a var local = await theThing; usage; but you can if you need that.

I must, however, clarify a few points that perhaps weren't clear:

  • you should not (usually) try to access the .Result of a task unless you know that it has already completed
  • knowing that it has completed isn't enough to know that it has completed successfully; if you only test "is completed", you can use .GetResult() on the awaiter to check for exceptions while also fetching the result (which you can then discard if you like)
  • in my case, I'm taking a shortcut by checking IsCompletedSuccessfully; this exists on ValueTask[<T>] (and on Task[<T>] in .NET Core 2.0, else you can check .Status == TaskStatus.RanToCompletion) - which is only true in the "completed without an exception" case
  • because of expectations around how exceptions on async operations are wrapped and surfaced, it is almost always preferable to just switch into the async flow if you know a task has faulted, and just await it; the compiler knows how to get the exception out in the most suitable way, so: let it do the hard work

You should explain more about ValueTask[<T>] vs Task[<T>] - not many people understand them well

OK! Many moons ago, Task<T> became a thing, and all was well. Task<T> actually happened long before C# had any kind of support for async/await, and the main scenarios it was concerned about were genuinely asynchronous - it was expected that the answer would not be immediately available. So, the ovehead of allocating a placeholder object was fine and dandy, dandy and fine.

As the usage of Task<T> grew, and the language support came into effect, it started to become clear that there were many cases where:

  • the operation would often be available immediately (think: caches, buffered data, uncontested locking primitives, etc)
  • it was being used inside a tight loop, or just at high frequency - i.e. something that happens thousands of times a second (file IO, network IO, synchronization over a collection, etc)

When you put those two things together, you find yourself allocating large numbers of objects for something that was only rarely actually asynchronous (so: when there wasn't data available in the socket, or the file buffer was empty, or the lock was contested). For some scenarios, there are pre-completed reusable task instances available (such as Task.CompletedTask, and inbuilt handling for some low integers), but this doesn't help if the return value is outside this very limited set. To help avoid the allocations in the general case, ValueTask[<T>] was born. A ValueTask[<T>] is a struct that implements the "awaitable" pattern (a duck-typed pattern, like foreach, but that's a story for another day), that essentially contains two fields:

  • a T if the value was known immediately (obviously not needed for the untyped ValueTask case)
  • a Task<T> if the value is pending and the answer depends on the result of the incomplete operation

That means that if the value is known now, no Task[<T>] (and no corresponding TaskCompletionSource<T>) ever needs to be allocated - we just throw back the struct, it gets unwrapped by the async pattern, and life is good. Only in the case where the operation is actually asynchronous does an object need to be allocated.

Now, there are three common views on what we should do with this:

  1. always expose Task[<T>], regardless of whether it is likely to be synchronous
  2. expose Task[<T>] if we know it will be async, expose ValueTask[<T>] if we think it may be synchronous
  3. always expose ValueTask[<T>]

Frankly, the only valid reason to use 1 is because your API surface was baked and fixed back before ValueTask[<T>] existed.

The choice between 2 and 3 is interesting; what we're actually talking about there is an implementation detail, so a good case could be argued for 3, allowing you to amaze yourself later if you find a way of doing something synchronously (where it was previously asynchronous), without breaking the API. I went for 2 in the code shown, but it would be something I'd be willing to change without much prodding.

You should also note that there is actually a fourth option: use custom awaitables (meaning: a custom type that implements the "awaitable" duck-typed pattern). This is an advanced topic, and needs very careful consideration. I'm not even going to give examples of how to do that, but it is worth noting that ReadAsync and FlushAsync ("pipelines" methods that we've used extensively here) do return custom awaitables. You'd need to really, really understand your reasons before going down that path, though.


I spotted a bug in your "next message number" code

Yes, the code shown in the post can generate two messages with id 1, after 4-billion-something messages:

messageId = ++_nextMessageId;
if (messageId == 0) messageId = 1;

Note that I didn't increment _nextMessageId when I dodged the sentinel (zero). There's also a very small chance that a previous message from 4-billion-ago still hasn't been replied to. Both of these are fixed in the "real" code.


You might be leaking your lease around the TrySetResult

In the original blog code, I had

tcs?.TrySetResult(payload.Lease());

If tcs is not null (via the "Elvis operator"), this allocates a lease and then invokes TrySetResult. However, TrySetResult can return false - meaning: it couldn't do that, because the underlying task was already completed in some other way (perhaps we added timeout code). The only time we should consider that we have successfully transferred ownership of the lease to the task is if it returns true. The real code fixes this, ensuring that it is disposed in all cases except where TrySetResult returns true.


What about incremental frame parsers?

In my discussion of handling the frame, I was using an approach that processed a frame either in it's entirety, or not at all. This is not the only option, and you can consume any amount of the frame that you want, as long as you write code to track the internal state. For example, if you are parsing http, you could parse the http headers into some container as long as you have at least one entire http header name/value pair (without requiring all the headers to start parsing). Similarly, you could consume some of the payload (perhaps writing what you have so far to disk). In both cases, you would simply need to Advance past the bit that you consider consumed, and update your own state object.

So yes, that is absolutely possible - even highly desirable in some cases. In some cases it is highly desirable not to start until you've got everything. Remember that parsing often means taking the data from a streamed representation, and pushing it into a model representation - you might actually need more memory for the model representation (especially if the source data is compressed or simply stored in a dense binary format). An advantage of incremental parsing is that when the last few bytes dribble in, you might already have done most of the parsing work - allowing you to overlap pre-processing the data with data receive - rather than "buffer, buffer, buffer; right - now start parsing it".

However, in the case I was discussing: the header was 8 bytes, so there's not much point trying to over-optimize; if we don't have an entire header now, we'll mostly likely have a complete header when the next packet arrives, or we'll never have an entire header. Likewise, because we want to hand the entire payload to the consumer as a single chunk, we need all the data. We could actually lease the target array as soon as we know the size, and start copying data into that buffer and releasing the source buffers. We're not actually gaining much by this - we're simply exchanging data in one buffer for the same amount of data in another buffer; but we're actually exposing ourselves to an attack vector: a malicious (or buggy) client can sent a message-header that claims to be sending a large amount of data (maybe 1GiB), then just ... keeps the socket open and doesn't send anything more. In this scenario, the client has sent almost nothing (maybe just 8 bytes!), but they've chewed up a lot of server memory. Now imagine they do this from 10, 100, or 1000 parallel connections - and you can see how they've achieved disproportionate harm to our server, for almost zero cost at the client. There are two pragmatic fixes for this:

  • Put an upper limit on the message size, and put an upper limit on the connections from a single endpoint
  • Make the client pay their dues: if they claim to be sending a large message (which may indeed have legitimate uses): don't lease any expensive resources until they've actually sent that much (which is what the code as-implemented achieves)

Emphasis: your choice of frame parsing strategy is entirely contextual, and you can play with other implementations.


So; that's the amendments. I hope they are useful. A huge "thanks" to the people who are keeping me honest here, including Shane GrĂ¼ling, David Fowler, and Nick Craver.

Sunday, 29 July 2018

Pipe Dreams, part 3

Pipelines - a guided tour of the new IO API in .NET, part 3

Update: please also see part 3.1 for further clarifications on this post

Sorry, it has been longer than anticipated since part 2 (also: part 1). A large part of the reason for that is that I've been trying to think how best to explain some of the inner guts of StackExchange.Redis in a way that makes it easy to understand, and is useful for someone trying to learn about "pipelines", not StackExchange.Redis. I've also been thinking on ways to feed more practical "pipelines" usage guidance into the mix, which was something that came up a lot in feedback to parts 1/2.

In the end, I decided that the best thing to do was to step back from StackExchange.Redis, and use a completely different example, but one that faces almost all of the same challenges.

So, with your kind permission, I'd like to deviate from our previously advertised agenda, and instead talk about a library by my colleague David Haney - SimplSockets. What I hope to convey is a range of both the reasoning behind prefering pipelines, but also practical guidance that the reader can directly transfer to their own IO-based needs. In particular, I hope to discuss:

  • different ways to pass chunks of data between APIs
  • working effectively with the array-pool
  • async/await optimization in the context of libraries
  • practical real-world examples of writing to and reading from pipelines
  • how to connect pipelines client and server types to the network
  • performance comparisons from pipelines, and tips on measuring performance

I'll be walking through a lot of code here, but I'll also be making the "real" code available for further exploration; this also includes some things I dodn't have time to cover here, such as how to host a pipelines server inside the Kestrel server.

Sound good?

What is SimplSockets?

This is a network helper library designed to make it easier to implement basic client/server network comms over a socket:

  • it implements a simple framing protocol to separate messages
  • it allows for concurrent usage over a single client, with a message queuing mechanism
  • it embeds additional data in the framing data to allow responses to be tied back to requests, to complete operations
  • out-of-order and out-of-band replies are allowed - you might send requests A, B, C - and get the responses A, C, D, B - i.e. two of the responses came in the opposite order (presumably B took longer to execute), and D came from the server unsolicited (broadcasts, etc)
  • individual messages are always complete in a single frame - there is no frame splitting
  • in terms of API surface: everything is synchronous and byte[] based; for example the client has a byte[] SendReceive(byte[]) method that sends a payload and blocks until the corresponding response is received, and there is a MessageReceived event for unsolicited messages that exposes a byte[]
  • the server takes incoming requests via the same MessageReceived event, and can (if required, not always) post replies via a Reply(byte[], ...) method that also takes the incoming message (for pairing) - and has a Broadcast(byte[]) method for sending a message to all clients
  • there are some other nuances like heartbeats, but; that's probably enough

So; we've probably got enough there to start talking about real-world - and very common - scenarios in network code, and we can use that to start thinking about how "pipelines" makes our life easier.

Also an important point: anything I say below is not meant to be critical of SimplSockets - rather, it is to acknowledge that it was written when a lot of pieces like "pipelines" and async/await didn't exist - so it is more an exploration into how we could implement this differently with today's tools.

First things first: we need to think about our exchange types

The first question I have here is - for received messages in particular: "how should we expose this data to consumers?". By this I mean: SimplSockets went with byte[] as the exchange type; can we improve on that? Unsurprisingly: yes. There are many approaches we can use here.

  1. at one extreme, we can stick with byte[] - i.e. allocate a standalone copy of the data, that we can hand to the user; simple to work with, and very safe (nobody else sees that array - no risk of confusion), but it comes at the cost of allocations and copy time.
  2. at the other extreme, we can use zero-copy - and stick with ReadOnlySequence<byte> - this means we're consuming the non-contiguous buffers in the pipe itself; this is fast, but somewhat limiting - we can't hand that out, because once we Advance the pipe: that data is going to be recycled. This might be a good option for strictly controlled server-side processing (where the data never escapes the request context)
  3. as an extension of 2, we could move the payload parsing code into the library (based on the live ReadOnlySequence<byte>), just exposing the deconstructed data, perhaps using custom structs that map to the scenario; efficient, but requires lots more knowledge of the contents than a general message-passing API allows; this might be a good option if you can pair the library with a serializer that accepts input as ReadOnlySequence<byte>, though - allowing the serializer to work on the data without any copies
  4. we could return a Memory<byte> to a copy of the data, perhaps using an oversized byte[] from the ArrayPool<byte>.Shared pool; but it isn't necessarily obvious to the consumer that they should return it to the pool (and indeed: getting a T[] array back from a Memory<T> is an advanced and "unsafe" operation - not all Memory<T> is based on T[] - so we really shouldn't encourage users to try)
  5. we could compromise by returning something that provides a Memory<byte> (or Span<byte> etc), but which makes it very obvious via a well-known API that the user is meant to do something when they're done with it - i.e. IDisposable / using - and have the exchange-type itself return things to the pool when Dispose() is called

In the context of a general purpose messaging API, I think that 5 is a reasonable option - it means the caller can store the data for some period while they work with it, without jamming the pipe, while still allowing us to make good use of the array pool. And if someone forgets the using, it is less efficient, but nothing will actually explode - it just means it'll tend to run a bit more like option 1. But: this decision of exchange types needs careful consideration for your scenario. The StackExchange.Redis client uses option 3, handing out deconstructed data; I also have a fake redis server using the StackExchange.Redis framing code, which uses option 2 - never allowing live escape a request context. You need to take time in considering your exchange types, because it is basically impossible to change this later!

As a pro tip for option 2 (using live ReadOnlySequence<byte> data and not letting it escape the context - zero-copy for maxiumum efficiency), one way to guarantee this is to wrap the buffer in a domain-specific ref struct before handing it to the code that needs to consume it. It is impossible to store a ref struct, which includes holding onto it in an async/await context, and includes basic reflection (since that requires "boxing", and you cannot "box" a ref struct) - so you have confidence that when the method completes, they no longer have indirect access to the data.

But, let's assume we're happy with option 5 (for this specific scenario - there is no general "here's the option you should use", except: not 1 if you can help it). What might that look like? It turns out that this intent is already desribed in the framework, as System.Buffers.IMemoryOwner<T>:

public interface IMemoryOwner<T> : IDisposable
{
    Memory<T> Memory { get; }
}

We can then implement this to put our leased arrays back into the array-pool when disposed, taking care to be thread-safe so that if it is disposed twice, we don't put the array into the pool twice (very bad):

private sealed class ArrayPoolOwner<T> : IMemoryOwner<T>
{
    private readonly int _length;
    private T[] _oversized;

    internal ArrayPoolOwner(T[] oversized, int length)
    {
        _length = length;
        _oversized = oversized;                
    }

    public Memory<T> Memory => new Memory<T>(GetArray(), 0, _length);

    private T[] GetArray() =>
        Interlocked.CompareExchange(ref _oversized, null, null)
        ?? throw new ObjectDisposedException(ToString());

    public void Dispose()
    {
        var arr = Interlocked.Exchange(ref _oversized, null);
        if (arr != null) ArrayPool<T>.Shared.Return(arr);
    }
}

The key point here is in Dispose(), where it swaps out the array field (using Interlocked.Exchange), and puts the array back into the pool. Once we've done this, subsequent calls to .Memory will fail, and calls to Dispose() will do nothing.

Some important things to know about the array pool:

  1. the arrays it gives you are often oversized (so that it can give you a larger array if it doesn't have one in exactly your size, but it has a larger one ready to go). This means we need to track the expected length (_length), and use that when constructing .Memory.
  2. the array is not zeroed upon fetch - it can contain garbage. In our case, this isn't a problem because (below) we are immediately going to overwrite it with the data we want to represent, so the external caller will never see this, but in the general case, you might want to consider a: should I zero the contents on behalf of the receiver before giving it to them?, and b: is my data sensitive such that I don't want to accidentally leak it into the pool? (there is an existing "zero when returning to the pool" option in the array-pool, for this reason)

As a side note, I wonder whether the above concept might be a worthy addition inside the framework itself, for usage directly from ArrayPool<T> - i.e. a method like IMemoryOwner<T> RentOwned(int length) alongside T[] Rent(int minimumLength) - perhaps with the additions of flags for "zero upon fetch" and "zero upon return".

The idea here is that passing an IMemoryOwner<T> expresses a transfer of ownership, so a typical usage might be:

void DoSomethingWith(IMemoryOwner<byte> data)
{
    using (data)
    {
        // ... other things here ...
        DoTheThing(data.Memory);
    }
    // ... more things here ...
}

The caller doesn't need to know about the implementation details (array-pool, etc). Note that we still have to allocate a small object to represent this, but this is still hugely preferable to allocating a large byte[] buffer each time, for our safety.

As a caveat, we should note that a badly written consumer could store the .Memory somewhere, which would lead to undefined behaviour after it has been disposed; or they could use MemoryMarshal to get an array from the memory. If we really needed to prevent these problems, we could do so by implementing a custom MemoryManager<T> (most likely, by making ArrayPoolOwner<T> : MemoryManager<T>, since MemoryManager<T> : IMemoryOwner<T>). We could then make .Span fail just like .Memory does above, and we could prevent MemoryMarshal from being able to obtain the underlying array. It is almost certainly overkill here, but it is useful to know that this option exists, for more extreme scenarios.

At this point you're probably thinking "wow, Marc, you're really over-thinking this - just give them the data", but: getting the exchange types right is probably the single most important design decision you have to make, so: this bit matters!

OK, so how would we populate this? Fortunately, that is pretty simple, as ReadOnlySequence<T> has a very handy CopyTo method that does all the heavy lifting:

public static IMemoryOwner<T> Lease<T>(
    this ReadOnlySequence<T> source)
{
    if (source.IsEmpty) return Empty<T>();

    int len = checked((int)source.Length);
    var arr = ArrayPool<T>.Shared.Rent(len);
    source.CopyTo(arr);
    return new ArrayPoolOwner<T>(arr, len);
}

This shows how we can use ArrayPool<T> to obtain a (possibly oversized) array that we can use to hold a copy of the data; once we've copied it, we can hand the copy to a consumer to use however they need (and being a flat vector here makes it simple to consume), while the network code can advance the pipe and discard / re-use the buffers. When they Dispose() it, it goes back in the pool, and everyone is happy.

Starting the base API

There is a lot of overlap in the code between a client and server; both need thread-safe mechanisms to write data, and both need some kind of read-loop to check for received data; but what happens is different. So - it sounds like a a base-class might be useful; let's start with a skeleton API that let's us hand in a pipe (or two: recall that an IDuplexPipe is actually the ends of two different pipes - .Input and .Output):

public abstract class SimplPipeline : IDisposable
{
    private IDuplexPipe _pipe;
    protected SimplPipeline(IDuplexPipe pipe)
        => _pipe = pipe;

    public void Dispose() => Close();
    public void Close() {/* burn the pipe*/}
}

The first thing we need after this is some mechanism to send a message in a thread-safe way that doesn't block the caller unduly. The way SimplSockets handles this (and also how StackExchange.Redis v1 works) is to have a message queue of messages that have not yet been written. When the caller calls Send, the messages is added to the queue (synchronized, etc), and will at some point be dequeued and written to the socket. This helps with perceived performance and can help avoid packet fragmentation in some scenarios, but:

  • it has a lot of moving parts
  • it duplicates something that "pipelines" already provides

For the latter, specifically: the pipe is the queue; meaning: we already have a buffer of data between the actual output. Adding a second queue is just duplicating this and retaining complexity, so: the second major design change we can make is: throw away the unsent queue; just write to the pipe (synchronized, etc), and let the pipe worry about the rest. One slight consequence of this is that the v1 code had a concept of prioritising messages that are expecting a reply - essentially queue-jumping. By treating the pipe as the outbound queue we lose this ability, but in reality this is unlikely to make a huge difference, so I'm happy to lose it. For very similar reasons, StackExchange.Redis v2 loses the concept of CommandFlags.HighPriority, which is this exact same queue-jumping idea. I'm not concerned by this.

We also need to consider the shape of this API, to allow a server or client to add a messagee

  • we don't necessarily want to be synchronous; we don't need to block while waiting to access to write to the pipe, or while waiting for a response from the server
  • we might want to expose alternate APIs for whether the caller is simply giving us memory to write (ReadOnlyMember<byte>), or giving us owneship of the data, for us to clean up when we've written it (IMemoryOwner<byte>)
  • let's assume that write and read are decoupled - we don't want to worry about the issues of response messages here

So; putting that together, I quite like:

protected async ValueTask WriteAsync(
    IMemoryOwner<byte> payload, int messageId)
{
    using (payload)
    {
        await WriteAsync(payload.Memory, messageId);
    }
}
protected ValueTask WriteAsync(
    ReadOnlyMemory<byte> payload, int messageId);

Here we're giving the caller the conveninence of passing us either an IMemoryOwner<byte> (which we then dispose correctly), or a ReadOnlyMemory<byte> if they don't need to convery ownership.

The ValueTask makes sense because a write to a pipe is often synchronous; we probably won't be contested for the single-writer access, and the only async part of writing to a pipe is flushing if the pipe is backed up (flushing is very often always synchronous). The messageId is the additional metadata in the frame header that lets us pair replies later. We'll worry about what it is in a bit.

Writes and wrongs

So; let's implement that. The first thing we need is guaranteed single-writer access. It would be tempting to use a lock, but lock doesn't play well with async (even if you don't screw it up). Because the flush may be async, the continuation could come back on another thread, so we need an async-compatible locking primitive; SemaphoreSlim should suffice.

Next, I'm going to go off on one of my wild tangents. Premise:

In general, application code should be optimized for readability; library code should be optimized for performance.

You may or may not agree with this, but it is the general guidance that I code by. What I mean by this is that library code tends to have a single focused purpose, often being maintained by someone whose experience may be "deep but not necessarily wide"; your mind is focusing on that one area, and it is OK to go to bizarre lengths to optimize the code. Conversely, application code tends to involve a lot more plumbing of different concepts - "wide but not necessarily deep" (the depth being hidden in the various libraries). Application code often has more complex and unpredictable interactions, so the focus should be on maintainable and "obviously right".

Basically, my point here is that I tend to focus a lot on optimizations that you wouldn't normally put into application code, because I know from experience and extensive benchmarking that they really matter. So... I'm going to do some things that might look odd, and I want you to take that journey with me.

Let's start with the "obviously right" implementation:

private readonly SemaphoreSlim _singleWriter
    = new SemaphoreSlim(1);
protected async ValueTask WriteAsync(
    ReadOnlyMemory<byte> payload, int messageId)
{
    await _singleWriter.WaitAsync();
    try
    {
        WriteFrameHeader(writer, payload.Length, messageId);
        await writer.WriteAsync(payload);
    }
    finally
    {
        _singleWriter.Release();
    }
}

This awaits single-writer access to the pipe, writes the frame header using WriteFrameHeader (which we'll show in a bit), then drops the payload using the framework-provided WriteAsync method, noting that this includes the FlushAsync as well. There's nothing wrong with this code, but... it does involve unnecessary state machine plumbing in the most likely case - i.e. where everything completes synchronously (the writer is not contested, and the pipe is not backed up). We can tweak this code by asking:

  • can I get the single-writer access uncontested?
  • was the flush synchronous?

Consider, instead - making the method we just wrote private and renaming it to WriteAsyncSlowPath, and adding a non-async method instead:

protected ValueTask WriteAsync(
    ReadOnlyMemory<byte> payload, int messageId)
{
    // try to get the conch; if not, switch to async
    if (!_singleWriter.Wait(0))
        return WriteAsyncSlowPath(payload, messageId);
    bool release = true;
    try
    {
        WriteFrameHeader(writer, payload.Length, messageId);
        var write = writer.WriteAsync(payload);
        if (write.IsCompletedSuccessfully) return default;
        release = false;
        return AwaitFlushAndRelease(write);
    }
    finally
    {
        if (release) _singleWriter.Release();
    }
}
async ValueTask AwaitFlushAndRelease(
    ValueTask<FlushResult> flush)
{
    try { await flush; }
    finally { _singleWriter.Release(); }
}

The Wait(0) returns true if and only if we can take the semaphore synchronously without delay. If we can't: all bets are off, just switch to the async version. Note once you've gone async, there's no point doing any more of these "hot path" checks - you've already built a state machine (and probably boxed it): the meal is already paid for, so you might as well sit and eat.

However, if we do get the semaphore for free, we can continue and do our writing for free. The header is synchronous anyway, so our next decision is: did the flush complete synchronously? If it did (IsCompletedSuccessfully), we're done - away we go (return default;). Otherwise, we'll need to await the flush. Now, we can't do that from our non-async method, but we can write a separate method (AwaitFlushAndRelease) that takes our incomplete flush, and awaits it. In particular, note that we only want the semaphore to be released after the flush has completed, hence the Release() in our helper method. This is also why we set release to false in the calling method, so it doesn't get released prematurely.

We can apply similar techniques to most async operations if we know they're going to often be synchronous, and it is a pattern you may wish to consider. Emphasis: it doesn't help you at all if the result is usually or always genuinely asynchronous - so: don't over-apply it.


Right; so - how do we write the header? What is the header? SimplSockets defines the header to be 8 bytes composed of two little-endian 32-bit integers. The first 4 bytes contains the payload length in bytes; the second 4 bytes is the messageId used to correlate requests and responses. Writing this is remarkably simple:

void WriteFrameHeader(PipeWriter writer, int length, int messageId)
{
    var span = writer.GetSpan(8);
    BinaryPrimitives.WriteInt32LittleEndian(
        span, length);
    BinaryPrimitives.WriteInt32LittleEndian(
        span.Slice(4), messageId);
    writer.Advance(8);
}

You can ask a PipeWriter for "reasonable" sized buffers with confidence, and 8 bytes is certainly a reasonable size. The helpful framework-provided BinaryPrimitives type provides explicit-endian tools, perfect for network code. The first call writes length to the first 4 bytes of the span. After that, we need to Slice the span so that the second call writes to the next 4 bytes - and finally we call Advance(8) which commits our header to the pipe without flushing it. Normally, you might have to write lots of pieces manually, then call FlushAsync explicitly, but this particular protocol is a good fit for simply calling WriteAsync on the pipe to attach the payload, which includes the flush. So; putting those pieces together, we've successfully written our message to the pipe.

Using that from a client

We have a WriteAsync method in the base class; now let's add a concrete client class and start hooking pieces together. Consider:

public class SimplPipelineClient : SimplPipeline
{
    public async Task<IMemoryOwner<byte>> SendReceiveAsync(ReadOnlyMemory<byte> message)
    {
        var tcs = new TaskCompletionSource<IMemoryOwner<byte>>();
        int messageId;
        lock (_awaitingResponses)
        {
            messageId = ++_nextMessageId;
            if (messageId == 0) messageId = 1;
            _awaitingResponses.Add(messageId, tcs);
        }
        await WriteAsync(message, messageId);
        return await tcs.Task;
    }
    public async Task<IMemoryOwner<byte>> SendReceiveAsync(IMemoryOwner<byte> message)
    {
        using (message)
        {
            return await SendReceiveAsync(message.Memory);
        }
    }
}

where _awaitingResponses is a dictionary of int message-ids to TaskCompletionSource<IMemoryOwner<byte>>. This code invents a new messageId (avoiding zero, which we'll use as a sentinel value), and creates a TaskCompletionSource<T> to represent our in-progress operation. Since this definitely will involve network access, there's no benefit in exposing it as ValueTask<T>, so this works well. Once we've added our placeholder for catching the reply we write our message (always do book-keeping first, to avoid race conditions). Finally, expose the incomplete task to the caller.

Note that I've implemented this the "obvious" way, but we can optimize this like we did previously, by checking if WriteAsync completed synchronously and simply returning the tcs.Task without awaiting it. Note also that SimplSockets used the calling thread-id as the message-id; this works fine in a blocking scenario, but it isn't viable when we're using async - but: the number is opaque to the "other end" anyway - all it has to do is return the same number.

Programmed to receive

That's pretty-much it for write; next we need to think about receive. As mentioned in the previous posts, there's almost always a receive loop - especially if we need to support out-of-band and out-of-order messages (so: we can't just read one frame immediately after writing). A basic read loop can be approximated by:

protected async Task StartReceiveLoopAsync(
   CancellationToken cancellationToken = default)
{
   try
   {
       while (!cancellationToken.IsCancellationRequested)
       {
           var readResult = await reader.ReadAsync(cancellationToken);
           if (readResult.IsCanceled) break;

           var buffer = readResult.Buffer;

           var makingProgress = false;
           while (TryParseFrame(ref buffer, out var payload, out var messageId))
           {
               makingProgress = true;
               await OnReceiveAsync(payload, messageId);
           }
           reader.AdvanceTo(buffer.Start, buffer.End);
           if (!makingProgress && readResult.IsCompleted) break;
       }
       try { reader.Complete(); } catch { }
   }
   catch (Exception ex)
   {
       try { reader.Complete(ex); } catch { }
   }
}
protected abstract ValueTask OnReceiveAsync(
   ReadOnlySequence<byte> payload, int messageId);

Note: since we are bound to have an async delay at some point (probably immediately), we might as well just jump straight to an "obvoious" async implementation - we'll gain nothing from trying to be clever here. Key points to observe:

  • we get data from the pipe (note that we might want to also consider TryRead here, but only if we are making progress - otherwise we could find ourselves in a hot loop)
  • read (TryParseFrame) and process (OnReceiveAsync) as many frames as we can
  • advance the reader to report our progress, noting that TryParseFrame will have updated buffer.Start, and since we're actively reading as many frames as we can, it is true to say that we've "inspected" to buffer.End
  • keep in mind that the pipelines code is dealing with all the back-buffer concerns re data that we haven't consumed yet (usually a significant amount of code repeated in lots of libraries)
  • check for exit conditions - if we aren't progressing and the pipe won't get any more data, we're done
  • report when we've finished reading - through success or failure

Unsurprisingly, TryParseFrame is largely the reverse of WriteAsync:

private bool TryParseFrame(
    ref ReadOnlySequence<byte> input,
    out ReadOnlySequence<byte> payload, out int messageId)
{
    if (input.Length < 8)
    {   // not enough data for the header
        payload = default;
        messageId = default;
        return false;
    }

    int length;
    if (input.First.Length >= 8)
    {   // already 8 bytes in the first segment
        length = ParseFrameHeader(
            input.First.Span, out messageId);
    }
    else
    {   // copy 8 bytes into a local span
        Span<byte> local = stackalloc byte[8];
        input.Slice(0, 8).CopyTo(local);
        length = ParseFrameHeader(
            local, out messageId);
    }

    // do we have the "length" bytes?
    if (input.Length < length + 8)
    {
        payload = default;
        return false;
    }

    // success!
    payload = input.Slice(8, length);
    input = input.Slice(payload.End);
    return true;
}

First we check whether we have enough data for the frame header (8 bytes); if we don't have that - we certainly don't have a frame. Once we know we have enough bytes for the frame header, we can parse it out to find the payload length. This is a little subtle, because we need to recall that ReadOnlySequence<byte> can be discontiguous multiple buffers. Since we're only talking about 8 bytes, the simplest thing to do is:

  • check whether the first segment has 8 bytes; if so, parse from that
  • otherwise, stackalloc a span (note that this doesn't need unsafe), copy 8 bytes from input into that, and parse from there.

Once we know how much payload we're expecting, we can check whether we have that too; if we don't: cede back to the read loop. But if we do:

  • our actual payload is the length bytes after the header - i.e. input.Slice(8, length)
  • we want to update input by cutting off everything up to the end of the frame, i.e. input = input.Slice(payload.End)

This means that when we return true, payload now contains the bytes that were sent to us, as a discontiguous buffer.

We should also take a look at ParseFrameHeader, which is a close cousin to WriteFrameHeader:

static int ParseFrameHeader(
    ReadOnlySpan<byte> input, out int messageId)
{
    var length = BinaryPrimitives
            .ReadInt32LittleEndian(input);
    messageId = BinaryPrimitives
            .ReadInt32LittleEndian(input.Slice(4));
    return length;
}

Once again, BinaryPrimitives is helping us out, and we are slicing the input in exactly the same way as before to get the two halves.


So; we can parse frames; now we need to act upon them; here's our client implementation:

protected override ValueTask OnReceiveAsync(
    ReadOnlySequence<byte> payload, int messageId)
{
    if (messageId != 0)
    {   // request/response
        TaskCompletionSource<IMemoryOwner<byte>> tcs;
        lock (_awaitingResponses)
        {
            if (_awaitingResponses.TryGetValue(messageId, out tcs))
            {
                _awaitingResponses.Remove(messageId);
            }
        }
        tcs?.TrySetResult(payload.Lease());
    }
    else
    {   // unsolicited
        MessageReceived?.Invoke(payload.Lease());
    }
    return default;
}

This code has two paths; it can be the request/response scenario, or it can be an out-of-band response message with no request. So; if we have a non-zero messageId, we check (synchronized) in our _awaitingResponses dictionary to see if we have a message awaiting completion. If we do, we use TrySetResult to complete the task (after exiting the lock), giving it a lease with the data from the message. Otherwise, we check whether the MessageReceived event is subscribed, and invoke that similarly. In both cases, the use of ?. here means that we don't populate a leased array if nobody is listening. It will be the receiver's job to ensure the lease is disposed, as only they can know the lifetime.

Service, please

We need to think a little about how we orchestrate this at the server. The SimplPipeline base type above relates to a single connection - it is essentially a proxy to a socket. But servers usually have many clients. Because of that, we'll create a server type that does the actual processing, that internally has a client-type that is our SimplPipeline, and a set of connected clients; so:

public abstract class SimplPipelineServer : IDisposable
{
    protected abstract ValueTask<IMemoryOwner<byte>> 
        OnReceiveForReplyAsync(IMemoryOwner<byte> message);
    
    public int ClientCount => _clients.Count;
    public Task RunClientAsync(IDuplexPipe pipe,
        CancellationToken cancellationToken = default)
        => new Client(pipe, this).RunAsync(cancellationToken);
    
    private class Client : SimplPipeline
    {
        public Task RunAsync(CancellationToken cancellationToken)
            => StartReceiveLoopAsync(cancellationToken);

        private readonly SimplPipelineServer _server;
        public Client(IDuplexPipe pipe, SimplPipelineServer server)
            : base(pipe) => _server = server;

        protected override async ValueTask OnReceiveAsync(
            ReadOnlySequence<byte> payload, int messageId)
        {
            using (var msg = payload.Lease())
            {
                var response = await _server.OnReceiveForReplyAsync(msg);
                await WriteAsync(response, messageId);
            }
        }
    }
}

So; our publicly visible server type, SimplPipelineServer has an abstract method for providing the implementation for what we want to do with messages: OnReceiveForReplyAsync - that takes a payload, and returns the response. Behind the scenes we have a set of clients, _clients, although the details of that aren't interesting.

We accept new clients via the RunClientAsync method; this might seem counter-intuitive, but the emerging architecture for pipelines servers (especially considering "Kestrel" hosts) is to let an external host deal with listening and accepting connections, and all we need to do is have something that accepts an IDuplexPipe and returns a Task. In this case, what that does is create a new Client and start the client's read loop, StartReceiveLoopAsync. When the client receives a message (OnReceiveAsync), it asks the server for a response (_server.OnReceiveForReplyAsync), and then writes that response back via WriteAsync. Note that the version of OnReceiveAsync shown has the consequence of meaning that we can't handle multiple overlapped messages on the same connection at the same time; the "real" version has been aggressively uglified, to check whether _server.OnReceiveForReplyAsync(msg) has completed synchronously; if it hasn't, then it schedules a continuation to perform the WriteAsync (also handling the disposal of msg), and yields to the caller. It also optimizes for the "everything is synchronous" case.

The only other server API we need is a broadcast:

public async ValueTask<int> BroadcastAsync(
    ReadOnlyMemory<byte> message)
{
    int count = 0;
    foreach (var client in _clients)
    {
        try
        {
            await client.Key.SendAsync(message);
            count++;
        }
        catch { } // ignore failures on specific clients
    }
    return count;
}

(again, possibly with an overload that takes IMemoryOwner<byte>)

where SendAsync is simply:

public ValueTask SendAsync(ReadOnlyMemory<byte> message)
    => WriteAsync(message, 0);

Putting it all together; implementing a client and server

So how can we use all of this? How can we get a working client and server? Let's start with the simpler of the two, the client:

using (var client = await SimplPipelineClient.ConnectAsync(
    new IPEndPoint(IPAddress.Loopback, 5000)))
{
    // subscribe to broadcasts
    client.MessageReceived += async msg => {
        if (!msg.Memory.IsEmpty)
            await WriteLineAsync('*', msg);
    };

    string line;
    while ((line = await Console.In.ReadLineAsync()) != null)
    {
        if (line == "q") break;

        using (var leased = line.Encode())
        {
            var response = await client.SendReceiveAsync(leased.Memory);
            await WriteLineAsync('<', response);
        }     
    }
}

SimplPipelineClient.ConnectAsync here just uses Pipelines.Sockets.Unofficial to spin up a client socket pipeline, and starts the StartReceiveLoopAsync() method. Taking an additional dependency on Pipelines.Sockets.Unofficial is vexing, but right now there is no framework-supplied client-socket API for pipelines, so: it'll do the job.

This code sets up a simple console client that takes keyboard input; if it receives a "q" it quits; otherwise it sends the message to the server (Encode, not shown, is just a simple text-encode into a leased buffer), and writes the response. The WriteLineAsync method here takes a leased buffer, decodes it, and writes the output to the console - then disposes the buffer. We also listen for unsolicited messages via MessageReceived, and write those to the console with a different prefix.

The server is a little more involved; first we need to implement a server; in this case let's simply reverse the bytes we get:

class ReverseServer : SimplPipelineServer
{
    protected override ValueTask<IMemoryOwner<byte>>
        OnReceiveForReplyAsync(IMemoryOwner<byte> message)
    {
        // since the "message" outlives the response write,
        // we can do an in-place reverse and hand
        // the same buffer back
        var memory = message.Memory;
        Reverse(memory.Span); // details not shown
        return new ValueTask<IMemoryOwner<byte>>(memory);
    }
}

All this does is respond to messages by returning the same payload, but backwards. And yes, I realize that since we're dealing with text, this could go horribly wrong for grapheme-clusters and/or multi-byte code-points! I never said it was a useful server...

Next up, we need a host. Kestrel (the "ASP.NET Core" server) is an excellent choice there, but implementing a Kestrel host requires introducing quite a few more concepts. But... since we already took a dependency on Pipelines.Sockets.Unofficial for the client, we can use that for the server host with a few lines of code:

class SimplPipelineSocketServer : SocketServer
{
    public SimplPipelineServer Server { get; }

    public SimplPipelineSocketServer(SimplPipelineServer server)
        => Server = server;

    protected override Task OnClientConnectedAsync(
        in ClientConnection client)
        => Server.RunClientAsync(client.Transport);

    public static SimplPipelineSocketServer For<T>()
        where T : SimplPipelineServer, new()
        => new SimplPipelineSocketServer(new T());

    protected override void Dispose(bool disposing)
    {
        if (disposing) Server.Dispose();
    }
}

The key line in here is our OnClientConnectedAsync method, which is how we accept new connections, simply by passing down the client.Transport (an IDuplexPipe). Hosting in Kestrel works very similarly, except you subclass ConnectionHandler instead of SocketServer, and override the OnConnectedAsync method - but there are a few more steps involved in plumbing everything together. Kestrel, however, has advantages such as supporting exotic socket APIs.

So, let's whack together a console that interacts with the server:

using (var socket =
    SimplPipelineSocketServer.For<ReverseServer>())
{
    socket.Listen(new IPEndPoint(IPAddress.Loopback, 5000));
    
    string line;
    while ((line = await Console.In.ReadLineAsync()) != null)
    {
        if (line == "q") break;

        int clientCount, len;
        using (var leased = line.Encode())
        {
            len = leased.Memory.Length;
            clientCount = await socket.Server.BroadcastAsync(leased.Memory);
        }
        await Console.Out.WriteLineAsync(
            $"Broadcast {len} bytes to {clientCount} clients");
    }
}

This works much like the client, except any input other than "q" is broadcast to all the clients.

Now race your horses

We're not just doing this for fun! The key obective of things like pipelines and the array-pool is that it makes it much simpler to write IO code that makes efficient use of memory; reducing allocations (and especially reducing large object allocations) signficantly reduces garbage collection overhead, allowing our code to be much more scalable (useful for both servers, and high-throughput client scenarios). Our use of async/await makes it much simpler to make effective use of the CPU: instead of blocking for a while, we can make the thread available to do other useful work - increasing throughput, and once again: reducing memory usage (having lots of threads is not cheap - each thread has a quite significant stack space reserved for it).

Note that this isn't entirely free; fetching arrays from the pool (and remembering to return them) by itself has some overhead - but the general expectation is that the cost of checking the pool is, overall, lower than the cost associated from constant allocations and collections. Similarly, async: the hope is that the increased scalability afforded by freeing up threads more-than-offsets the cost of the additional work required by the plumbing involved.

But: there's only one way to find out. As Eric Lippert puts it:

If you have two horses and you want to know which of the two is the faster then race your horses

Setting up a good race-track for code can be awkward, because we need to try to reproduce a meaningful scenario. And it is amazingly easy to write bad performnce tests. Rather than reinvent bad code, it is hugely adviseable to lean on tools like BenchmarkDotNet. If you are even remotely performance minded, and you haven't used BenchmarkDotNet: sorry, but you're doing it wrong.

There are 4 combinations we can check here:

  • SimplSocketClient against SimplSocketServer
  • SimplSocketClient against SimplPipelineServer
  • SimplPipelineClient against SimplSocketServer
  • SimplPipelineClient against SimplPipelineServer

I won't list all of these, but for these tests I'll use a [GlobalSetup] method (a BenchmarkDotNet concept) to spin up both servers (on different ports), then we can test clients against each. Here's our "SimplSocketClient against SimplSocketServer" test (remembering that SimplSocketClient is synchronous):

[Benchmark(OperationsPerInvoke = Ops)]
public long c1_s1()
{
    long x = 0;
    using (var client = new SimplSocketClient(CreateSocket))
    {
        client.Connect(s1);
        for (int i = 0; i < Ops; i++)
        {
            var response = client.SendReceive(_data);
            x += response.Length;
        }
    }
    return AssertResult(x);
}

and here's our "SimplPipelineClient against SimplPipelineServer" test (using a Task this time, as SimplPipelineClient uses an async API):

[Benchmark(OperationsPerInvoke = Ops)]
public async Task<long> c2_s2()
{
    long x = 0;
    using (var client =
        await SimplPipelineClient.ConnectAsync(s2))
    {
        for (int i = 0; i < Ops; i++)
        {
            using (var response =
                await client.SendReceiveAsync(_data))
            {
                x += response.Memory.Length;
            }
        }
    }
    return AssertResult(x);
}

Note that we're performing multiple operations (Ops) per run here, so we're not just measing overheads like connect. Other than that, we'll just let BenchmarkDotNet do the hard work. We run our tests, and we get (after some time; benchmarking isn't always fast, although you can make suggestions on the iterations etc to speed it up if you want):

Method Runtime Mean Error StdDev Gen 0 Gen 1
c1_s1 Clr NA NA NA N/A N/A
c1_s2 Clr NA NA NA N/A N/A
c2_s1 Clr NA NA NA N/A N/A
c2_s2 Clr 45.99us 0.4275us 0.2544us 0.3636 0.0909
c1_s1 Core NA NA NA N/A N/A
c1_s2 Core NA NA NA N/A N/A
c2_s1 Core NA NA NA N/A N/A
c2_s2 Core 29.87us 0.2294us 0.1518us 0.1250 -

Now, you're probaly looking at that table and thinking "huh? most of the data is missing - how can interpret that?" - and: you wouldn't be wrong! It turns out that the c1 (SimplSocketClient) and s1 (SimplSocketServer) implementations are simply unreliable. Ultimately, it was painfully hard to write reliable socket code before pipelines, and it looks like the legacy implementation simply has bugs and race conditions that don't show up in casual usage (it works fine in the REPL client), but which manifest pretty quickly when BenchmarkDotNet runs it aggressively. Our "pipelines" implementation simply used the "obvious" thing, and it works reliably first time. All of the complex pieces that IO authors previously had to worry about have now moved to the framework code, which enables programmers to focus on the interesting thing that they're trying to do (rather than spending most of their time fighting with IO intrinsics), and benefit from a reliable well-tested implementation of the ugly IO code.

A major advantage of moving to pipelines is getting rid of the gnarly IO bugs that you didn't even know you had.

I will be more than happy to update this table with updated numbers if SimplSockets can find the things that are stalling it.

Of the numbers that we do have, we can see that it behaves well on Clr (.NET Framework) but works much better on Core (.NET Core). .NET Core 2.1 is frankly amazing (and 3.0 looks even better) - with lots of advantages. If you're serious about performance, migrating to .NET Core should definitely be on your roadmap.

Summary

This has been a long read, but I hope I've conveyed some useful practical advice and tips for working with pipelines in real systems, in a way that is directly translatable to your own requirements. If you want to play with the code in more depth, or see it in action, you can see my fork here.

Update: please also see part 3.1 for further clarifications on this post

Enjoy!