3

Based on the solution that i got: Running multiple sockets using asyncio in python

i tried to add also the computation part using asyncio

Setup: Python 3.7.4

import msgpack
import threading
import os
import asyncio
import concurrent.futures
import functools
import nest_asyncio
nest_asyncio.apply()

class ThreadSafeElem(bytes):
  def __init__(self, * p_arg, ** n_arg):
     self._lock = threading.Lock()
  def __enter__(self):
     self._lock.acquire()
     return self
  def __exit__(self, type, value, traceback):
     self._lock.release()

elem = ThreadSafeElem()

async def serialize(data):
   return msgpack.packb(data, use_bin_type=True)
async def serialize1(data1):
   return msgpack.packb(data1, use_bin_type=True)

async def process_data(data,data1):
   loop = asyncio.get_event_loop()
   future = await loop.run_in_executor(None, functools.partial(serialize, data))
   future1 = await loop.run_in_executor(None, functools.partial(serialize1, data1))
   return   await asyncio.gather(future,future1)

 ################ Calculation#############################
def calculate_data():
  global elem
  while True:
      try:
          ... data is calculated (some dictionary))...
          elem, elem1= asyncio.run(process_data(data, data1))
      except:
          pass
#####################################################################
def get_data():
  return elem
def get_data1():
  return elem1
########### START SERVER AND get data contionusly ################
async def client_thread(reader, writer):
  while True:
    try:
        bytes_received = await reader.read(100) 
        package_type = np.frombuffer(bytes_received, dtype=np.int8)
        if package_type ==1 :
           nn_output = get_data1()
        if package_type ==2 :
           nn_output = get_data()               
        writer.write(nn_output)
        await writer.drain()
    except:
        pass

async def start_servers(host, port):
  server = await asyncio.start_server(client_thread, host, port)
  await server.serve_forever()

async def start_calculate():
  await asyncio.run(calculate_data())

def enable_sockets():
 try:
    host = '127.0.0.1'
    port = 60000
    sockets_number = 6
    loop = asyncio.get_event_loop()
    for i in range(sockets_number):
        loop.create_task(start_servers(host,port+i))
    loop.create_task(start_calculate())
    loop.run_forever()
except:
    print("weird exceptions")
##############################################################################

enable_sockets()   

The issue is that when i make a call from client, the server does not give me anything.

I tested the program with dummy data and no asyncio on calculation part so without this loop.create_task(start_calculate()) and the server responded correctly.

I also run the calculate data without adding it in the enable sockets and it worked. It also working with this implementation, but the problem is the server is not returning anything.

I did it like this cos i need the calculate part to run continuously and when one of the clients is calling to return the data at that point.

0

2 Answers 2

3

An asyncio event loop cannot be nested inside another, and there is no point in doing so: asyncio.run (and similar) blocks the current thread until done. This does not increase parallelism, and merely disables any outer event loop.

If you want to nest another asyncio task, directly run it in the current event loop. If you want to run a non-cooperative, blocking task, run it in the event loop executor.

async def start_calculate():
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(None, calculate_data)

The default executor uses threads – this allows running blocking tasks, but does not increase parallelism. Use a custom ProcessPoolExecutor to use additional cores:

import concurrent.futures

async def start_calculate():
    loop = asyncio.get_running_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        await loop.run_in_executor(pool, calculate_data)
Sign up to request clarification or add additional context in comments.

6 Comments

I tried the first approach suggested and modified start_calculate but i received RuntimeWarning: coroutine 'process_data' was never awaited @MisterMiyagi
@cUser calculate_data as shown is incomplete, and it catches and discards all errors – I cannot help with something I know nothing about. However, it seems you dispatch in it using asyncio.run(process_data(data)) – as mentioned nesting asyncio.run inside asyncio.run is not a good idea. Either submit process_data to the outer event loop, or discard the wrapper and run serialize directly inside calculate_data.
You are perfectly right the asyncio.run in asyncio.run is far from a good idea. However i realized that that i will serialize 5 things that i need to do in the same time. Calcualte data is just doing some irelevant calculation and it adds them in dictionaries thats why i did not add it. So if i will do the serialize in calculate data it will make everything very slow @MisterMiyagi. So then at the point how can i still do the serialization in paralel even if i am running in this loop.run_in_executor(None, calculate_data). Tell me if i made it somehow clear
With the code shown in the question, each calculate_data eventually calls serialize once: calculate_data blocking in asyncio.run does not add concurrency, process_data submitting exactly one executor task does not add concurrency, asyncio.gather of a single task does not add concurrency. As such, asyncio.run(process_data(data)) could be replaced by serialize(data) (and serialize should be a def not async def function).
@cUser I'm still not following why you want to run serialize and serialize using asyncio. They don't use any async functionality. Either way, I've answered how to run multiple tasks in asyncio. If you want to debug a specific bug that turns up now, please ask a new question including all information required to actually debug it. Adding information piecewise via comments to an answer is not efficient for either of us.
|
3

Why do you call asyncio.run() multiple times?

This function always creates a new event loop and closes it at the end. It should be used as a main entry point for asyncio programs, and should ideally >only be called once.

I would advise you to read the docs

1 Comment

You can use multiprocessing, I have personally done this

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.