4

I have a block of code which takes a long time to execute and is CPU intense. I want to run that block several times and want to use the full power of my CPU for that. Looking at asyncio I understood that it is mainly for asynchronous communication, but is also a general tool for asynchronous tasks.

In the following example the time.sleep(y) is a placeholder for the code I want to run. In this example every co-routine is executed one after the other and the execution takes about 8 seconds.

import asyncio
import logging
import time


async def _do_compute_intense_stuff(x, y, logger):
    logger.info('Getting it started...')
    for i in range(x):
        time.sleep(y)
    logger.info('Almost done')
    return x * y

logging.basicConfig(format='[%(name)s, %(levelname)s]: %(message)s', level='INFO')
logger = logging.getLogger(__name__)
loop = asyncio.get_event_loop()
co_routines = [
    asyncio.ensure_future(_do_compute_intense_stuff(2, 1, logger.getChild(str(i)))) for i in range(4)]
logger.info('Made the co-routines')
responses = loop.run_until_complete(asyncio.gather(*co_routines))
logger.info('Loop is done')
print(responses)

When I replace time.sleep(y) with asyncio.sleep(y) it returns nearly immediately. With await asyncio.sleep(y) it takes about 2 seconds.

Is there a way to parallelize my code using this approach or should I use multiprocessing or threading? Would I need to put the time.sleep(y) into a Thread?

4
  • 4
    You don't use asyncio for that. Asyncio is great when you have a problem that waits for I/O to happen. Intense computation is not such a problem. Use multiprocessing instead. Only use threading if you are using some C-extension-backed library that'll release the GIL when doing heavy computations. Commented Jul 10, 2018 at 17:56
  • 2
    Asyncio also requires all your code to cooperate. Each await is a place that your task tells the event loop that it is willing for other tasks to run if they are not waiting. time.sleep() is the very opposite of cooperating. It blocks everything, so the event loop can't switch tasks. Commented Jul 10, 2018 at 17:57
  • 2
    asyncio.sleep() produces a coroutine. If you don't await on it, it'll not do anything, so yes, you'd see an instant return. Commented Jul 10, 2018 at 17:58
  • Thanks @MartijnPieters that clears up some confusion! Commented Jul 10, 2018 at 18:28

2 Answers 2

7

Executors use multithreading to accomplish this (or mulitprocessing, if you prefer). Asyncio is used to optimize code where you wait frequently for input, output operations to run. Sometimes that can be writing to files or loading websites.

However, with cpu heavy operations (that don't just rely on waiting for IO), it's recommended to use something akin to threads, and, in my opinion, concurrent.futures provides a very nice wrapper for that and it is similar to Asyncio's wrapper.

The reason why Asyncio.sleep would make your code run faster because it starts the function and then starts checking coroutines to see if they are ready. This doesn't scale well with CPU-heavy operations, as there is no IO to wait for.

To change the following example from multiprocessing to multi-threading Simply change ProcessPoolExecutor to ThreadPoolExecutor.

Here is a multiprocessing example:

import concurrent.futures
import time

def a(z):
    time.sleep(1)
    return z*12

if __name__ == '__main__':
    with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
        futures = {executor.submit(a, i) for i in range(5)}
        for future in concurrent.futures.as_completed(futures):
            data = future.result()
            print(data)

This is a simplified version of the example provided in the documentation for executors.

Sign up to request clarification or add additional context in comments.

5 Comments

Thanks so far. It seems to work somehow. The functions are executed concurrently but not in parallel (I guess). When executing my function once it takes up a whole core and 20 seconds. When executing 2 functions it takes 1 minute and none of the cores goes to 100%. Any ideas?
The processor won't go to 100% because you're on a single core. Use multiprocessing if you wish to full-load the cpu. Please keep in mind that you might want to -pre stage your data prior to starting the process, as communication between queues can sometimes be tricky when the second process is running.
@Benjamin Updated the answer to use multiprocessing.
Coming from Java where threads run on a core each this [stackoverflow.com/a/3046201/5119485](question) has helped me understand the difference of processes and threads in Python. My actual code however didn't want to run in a process because I used classes there which could not be pickled. Since I don't have inter process communication I ditched the nice libs and just have a wrapper starting my script with Popen and reading back from stdout within a ThreadPoolExecutor.
Since your answer addresses the asked question I will accept it.
0

simple example

This example was taken from https://www.blog.pythonlibrary.org/2016/07/26/python-3-an-intro-to-asyncio/

It helped me a lot. There is also a "bad example" - this helped me even more ^^

import aiohttp
import asyncio
import async_timeout
import os

async def download_coroutine(session, url):
    with async_timeout.timeout(10):
        async with session.get(url) as response:
            filename = os.path.basename(url)
            with open(filename, 'wb') as f_handle:
                while True:
                    chunk = await response.content.read(1024)
                    if not chunk:
                        break
                    f_handle.write(chunk)
            return await response.release()

async def main(loop):
    urls = ["http://www.irs.gov/pub/irs-pdf/f1040.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"]
    async with aiohttp.ClientSession(loop=loop) as session:
        tasks = [download_coroutine(session, url) for url in urls]
        await asyncio.gather(*tasks)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.