3
\$\begingroup\$

You will need to follow this guide.

This is a Youtube crawler that crawls information about Youtube playlists, it uses Youtube Data API v3 and it crawls the title, url, description, count and videos of the playlist.

For the videos it crawls the title, url, publish time, description, tags, category, duration, statistics (view count, like count, favorite count and comment count) and owner.

For the owner it crawls the title, url, description, custom url, creation date, country and statistics of the channel (view count, subscriber count, hide subscriber count and video count).

The main portion of the code is a class, you create it with your API key, it contains separate methods to crawl information about a channel, crawl a video, crawl a playlist and crawl all playlists created by a channel.

It only crawls an item if the item's id is not found in the respective data structure.

The main usage is to crawl all the information I wanted to crawl about all playlists created by my channel, and here is the output (it only contains public information anyways).


Code

import googleapiclient.discovery
import json
import os
import re
import requests
from datetime import datetime, timedelta
from lxml import html
from pathlib import Path

os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"
PLAYLIST_BASE_URL = 'https://www.youtube.com/playlist?list='
VIDEO_BASE_URL = 'https://www.youtube.com/watch?v='
CHANNEL_BASE_URL = 'https://www.youtube.com/channel/'
YOUTUBE_BASE_URL = 'https://www.youtube.com/'
YOUTUBE_TIMESTAMP = r'^P((?P<Day>\d+)D)?T((?P<Hour>([0-9]|[0-1][0-9]|2[0-3]))H)?((?P<Minute>([0-9]|[0-5][0-9]))M)?((?P<Second>([0-9]|[0-5][0-9])(\.\d+)?)S)?$'

def parse_youtube_time(s):
    if s == 'P0D':
        return 'LIVE'
    groupdict = re.match(YOUTUBE_TIMESTAMP, s).groupdict()
    assert any(groupdict.values())
    d, h, m, s = groupdict.values()
    timestamp = ['00']*3
    if h:
        timestamp[0] = h.zfill(2)
    if m:
        timestamp[1] = m.zfill(2)
    if s:
        if float(s) < 10:
            s = '0'+s
        timestamp[2] = s
    timestamp = ':'.join(timestamp)
    if d:
        timestamp = d + 'D ' + timestamp
    return timestamp

def get_all_items(request):
    response = request.execute()
    url = request.uri
    while (token := response.pop('nextPageToken', None)):
        nextpage = json.loads(requests.get(url+'&pageToken='+token).text)
        response['items'] += nextpage.pop('items')
        response.update(nextpage)
    return response

class Crawler:
    def __init__(self, api_key):
        self.youtube = googleapiclient.discovery.build('youtube', 'v3', developerKey=api_key)
        self.channels = dict()
        self.videos = dict()
        self.playlists = dict()
    
    def crawl_channel(self, channel_id):
        if channel_id in self.channels:
            return self.channels[channel_id]
        
        url = CHANNEL_BASE_URL + channel_id
        response = self.youtube.channels().list(id=channel_id, part='snippet,statistics').execute()
        item = response['items'][0]
        d = item['snippet']
        title = d.pop('title')
        if (customUrl := d.get('customUrl')):
            d['customUrl'] = YOUTUBE_BASE_URL + customUrl
        for i in ('thumbnails', 'localized'): d.pop(i)
        statistics = {k: int(v) for k, v in item['statistics'].items()}
        d['statistics'] = statistics
        owner = {'title': title, 'url': url}
        owner.update(d)
        self.channels[channel_id] = owner
        return owner
    
    def crawl_video(self, video_id):
        if video_id in self.videos:
            return self.videos[video_id]
        
        url = VIDEO_BASE_URL + video_id
        response = self.youtube.videos().list(id=video_id,part="snippet,contentDetails,statistics").execute()
        if not response['items']:
            self.videos[video_id] = None
            return None
        
        item = response['items'][0]
        snippet = item['snippet']
        title = snippet['title']
        description = snippet['description']
        tags = snippet.get('tags')
        categoryId = int(snippet.get('categoryId', 0))
        publishedAt = snippet['publishedAt']
        duration = parse_youtube_time(item['contentDetails']['duration'])
        statistics = {k: int(v) for k, v in item['statistics'].items()}
        owner_id = snippet['channelId']
        owner = self.crawl_channel(owner_id)
        
        video = {
            'title': title,
            'url': url,
            'publishedAt': publishedAt,
            'description': description,
            'tags': tags,
            'category': categoryId,
            'duration': duration,
            'statistics': statistics,
            'owner': owner
        }
        self.videos[video_id] = video
        return video
    
    def crawl_playlist(self, playlist_id):
        if playlist_id in self.playlists:
            return self.playlists[playlist_id]
        
        url = PLAYLIST_BASE_URL + playlist_id
        response = self.youtube.playlists().list(part='snippet,contentDetails',id=playlist_id).execute()
        item = response['items'][0]
        title = item['snippet']['title']
        description = item['snippet']['description']
        count = item['contentDetails']['itemCount']
        playlist = {
            'title': title,
            'url': url,
            'count': count,
            'description': description,
            'items': []
        }
        request = self.youtube.playlistItems().list(part='snippet,contentDetails',playlistId=playlist_id, maxResults='50')
        response = get_all_items(request)        
        for item in response['items']:
            snippet = item['snippet']
            if snippet['resourceId']['kind'] == 'youtube#video':
                video_id = snippet['resourceId']['videoId']
                video = self.crawl_video(video_id)
                if video:
                    playlist['items'].append(video)
        
        self.playlists[playlist_id] = playlist
        return playlist
    
    def crawl_playlists(self, channel):
        playlists = []
        request = self.youtube.playlists().list(part="snippet,contentDetails", channelId=channel, maxResults='50')
        res_playlists = get_all_items(request)
        for item in res_playlists['items']:
            playlist_id = item['id']
            playlist = self.crawl_playlist(playlist_id)
            playlists.append(playlist)
        
        return playlists


def parse_history_file(filepath):
    working_list = []
    tree = html.fromstring(Path(filepath).read_text(encoding='utf8'))
    nodes = tree.xpath(
        '//div[contains(@class, "outer-cell")]/div[@class="mdl-grid"]')
    for node in nodes:
        content = node.xpath('./div[contains(@class, "content-cell")][1]')[0]
        junk_text = node.xpath(
            './div[contains(@class, "content-cell")][2]//text()')
        if ' From Google Ads' in junk_text:
            continue
        children = content.xpath('./a')
        if len(children) != 2:
            continue
        video_title = children[0].text
        video_id = children[0].attrib['href'].replace(VIDEO_BASE_URL, '')
        channel_title = children[1].text
        channel_id = children[1].attrib['href'].replace(CHANNEL_BASE_URL, '')
        last_line = content.xpath('(.//text())[last()]')[0]
        timestamp = datetime.strptime(last_line[:-4], '%b %d, %Y, %I:%M:%S %p')
        timestamp -= timedelta(hours=8)
        timestamp = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')
        entry = {
            'video_title': video_title,
            'video_id': video_id,
            'channel_title': channel_title,
            'channel_id': channel_id,
            'timestamp': timestamp
        }
        working_list.append(entry)
    return working_list

def analyze_history_file(filepath, full=False):
    history = parse_history_file(filepath)
    channels = dict()
    for entry in history:
        video_title, video_id, channel_title, channel_id, timestamp = entry.values()
        if channel_id not in channels:
            if full:
                channel = crawler.crawl_channel(channel_id)
                video = crawler.crawl_video(video_id)
                if not video:
                    continue
                video = video.copy()
                video.pop('owner')
            else:
                channel = {'title': channel_title}
                video = {'title': video_title}
            video.update({
                'timestamps': [timestamp],
                'watched': 1
            })
            channel.update({
                'videos': {video_id: video},
                'watched': 1
            })
            channels[channel_id] = channel
        else:
            channel = channels[channel_id]
            videos = channel['videos']
            if video_id not in videos:
                if full:
                    video = crawler.crawl_video(video_id)
                    if not video:
                        continue
                    video = video.copy()
                    video.pop('owner')
                else:
                    video = {'title': video_title}
                video.update({
                    'timestamps': [timestamp],
                    'watched': 1
                })
                videos[video_id] = video
            else:
                videos[video_id]['timestamps'].append(timestamp)
                videos[video_id]['watched'] += 1
            
            channel['watched'] += 1
    
    for channel, first_level in channels.items():
        channels[channel]['videos'] = {k: v for k, v in sorted(first_level['videos'].items(), key=lambda x: -x[1]['watched'])}
    
    channels = {k: v for k, v in sorted(channels.items(), key=lambda x: -x[1]['watched'])}
    return channels

if __name__ == "__main__":
    API_KEY = your_api_key
    CHANNEL = 'UC--2uyGZ5xJWulPXe2p7YDw'
    crawler = Crawler(API_KEY)
    playlists = crawler.crawl_playlists(CHANNEL)
    Path('D:/Youtube_playlists.json').write_text(json.dumps(playlists, indent=4, ensure_ascii=False), encoding='utf8')
    Path('D:/Youtube_watch_history.json').write_text(json.dumps(parse_history_file('D:/watch-history.html'), indent=4, ensure_ascii=False), encoding='utf8')
    Path('D:/Youtube_watch_history_analyzed.json').write_text(json.dumps(analyze_history_file('D:/watch-history.html'), indent=4, ensure_ascii=False), encoding='utf8')

How can it be improved?


Edit

Fixed a small bug, for live shows Youtube API returns a duration of 'P0D' which means 00:00:00, this is because the show is ongoing so there is not a finite duration for it now.

I didn't find about this until now.


Another small bug, Hour can only go up to 23, it seems for time duration higher than 24 hours, Youtube uses the unit Day, and I have once seen live videos with hundreds of days of duration, so the biggest unit is Day and the full format is 'P{d}DT{h}H{m}M{s}S'.

Videos with duration higher than 24 hours are relatively rare so I didn't know about it.


I have modified the script to make it parse your watch-history.html as well, you will need to use Google Takeout to get that file if you want to test the last function.

Because my watch history is private I won't share it here.

\$\endgroup\$
3
  • \$\begingroup\$ Currently I am writing a PyQt6 GUI program to represent all this data... \$\endgroup\$ Commented Apr 29, 2022 at 9:19
  • \$\begingroup\$ My timezone is CST (China Standard Time) which is UTC+08:00, so I need to subtract eight hours from it to get the UTC time, because Youtube timestamps are in UTC. I don't know how to parse time with timezone info yet so you will need to modify the second last function according to your timezone to get UTC time. \$\endgroup\$ Commented Apr 30, 2022 at 9:30
  • \$\begingroup\$ You might like to take out the references to “D:/“ as not all people running the script will be on Windows or have D drive. \$\endgroup\$ Commented Nov 29, 2022 at 4:58

0

You must log in to answer this question.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.