3

I have a scenario where multiple threads are sending data over a single socket. A unique ID has been inserted into the message and the unique ID is echoed back in the response message. Everything works great when the socket is isolated to a single client (as expected). Now I'm looking for an async / await pattern for multiple threads where the client waits for a specific response.

Some code to demonstrate:

using (var ns = new NetworkStream(_socket))
{
    byte[] requestBuffer = GetBuffer(request);
    await ns.WriteAsync(requestBuffer, 0, request.Length);
    byte[] responseBuffer = await ReceiveMessageAsync(ns);

    // process response message
}

The above example does not work in a multithreaded scenario because messages can come back in any order, so the next message off the wire may not belong to the current client. My thought was that the client would register a delegate or Task using its unique ID (storing that in a Dictionary) and when a message came back with that unique ID, the delegate or task would be 'completed' with the response bytes. I'm guessing this would be fairly easy to implement with an EventHandler but I'm looking for a way to await the response.

For example:

using (var ns = new CustomNetworkStream(_socket))
{
    Task waitTask = ns.RegisterTask(this.UniqueId);

    byte[] requestBuffer = GetBuffer(request);
    await ns.WriteAsync(requestBuffer, 0, request.Length);

    byte[] responseBuffer = await waitTask;

    // process response message
}

I don't know what the "RegisterTask" method would look like, how to store the task, how to make the task 'wait' and later 'complete' it with Task as the Result.

Any ideas? I've researched Toub's Async Coordination Primitives series but I'm not certain if that's the right approach or not.

5
  • 1
    I think all you would need is TaskCompletionSource<T>. However, multithreaded asynchronous APIs are extremely complex for socket communications, far more than your sample code shows. I'd recommend using a higher-level abstraction such as SignalR. Commented Jan 27, 2015 at 15:40
  • The networking part of my library Griffin.Framework has built in support for that (with an async client). griffinframework.net Commented Jan 27, 2015 at 15:57
  • Thanks @jgauffin, I'll have a look at it. Commented Jan 27, 2015 at 16:34
  • "stream" implies TCP, but "multiple clients on single socket" implies UDP. Either way, I would say that while you might be able to get this to work, you have a more fundamental design issue here. The question is very difficult to fully understand without the detail that would explain how you got yourself into this situation. Commented Jan 28, 2015 at 5:06
  • @PeterDuniho - I'm creating a persistent socket and allowing multiple threads to send / receive data over the connection. Commented Jan 28, 2015 at 13:38

2 Answers 2

3

So you will want to wrap all of this into a new class, because you're going to need to share state between the places that you read and the places that you write.

Every time you go and write to the stream you'll need to accept the unique ID, and add an entry into a lookup that maps the id to a TaskCompletionSource. The write method can then return the Task from that TCS.

You can then have a separate reader that just sits there reading from your stream, finds the dictionary entry associated with the ID of that response, and sets its result.

public class MyNetworkStream : IDisposable
{
    private NetworkStream stream;
    private ConcurrentDictionary<int, TaskCompletionSource<byte[]>> lookup =
        new ConcurrentDictionary<int, TaskCompletionSource<byte[]>>();
    private CancellationTokenSource disposalCTS = new CancellationTokenSource();
    public MyNetworkStream(Socket socket)
    {
        stream = new NetworkStream(socket);
        KeepReading();
    }
    public void Dispose()
    {
        disposalCTS.Cancel();
        stream.Dispose();
    }

    public Task<byte[]> WriteAndWaitAsync(byte[] buffer, int offset, 
        int count, int uniqueID)
    {
        var tcs = lookup.GetOrAdd(uniqueID, new TaskCompletionSource<byte[]>());
        stream.WriteAsync(buffer, offset, count);
        return tcs.Task;
    }

    private async void KeepReading()
    {
        try
        {
            //TODO figure out what you want for a buffer size so that you can 
            //read a block of the appropriate size.
            byte[] buffer = null;
            while (!disposalCTS.IsCancellationRequested)
            {
                //TODO edit offset and count appropriately 
                await stream.ReadAsync(buffer, 0, 0, disposalCTS.Token);
                int id = GetUniqueIdFromBlock(buffer);
                TaskCompletionSource<byte[]> tcs;
                if (lookup.TryRemove(id, out tcs))
                    tcs.TrySetResult(buffer);
                else
                {
                    //TODO figure out what to do here
                }
            }
        }
        catch (Exception e)
        {
            foreach (var tcs in lookup.Values)
                tcs.TrySetException(e);
            Dispose();
            //TODO consider any other necessary cleanup
        }
    }

    private int GetUniqueIdFromBlock(byte[] buffer)
    {
        throw new NotImplementedException();
    }
}
Sign up to request clarification or add additional context in comments.

Comments

3

What you need is a TaskCompletionSource<byte[]> to use as a synchronization construct, a ConcurrentDictionary to map between an id and a TCS and a listener:

ConcurrentDictionary<UniqueId, TaskCompletionSource<byte[]>> _dictionary;
async Task Listen(CancellationToken token)
{
    while (!token.IsCancellationRequested)
    {
        using (var ns = new NetworkStream(_socket))
        {
            byte[] responseBuffer = await ReceiveMessageAsync(ns);
            var id = ExtractId(responseBuffer);
            TaskCompletionSource<byte[]> tcs;
            if (dictionary.TryRemove(id, out tcs))
            {
                tcs.SetResult(responseBuffer);
            }
            else
            {
                // error
            }
        }
    }
}

Task RegisterTask(UniqueId id)
{
    var tcs = new TaskCompletionSource<byte[]>();
    if (!_dictionary.TryAdd(id, tcs))
    {
        // error
    }
    return tcs.Task;
}

However, as Stephen Cleary suggested, you probably want to use an existing solution for that.

1 Comment

Looks like a reasonable solution. Normally, I would use a pub/sub or SignalR, but the request/response is all happening on the server as part of an outer request/response and I need to eliminate as many server hops as possible.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.