2
\$\begingroup\$

I wrote this code to make a non-blocking manager along with pipeline operations using asyncio, my main concern is to catch received items producer, and when the received operation is complete. I want to return all together and merged if keys matches, however, I have a doubt where should I have to join the data at the end producer or consumer in my case that the current workflow is the following

1 - scrape all databases ( multiples clients) (simulate)

2 - pushes to manager (proxy level server) where multiples clients send its data to the manger

3 - Merge multiple data sources into one list as per incoming data no DB operations yet example {"ID-2002-0201": {"id":"ID-2002-0201","updated_at":"2018-05-14T22:25:51Z","html_url":"xxxxxxxxxxxx"}} > maybe producer

4 - use a get_or_create ( check in the database if there isnt a record with that data , otherwise create it) > consumer

5 - create a bulk of data (maybe chunk the data into smaller chunks to be scalable when growing data source from 2 to 100+) > consumer

server.py

# #!/usr/bin/env python3
import asyncio
import logging
import random
from pipeline_async import Pipeline

class A:
    def __init__(self):
        pass

    def run(self):
        return {"ID-2002-0201":{"id":"ID-2002-0201","updated_at":"2018-05-14T22:25:51Z","html_url":"xxxxxxxxxxxx"}}

class B:
    def __init__(self):
        pass

    def run(self):
        return {"ID-2002-0202":{"id":"ID-2002-0202","updated_at":"2018-05-14T22:25:51Z","html_url":"xxxxxxxxxxxx"}}

class Manager:

    async def producer(self, pipeline, data_sources):
        """Pretend we're getting a number from the network."""
        for data_stream in data_sources:
            await pipeline.set_message(data_stream.run(), "Producer")
            logging.info("Producer got message: %s", data_stream)

    async def consumer(self, pipeline):
        """ Pretend we're saving a number in the database. """
        while True:
            # wait for an item from the Producer
            message = await pipeline.get_message("Consumer")
            # process the msg
            logging.info(
                "Consumer storing message: %s", message
            )
            # simulate i/o operation using sleep
            await asyncio.sleep(random.random())
            pipeline.task_done()

    async def start(self):
        pipeline = Pipeline()
        data_sources = [A(),B()]
        # schedule the consumer
        consume = asyncio.ensure_future(self.consumer(pipeline))
        # run the producer and wait for completion
        await self.producer(pipeline, data_sources)
        # wait until the consumer has processed all items
        await pipeline.join()
        # the consumer is still awaiting for an item, cancel it
        consume.cancel()
        logging.info("Successfully shutdown the service.")

if __name__ == '__main__':
    asyncio.run(Manager().start())

pipeline.py

class Pipeline(asyncio.Queue):
    def __init__(self):
        super().__init__(maxsize=10)

    async def get_message(self, name):
        logging.debug("%s:about to get from queue", name)
        value = await self.get()
        logging.debug("%s:got %s from queue", name, value)
        return value

    async def set_message(self, value, name):
        logging.debug("%s:about to add %s to queue", name, value)
        await self.put(value)
        print(name, value)
        logging.debug("%s:added %s to queue", name, value)

I would appreciate some feedback on cases I missed

\$\endgroup\$

0

You must log in to answer this question.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.