2

I'm processing a list of items (200k - 300k), each item processing time is between 2 to 8 seconds. To gain time, I can process this list in parallel. As I'm in an async context, I use something like this :

public async Task<List<Keyword>> DoWord(List<string> keyword)
{
    ConcurrentBag<Keyword> keywordResults = new ConcurrentBag<Keyword>();
    if (keyword.Count > 0)
    {
        try
        {
            var tasks = keyword.Select(async kw =>
            {
                return await Work(kw).ConfigureAwait(false);
            });

            keywordResults = new ConcurrentBag<Keyword>(await Task.WhenAll(tasks).ConfigureAwait(false));
        }
        catch (AggregateException ae)
        {
            foreach (Exception innerEx in ae.InnerExceptions)
            {
                log.ErrorFormat("Core threads exception: {0}", innerEx);
            }
        }
    }
    return keywordResults.ToList();
}

The keyword list contains always 8 elements (comming from above) thus I process my list 8 by 8 but, in this case, I guess that if 7 keywords are processed in 3 secs and the 8th is processed in 10 secs, the total time for the 8 keywords will be 10 (correct me if i'm wrong). How Can I approach from the Parallel.Foreach then? I mean : launch 8 keywords if 1 of them is done, launch 1 more. In this case I'll have 8 working processes permanently. Any idea ?

7
  • Have you considered using TPL DataFlow to set up a pipeline to process the items? Commented Jul 12, 2016 at 13:08
  • it sounds like this is what you're looking for msdn.microsoft.com/en-us/library/… Commented Jul 12, 2016 at 13:09
  • @MatthewWatson, I just learned it existence, I will check this, thanks! Commented Jul 12, 2016 at 13:25
  • @TheBeardedLlama, impossible to use Parallel.Foreach in an async context. Commented Jul 12, 2016 at 13:27
  • It is not clear from your example if the work is CPU-bound or you have something else like database call in the "work" Commented Jul 12, 2016 at 13:41

2 Answers 2

6

Another more easier way to do this is to use the AsyncEnumerator NuGet Package:

using System.Collections.Async;

public async Task<List<Keyword>> DoWord(List<string> keywords)
{
    var keywordResults = new ConcurrentBag<Keyword>();
    await keywords.ParallelForEachAsync(async keyword =>
    {
        try
        {
            var result = await Work(keyword);
            keywordResults.Add(result);
        }
        catch (AggregateException ae)
        {
            foreach (Exception innerEx in ae.InnerExceptions)
            {
                log.ErrorFormat("Core threads exception: {0}", innerEx);
            }
        }
    }, maxDegreeOfParallelism: 8);
    return keywordResults.ToList();
}
Sign up to request clarification or add additional context in comments.

1 Comment

You should mention your affiliation with this NuGet package. Otherwise the answer is considered spam - which would be a shame because it is a very good and helpful answer.
2

Here's some sample code showing how you could approach this using TPL Dataflow.

Note that in order to compile this, you will need to add TPL Dataflow to your project via NuGet.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace Demo
{
    class Keyword // Dummy test class.
    {
        public string Name;
    }

    class Program
    {
        static void Main()
        {
            // Dummy test data.
            var keywords = Enumerable.Range(1, 100).Select(n => n.ToString()).ToList();

            var result = DoWork(keywords).Result;

            Console.WriteLine("---------------------------------");

            foreach (var item in result)
                Console.WriteLine(item.Name);
        }

        public static async Task<List<Keyword>> DoWork(List<string> keywords)
        {
            var input = new TransformBlock<string, Keyword>
            (
                async s => await Work(s),
                // This is where you specify the max number of threads to use.
                new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 }
            );

            var result = new List<Keyword>();

            var output = new ActionBlock<Keyword>
            (
                item => result.Add(item),   // Output only 1 item at a time, because 'result.Add()' is not threadsafe.
                new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 }
            );

            input.LinkTo(output, new DataflowLinkOptions { PropagateCompletion = true });

            foreach (string s in keywords)
                await input.SendAsync(s);

            input.Complete();
            await output.Completion;

            return result;
        }

        public static async Task<Keyword> Work(string s) // Stubbed test method.
        {
            Console.WriteLine("Processing " + s);

            int delay;
            lock (rng) { delay = rng.Next(10, 1000); }
            await Task.Delay(delay); // Simulate load.

            Console.WriteLine("Completed " + s);
            return await Task.Run( () => new Keyword { Name = s });
        }

        static Random rng = new Random();
    }
}

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.