- open a new database connection inside
handle_message, or - indent the subsequent code such that
clientis not closed until after yourwhile True:loop exits (presumably by a KeyboardInterruptException`KeyboardInterrupt)
Here is a toy demonstration of the issue:
class Client:
def __init__(self):
self.opened = False
def __enter__(self):
self.opened = True
print("Client opened")
return self
def __exit__(self, *args):
self.opened = False
print("Client closed")
def write_api(self):
return WriteApi(self)
class WriteApi():
def __init__(self, client):
self.client = client
def use_api(self):
print("Using API. Client open =", self.client.opened)
def trade_stream(handler):
handler(None)
print("About to enter with-statement")
with Client() as client:
print("Entered with-statement")
write_api = client.write_api()
def handle_message(msg):
write_api.use_api()
print("Exiting with-statement")
print("Exited with-statement")
trade_stream(handle_message)
Output:
About to enter with-statement
Client opened
Entered with-statement
Exiting with-statement
Client closed
Exited with-statement
Using API. Client open = False
Note that using the API happens after the client has been closed.
So why do you appear to have working code?
From the documentation ...
from influxdb_client import InfluxDBClient
# Initialize background batching instance of WriteApi
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client:
with client.write_api() as write_api:
pass
... it appears client.write_api() returns an object which itself can behave as a context manager, and so should be used in a with-statement.
If the write_api is "open", and then the client is closed, whether or not the write_api is invalid will depend on how the library is written. It may internally hold the client open, until the write_api is explicitly closed (which you are not doing), or it may be Undefined Behaviour and might appear to work at the moment, and start failing in the future.
As stated earlier, the fix could be to indent the subsequent code, as in:
with InfluxDBClient(
url="http://localhost:8086",
token="fake_token_id",
org="organization") as client:
write_api = client.write_api(write_options=SYNCHRONOUS)
def handle_message(msg: dict) -> None:
...
# still indented, so still inside the `with client`
ws_linear.trade_stream(
handle_message, "BTCUSDT"
)
# still indented, so still inside the `with client`
while True:
sleep(1)
# with statement ends here
# client will be closed here