0

I want to new a thread to do a io task with consumer/producer pattern in a asyncio task:

import asyncio
import threading
import time
import queue


q = queue.Queue()


def consumer():
    while True:
        v = q.get()
        time.sleep(v)
        print(f"log {v}")


async def work(v):
    await asyncio.sleep(0.1)
    print(f"work {v}")


async def main():
    t = threading.Thread(target=consumer, daemon=True)
    t.start()
    for i in range(1, 5):
        print(f"start {i}")
        q.put(i)
        loop.create_task(work(i))
        print(f"done {i}")
    t.join()


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.run_forever()

But the asyncio task cannot be execute because of consumer thread.

12
  • 1
    You don't use threads for IO in asyncio. Why the example from the asyncio documentation doesn't work for you? Commented Mar 8, 2021 at 8:02
  • @RomanKonoval actually threads in Python are still worth to use with asyncio if you need read/write files in disk or if you have some blocking functions. Commented Mar 8, 2021 at 8:10
  • @RomanKonoval just like ArtiomKozyrev say, I want to gzip a big file, but I don't want to block the main event loop. Commented Mar 8, 2021 at 8:14
  • You are not doing IO in this case. You are invoking a blocking function that does many things internally (including some IO). The question is misleading. The correct answer to question how to use thread for doing IO in asyncio? is don't do this. The question how to use threads for blocking operations in asyncio is legit. Commented Mar 8, 2021 at 8:17
  • 1
    gzip it's CPU-bound task, so threads in Python will not help you because of GIL. You need asyncio.loop.run_in_executor, combined with ProcessPoolExecutor Commented Mar 8, 2021 at 8:19

1 Answer 1

2

Instead of explicitly managing the worker thread and a queue, you can use the thread pool built into asyncio (or create your own if you wish) and call run_in_executor to submit tasks to it. For example:

import asyncio, time

def blocking_work(v):
    # this function is not async, it will be run in a separate thread
    time.sleep(v)
    print(f"log {v}")

async def work(v):
    # native asyncio work
    await asyncio.sleep(0.1)
    # followed by blocking IO wrapped in thread
    loop = asyncio.get_event_loop()
    await loop.run_in_executor(None, blocking_work, v)
    print(f"work {v}")

async def main():
    await asyncio.gather(*[work(i) for i in range(1, 5)])

asyncio.run(main())
Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.