2

I have a requirement for a project to use several hundred threads and each thread should run an asynchronous function. At the end, I need to collect results from all the threads. I have following example but it works synchronously.

import asyncio
import concurrent.futures

async def count():
    print("One")
    # here I need to use an asynchronous function
    await asyncio.sleep(2)
    print("Two")
    return "some result"

async def main():
    futures = []
    with concurrent.futures.ThreadPoolExecutor() as executor:
        for _ in range(5):
            future = executor.submit(count)
            futures.append(future)

        results = [await f.result() for f in concurrent.futures.as_completed(futures)]
        print(results)


if __name__ == "__main__":
    asyncio.run(main())

Code above outputs:

One
Two
One
Two
One
Two
One
Two
One
Two
['some result', 'some result', 'some result', 'some result', 'some result']

Expected output:

One
One
One
One
One
Two
Two
Two
Two
Two
['some result', 'some result', 'some result', 'some result', 'some result']

2 Answers 2

1

Your code does not use eventloop correctly, note that asyncio.run runs the passed coroutine, taking care of managing the asyncio event loop, finalizing asynchronous generators, and closing the threadpool. see more details in https://docs.python.org/3/library/asyncio-task.html#asyncio.run

do it below will properly reuse the same eventloop.

import asyncio
import concurrent.futures

async def count():
    print("One")
    # here I need to use an asynchronous function
    await asyncio.sleep(2)
    print("Two")
    return "some result"

async def main():
    loop = asyncio.get_running_loop()
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        # this in fact return coroutine object
        coros = [await loop.run_in_executor(executor, count) for _ in range(5)]
        results = [await f for f in asyncio.as_completed(coros)]
        print(results)


if __name__ == "__main__":
    asyncio.run(main())

output

One
One
One
One
One
Two
Two
Two
Two
Two
['some result', 'some result', 'some result', 'some result', 'some result']
Sign up to request clarification or add additional context in comments.

Comments

0

Use could use event loop's run_in_executor() to schedule the tasks and asyncio.as_completed() to loop over the results:

async def main():
    futures = []
    loop = asyncio.get_event_loop()
    with concurrent.futures.ThreadPoolExecutor() as executor:
        for _ in range(5):
            futures.append(await loop.run_in_executor(executor, count))

        results = [await f for f in asyncio.as_completed(futures)]
        print(results)

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.