Because I'm quite naive regarding C#'s Task Asynchronous Programming and concurrency in general, and because it's so difficult to test, I'm concerned about the safety of this code. It's likely that those topics, or something even more esoteric to me (e.g. processor-specific issues like volatility rules), caused me to overlook something obvious to you. Please help me understand what I missed here!
Intent
Given a Func<T> that does not support concurrency, we can safely initate the function concurrently, and receive Task<T>. Concurrent initiations are "joined" to one of two iterations of the function; the active iteration, or the single pending iteration. The caller may disallow joining an ongoing iteration, to avoid inconsistent outcomes.
Contract
- This ExclusiveFuncclass requires aFunc<T>, which is set at construction and cannot be changed.
- ExclusiveFunchas one public instance method,- Task<T> Run(bool joinLate = false), used to initiate, or "trigger", the- Func<T>.
- ExclusiveFuncis safe for multi-threaded use. Multiple instances of- Func<T>will never run concurrently from any concurrent calls to- Run. When proxied through- ExclusiveFunc.Run, the- Func<T>is protected from self-concurrency issues that might be caused by scheduled tasks or other concurrent use.
- There will also never be more than one pending Func<T>. That is, there will at most be oneFunc<T>running, and one waiting to run. TheFunc<T>must be implemented such that the side-effects and return value of any single future iteration is sufficient for all pending callers.
- Callers receive Task<T>for the iteration they have joined, whether joining the running or the pending operation.Tmust be safe for concurrent use if concurrent callers will use the result.
- Because "joining a run late" may result in unacceptable dirty / stale information or missed processing, the default operation when Func<T>is already running is to instead set one iteration ofFunc<T>to run when the active operation is complete. In this default operation, the caller is guaranteed thatFunc<T>will begin some time after the request was made.
- Optionally, the caller may joinLateif knowledge of an ongoing iteration (including its return value and side effects) is sufficient, and initiating an additional iteration is not required.
- ExclusiveFunc.Runneed not be awaited, if a "fire and forget" implementation is required. However, this implementation is not ideal for heavy usage as such, since every call to- Runwill internally- awaitfor the result on its own thread.
- All callers joined on any iteration of Func<T>will receive exceptions thrown by the specific iteration they await.
- Awaiting callers will not wait unnecessarily (such as for subsequent runs), and specifically a queued request will not be forced onto the thread that was running Func<T>at the moment it was queued, but instead will run on the thread that queued it.
- The ExclusiveFuncclass is not vulnerable to internal transition race conditions. It will not halt ifFunc<T>throws or the queue runs dry.
Specific Concerns
- Are there cross-platform issues caused by the lock-less implementation? For example, shouldqueuebe markedvolatile?
- Are there exception-handling problems? For example, the caller does not receive exceptions, or exceptions crash the queue.
- Are there defects in where the work happens that cause unexpected results? For example, the a Task never completes because its thread also runs pending requests.
- Is the lack of Task methods options a problem? For example, does this cause incorrect SynchronizationContext?
- Is this likely to cause a thread pool problem? For example, using twice as many threads as the caller expects?
- Is there a problem with using one Task to synchronize multiple threads in this way?
Code
using System;
using System.Threading;
using System.Threading.Tasks;
public class ExclusiveFunc<T>
{
    public ExclusiveFunc(Func<T> func)
    {
        queue = new Queue(() => {
            try {
                return func();
            }
            finally {
                queue = queue.Next();
            }
        });
    }
    public async Task<T> Run(bool joinLate = false)
    {
        var a = queue;
        if (a.Current.Acquire())
        {
            a.Current.Acquired.Start();
            return await a.Current.Acquired;
        }
        if (joinLate) {
            return await a.Current.Acquired;
        }
        if (a.Pending.Acquire()) {
            await a.Current.Acquired;
            a.Pending.Acquired.Start();
        }
        return await a.Pending.Acquired;
    }
    private Queue queue;
    private class Queue
    {
        public readonly State Current;
        public readonly State Pending;
        public Queue(Func<T> func) : this(func, new State(func)) { }
        public Queue Next()
        {
            return new Queue(func, Pending);
        }
        private readonly Func<T> func;
        private Queue(Func<T> func, State pending)
        {
            this.func = func;
            Current = pending;
            Pending = new State(func);
        }
    }
    private class State
    {
        public Task<T> Acquired;
        public State(Func<T> func)
        {
            this.func = func;
        }
        public bool Acquire()
        {
            return Interlocked.CompareExchange(ref Acquired, new Task<T>(func), null) == null;
        }
        private readonly Func<T> func;
    }
}
Tests
using System.Threading;
using System.Threading.Tasks;
using Xunit;
public class ExclusiveIncrementer {
    private int locked = 0;
    private int count = 0;
    public int Slow() {
        Assert.Equal(0, Interlocked.Exchange(ref locked, 1));
        Thread.Sleep(100);
        Assert.Equal(1, Interlocked.Exchange(ref locked, 0));
        return Interlocked.Increment(ref count);
    }
}
public class ExclusiveFuncTest_WithoutThreads {
    protected delegate Task<int> RunEf(bool joinLate = false);
    protected virtual RunEf GetRun() {
        return new ExclusiveFunc<int>(new ExclusiveIncrementer().Slow).Run;
    }
    [Fact]
    public async Task ConcurrentRequestCanJoinOngoing() {
        var run = GetRun();
        var master = run();
        var slave = run(true);
        Assert.Equal(1, await master);
        Assert.Equal(1, await slave);
    }
    [Fact]
    public async Task ConcurrentRequestCanQueueIfOngoing() {
        var run = GetRun();
        var immediate = run();
        var queued = run();
        Assert.Equal(1, await immediate);
        Assert.Equal(2, await queued);
    }
    [Fact]
    public async Task ProceedsAfterQueueEmpty() {
        var run = GetRun();
        var first = run();
        Assert.Equal(1, await first);
        var second = run();
        Assert.Equal(2, await second);
    }
    [Fact]
    public async Task FireAndForgetCompletes() {
        var run = GetRun();
        var first = run();
        var second = run();
        Assert.Equal(2, await second);
    }
    [Fact]
    public async Task OrderDeterminedByCallNotAwait() {
        var run = GetRun();
        var first = run();
        var second = run();
        Assert.Equal(2, await second);
        Assert.Equal(1, await first);
    }
    [Fact]
    public async Task MultiplePendingShareOperation() {
        var run = GetRun();
        var blocking = run();
        var firstPending = run();
        var secondPending = run();
        Assert.Equal(2, await firstPending);
        Assert.Equal(2, await secondPending);
    }
    [Fact]
    public async Task JoinWillStartIfRequired() {
        var run = GetRun();
        var only = run(true);
        Assert.Equal(1, await only);
    }
}
public class ExclusiveFuncTest_WithThreads : ExclusiveFuncTest_WithoutThreads {
    protected override RunEf GetRun() {
        var run = base.GetRun();
        return runThread;
        Task<int> runThread(bool joinLate = false) {
            // We enforce order with Sleep, to allow human-readable test outcomes
            Thread.Sleep(30);
            return Task.Run(() => run(joinLate));
        }
    }
}
Background (not the main question, but ofc comments welcome):
I would like to separate the locking logic from several schedulable tasks in my system (e.g. scheduled conference call setups, due e-mails). While unlikely, very closely-scheduled tasks may run concurrently. Rather than artificially restricting the scheduling resolution to an arbitrary "almost certainly safe" value, I want to ensure there is at most one running. The caller may determine whether joining an ongoing run is sufficient or not.
I understand domain-specific idempotency/synchronization is typically preferable.
Related: (for Node.js) Concurrent fire and forget async task queue

ContinueWithand then used its task to chain the next one and so on...? Alternatively, wouldn't theConcurrentQueue<T>be here easier to use? \$\endgroup\$