1

I have a generator function that iterates over a big number of parameters and yields result of another function with this parameters. Inner function may have quite a long time of execution, so I would like to use multiprocessing to speed up process. Maybe it's important, I also would like to have an ability to stop this generator in middle of execution. But I'm not sure what is the right way to implement such logic. I need something like queue, giving the ability to add new tasks after old ones have been finished and to yield results as soon as they ready. I've looked over multiprocessing.Queue, but at first glance it seems not suitable for my case. May be somebody can advise what should I use in such scenario?

Here is approximate code of my task:

def gen(**kwargs):
    for param in get_params():
        yield inner_func(param)
6
  • What is the reason the inner function takes a long time? Is it doing heavy calculations or fetching data over the network? Commented Jan 29, 2022 at 23:48
  • Is it okay if the generator does not yield values in the same order as the parameters come? Commented Jan 29, 2022 at 23:51
  • Inner function solves some tasks of mixed-integer programming, i.e. it's CPU bound. And the order of generator's results doesn't matter Commented Jan 30, 2022 at 0:50
  • "ability to add new tasks after old ones have been finished" - Do you mean that you might add more tasks than one for each param? Commented Jan 30, 2022 at 11:41
  • Is there any reason why you don’t just use ˋmapˋ of Python‘s inbuilt concurrency modules? The generator does not seem to add anything. Commented Jan 30, 2022 at 12:28

1 Answer 1

2

Use a multiprocessing.pool.Pool class for multiprocessing since its terminate method will cancel both all running tasks as well as those scheduled to run (the concurrent.futures module terminate method will not cancel already running tasks). And as @MisterMiyakgi indicated, it should not be necessary to use a generator. However, you should use the imap_unordered method, which returns an iterable that can be iterated and allows you to get results as they are generated by your inner_function, whereas if you were to use map you would not be able to get the first generated value until all values had been generated.

from multiprocessing import Pool

def get_params():
    """ Generator function. """
    # For example:
    for next_param in range(10):
        yield next_param

def inner_function(param):
    """ Long running function. """
    # For example:
    return param ** 2

def gen():
    pool = Pool()
    # Use imap_unordered if we do not care about the order of results else imap:
    iterable = pool.imap_unordered(inner_function, get_params())
    # The iterable can be iterated as if it were a generator
    # Add terminate method to iterable:
    def terminate():
        pool.terminate()
        pool.close()
        pool.join()
    iterable.terminate = terminate
    return iterable


# Usage:
# Required for Windows
if __name__ == '__main__':
    iterable = gen()
    # iterable.terminate() should be called when done iterating the iterable
    # but it can be called any time to kill all running tasks and scheduled tasks.
    # After calling terminate() do not further iterate the iterable.
    for result in iterable:
        print(result)
        if result == 36:
            iterable.terminate() # kill all remaining tasks, if any
            break

Prints:

0
1
4
9
16
25
36
Sign up to request clarification or add additional context in comments.

2 Comments

Your answer helped me a lot. Thanks. But I still wondering about few things. Does imap_unordered have a limit in iterating through params, or It takes them until RAM is over? And is the chunksize parameter somehow related to this limit?
I don't know of any limit (you could exhaust memory). As far as chunksize is concerned, if you do not specify this parameter it defaults to None, which means a chunksize value of 1 will be used. For very large iterables this may not yield optimal performance. On the other hand, specifying a chunksize value N greater than 1 also means that results are also delivered back in chunks of the same size so you do not get results back immediately as they are generated but only after N results have been produced by a single process.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.