3

I have some HTML pages that I am trying to extract the text from using asynchronous web requests through aiohttp and asyncio, after extracting them I save the files locally. I am using BeautifulSoup(under extract_text()), to process the text from the response and extract the relevant text within the HTML page(exclude the code, etc.) but facing an issue where my synchronous version of the script is faster than my asynchronous + multiprocessing.

As I understand, using the BeautifulSoup function causes the main event loop to block within parse(), so based on these two StackOverflow questions[0, 1], I figured the best thing to do was to run the extract_text() within its own process(as its a CPU task) which should prevent the event loop from blocking.

This results in the script taking 1.5x times longer than the synchronous version(with no multiprocessing).

To confirm that this was not an issue with my implementation of the asynchronous code, I removed the use of the extract_text() and instead saved the raw text from the response object. Doing this resulted in my asynchronous code being much faster, showcasing that the issue is purely from the extract_text() being run on a separate process.

Am I missing some important detail here?

import asyncio
from asyncio import Semaphore
import json
import logging
from pathlib import Path
from typing import List, Optional

import aiofiles
from aiohttp import ClientSession
import aiohttp
from bs4 import BeautifulSoup
import concurrent.futures
import functools


def extract_text(raw_text: str) -> str:
    return " ".join(BeautifulSoup(raw_text, "html.parser").stripped_strings)


async def fetch_text(
    url: str,
    session: ClientSession,
    semaphore: Semaphore,
    **kwargs: dict,
) -> str:
    async with semaphore:
        response = await session.request(method="GET", url=url, **kwargs)
        response.raise_for_status()
        logging.info("Got response [%s] for URL: %s", response.status, url)
        text = await response.text(encoding="utf-8")
        return text


async def parse(
    url: str,
    session: ClientSession,
    semaphore: Semaphore,
    **kwargs,
) -> Optional[str]:
    try:
        text = await fetch_text(
            url=url,
            session=session,
            semaphore=semaphore,
            **kwargs,
        )
    except (
        aiohttp.ClientError,
        aiohttp.http_exceptions.HttpProcessingError,
    ) as e:
        logging.error(
            "aiohttp exception for %s [%s]: %s",
            url,
            getattr(e, "status", None),
            getattr(e, "message", None),
        )
    except Exception as e:
        logging.exception(
            "Non-aiohttp exception occured:  %s",
            getattr(e, "__dict__", None),
        )
    else:
        loop = asyncio.get_running_loop()
        with concurrent.futures.ProcessPoolExecutor() as pool:
            extract_text_ = functools.partial(extract_text, text)
            text = await loop.run_in_executor(pool, extract_text_)
            logging.info("Found text for %s", url)
            return text


async def process_file(
    url: dict,
    session: ClientSession,
    semaphore: Semaphore,
    **kwargs: dict,
) -> None:
    category = url.get("category")
    link = url.get("link")
    if category and link:
        text = await parse(
            url=f"{URL}/{link}",
            session=session,
            semaphore=semaphore,
            **kwargs,
        )
        if text:
            save_path = await get_save_path(
                link=link,
                category=category,
            )
            await write_file(html_text=text, path=save_path)
        else:
            logging.warning("Text for %s not found, skipping it...", link)


async def process_files(
    html_files: List[dict],
    semaphore: Semaphore,
) -> None:
    async with ClientSession() as session:
        tasks = [
            process_file(
                url=file,
                session=session,
                semaphore=semaphore,
            )
            for file in html_files
        ]
        await asyncio.gather(*tasks)


async def write_file(
    html_text: str,
    path: Path,
) -> None:
    # Write to file using aiofiles
    ...

async def get_save_path(link: str, category: str) -> Path:
    # return path to save
    ...

async def main_async(
    num_files: Optional[int],
    semaphore_count: int,
) -> None:
    html_files = # get all the files to process
    semaphore = Semaphore(semaphore_count)
    await process_files(
        html_files=html_files,
        semaphore=semaphore,
    )


if __name__ == "__main__":
    NUM_FILES = # passed through CLI args
    SEMAPHORE_COUNT = # passed through CLI args
    asyncio.run(
        main_async(
            num_files=NUM_FILES,
            semaphore_count=SEMAPHORE_COUNT,
        )
    )

SnakeViz charts across 1000 samples

  1. Async version with extract_text and multiprocessing

Async version with extract_text and multiprocessing

  1. Async version without extract_text

Async version without extract text

  1. Sync version with extract_text(notice how the html_parser from BeautifulSoup takes up the majority of the time here)

Sync version with extract_text

  1. Sync version without extract_text

enter image description here

7
  • Before trying to parallelize code, you should first benchmark your code to identify bottlenecks and optimization opportunities. Here, you are trying to parallelize extract_text but I suspect that it is already quite fast. Thus, you are just paying the additional cost of managing parallelism (starting and stopping the pool at each iteration for instance) which is counterproductive. Commented Jul 5, 2022 at 14:06
  • @LouisLac I did benchmark it although naively just through timing the scripts. Using extract_text() causes an estimated 10x slowdown compared to not using it across both synchronous and asynchronous scripts(more than 10x actually). Commented Jul 5, 2022 at 14:10
  • Can you compute exactly the average duration of the extract_text function call? With a performance timer if possible. Commented Jul 5, 2022 at 14:16
  • Moreover, it looks like you are trying to run extract_text in a multiprocessing pool and then trying to make process_file calls concurrent with asyncio. Instead, you could try to keep all functions synchronous (parse, process_file, write_file) and only run process_file in parallel using a multiprocessing pool. Commented Jul 5, 2022 at 14:20
  • @LouisLac I have added the cProfiles of all the scripts I mentioned using SnakeViz in the question. As you can see, the _htmlparser from BeautifulSoup takes up the majority of the time within the synchronous version(image 3) compared to the synchronous version without it(image 4). Commented Jul 6, 2022 at 1:59

1 Answer 1

5

Here is roughly what your asynchronous program does:

  1. Launch num_files parse() tasks concurrently
  2. Each parse() task creates its own ProcessPoolExecutor and asynchronously awaits for extract_text (which is executed in the previously created process pool).

This is suboptimal for several reasons:

  1. It creates num_files process pools, which are expensive to create and takes memory
  2. Each pool is only used for one single operation, which is counterproductive: as many concurrent operations as possible should be submitted to a given pool

You are creating a new ProcessPoolExecutor each time the parse() function is called. You could try to instantiate it once (as a global for instance, of passed through a function argument):

from concurrent.futures import ProcessPoolExecutor

async def parse(loop, executor, ...):
  ...
  text = await loop.run_in_executor(executor, extract_text)

# and then in `process_file` (or `process_files`):

async def process_file(...):
  ...
  loop = asyncio.get_running_loop()
  with ProcessPoolExecutor() as executor:
    ...
    await process(loop, executor, ...)

I benchmarked the overhead of creating a ProcessPoolExecutor on my old MacBook Air 2015 and it shows that it is quite slow (almost 100 ms for pool creation, opening, submit and shutdown):

from time import perf_counter
from concurrent.futures import ProcessPoolExecutor

def main_1():
    """Pool crated once"""
    reps = 100
    t1 = perf_counter()
    with ProcessPoolExecutor() as executor:
        for _ in range(reps):
            executor.submit(lambda: None) 
    t2 = perf_counter()   
    print(f"{(t2 - t1) / reps * 1_000} ms")  # 2 ms/it

def main_2():
    """Pool created at each iteration"""
    reps = 100
    t1 = perf_counter()
    for _ in range(reps):
        with ProcessPoolExecutor() as executor:
            executor.submit(lambda: None) 
    t2 = perf_counter()   
    print(f"{(t2 - t1) / reps * 1_000} ms")  # 100 ms/it

if __name__ == "__main__":
    main_1()
    main_2()

You may again hoist it up in the process_files function, which avoid recreating the pool for each file.

Also, try to inspect more closely your first SnakeViz chart in order to know what exactly in process.py:submit is taking that much time.


One last thing, be careful of the semantics of using a context manager on an executor:

from concurrent.futures import ProcessPoolExecutor

with ProcessPoolExecutor() as executor:
  for i in range(100):
    executor.submit(some_work, i)

Not only this creates and executor and submit work to it but it also waits for all work to finish before exiting the with statement.

Sign up to request clarification or add additional context in comments.

3 Comments

It's also worth remembering there's a non-negligible overhead in serializing/deserializing data with child processes.
Thank you that worked perfectly! I actually moved the global executor to process_files and then passed it down to process_file as process_file is run for each file. This version is now much faster. In order to further improve it do you think implementing a Queue to process the jobs would help?
I don't thing so, the ProcessPoolExecutor already uses a queue internally to store pending jobs.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.