0

I'm trying to implement self-hosted web service using asp.net core 2.1 and got stuck with the problem of implementing background long-time execution tasks.

Due to the high CPU load and time consumption of each ProcessSingle method (in the code snippet below), I would like to limit the number of executing simultaneous tasks. But as I can see all tasks in Parallel.ForEachstart almost immediately, despite the fact that I set MaxDegreeOfParallelism = 3

My code is (it's a simplified version):

public static async Task<int> Work()
{
    var id = await CreateIdInDB() // async create record in DB

    // run background task, don't wait when it finishes
    Task.Factory.StartNew(async () => {
        Parallel.ForEach(
            listOfData,
            new ParallelOptions { CancellationToken = token, MaxDegreeOfParallelism = 3 },
            async x => await ProcessSingle(x));
    });

    // return created id immediately
    return id;
}

public static async Task ProcessSingle(MyInputData inputData)
{
    var dbData = await GetDataFromDb(); // get data from DB async using Dapper
    // some lasting processing (sync)
    await SaveDataToDb(); // async save processed data to DB using Dapper
}

If I understand correctly, the problem is in async x => await ProcessSingle(x) inside Parallel.ForEach, isn't it?

Could someone describe please, how it should be implemented in the right way?

Update

Due to some kind of ambiguity in my question, it's necessary to focus on the main aspects:

  1. There are three parts in ProcessSingle method:

    • getting data from DB async

    • make long-time high CPU-loaded math calculations

    • save results to DB async

  2. The problem consists of two separate:

    • How to decrease CPU usage (by running not more than three math simultaneous calculations for example)?

    • How to keep the structure of the ProcessSingle method - keep them async because of async DB calls.

Hope it would be more clear now.

P.S. The suitable answer has been already given, it works (especially thanks to @MatrixTai). This update has been written for general clarification.

11
  • 2
    You don't need async/await inside your foreach loop. That causes further threads to start. I would just make ProcessSingle an ordinary, synchronous function. Commented Sep 14, 2018 at 6:33
  • 2
  • @PMF, @MickyD but the problem that I can't get rid of async calls to DB inside of ProcessSingle. So the only way is to stop using Parallel.ForEach and start using something like Task.WhenAll(/* call all ProcessSingle */) ? But in that case, there is no built-in way to limit concurrent threads (as I remember, I've seen some custom implementations)? Commented Sep 14, 2018 at 6:45
  • 2
    You would be better using TPL DataFlow. There not only can you use async/await but you can also throttle. ;) Commented Sep 14, 2018 at 6:47
  • Don't use Parallel.Foreah with async/await. It's meant for data parallelism - it's job is to partition the incoming data and assign each partition to a single task. What you ask though is already built into the ActionBlock<T> class. Commented Sep 14, 2018 at 6:54

2 Answers 2

2

Update

As I just notice you mentioned in comment, the problem is caused by math calculation.

It will be better to separate the part of calculation and updating DB.

For the calculation part, use Parallel.ForEach() so as to optimize your work and you can control the thread number.

And only after all these tasks finished. Use async-await to update your data to DB without SemaphoreSlim I mentioned.

public static async Task<int> Work()
{
    var id = await CreateIdInDB() // async create record in DB

    // run background task, don't wait when it finishes
    Task.Run(async () => {

        //Calculation Part
        ConcurrentBag<int> data = new ConcurrentBag<int>();
        Parallel.ForEach(
            listOfData,
            new ParallelOptions { CancellationToken = token, MaxDegreeOfParallelism = 3 },
            x => {ConcurrentBag.Add(calculationPart(x))});

        //Update DB part
        int[] data_arr = data.ToArray();
        List<Task> worker = new List<Task>();
        foreach (var i in data_arr)
        {
            worker.Add(DBPart(x));
        }
        await Task.WhenAll(worker);
    });

    // return created id immediately
    return id;
}

For sure they all start together as you use async-await in Parallel.forEach.

First, read about this question for both 1st and 2nd answer. Combining these two are meaningless.

Actually async-await will maximize the usage of available thread, so simply use it.

public static async Task<int> Work()
{
    var id = await CreateIdInDB() // async create record in DB

    // run background task, don't wait when it finishes
    Task.Run(async () => {
        List<Task> worker = new List<Task>();
        foreach (var i in listOfData)
        {
            worker.Add(ProcessSingle(x));
        }
        await Task.WhenAll(worker);
    });

    // return created id immediately
    return id;
}

But then here is another problem, in this case those tasks still start all together, eating your CPU-usage.

So to avoid this, use SemaphoreSlim

public static async Task<int> Work()
{
    var id = await CreateIdInDB() // async create record in DB

    // run background task, don't wait when it finishes
    Task.Run(async () => {
        List<Task> worker = new List<Task>();
        //To limit the number of Task started.
        var throttler = new SemaphoreSlim(initialCount: 20);
        foreach (var i in listOfData)
        {
            await throttler.WaitAsync();
            worker.Add(Task.Run(async () =>
            {
                await ProcessSingle(x);
                throttler.Release();
            }
            ));
        }
        await Task.WhenAll(worker);
    });

    // return created id immediately
    return id;
}

Read more How to limit the amount of concurrent async I/O operations?.

Also, do not use Task.Factory.StartNew() when simple Task.Run() is enough to do work you want, read this excellent article by Stephen Cleary.

Sign up to request clarification or add additional context in comments.

6 Comments

There's a built-in class for executing jobs in parallel with a DOP, it's ActionBlock<T>. In any case, executing multiple slow DB queries in parallel is more likely to reduce performance
@PanagiotisKanavos, I agree that if the problem related to DB queries, it is more likely to reduce preformance. To do this task for myself, I will be more likely do it synchronously.
@MatrixTai, no no, there are no problems with db communications at all :) You've answered exactly what I was looking for. Thanks)
@user1820686 , wait, I just notice you have written in comment that the problem is caused by maths calculation. Then, it is mostly CPU-Tasked. In such case, it may be better to use method PMF metioned, make it synchronous and dun by Parallel.forEach, and lastly collecting all data for async-await writing to DB. I will update my answer.
@user1820686 what is your actual question? Parallel.ForEach with a DOP of 3 will run ONLY 3 tasks at the same time. It will split the data 3 ways and pass each partition to a single task. Your question's bug is that by making ProcessSingle async, you start another task for each individual data item
|
1

If you're more familiar with the "traditional" parallel processing concept, rewrite your ProcessSingle() method like this:

public static void ProcessSingle(MyInputData inputData)
{
    var dbData = GetDataFromDb(); // get data from DB async using Dapper
    // some lasting processing (sync)
    SaveDataToDb(); // async save processed data to DB using Dapper
}

Of course, you would then preferably also change the Work() method in a similar fashion.

7 Comments

Looks suspiciously like you will be mixing Task with Parallel.ForEach iteration
@MickyD: Yes, I will. That is not a problem, though. Mixing Parallel.ForEach and async / await is. Parallel.ForEach itself uses tasks.
It is a problem. Refer to the link above.
@MickyD: Sorry, which link? stackoverflow.com/questions/11564506/… doesn't mention anything about mixing Tasks and Parallel.ForEach. It's all about mixing Tasks and async/await. I would not be using any async/await at all.
Incorrect. You've already indicated you will be using Task. The actual await is irrelevant. Read this. Additionally, even if you purely used Parallel.ForEach, to use a thread-pool thread with a blocking I/O task such as GetDataFromDb() and SaveDataToDb() is a waste of a perfectly good thread. You need to use IOCP which is nowhere in your example. async/await is perfect for I/O but you can't use that with Parallel.ForEach.
|

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.