Skip to content

CSHARP-5608: CSOT: Command Execution #1716

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public void Authenticate(IConnection connection, ConnectionDescription descripti
try
{
var protocol = CreateAuthenticateProtocol();
protocol.Execute(connection, cancellationToken);
// TODO: CSOT: implement operationContext support for Auth.
protocol.Execute(new OperationContext(Timeout.InfiniteTimeSpan, cancellationToken), connection);
}
catch (MongoCommandException ex)
{
Expand All @@ -79,7 +80,8 @@ public async Task AuthenticateAsync(IConnection connection, ConnectionDescriptio
try
{
var protocol = CreateAuthenticateProtocol();
await protocol.ExecuteAsync(connection, cancellationToken).ConfigureAwait(false);
// TODO: CSOT: implement operationContext support for Auth.
await protocol.ExecuteAsync(new OperationContext(Timeout.InfiniteTimeSpan, cancellationToken), connection).ConfigureAwait(false);
}
catch (MongoCommandException ex)
{
Expand Down
6 changes: 4 additions & 2 deletions src/MongoDB.Driver/Authentication/SaslAuthenticator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ public void Authenticate(IConnection connection, ConnectionDescription descripti
try
{
var protocol = CreateCommandProtocol(command);
result = protocol.Execute(connection, cancellationToken);
// TODO: CSOT: implement operationContext support for Auth.
result = protocol.Execute(new OperationContext(Timeout.InfiniteTimeSpan, cancellationToken), connection);
conversationId ??= result?.GetValue("conversationId").AsInt32;
}
catch (MongoException ex)
Expand Down Expand Up @@ -172,7 +173,8 @@ public async Task AuthenticateAsync(IConnection connection, ConnectionDescriptio
try
{
var protocol = CreateCommandProtocol(command);
result = await protocol.ExecuteAsync(connection, cancellationToken).ConfigureAwait(false);
// TODO: CSOT: implement operationContext support for Auth.
result = await protocol.ExecuteAsync(new OperationContext(Timeout.InfiniteTimeSpan, cancellationToken), connection).ConfigureAwait(false);
conversationId ??= result?.GetValue("conversationId").AsInt32;
}
catch (MongoException ex)
Expand Down
21 changes: 8 additions & 13 deletions src/MongoDB.Driver/Core/Bindings/ChannelChannelSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,31 +26,26 @@ internal sealed class ChannelChannelSource : IChannelSource
private readonly IChannelHandle _channel;
private bool _disposed;
private readonly IServer _server;
private readonly TimeSpan _roundTripTime;
private readonly ICoreSessionHandle _session;

// constructors
public ChannelChannelSource(IServer server, IChannelHandle channel, ICoreSessionHandle session)
public ChannelChannelSource(IServer server, TimeSpan roundTripTime, IChannelHandle channel, ICoreSessionHandle session)
{
_server = Ensure.IsNotNull(server, nameof(server));
_roundTripTime = Ensure.IsGreaterThanZero(roundTripTime, nameof(roundTripTime));
_channel = Ensure.IsNotNull(channel, nameof(channel));
_session = Ensure.IsNotNull(session, nameof(session));
}

// properties
public IServer Server
{
get { return _server; }
}
public IServer Server => _server;

public ServerDescription ServerDescription
{
get { return _server.Description; }
}
public ServerDescription ServerDescription => _server.Description;

public ICoreSessionHandle Session
{
get { return _session; }
}
public TimeSpan RoundTripTime => _roundTripTime;

public ICoreSessionHandle Session => _session;

// methods
public void Dispose()
Expand Down
16 changes: 6 additions & 10 deletions src/MongoDB.Driver/Core/Bindings/ChannelReadBinding.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,21 @@ internal sealed class ChannelReadBinding : IReadBinding
private bool _disposed;
private readonly ReadPreference _readPreference;
private readonly IServer _server;
private readonly TimeSpan _roundTripTime;
private readonly ICoreSessionHandle _session;

public ChannelReadBinding(IServer server, IChannelHandle channel, ReadPreference readPreference, ICoreSessionHandle session)
public ChannelReadBinding(IServer server, TimeSpan roundTripTime, IChannelHandle channel, ReadPreference readPreference, ICoreSessionHandle session)
{
_server = Ensure.IsNotNull(server, nameof(server));
_roundTripTime = Ensure.IsGreaterThanZero(roundTripTime, nameof(roundTripTime));
_channel = Ensure.IsNotNull(channel, nameof(channel));
_readPreference = Ensure.IsNotNull(readPreference, nameof(readPreference));
_session = Ensure.IsNotNull(session, nameof(session));
}

public ReadPreference ReadPreference
{
get { return _readPreference; }
}
public ReadPreference ReadPreference => _readPreference;

public ICoreSessionHandle Session
{
get { return _session; }
}
public ICoreSessionHandle Session => _session;

public void Dispose()
{
Expand Down Expand Up @@ -81,7 +77,7 @@ public Task<IChannelSourceHandle> GetReadChannelSourceAsync(OperationContext ope

private IChannelSourceHandle GetReadChannelSourceHelper()
{
return new ChannelSourceHandle(new ChannelChannelSource(_server, _channel.Fork(), _session.Fork()));
return new ChannelSourceHandle(new ChannelChannelSource(_server, _roundTripTime, _channel.Fork(), _session.Fork()));
}

private void ThrowIfDisposed()
Expand Down
16 changes: 6 additions & 10 deletions src/MongoDB.Driver/Core/Bindings/ChannelReadWriteBinding.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,20 @@ internal sealed class ChannelReadWriteBinding : IReadWriteBinding
private readonly IChannelHandle _channel;
private bool _disposed;
private readonly IServer _server;
private readonly TimeSpan _serverRoundTripTime;
private readonly ICoreSessionHandle _session;

public ChannelReadWriteBinding(IServer server, IChannelHandle channel, ICoreSessionHandle session)
public ChannelReadWriteBinding(IServer server, TimeSpan roundTripTime, IChannelHandle channel, ICoreSessionHandle session)
{
_server = Ensure.IsNotNull(server, nameof(server));
_serverRoundTripTime = Ensure.IsGreaterThanZero(roundTripTime, nameof(roundTripTime));
_channel = Ensure.IsNotNull(channel, nameof(channel));
_session = Ensure.IsNotNull(session, nameof(session));
}

public ReadPreference ReadPreference
{
get { return ReadPreference.Primary; }
}
public ReadPreference ReadPreference => ReadPreference.Primary;

public ICoreSessionHandle Session
{
get { return _session; }
}
public ICoreSessionHandle Session => _session;

public void Dispose()
{
Expand Down Expand Up @@ -121,7 +117,7 @@ public Task<IChannelSourceHandle> GetWriteChannelSourceAsync(OperationContext op

private IChannelSourceHandle GetChannelSourceHelper()
{
return new ChannelSourceHandle(new ChannelChannelSource(_server, _channel.Fork(), _session.Fork()));
return new ChannelSourceHandle(new ChannelChannelSource(_server, _serverRoundTripTime, _channel.Fork(), _session.Fork()));
}

private void ThrowIfDisposed()
Expand Down
18 changes: 5 additions & 13 deletions src/MongoDB.Driver/Core/Bindings/ChannelSourceHandle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,13 @@ private ChannelSourceHandle(ReferenceCounted<IChannelSource> reference)
}

// properties
public IServer Server
{
get { return _reference.Instance.Server; }
}
public IServer Server => _reference.Instance.Server;

public ServerDescription ServerDescription
{
get { return _reference.Instance.ServerDescription; }
}
public ServerDescription ServerDescription => _reference.Instance.ServerDescription;

public ICoreSessionHandle Session
{
get { return _reference.Instance.Session; }
}
public TimeSpan RoundTripTime => _reference.Instance.RoundTripTime;

public ICoreSessionHandle Session => _reference.Instance.Session;

// methods
public IChannelHandle GetChannel(OperationContext operationContext)
Expand All @@ -72,7 +65,6 @@ public void Dispose()
{
_reference.DecrementReferenceCount();
_disposed = true;
GC.SuppressFinalize(this);
}
}

Expand Down
22 changes: 13 additions & 9 deletions src/MongoDB.Driver/Core/Bindings/CoreTransaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* limitations under the License.
*/

using System;
using MongoDB.Bson;
using MongoDB.Driver.Core.Servers;

Expand All @@ -27,6 +28,7 @@ public class CoreTransaction
private bool _isEmpty;
private IChannelHandle _pinnedChannel = null;
private IServer _pinnedServer;
private TimeSpan _pinnedServerRoundTripTime;
private BsonDocument _recoveryToken;
private CoreTransactionState _state;
private readonly long _transactionNumber;
Expand Down Expand Up @@ -64,10 +66,7 @@ public CoreTransaction(long transactionNumber, TransactionOptions transactionOpt
/// </value>
public CoreTransactionState State => _state;

internal IChannelHandle PinnedChannel
{
get => _pinnedChannel;
}
internal IChannelHandle PinnedChannel => _pinnedChannel;

/// <summary>
/// Gets or sets pinned server for the current transaction.
Expand All @@ -76,11 +75,9 @@ internal IChannelHandle PinnedChannel
/// <value>
/// The pinned server for the current transaction.
/// </value>
internal IServer PinnedServer
{
get => _pinnedServer;
set => _pinnedServer = value;
}
internal IServer PinnedServer => _pinnedServer;

internal TimeSpan PinnedServerRoundTripTime => _pinnedServerRoundTripTime;

/// <summary>
/// Gets the transaction number.
Expand Down Expand Up @@ -120,6 +117,12 @@ internal void PinChannel(IChannelHandle channel)
}
}

internal void PinServer(IServer server, TimeSpan roundTripTime)
{
_pinnedServer = server;
_pinnedServerRoundTripTime = roundTripTime;
}

internal void SetState(CoreTransactionState state)
{
_state = state;
Expand All @@ -135,6 +138,7 @@ internal void UnpinAll()
{
_pinnedChannel?.Dispose();
_pinnedChannel = null;
_pinnedServerRoundTripTime = default;
_pinnedServer = null;
}
}
Expand Down
1 change: 0 additions & 1 deletion src/MongoDB.Driver/Core/Bindings/IBinding.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using MongoDB.Driver.Core.Servers;

Expand Down
10 changes: 5 additions & 5 deletions src/MongoDB.Driver/Core/Bindings/IChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using MongoDB.Bson;
using MongoDB.Bson.IO;
Expand All @@ -31,8 +30,10 @@ internal interface IChannel : IDisposable
{
IConnectionHandle Connection { get; }
ConnectionDescription ConnectionDescription { get; }
TimeSpan RoundTripTimeout { get; }

TResult Command<TResult>(
OperationContext operationContext,
ICoreSession session,
ReadPreference readPreference,
DatabaseNamespace databaseNamespace,
Expand All @@ -43,10 +44,10 @@ TResult Command<TResult>(
Action<IMessageEncoderPostProcessor> postWriteAction,
CommandResponseHandling responseHandling,
IBsonSerializer<TResult> resultSerializer,
MessageEncoderSettings messageEncoderSettings,
CancellationToken cancellationToken);
MessageEncoderSettings messageEncoderSettings);

Task<TResult> CommandAsync<TResult>(
OperationContext operationContext,
ICoreSession session,
ReadPreference readPreference,
DatabaseNamespace databaseNamespace,
Expand All @@ -57,8 +58,7 @@ Task<TResult> CommandAsync<TResult>(
Action<IMessageEncoderPostProcessor> postWriteAction,
CommandResponseHandling responseHandling,
IBsonSerializer<TResult> resultSerializer,
MessageEncoderSettings messageEncoderSettings,
CancellationToken cancellationToken);
MessageEncoderSettings messageEncoderSettings);
}

internal interface IChannelHandle : IChannel
Expand Down
1 change: 1 addition & 0 deletions src/MongoDB.Driver/Core/Bindings/IChannelSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ internal interface IChannelSource : IDisposable
{
IServer Server { get; }
ServerDescription ServerDescription { get; }
TimeSpan RoundTripTime { get; }
ICoreSessionHandle Session { get; }

IChannelHandle GetChannel(OperationContext operationContext);
Expand Down
14 changes: 4 additions & 10 deletions src/MongoDB.Driver/Core/Bindings/ReadPreferenceBinding.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,9 @@ public ReadPreferenceBinding(IClusterInternal cluster, ReadPreference readPrefer
_serverSelector = new ReadPreferenceServerSelector(readPreference);
}

public ReadPreference ReadPreference
{
get { return _readPreference; }
}
public ReadPreference ReadPreference => _readPreference;

public ICoreSessionHandle Session
{
get { return _session; }
}
public ICoreSessionHandle Session => _session;

public IChannelSourceHandle GetReadChannelSource(OperationContext operationContext)
{
Expand All @@ -75,9 +69,9 @@ public async Task<IChannelSourceHandle> GetReadChannelSourceAsync(OperationConte
return GetChannelSourceHelper(server);
}

private IChannelSourceHandle GetChannelSourceHelper(IServer server)
private IChannelSourceHandle GetChannelSourceHelper((IServer Server, TimeSpan RoundTripTime) server)
{
return new ChannelSourceHandle(new ServerChannelSource(server, _session.Fork()));
return new ChannelSourceHandle(new ServerChannelSource(server.Server, server.RoundTripTime, _session.Fork()));
}

public void Dispose()
Expand Down
Loading