2

I have a method that runs multiple async methods within it. I have to iterate over a list of devices, and pass the device to this method. I am noticing that this is taking a long time to complete so I am thinking of using Parallel.ForEach so it can run this process against multiple devices at the same time.

Let's say this is my method.

public async Task ProcessDevice(Device device) {
    var dev = await _deviceService.LookupDeviceIndbAsNoTracking(device);

    var result = await DoSomething(dev);
    await DoSomething2(dev);
}

Then DoSomething2 also calls an async method.

public async Task DoSomething2(Device dev) {
    foreach(var obj in dev.Objects) {
        await DoSomething3(obj);
    }
}

The list of devices continuously gets larger over time, so the more this list grows, the longer it takes the program to finish running ProcessDevice() against each device. I would like to process more than one device at a time. So I have been looking into using Parallel.ForEach.

Parallel.ForEach(devices, async device => {
    try {
        await ProcessDevice(device);
    } catch (Exception ex) {
        throw ex;
    }
})

It appears that the program is finishing before the device is fully processed. I have also tried creating a list of tasks, and then foreach device, add a new task running ProcessDevice to that list and then awaiting Task.WhenAll(listOfTasks);

var listOfTasks = new List<Task>();
foreach(var device in devices) {
    var task = Task.Run(async () => await ProcessDevice(device));
    listOfTasks.Add(task);
}
await Task.WhenAll(listOfTasks);

But it appears that the task is marked as completed before ProcessDevice() is actually finished running.

Please excuse my ignorance on this issue as I am new to parallel processing and not sure what is going on. What is happening to cause this behavior and is there any documentation that you could provide that could help me better understand what to do?

2

4 Answers 4

7

You can't mix async with Parallel.ForEach. Since your underlying operation is asynchronous, you'd want to use asynchronous concurrency, not parallelism. Asynchronous concurrency is most easily expressed with WhenAll:

var listOfTasks = devices.Select(ProcessDevice).ToList();
await Task.WhenAll(listOfTasks);
Sign up to request clarification or add additional context in comments.

2 Comments

When you say "You can't mix async with Parallel.ForEach", we agree it's just in his case, not in general?
@Thibaut In general. I just wrote up an answer to explain in more detail why.
1

In your last example there's a few problems:

var listOfTasks = new List<Task>();
foreach (var device in devices)
{
    await  Task.Run(async () => await ProcessDevice(device));
}
await Task.WhenAll(listOfTasks);

Doing await Task.Run(async () => await ProcessDevice(device)); means you are not moving to the next iteration of the foreach loop until the previous one is done. Essentially, you're still doing them one at a time.

Additionally, you aren't adding any tasks to listOfTasks so it remains empty and therefore Task.WhenAll(listOfTasks) completes instantly because there's no tasks to await.

Try this:

var listOfTasks = new List<Task>();
foreach (var device in devices)
{
    var task = Task.Run(async () => await ProcessDevice(device))
    listOfTasks.Add(task);
}
await Task.WhenAll(listOfTasks);

5 Comments

I don't know if it's a good idea to add all devices in a list of task to parallelize all at the same time. If there is too much, this will be slower than all others solutions.
It will depend greatly on what ProcessDevice() is really doing and how many devices there are but yes, this should be tested to see how it performs. An different but similar approach is to split device into batches and do, say, 5 batches in parallel do each batch synchronously.
@WSC - Sorry I updated my question, i did have listOfTasks.Add(task); but forgot it in the question.
@JaronJohnson Your updated code doesn't make sense. You're not setting task to anything. Look at the code in this answer more carefully.
@GabrielLuci I should have just copied and pasted my code instead if writing it out again in the question.. Updated it again...
1

I can explain the problem with Parallel.ForEach. An important thing to understand is that when the await keyword acts on an incomplete Task, it returns. It will return its own incomplete Task if the method signature allows (if it's not void). Then it is up to the caller to use that Task object to wait for the job to finish.

But the second parameter in Parallel.ForEach is an Action<T>, which is a void method, which means no Task can be returned, which means the caller (Parallel.ForEach in this case) has no way to wait until the job has finished.

So in your case, as soon as it hits await ProcessDevice(device), it returns and nothing waits for it to finish so it starts the next iteration. By the time Parallel.ForEach is finished, all it has done is started all the tasks, but not waited for them.

So don't use Parallel.ForEach with asynchronous code.

Stephen's answer is more appropriate. You can also use WSC's answer, but that can be dangerous with larger lists. Creating hundreds or thousands of new threads all at once will not help your performance.

7 Comments

Thank you for the explanation, that makes sense.
Would using a SemaphoreSlim to limit the the level of concurrency, in combination with either of these answers help maintain a balance? Again, sorry for the ignorance, I am new to this subject. I really need to educate myself on the topic but not sure where to start.
You could, but it might be over-complicating things for your situation. Is ProcessDevice() doing something processor-intensive? Or is it making I/O requests (reading files, making network requests, etc)?
it is more I/O then processor-intensive. There are a few calculations that happens but it is pretty lite. it makes a few web requests and updates database records.
Then Stephen's answer is the best way to do it. Once the I/O request is sent, await will return and the next one in the list will be started. So they all get started at once, then at Task.WhenAll, it will wait for them all to finish. It may even all happen on the same thread.
|
-1

not very sure it this if what you are asking for, but I can give example of how we start async process

 private readonly Func<Worker> _worker;

    private void StartWorkers(IEnumerable<Props> props){
    Parallel.ForEach(props, timestamp => { _worker.Invoke().Consume(timestamp); });
    }

Would recommend reading about Parallel.ForEach as it will do some part for you.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.