9

I have an application where i have 1000+ small parts of 1 large file.

I have to upload maximum of 16 parts at a time.

I used Thread parallel library of .Net.

I used Parallel.For to divide in multiple parts and assigned 1 method which should be executed for each part and set DegreeOfParallelism to 16.

I need to execute 1 method with checksum values which are generated by different part uploads, so i have to set certain mechanism where i have to wait for all parts upload say 1000 to complete. In TPL library i am facing 1 issue is it is randomly executing any of the 16 threads from 1000.

I want some mechanism using which i can run first 16 threads initially, if the 1st or 2nd or any of the 16 thread completes its task next 17th part should be started.

How can i achieve this ?

enter image description here

9
  • 6
    i like the image Commented Oct 28, 2015 at 14:57
  • If @usr's answer doesn't work, have a look at my answer here which might apply. Otherwise if you can use TPL's DataflowBlock classes that might be better (I'm thinking you can't because you specify C# 4) Commented Oct 28, 2015 at 14:59
  • @Abdullah Its just missing some hand drawn red circles Commented Oct 28, 2015 at 15:23
  • Is it a client-side UI app or a server-side app? Commented Oct 29, 2015 at 10:04
  • 1
    please accept and use the approach by usr. i think you had partitioning problem. look at the problem described here. Commented Nov 23, 2015 at 12:21

4 Answers 4

7

One possible candidate for this can be TPL Dataflow. This is a demonstration which takes in a stream of integers and prints them out to the console. You set the MaxDegreeOfParallelism to whichever many threads you wish to spin in parallel:

void Main()
{
    var actionBlock = new ActionBlock<int>(
            i => Console.WriteLine(i), 
            new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 16});

    foreach (var i in Enumerable.Range(0, 200))
    {
        actionBlock.Post(i);
    }
}

This can also scale well if you want to have multiple producer/consumers.

Sign up to request clarification or add additional context in comments.

1 Comment

I think this is the best answer
4

Here is the manual way of doing this.

You need a queue. The queue is sequence of pending tasks. You have to dequeue and put them inside list of working task. When ever the task is done remove it from list of working task and take another from queue. Main thread controls this process. Here is the sample of how to do this.

For the test i used List of integer but it should work for other types because its using generics.

private static void Main()
{
    Random r = new Random();
    var items = Enumerable.Range(0, 100).Select(x => r.Next(100, 200)).ToList();

    ParallelQueue(items, DoWork);
}

private static void ParallelQueue<T>(List<T> items, Action<T> action)
{
    Queue pending = new Queue(items);
    List<Task> working = new List<Task>();

    while (pending.Count + working.Count != 0)
    {
        if (pending.Count != 0 && working.Count < 16)  // Maximum tasks
        {
            var item = pending.Dequeue(); // get item from queue
            working.Add(Task.Run(() => action((T)item))); // run task
        }
        else
        {
            Task.WaitAny(working.ToArray());
            working.RemoveAll(x => x.IsCompleted); // remove finished tasks
        }
    }
}

private static void DoWork(int i) // do your work here.
{
    // this is just an example
    Task.Delay(i).Wait(); 
    Console.WriteLine(i);
}

Please let me know if you encounter problem of how to implement DoWork for your self. because if you change method signature you may need to do some changes.

Update

You can also do this with async await without blocking the main thread.

private static void Main()
{
    Random r = new Random();
    var items = Enumerable.Range(0, 100).Select(x => r.Next(100, 200)).ToList();

    Task t = ParallelQueue(items, DoWork);

    // able to do other things.

    t.Wait();
}

private static async Task ParallelQueue<T>(List<T> items, Func<T, Task> func)
{
    Queue pending = new Queue(items);
    List<Task> working = new List<Task>();

    while (pending.Count + working.Count != 0)
    {
        if (working.Count < 16 && pending.Count != 0)
        {
            var item = pending.Dequeue();
            working.Add(Task.Run(async () => await func((T)item)));
        }
        else
        {
            await Task.WhenAny(working);
            working.RemoveAll(x => x.IsCompleted);
        }
    }
}

private static async Task DoWork(int i)
{
    await Task.Delay(i);
}

3 Comments

Why are you using new Task and not Task.Run? Also blocking with Wait in the server code is typically a bad idea.
I just gave the way. Task.Delay.Wait was just an example. you can do anything you want inside DoWork. I appreciate you helped me. i fixed the code. it is better now (?).... im not experienced in multi-threading.also im new in programming (only 1 year) so bear with me ;) @avo
I still think there's a non-blocking way of doing it (without Wait) but I'm retracting my negative vote.
3
var workitems = ... /*e.g. Enumerable.Range(0, 1000000)*/;
SingleItemPartitioner.Create(workitems)
 .AsParallel()
 .AsOrdered()
 .WithDegreeOfParallelism(16)
 .WithMergeOptions(ParallelMergeOptions.NotBuffered)
 .ForAll(i => { Thread.Slee(1000); Console.WriteLine(i); });

This should be all you need. I forgot how the methods are named exactly... Look at the documentation.

Test this by printing to the console after sleeping for 1sec (which this sample code does).

2 Comments

my answer was accepted. but i think it shouldn't be. I think OP had the which is described here stackoverflow.com/questions/33869830/… .workitems.Select(x => x) is required to fix this parallel for List of items. Your approach will split list into chunks and that wasnt what OP wanted. so changing list into ienumerable will fix his problem.
@M.kazemAkhgary that's actually a good point, I forgot about partitioning. This really is a bad TPL default, another one.
1

Another option would be to use a BlockingCollection<T> as a queue between your file reader thread and your 16 uploader threads. Each uploader thread would just loop around consuming the blocking collection until it is complete.

And, if you want to limit memory consumption in the queue you can set an upper limit on the blocking collection such that the file-reader thread will pause when the buffer has reached capacity. This is particularly useful in a server environment where you may need to limit memory used per user/API call.

// Create a buffer of 4 chunks between the file reader and the senders
BlockingCollection<Chunk> queue = new BlockingCollection<Chunk>(4);

// Create a cancellation token source so you can stop this gracefully
CancellationTokenSource cts = ...

File reader thread

...
queue.Add(chunk, cts.Token);
...
queue.CompleteAdding();

Sending threads

for(int i = 0; i < 16; i++)
{
   Task.Run(() => {
      foreach (var chunk in queue.GetConsumingEnumerable(cts.Token))
      {
          .. do the upload
      }
   });
}

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.