Skip to content
Prev Previous commit
Next Next commit
Switch to ConcurrentStack
  • Loading branch information
MihaZupan committed Mar 21, 2024
commit 5db7ecd6c021fd5f73a49ae610cd3a3e5ab7c6ab
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Buffers;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
Expand Down Expand Up @@ -69,8 +70,8 @@ internal sealed class HttpConnectionPool : IDisposable

// HTTP/1.1 connection pool

/// <summary>Queue of currently available HTTP/1.1 connections stored in the pool.</summary>
private readonly ConcurrentQueue<HttpConnection> _http11Connections = new();
/// <summary>Stack of currently available HTTP/1.1 connections stored in the pool.</summary>
private readonly ConcurrentStack<HttpConnection> _http11Connections = new();
/// <summary>Controls whether we can use a fast path when returning connections to the pool and skip calling into <see cref="ProcessHttp11RequestQueue(HttpConnection?)"/>.</summary>
private bool _http11RequestQueueIsEmptyAndNotDisposed;
/// <summary>The maximum number of HTTP/1.1 connections allowed to be associated with the pool.</summary>
Expand Down Expand Up @@ -559,7 +560,7 @@ private void CheckForHttp11ConnectionInjection()
/// <summary>
/// This method is called:
/// <br/>- When returning a connection and observing that the request queue is not empty (<see cref="_http11RequestQueueIsEmptyAndNotDisposed"/> is <see langword="false"/>).
/// <br/>- After adding a request to the queue if we fail to obtain a connection from the queue.
/// <br/>- After adding a request to the queue if we fail to obtain a connection from <see cref="_http11Connections"/>.
/// <br/>- After scavenging or disposing the pool to ensure that any pending requests are handled or connections disposed.
/// <para>The method will attempt to match one request from the <see cref="_http11RequestQueue"/> to an available connection.
/// The <paramref name="connection"/> can either be provided as an argument (when returning a connection to the pool), or one will be rented from <see cref="_http11Connections"/>.
Expand All @@ -579,7 +580,7 @@ private void ProcessHttp11RequestQueue(HttpConnection? connection)
lock (SyncObj)
{
#if DEBUG
// Other threads may still interact with the connections queue. Read the count once to keep the assert message accurate.
// Other threads may still interact with the connections stack. Read the count once to keep the assert message accurate.
int connectionCount = _http11Connections.Count;
Debug.Assert(_associatedHttp11ConnectionCount >= connectionCount + _pendingHttp11ConnectionCount,
$"Expected {_associatedHttp11ConnectionCount} >= {connectionCount} + {_pendingHttp11ConnectionCount}");
Expand All @@ -591,7 +592,7 @@ private void ProcessHttp11RequestQueue(HttpConnection? connection)

if (_http11RequestQueue.Count != 0)
{
if (connection is not null || _http11Connections.TryDequeue(out connection))
if (connection is not null || _http11Connections.TryPop(out connection))
{
// TryDequeueWaiter will prune completed requests from the head of the queue,
// so it's possible for it to return false even though we checked that Count != 0.
Expand Down Expand Up @@ -623,7 +624,11 @@ private void ProcessHttp11RequestQueue(HttpConnection? connection)
// and set the _http11RequestQueueIsEmptyAndNotDisposed flag to false, followed by multiple
// returning connections observing the flag and calling into this method before we clear the flag.
// This should be a relatively rare case, so the added contention should be minimal.
_http11Connections.Enqueue(connection);
_http11Connections.Push(connection);
}
else
{
CheckForHttp11ConnectionInjection();
}

break;
Expand All @@ -649,7 +654,7 @@ private void ProcessHttp11RequestQueue(HttpConnection? connection)
{
// The pool is being disposed and there are no more requests to handle.
// Clean up any idle connections still waiting in the pool.
while (_http11Connections.TryDequeue(out connection))
while (_http11Connections.TryPop(out connection))
{
connection.Dispose();
}
Expand All @@ -658,7 +663,7 @@ private void ProcessHttp11RequestQueue(HttpConnection? connection)

private bool TryGetPooledHttp11Connection(HttpRequestMessage request, bool async, [NotNullWhen(true)] out HttpConnection? connection, [NotNullWhen(false)] out HttpConnectionWaiter<HttpConnection>? waiter)
{
while (_http11Connections.TryDequeue(out connection))
while (_http11Connections.TryPop(out connection))
{
if (CheckExpirationOnGet(connection))
{
Expand All @@ -684,10 +689,11 @@ private bool TryGetPooledHttp11Connection(HttpRequestMessage request, bool async

waiter = new HttpConnectionWaiter<HttpConnection>();

// Technically this block under the lock could be a part of ProcessHttp11RequestQueue to avoid taking the lock twice.
// It is kept separate to simplify that method (avoid extra arguments that are only relevant for this caller).
lock (SyncObj)
{
_http11RequestQueue.EnqueueRequest(request, waiter);
CheckForHttp11ConnectionInjection();

// Disable the fast path and force connections returned to the pool to check the request queue first.
_http11RequestQueueIsEmptyAndNotDisposed = false;
Expand Down Expand Up @@ -1941,7 +1947,7 @@ public void InvalidateHttp11Connection(HttpConnection connection, bool disposing
lock (SyncObj)
{
Debug.Assert(_associatedHttp11ConnectionCount > 0);
Debug.Assert(!disposing || Array.IndexOf([.. _http11Connections], connection) < 0);
Debug.Assert(!disposing || Array.IndexOf(_http11Connections.ToArray(), connection) < 0);

_associatedHttp11ConnectionCount--;

Expand Down Expand Up @@ -2042,11 +2048,11 @@ private void ReturnHttp11Connection(HttpConnection connection)
// The fast path when there are enough connections and no pending requests
// is that we'll see _http11RequestQueueIsEmptyAndNotDisposed being true both
// times, and all we'll have to do as part of returning the connection is
// an enqueue call on the concurrent queue.
// a Push call on the concurrent stack.

if (Volatile.Read(ref _http11RequestQueueIsEmptyAndNotDisposed))
{
_http11Connections.Enqueue(connection);
_http11Connections.Push(connection);

// When we add a connection to the pool, we must ensure that there are
// either no pending requests waiting, or that _something_ will pair those
Expand All @@ -2070,7 +2076,7 @@ private void ReturnHttp11Connection(HttpConnection connection)
// ProcessHttp11RequestQueue is responsible for handing the connection to a pending request,
// or to return it back to the pool if there aren't any.

// We hand over the connection directly instead of enqueuing it first to ensure
// We hand over the connection directly instead of pushing it on the stack first to ensure
// that pending requests are processed in a fair (FIFO) order.
ProcessHttp11RequestQueue(connection);
}
Expand Down Expand Up @@ -2369,7 +2375,7 @@ public bool CleanCacheAndDisposeIfUnused()
// will be purged next time around.
_usedSinceLastCleanup = false;

ScavengeConnectionQueue(_http11Connections, ref toDispose, nowTicks, pooledConnectionLifetime, pooledConnectionIdleTimeout);
ScavengeConnectionStack(this, _http11Connections, ref toDispose, nowTicks, pooledConnectionLifetime, pooledConnectionIdleTimeout);

if (_availableHttp2Connections is not null)
{
Expand All @@ -2389,31 +2395,48 @@ public bool CleanCacheAndDisposeIfUnused()
CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
}

// We were interacting with the connections queue.
// Make sure any waiting requests that saw no connections in a race still get processed.
ProcessHttp11RequestQueue(null);

// Pool is active. Should not be removed.
return false;

static void ScavengeConnectionQueue(ConcurrentQueue<HttpConnection> connections, ref List<HttpConnectionBase>? toDispose, long nowTicks, TimeSpan pooledConnectionLifetime, TimeSpan pooledConnectionIdleTimeout)
static void ScavengeConnectionStack(HttpConnectionPool pool, ConcurrentStack<HttpConnection> connections, ref List<HttpConnectionBase>? toDispose, long nowTicks, TimeSpan pooledConnectionLifetime, TimeSpan pooledConnectionIdleTimeout)
{
// We can't simply enumerate the connections queue as other threads may still be adding and removing entries.
// If we want to check the state of a connection, we must dequeue it first to ensure we own it.
int initialCount = connections.Count;
// We can't simply enumerate the connections stack as other threads may still be adding and removing entries.
// If we want to check the state of a connection, we must take it from the stack first to ensure we own it.

while (--initialCount >= 0 && connections.TryDequeue(out HttpConnection? connection))
// We're about to starve the connection pool of all available connections for a moment.
// We must be holding the lock while doing so to ensure that any new requests that
// come in during this time will be blocked waiting in ProcessHttp11RequestQueue.
// If this were not the case, requests would repeatedly call into CheckForHttp11ConnectionInjection
// and trigger new connection attempts, even if we have enough connections in our copy.
Debug.Assert(pool.HasSyncObjLock);
Debug.Assert(connections.Count <= pool._associatedHttp11ConnectionCount);

HttpConnection[] stackCopy = ArrayPool<HttpConnection>.Shared.Rent(pool._associatedHttp11ConnectionCount);
int usableConnections = 0;

while (connections.TryPop(out HttpConnection? connection))
{
if (IsUsableConnection(connection, nowTicks, pooledConnectionLifetime, pooledConnectionIdleTimeout))
{
connections.Enqueue(connection);
stackCopy[usableConnections++] = connection;
}
else
{
toDispose ??= new List<HttpConnectionBase>();
toDispose.Add(connection);
}
}

if (usableConnections > 0)
{
// Add them back in reverse to maintain the LIFO order.
Span<HttpConnection> usable = stackCopy.AsSpan(0, usableConnections);
usable.Reverse();
connections.PushRange(stackCopy, 0, usableConnections);
usable.Clear();
}

ArrayPool<HttpConnection>.Shared.Return(stackCopy);
}

static int ScavengeConnectionList<T>(List<T> list, ref List<HttpConnectionBase>? toDispose, long nowTicks, TimeSpan pooledConnectionLifetime, TimeSpan pooledConnectionIdleTimeout)
Expand Down