7,934 questions
1
vote
0
answers
82
views
pytest-xdist fails with session-scoped async fixtures only when worker count is multiple of CPU cores
Summary
I've discovered a reproducible bug where pytest-xdist + pytest-asyncio fails only when the number of workers is a multiple of 4 on my 16-core system (4, 8, 12, 16 fail; 3, 5, 6, 7, 9, 10, 11, ...
-2
votes
0
answers
51
views
Impact of GIL on multi-threaded async performance in Python 3.11 vs 3.12 [closed]
I am developing a Python application that combines asyncio coroutines with multi-threading to perform CPU-bound tasks concurrently. While testing performance, I noticed that the execution time differs ...
5
votes
2
answers
173
views
Why does multiprocess with "fork" fail under Python 3.14 but work in 3.13 (works only with "spawn" and "forkserver")?
The following code works fine on Python 3.13, but fails on Python 3.14 with a RuntimeError related to asyncio tasks.
If I switch the multiprocessing start method from "fork" to "spawn&...
1
vote
1
answer
50
views
FastAPI + pytest: RuntimeError: Event loop is closed when testing logout with AsyncClient
I'm testing a FastAPI application with asynchronous endpoints using pytest and httpx.AsyncClient. I have an endpoint /logout that depends on a JWT token from cookies and uses some async dependencies.
...
-2
votes
1
answer
38
views
Weird errors while trying to connect to Mega.NZ from Python via https://pypi.org/project/mega.py/ [closed]
I'm trying to connect to Mega.NZ via module https://pypi.org/project/mega.py/ with this code:
#!/usr/bin/env python3
from mega import Mega
mega = Mega()
m = mega.login("my mail", "my ...
-2
votes
1
answer
111
views
Use decorator to remove await prefix
I am working with Asyncio in an app I am making. However, I find it daunting to prefix my function calls with await.
So I want to encapsulate it in a decorator that adds the wait to the function call.
...
0
votes
1
answer
106
views
python REPL: is it possible to combine -i and -m asyncio?
I was trying to start the asyncio REPL (python -m asyncio) and to execute a script before starting the interactive REPL (python -i ./script.py). I tried re-ordering of options, the end-of-options ...
2
votes
2
answers
73
views
Why is asyncio.gather returning results in a different order than my tasks?
I’m learning about asynchronous programming in Python and testing with asyncio.
import asyncio
import random
async def worker(name):
delay = random.randint(1, 3)
await asyncio.sleep(delay)
...
0
votes
2
answers
72
views
asyncio.run with SQLAlchemy asyncio sessions works only once, then raises "Future attached to a different loop" and "another operation is in progress"
I’m running into an issue when using asyncio.run together with SQLAlchemy (async) inside a Celery task.
When I call the function the first time, it works fine.
On the second call, I get:
RuntimeError: ...
0
votes
1
answer
40
views
How to make Open FGA Async Client work with Flask + Gunicorn?
class FGAClientWrapper:
"""
A synchronous, gevent-friendly wrapper for the async OpenFGA SDK.
"""
def __init__(self, call_timeout: float | None = 10.0):
...
2
votes
2
answers
124
views
How to safely check a multiprocessing.Event in an asyncio loop without blocking?
I'm working on an asynchronous server using asyncio, but I need to monitor a multiprocessing.Event (used to signal termination from another process) inside my async event loop. Here's the simplified ...
4
votes
3
answers
229
views
Pytest in FastAPI + Postgres results in: <sys>:0: RuntimeWarning: coroutine 'Connection._cancel' was never awaited
I'm writing tests for my fastapi application that uses asynchronous posgtres connection:
# backend/database/session.py
from sqlmodel import SQLModel
from sqlmodel.ext.asyncio.session import ...
1
vote
2
answers
59
views
coroutine was never awaited with asyncio.TaskGroup
In the following code snippet, runner takes an optional semaphore. If this semaphore is provided, then all coroutines will be wrapped to handle entering the semaphore's context manager.
If I raise an ...
1
vote
4
answers
89
views
Why only creating a task will run the coroutine in python?
There is something I can't understand in this code
import asyncio
async def fetch_data(param):
print(f"Do something with {param}...")
await asyncio.sleep(param)
print(f"Done ...
2
votes
1
answer
101
views
Threading or asyncio for serial communications?
I'm running some serial commands to test connected devices under test (DUTs), & we have a need to run these in parallel/concurrently to speed up our tests. I would like some feedback on which ...