MySqlCloudStackCoreConnectionFactory:
public sealed class MySqlCloudStackCoreConnectionFactory : ICloudStackCoreConnectionFactory
{
private sealed record QTaskInfo<TDbItem>(Region Region, Task<int> CountTask, Task<IEnumerable<TDbItem>> DataTask);
private readonly CloudStackDBOptions _configuration;
private readonly ILogger<MySqlCloudStackCoreConnectionFactory> _logger;
private static readonly ConcurrentDictionary<Region, bool> _connections = new ConcurrentDictionary<Region, bool>();
public MySqlCloudStackCoreConnectionFactory(IOptions<CloudStackDBOptions> options, ILogger<MySqlCloudStackCoreConnectionFactory> logger)
{
_configuration = options.Value;
_logger = logger;
}
public DbConnection CreateConnection(Region region)
{
return new MySqlConnection(GetConnString(region));
}
public async Task<(int Count, TOut[] Data)> QuerySearchAsync<TDbItem, TOut>(IRegionRepository regionRepository, string fetchDataQuery, string? countDataQuery = null, Dictionary<string, string>? columnModelModel = null, Paging? paging = null, ColumnName[]? order = null, string[]? excludedOrder = null, Func<Region, TDbItem, TOut>? transform = null, object? param = null, Func<Region, bool>? filter = null, bool skipDbPaging = false, CancellationToken ct = default(CancellationToken)) where TOut : class
{
string[] excludedOrder2 = excludedOrder;
int total = 0;
List<TOut> response = new List<TOut>();
List<DbConnection> connections = new List<DbConnection>();
StringBuilder stringBuilder = new StringBuilder(fetchDataQuery);
if (order != null && excludedOrder2 != null && excludedOrder2.Length != 0)
{
order = order.Where((ColumnName o) => !excludedOrder2.Contains(o.Name)).ToArray();
}
if (order != null && order.Length != 0)
{
stringBuilder.Append(" ORDER BY");
ColumnName[] array = order;
foreach (ColumnName columnName in array)
{
string value = ((columnModelModel != null) ? columnModelModel[columnName.Name] : columnName.Name);
stringBuilder.Append(' ').Append(value).Append(columnName.Asc ? " ASC," : " DESC,");
}
stringBuilder.Remove(stringBuilder.Length - 1, 1);
}
if (paging != null && !skipDbPaging)
{
stringBuilder.Append(" LIMIT ").Append((paging.Page + 1) * paging.PageSize).Append(';');
}
fetchDataQuery = stringBuilder.ToString();
QTaskInfo<TDbItem>[] array2 = ParallelizeTask<TDbItem>(fetchDataQuery, countDataQuery, param, connections, await GetOnlineRegion(regionRepository, filter, ct));
Task[] array3 = new Task[2 * array2.Length];
for (int j = 0; j < array3.Length; j += 2)
{
int num = j / 2;
array3[j] = array2[num].CountTask;
array3[j + 1] = array2[num].DataTask;
}
Task.WaitAll(array3, ct);
ValueTask[] array4 = CloseConnection(connections);
for (int k = 0; k < array2.Length; k++)
{
var (region2, task3, task4) = array2[k];
try
{
IEnumerable<TDbItem> result2 = task4.Result;
int result3 = task3.Result;
total += result3;
foreach (TDbItem item in result2)
{
TOut val = ((transform != null) ? transform(region2, item) : null) ?? (item as TOut);
if (val != null)
{
response.Add(val);
}
}
}
catch (Exception exception)
{
regionRepository.SetStatus(region2, online: false);
_logger.LogError(exception, "Error request region: {Region}", region2);
}
}
IQueryable<TOut> result = response.AsQueryable().ApplySearch(paging, order);
_logger.LogInformation("Dispose all connection created");
ValueTask[] array5 = array4;
for (int l = 0; l < array5.Length; l++)
{
ValueTask valueTask = array5[l];
await valueTask;
}
TOut[] array6 = result.ToArray();
return ((countDataQuery == null) ? array6.Length : total, array6);
}
public async Task<int> ExecuteAsync(IRegionRepository regionRepository, string sql, object? @params = null, Func<Region, bool>? filter = null, CancellationToken ct = default(CancellationToken))
{
string sql2 = sql;
object params2 = @params;
Region[] array = await GetOnlineRegion(regionRepository, filter, ct);
List<DbConnection> connections = new List<DbConnection>();
Region[] array2 = new Region[array.Length];
Task<int>[] array3 = new Task<int>[array.Length];
for (int i = 0; i < array3.Length; i++)
{
Region region = array[i];
_logger.LogInformation("Creating connection for: {Region}", region);
DbConnection connection = CreateConnection(region);
Task<int> task = Task.Run(async delegate
{
try
{
_logger.LogDebug("Creating connection for: {Region}", region);
return await connection.ExecuteAsync(sql2, params2);
}
catch (Exception exception2)
{
_logger.LogWarning(exception2, "Error query {Region}", region);
return 0;
}
});
array3[i] = task;
array2[i] = region;
}
Task[] tasks = array3;
Task.WaitAll(tasks, ct);
ValueTask[] array4 = CloseConnection(connections);
int total = 0;
for (int j = 0; j < array3.Length; j++)
{
Task<int> task2 = array3[j];
Region region2 = array2[j];
try
{
int result = task2.Result;
total += result;
}
catch (Exception exception)
{
regionRepository.SetStatus(region2, online: false);
_logger.LogError(exception, "Error request region: {Region}", region2);
}
}
_logger.LogInformation("Dispose all connection created");
ValueTask[] array5 = array4;
for (int k = 0; k < array5.Length; k++)
{
ValueTask valueTask = array5[k];
await valueTask;
}
return total;
}
private static ValueTask[] CloseConnection(List<DbConnection> connections)
{
return connections.Select((DbConnection s) => s.DisposeAsync()).ToArray();
}
private string GetConnString(Region region)
{
return region switch
{
Region.Europe => _configuration.Europe,
Region.Asia => _configuration.Asia,
Region.NA => _configuration.NA,
Region.Lab => _configuration.Lab,
_ => throw new NotSupportedException("Region is not supported"),
};
}
private static async Task<Region[]> GetOnlineRegion(IRegionRepository regionRepository, Func<Region, bool>? filter = null, CancellationToken ct = default(CancellationToken))
{
Func<Region, bool> filter2 = filter;
return (from p in await regionRepository.GetOnlineAsync(ct)
where p != Region.Lab
where filter2?.Invoke(p) ?? true
select p).ToArray();
}
private QTaskInfo<TDbItem>[] ParallelizeTask<TDbItem>(string fetchDataQuery, string? countDataQuery, object? param, List<DbConnection> connections, Region[] onlineRegions)
{
string fetchDataQuery2 = fetchDataQuery;
object param2 = param;
string countDataQuery2 = countDataQuery;
QTaskInfo<TDbItem>[] array = new QTaskInfo<TDbItem>[onlineRegions.Length];
for (int i = 0; i < array.Length; i++)
{
Region region = onlineRegions[i];
_logger.LogInformation("Creating connection for: {Region}", region);
DbConnection dataConnection = CreateConnection(region);
if (!_connections.GetOrAdd(region, value: false))
{
lock (_connections)
{
if (!_connections.GetValueOrDefault(region))
{
dataConnection.Open();
_connections[region] = true;
}
}
}
Task<IEnumerable<TDbItem>> dataTask = Task.Run(async delegate
{
try
{
_logger.LogDebug("Run Query {Query} with {Args}", fetchDataQuery2, param2);
return await dataConnection.QueryAsync<TDbItem>(fetchDataQuery2, param2);
}
catch (Exception exception2)
{
_logger.LogWarning(exception2, "Error query {Region}", region);
return Array.Empty<TDbItem>();
}
});
Task<int> countTask;
if (!string.IsNullOrEmpty(countDataQuery2))
{
DbConnection countConnection = CreateConnection(region);
countTask = Task.Run(async delegate
{
try
{
_logger.LogDebug("Run Query {Query} with {Args}", countDataQuery2, param2);
return await countConnection.ExecuteScalarAsync<int>(countDataQuery2, param2);
}
catch (Exception exception)
{
_logger.LogWarning(exception, "Error query {Region}", region);
return 0;
}
});
connections.Add(countConnection);
}
else
{
countTask = Task.FromResult(0);
}
connections.Add(dataConnection);
array[i] = new QTaskInfo<TDbItem>(region, countTask, dataTask);
}
return array;
}
}