Experimental
This is an experimental, alternate implementation that:
- Streams the HTTP response and does not need the entire response content to complete
- Streams the parsed HTML elements and does not need the entire document tree to complete
- Streams the JSON body and does not need the entire dictionary tree to complete
It is somewhat iterator-heavy, and built more as a proof of concept to demonstrate that this is possible. Advantages include that worst-case memory usage should be reduced, and that BeautifulSoup is no longer needed. Disadvantages include that a new dependency, JsonSlicer, is needed; and this might introduce subtle HTTP inefficiencies from connections that are reset before complete response transmission.
import csv
import logging
from functools import partial
from html.parser import HTMLParser
from typing import Any, Dict, Iterable, Tuple, Optional
from urllib.parse import urljoin
from jsonslicer import JsonSlicer
from requests import Session, Response
JSON = Dict[str, Any]
class StreamParser(HTMLParser):
def __init__(self, resp: Response):
resp.raise_for_status() # If the response failed, it can't be parsed
self.resp = resp # Keep the response so we can stream from it
self.in_tag = False # Parser state: if we're in the script tag
self.done = False # Whether we're done the script tag
self.queue = [] # Queue of text element chunks in the script
super().__init__() # Initialize the base parser
def __enter__(self):
# Start the data chunk iterator
self.chunks = self.data_chunks()
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
# When we're done, tell the HTTP response stream to close
self.resp.close()
def data_chunks(self) -> Iterable[str]:
# Stream in arbitrary-sized chunks from the response
for chunk in self.resp.iter_content(
chunk_size=None, # Get whatever chunks are sent our way
decode_unicode=True, # Needed for HTMLParser compatibility
):
logging.debug(
f'{len(chunk)}-character chunk: '
f'{chunk[:10]}...{chunk[-10:]}'
)
# Feed this chunk to the parser, which will in turn call our handle
# methods and populate the queue
self.feed(chunk)
yield from self.queue
self.queue.clear()
# We only care about one tag. Once that's parsed, we're done
# iterating
if self.done:
break
def read(self, n: Optional[int] = -1) -> str:
# Will be called by JsonSlicer. We only support partial reads for
# efficiency's sake; we do not build up our own buffer string.
if n is None or n < 0:
raise NotImplementedError('Read-to-end not supported')
try:
return next(self.chunks)
except StopIteration:
return '' # end of stream
def handle_starttag(self, tag: str, attrs: Iterable[Tuple[str, str]]):
self.in_tag = tag == 'script' and any(
k == 'id' and v == '__NEXT_DATA__' for k, v in attrs
)
def handle_data(self, data: str) -> None:
if self.in_tag:
self.queue.append(data)
def handle_endtag(self, tag: str) -> None:
if self.in_tag:
self.in_tag = False
self.done = True
def __iter__(self) -> Iterable[JSON]:
# Iterating over this object will magically produce individual listing
# dictionaries. We're an iterator; we delegate to the JsonSlicer
# iterator; and it in turn invokes read() which uses our data_chunks
# iterator.
return JsonSlicer(file=self, path_prefix=(
'props', 'initialProps', 'pageProps', 'regularListingsFormatted', None,
))
class ZooplaScraper:
ROOT = 'https://zoopla.co.uk'
from_root = partial(urljoin, ROOT)
def __init__(self):
self.session = Session()
def fetch(
self, query: str = 'London', radius: int = 0,
sort: str = 'newest_listings', page: int = 1,
) -> StreamParser:
resp = self.session.get(
self.from_root(f'for-sale/property/{query.lower()}/'),
params={
'page_size': 25,
'q': query,
'radius': radius,
'results_sort': sort,
'pn': page,
},
stream=True,
)
return StreamParser(resp)
@classmethod
def serialise(cls, listing: JSON) -> JSON:
# Convert from the site's representation of a listing dict to our own
return {
'listing_id': listing['listingId'],
'name_title': listing['title'],
'names': listing['branch']['name'],
'addresses': listing['address'],
'agent': cls.from_root(listing['branch']['branchDetailsUri']),
'phone_no': listing['branch']['phone'],
'picture': listing['image']['src'],
'prices': listing['price'],
'listed_on': listing['publishedOn'],
'listing_detail_link': cls.from_root(listing['listingUris']['detail']),
}
def run(
self,
query: str = 'London', radius: int = 0, sort: str = 'newest_listings',
max_pages: int = 4,
) -> Iterable[JSON]:
for page in range(1, max_pages + 1):
with self.fetch(query, radius, sort, page) as stream:
for n_listings, data in enumerate(stream):
yield self.serialise(data)
logging.info(f'Page {page}: {n_listings} listings')
@staticmethod
def to_csv(results: Iterable[JSON], filename: str = 'zoopla.csv') -> None:
with open(filename, 'w', newline='') as csv_file:
first = next(results)
writer = csv.DictWriter(csv_file, fieldnames=first.keys())
writer.writeheader()
writer.writerow(first)
writer.writerows(results)
logging.info(f'Write to {filename} complete')
if __name__ == '__main__':
# Will include debugging statements from urllib3
logging.basicConfig(level=logging.INFO) # Switch to DEBUG for more verbosity
scraper = ZooplaScraper()
scraper.to_csv(scraper.run())