2

I'm working on an asynchronous server using asyncio, but I need to monitor a multiprocessing.Event (used to signal termination from another process) inside my async event loop. Here's the simplified version of my server loop:

self.__terminateExchange: Event = multiprocessing.Event()
server = await asyncio.start_server(
    self.handle_client,
    self.ipAddress,
    self.port,
    backlog=self.maxSocketConnections
)

async with server:
    while not self.__terminateExchange.is_set():
        await asyncio.sleep(0.01)

self.__terminateExchange is a multiprocessing.Event. The idea is that another process can call .set() on it, and I want the server to shut down gracefully when that happens.

Problem: This setup is unreliable — sometimes the server exits as expected when the event is set, and sometimes it doesn't. I suspect that checking .is_set() inside the asyncio loop is the issue, possibly due to it being a blocking or non-async-safe call.

  1. What is the correct way to wait for or monitor a multiprocessing.Event in an asyncio event loop?

  2. Is there a non-blocking or async-compatible way to integrate multiprocessing.Event with asyncio?

Would using a thread to bridge the multiprocessing.Event to an asyncio.Event be a better approach?

Any help would be deeply appreciated.

I tried stopping an async code using multiprocessing.Event. Here, when I set the event, it sometimes stops and sometimes doesn't. I also used process.is_alive() to check if the process is still alive and kill it, but it still runs sometimes.

I want to be able to stop the code once the event is set.

2 Answers 2

3

I think the issue you're experiencing stems from multiprocessing.Event.is_set() not being truly async-safe, the problem is that you're polling a blocking call in your asyncio loop, which can create timing issues.

For that you can asyncio.to_thread():

import asyncio
import multiprocessing

async def wait_for_termination(terminate_event):
    """I believe this approach properly isolates the blocking call"""
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(None, terminate_event.wait)

# In your server code:
async with server:
    termination_task = asyncio.create_task(
        wait_for_termination(self.__terminateExchange)
    )
    await termination_task

Or also you could also bridge to an asyncio.Event:

class EventBridge:
    def __init__(self, mp_event):
        self.mp_event = mp_event
        self.async_event = asyncio.Event()
        
    def start_monitoring(self):
        def monitor():
            self.mp_event.wait()  # Block in thread
            asyncio.run_coroutine_threadsafe(
                self._signal_async(), asyncio.get_running_loop()
            )
        threading.Thread(target=monitor, daemon=True).start()
        
    async def wait(self):
        await self.async_event.wait()

The key insight is: never call blocking multiprocessing primitives directly in asyncio. I think your original tight polling loop with await asyncio.sleep(0.01) creates race conditions because is_set() isn't guaranteed to be non-blocking.

Sign up to request clarification or add additional context in comments.

5 Comments

What do you mean by "blocking" and "non-blocking" here? Isn't is_set() just a simple synchronous getter? It doesn't wait for anything afaict.
I think he says that blocking current main thread of server (main event loop) ,multiprocessing.Event.wait() is implemented using OS-level synchronization primitives (like condition variables). It isn’t aware of asyncio, so it doesn’t yield back to the event loop and will block it if called directly. To avoid blocking, you can run wait() in a different thread (separate from the event loop). This way the event loop stays responsive, which is essential for an async server.
Note that the OP is not using multiprocessing.Event.wait(), but multiprocessing.Event.is_set() (in a loop interspersed with sleeps), which should indeed be totally non-blocking. The code in this answer is not incorrect, but doesn't explain why the OP's code doesn't work, nor is that at all obvious.
ah you guys are true. I just read again about above answer. is_set is simple function and no waiting anything. My bad
@VietDinh Yes, but there is no multiprocessing.Event.wait() call in the question
3

Looking at your code, the loop is essentially polling every 10ms, which introduces two main problems:

  • Delay / race risk → if the event is set in another process, the loop won’t notice until the next wake-up. If the event loop is busy (e.g. a long-running task that doesn’t yield), the delay can stretch even further and feel unreliable. (This shows why the event loop works well for I/O-bound tasks that yield, but struggles with blocking work.)

  • Wasteful → waking up 100 times per second just to check a flag wastes CPU cycles. Async code is designed to react to events, not spin on checks.

Normally, you wouldn’t need another process just to trigger a stop event — a keyboard interrupt or a signal handler is often enough. But if you want to stick with the design where another process sets the stop flag, then bridging the multiprocessing.Event to an asyncio.Event is the better option.

This keeps the event loop responsive and eliminates the wasteful polling, since it only wakes up when the event is actually set. However, note that it mainly solves the wasteful polling problem — the delay/race risk can still occur if the loop is blocked by CPU-heavy work.

Here is a simple code by AI-written that provide brige solution for blocking and non-blocking demo

import asyncio
import multiprocessing
import time


# Non-blocking version: uses bridge event
async def wait_for_termination(terminate_event: multiprocessing.Event):
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(None, terminate_event.wait)


# Blocking version: this will freeze the event loop
async def block_wait_for_termination(terminate_event: multiprocessing.Event):
    terminate_event.wait()


async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
    addr = writer.get_extra_info("peername")
    print(f"New connection from {addr}")
    try:
        while data := await reader.read(100):
            writer.write(data)
            await writer.drain()
    except asyncio.CancelledError:
        pass
    finally:
        print(f"Closing {addr}")
        writer.close()
        await writer.wait_closed()


async def background_task(name: str):
    """Some async task that keeps running until cancelled."""
    try:
        while True:
            print(f"{name} is working...")
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        print(f"{name} stopped.")


async def main(mpevent: multiprocessing.Event, use_blocking: bool = False):
    # Start server
    server = await asyncio.start_server(handle_client, "127.0.0.1", 8888)
    print("Server started on 127.0.0.1:8888")

    task1 = asyncio.create_task(background_task("Task1"))
    task2 = asyncio.create_task(background_task("Task2"))

    if use_blocking:
        task3 = asyncio.create_task(block_wait_for_termination(mpevent))
    else:
        task3 = asyncio.create_task(wait_for_termination(mpevent))

    async with server:  # ensures server closes automatically
        print("Main loop running, waiting for terminate_event...")
        await task3   # wait until event is set

    print("Terminate signal received!")

    # Cancel background tasks
    task1.cancel()
    task2.cancel()
    await asyncio.gather(task1, task2, return_exceptions=True)

    print("All tasks stopped, exiting main loop.")


def worker(event: multiprocessing.Event):
    print("Worker process started")
    time.sleep(3)  # simulate some external trigger
    event.set()
    print("Worker set terminate_event.")


if __name__ == "__main__":
    terminate_event = multiprocessing.Event()
    p = multiprocessing.Process(target=worker, args=(terminate_event,))
    p.start()

    # Use use_blocking=True to see that will block whole server while waiting
    asyncio.run(main(terminate_event, use_blocking=False))

    p.join()

1 Comment

The reason I had to keep sleep here is because, the function I am running in the server needs some time to execute which I am providing with this sleep.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.