I wrote this code to make a non-blocking manager along with pipeline operations using asyncio, my main concern is to catch received items producer, and when the received operation is complete. I want to return all together and merged if keys matches, however, I have a doubt where should I have to join the data at the end producer or consumer in my case that the current workflow is the following
1 - scrape all databases ( multiples clients) (simulate)
2 - pushes to manager (proxy level server) where multiples clients send its data to the manger
3 - Merge multiple data sources into one list as per incoming data no DB operations yet example {"ID-2002-0201": {"id":"ID-2002-0201","updated_at":"2018-05-14T22:25:51Z","html_url":"xxxxxxxxxxxx"}} > maybe producer
4 - use a get_or_create ( check in the database if there isnt a record with that data , otherwise create it) > consumer
5 - create a bulk of data (maybe chunk the data into smaller chunks to be scalable when growing data source from 2 to 100+) > consumer
server.py
# #!/usr/bin/env python3
import asyncio
import logging
import random
from pipeline_async import Pipeline
class A:
def __init__(self):
pass
def run(self):
return {"ID-2002-0201":{"id":"ID-2002-0201","updated_at":"2018-05-14T22:25:51Z","html_url":"xxxxxxxxxxxx"}}
class B:
def __init__(self):
pass
def run(self):
return {"ID-2002-0202":{"id":"ID-2002-0202","updated_at":"2018-05-14T22:25:51Z","html_url":"xxxxxxxxxxxx"}}
class Manager:
async def producer(self, pipeline, data_sources):
"""Pretend we're getting a number from the network."""
for data_stream in data_sources:
await pipeline.set_message(data_stream.run(), "Producer")
logging.info("Producer got message: %s", data_stream)
async def consumer(self, pipeline):
""" Pretend we're saving a number in the database. """
while True:
# wait for an item from the Producer
message = await pipeline.get_message("Consumer")
# process the msg
logging.info(
"Consumer storing message: %s", message
)
# simulate i/o operation using sleep
await asyncio.sleep(random.random())
pipeline.task_done()
async def start(self):
pipeline = Pipeline()
data_sources = [A(),B()]
# schedule the consumer
consume = asyncio.ensure_future(self.consumer(pipeline))
# run the producer and wait for completion
await self.producer(pipeline, data_sources)
# wait until the consumer has processed all items
await pipeline.join()
# the consumer is still awaiting for an item, cancel it
consume.cancel()
logging.info("Successfully shutdown the service.")
if __name__ == '__main__':
asyncio.run(Manager().start())
pipeline.py
class Pipeline(asyncio.Queue):
def __init__(self):
super().__init__(maxsize=10)
async def get_message(self, name):
logging.debug("%s:about to get from queue", name)
value = await self.get()
logging.debug("%s:got %s from queue", name, value)
return value
async def set_message(self, value, name):
logging.debug("%s:about to add %s to queue", name, value)
await self.put(value)
print(name, value)
logging.debug("%s:added %s to queue", name, value)
I would appreciate some feedback on cases I missed