4

Is there any way to go through a huge database and apply some jobs in parallel for bench of entries? I tried with ExecutorService, but we have to shutdown() in order to know the pool size...

So my best solution is:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TestCode
{
private static List<String> getIds(int dbOffset, int nbOfArticlesPerRequest) 
{
    return Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "21", "22", "23", "24", "25", "26", "27", "28", "29");
}

public static void main(String args[]) throws Exception
{
    int dbOffset = 0;
    int nbOfArticlesPerRequest = 100;
    int MYTHREADS = 10;
    int loopIndex = 0;
    boolean bContinue=true;
    Runnable worker;



    while(bContinue) // in this loop we'll constantly fill the pool list
    {
        loopIndex++;
        ExecutorService executor = Executors.newFixedThreadPool(MYTHREADS); // NOT IDEAL, BUT EXECUTORSERVICE CANNOT BE REUSED ONCE SHUTDOWN...

        List<String> ids = getIds(dbOffset, nbOfArticlesPerRequest ); // getIds(offset, rows_number)
        for(String id: ids) {
            worker = new MyRunnable(id);
            executor.execute(worker);
        }

        executor.shutdown();
        while (!executor.isTerminated()) {
            System.out.println("Pool size is now " + ((ThreadPoolExecutor) executor).getActiveCount()+
                    " - queue size: "+ ((ThreadPoolExecutor) executor).getQueue().size()
            );
            TimeUnit.MILLISECONDS.sleep(500);
        }

        if(loopIndex>=3) {
            System.out.println("\nEnd the loop #"+loopIndex+" ===> STOOOP!\n");
            bContinue = false;
        }
        dbOffset+=nbOfArticlesPerRequest;
    }
}



public static class MyRunnable implements Runnable {

    private final String id;

    MyRunnable(String id) {
        this.id = id;
    }

        @Override
        public void run()
        {
            System.out.println("Thread '"+id+"' started");
            try {
                TimeUnit.MILLISECONDS.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Thread '"+id+"' stopped");
        }
    }
}

This is working fine, but the drawback is that at every end of loop I need to wait the last threads to finish.

e.g.: when only 3 threads are running...

I did the following in order to solve this problem, but is that "safe"/correct?

BTW: is there any way to know how many tasks/threads are in the queue?

    int dbOffset = 0;
    int nbOfArticlesPerRequest = 5; //100;
    int MYTHREADS = 2;
    int loopIndex = 0;

    ExecutorService executor = Executors.newFixedThreadPool(MYTHREADS); // **HERE IT WOULD BE A GLOBAL VARIABLE**
       while(bContinue) // in this loop we'll constantly fill the pool list
        {
            loopIndex++;

            List<String> ids = getIds(dbOffset, nbOfArticlesPerRequest ); // getIds(offset, rows_number)
             for(String id: ids) {
                    worker = new MyRunnable(id);
                    executor.execute(worker);
             }

            while (!executor.isTerminated() && ((ThreadPoolExecutor) executor).getActiveCount() >= MYTHREADS) {
                System.out.println("Pool size is now " + ((ThreadPoolExecutor) executor).getActiveCount()+
                        " - queue size: "+ ((ThreadPoolExecutor) executor).getQueue().size()
                );
                TimeUnit.MILLISECONDS.sleep(500);
            }

            if(loopIndex>=3) {
                System.out.println("\nEnd the loop #"+loopIndex+" ===> STOOOP!\n");
                bContinue = false;
            }
            dbOffset+=nbOfArticlesPerRequest;
        }

    executor.shutdown();
    // Wait until all threads are finish
    while (!executor.isTerminated()) {
        System.out.println("Pool size is now " + ((ThreadPoolExecutor) executor).getActiveCount()+
                " - queue size: "+ ((ThreadPoolExecutor) executor).getQueue().size()
        );
        TimeUnit.MILLISECONDS.sleep(500);
    }

EDIT:

I try to launch 1 or 10 millions tasks, so (I assume) I cannot put them all in the queue... That's why I use a global executor in order to be able to always have some threads in queue (for that I cannot shutdown the executor, otherwise it's not usable anymore).

Latest code version:

    int dbOffset = 0;
    int nbOfArticlesPerRequest = 5; //100;
    int MYTHREADS = 2;
    int loopIndex = 0;

    ThreadPoolExecutor executorPool = new ThreadPoolExecutor(MYCORES, MYCORES, 0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); // **HERE IT WOULD BE A GLOBAL VARIABLE**
       while(bContinue) // in this loop we'll constantly fill the pool list
        {
            loopIndex++;

            List<String> ids = getIds(dbOffset, nbOfArticlesPerRequest ); // getIds(offset, rows_number)
             for(String id: ids) {
                    worker = new MyRunnable(id);
                    executorPool.execute(worker);
             }

            while (executorPool.getActiveCount() >= MYTHREADS  || executorPool.getQueue().size()> Math.max(1, MYTHREADS -2)) 
            {
                System.out.println("Pool size is now " + executorPool.getActiveCount()+
                                        " - queue size: "+ executorPool.getQueue().size()
                );

                if(executorPool.getQueue().size() <= Math.max(1, MYCORES-2)) {
                    System.out.println("Less than "+Math.max(1, MYCORES-2)+" threads in queue ---> fill the queue");
                    break;
                }

                TimeUnit.MILLISECONDS.sleep(2000);
            }

            if(loopIndex>=3) {
                System.out.println("\nEnd the loop #"+loopIndex+" ===> STOOOP!\n");
                bContinue = false;
            }
            dbOffset+=nbOfArticlesPerRequest;
        }

    executorPool.shutdown();
    // Wait until all threads are finish
    while (!executorPool.isTerminated()) {
        System.out.println("Pool size is now " + executorPool.getActiveCount()+
                " - queue size: "+ executorPool.getQueue().size()
        );
        TimeUnit.MILLISECONDS.sleep(500);
    }

Thanks in advance

1

3 Answers 3

10

Update

Now it's clear to me that your main concern it that you can't submit 10 million tasks at once.

Do not be afraid, you can submit all of them into executor. The actual amount of tasks run in parallel is limited by the underlying thread pool size. That is, if you have pool size of 2, only two tasks are being executed at the time, the rest sit in the queue and wait for the free thread.

By default Executors.newFixedThreadPool() creates the Executor that has a queue of Integer.MAX_VALUE size, hence your millions of tasks would fit there.


You can use ExecutorService.submit() method that returns Future. Then you can examine state of your Future tasks (i.e. with isDone(), isCancelled() methods).

Executor is typically something you don't want to shutdown explicitly and exists throughout your application lifecycle. With this approach you don't need to shutdown in order to know how many tasks are pending.

List<Future<?>> tasks = new ArrayList<>();
for (String id : ids) {
    Future<?> task = executorService.submit(() -> {
        // do work
    });
    tasks.add(task);
}

long pending = tasks.stream().filter(future -> !future.isDone()).count();
System.out.println(pending + " task are still pending");

Moreover, please note that tasks and threads are not interchangeable terms. In your case, the executor has fixed number of threads. You can submit more tasks than that, but the rest will sit in executor's queue until there's a free thread to run the task on.

Sign up to request clarification or add additional context in comments.

6 Comments

Might be a good idea... Then I just need to add a "waiting loop" in order to assign more tasks when less than X tasks are running...
the same question as for @Pavan, what would be the benefit from your solution compared to mine (see latest code after EDIT)?
Well, now I got your point and updated the answer. I think you're putting significant effort to simulate something Executors already provide - the queue.
Thank you David, but by submitting 10 millions of tasks, would the Executor object not be huge (i.e.: and take a big amount of RAM)?
Of course I did, and your update showed me that the queue is able to manage such big amount of tasks! thanks!
|
0

ExecuterService allows you to invoke the list of tasks which can be run parallely and return the result when its available.

In your code you are using

worker = new MyRunnable(id);
executor.execute(worker);

Instead Runnable, its better to use Callable in this use case then you can submit the list of Callables for execution in single api instead of for loop.

List<Callable> workers = new ArrayList<>();
workers.add(new MyCallable(id)) // this is just for example
workers.add(new MyCallable(id))
workers.add(new MyCallable(id))

List<Future<Boolean>> futures = executor.invokeAll(workers); // this will execute all worker tasks parallely and return you future object list using which you can see whether worker thread is completed or not and also the what is the return value.

Note that get method on Future object is blocking call

7 Comments

invokeAll is also blocking, so the initial problem (need to wait on the last thread at every loop) is not solved. :)
@Bast - As per my understanding, invokeAll is not blocking call. docs.oracle.com/javase/7/docs/api/java/util/concurrent/…
You're right, only the future.get() is blocking... I could probably use your solution as the one proposed by @DavidSiro ...
Ya please try and post your feedback as it might help others as well. Thanks
what would be the benefit from your solution compared to mine (see latest code after EDIT)?
|
0

You don't need to know thread pool size to check completion of tasks in ExecutorService. You can remove your code after submitting the task.

Option 1:

  1. Replace ThreadPoolExecutor with newWorkStealingPool from Executors.

    Creates a work-stealing thread pool using all available processors as its target parallelism level.

    It will allow better utilization of threads in ExecutorService.

    ExecutorService executor = Executors.newWorkStealingPool();
    
  2. Use invokeAll

Option 2: ( useful if you know the number of tasks in advance)

Use CountDownLatch and initialize the counter to number of tasks to be submitted.

Further references:

wait until all threads finish their work in java

How to properly shutdown java ExecutorService

5 Comments

yes, but because it's a while loop I wanted to dynamically add new threads in order to always have some in "queue"... actually using getActiveCount() is more correct (code updated) -- I now actually even switched to ThreadPoolExecutor in my local code
Please note that in my second code portion (i.e.: "solution"), ExecutorService is global, so it cannot be shutdown otherwise it's not usable anymore
Outside the while loop, you can keep shutdown code by using shutdown, sbutdownNow, awaitTermination APIs in sequence as quoted in above post.
Yes, shutdown() is already outside the loop in my code... the problem is that I need to launch 1 or 10 millions tasks, so (I assume) I cannot put them all in the queue... And I thought that using a global executor might be the solution to always have some threads in queue...I'll edit my post and add the latest code that I am running
use invokeAll() and replace ThreadPoolExecutor with newWorkStealingPool

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.