分享

【實戰篇】 解析 Python 之父寫的 web crawler 異步爬蟲 | Max行銷誌

 老庄走狗 2022-01-17

【实战篇】解析 Python之父写的网络爬虫 异步爬虫

异步全站爬虫_Max行销志

下面的程序码程序是来自于Python 之父 Guido van RossumA. Jesse Jiryu Davis所写的虫子一起写的网页爬虫主要是展示如何使用使用 aiohttp 来网页写异步爬取。

作者:A. Jesse Jiryu Davis 和 Guido van Rossum
项目:网络爬虫

is 是一个网络爬虫。您给它一个 URL,它会通过 HTML 页面中的 href 链接来抓取该网站。

该示例的重点是展示如何使用 asyncio 模块编写相当复杂的 HTTP 客户端应用程序。这个模块最初被称为 Tulip,是 Python 3.4 标准库中的新模块,基于 PEP 3156。该示例使用了一个名为“aiohttp”的异步 HTTP 客户端实现,由 Andrew Svetlov、Nikolay Kim 和其他人编写。

https://github.com/aosabook/500lines/tree/master/crawler

ㄧ. 将部分程序更新

原始版本 GitHub ·刚刚发布的位置:500lines/crawling.py at master · aosabook/500lines GitHub

因为 GitHub 更新时间是四年前,下载下来,使用一些方法更新:

▍Python 3.8+ 移除 urllib.parse.splitport()

urllib.parse.splitport() 将在 Python 3.8 被移除,所以改使用 urlparse 和 hostname 的方法取得 hostname

parts = urllib.parse.urlparse(root)
host = parts.hostname

参考文件:问题 27485: urllib.splitport — 是不是官方的?- Python追踪器

▍Python3.10+ 即将移除 @asyncio.coroutine

Generator-based coroutine 的方式将在 Python 3.10 中被移除,所以这样的语法将改用 Native coroutine 的方式,使用 Python 3.5+ library 中的 async / await 来选择@asyncio.coroutine

参考文件:Coroutines and Tasks — Python 3.8.2 文档

▍asyncio.get_event_loop

Python 3.7 推出了更多的方法,将使用 event_loop 封装,asyncio.run() 一行程序就结束,不用在创建 event_loop 结束时也不需要 loop.close。

参考文件:cpython/runners.py at 3.8 · python/cpython · GitHub

二. 解析开始

▍Python环境配置:

  • Python 3.7+

▍pip install 安装套件:

pip install aiohttp

▍开始解析

可以看到 Crawler 里面,我用注解来拆成三个部分

#解析爬取到的url是否符合需求规范
#爬取到的url将列队
#主要运行的联络函式

▍完整程序如下:

我将程序阅读步骤注解写在旁边,建议由下往上开始阅读

import asyncio
import aiohttp  # Install with "pip install aiohttp".
from asyncio import Queue

import cgi
from collections import namedtuple
import logging
import re
import time
import urllib.parse
from urllib.parse import urljoin

FetchStatistic = namedtuple('FetchStatistic', [
    'url', 'next_url', 'status', 'exception', 'size', 'content_type',
    'encoding', 'num_urls', 'num_new_urls'
])


def lenient_host(host):
    parts = host.split('.')[-2:]
    return ''.join(parts)


def is_redirect(response):
    return response.status in (300, 301, 302, 303, 307)


class Crawler:
    def __init__(
            self,
            roots,
            exclude=None,
            strict=True,  # What to crawl.
            max_redirect=10,
            max_tries=4,  # Per-url limits.
            max_tasks=10):
        self.roots = roots  # 使用者指定抓取的網站地址,是一個 list
        self.exclude = exclude
        self.strict = strict
        self.max_redirect = max_redirect
        self.max_tries = max_tries
        self.max_tasks = max_tasks
        self.seen_urls = set()  # 會保證不重複 url 與和已經抓取過的 url
        self.done = []
        self.root_domains = set()

    # 解析爬取到的 url 是否符合需求規範
    def host_okay(self, host):
        """Check if a host should be crawled.

        A literal match (after lowercasing) is always good.  For hosts
        that don't look like IP addresses, some approximate matches
        are okay depending on the strict flag.
        """
        host = host.lower()
        if host in self.root_domains:
            return True
        if re.match(r'\A[\d\.]*\Z', host):
            return False
        if self.strict:
            return self._host_okay_strictish(host)
        else:
            return self._host_okay_lenient(host)

    def _host_okay_strictish(self, host):
        """Check if a host should be crawled, strict-ish version.

        This checks for equality modulo an initial 'www.' component.
        """
        host = host[4:] if host.startswith('www.') else 'www.' + host
        return host in self.root_domains

    def _host_okay_lenient(self, host):
        """Check if a host should be crawled, lenient version.

        This compares the last two components of the host.
        """
        return lenient_host(host) in self.root_domains

    def url_allowed(self, url):
        if self.exclude and re.search(self.exclude, url):
            print('--------------------exclude', url)
            return False
        parts = urllib.parse.urlparse(url)
        if parts.scheme not in ('http', 'https'):
            return False
        host = parts.hostname
        if not self.host_okay(host):
            return False
        return True

    # 將爬取到的 url 放入列隊
    def add_url(self, url, max_redirect=None):
        # print(url)
        if max_redirect is None:
            max_redirect = self.max_redirect
        self.seen_urls.add(url)
        self.q.put_nowait((url, max_redirect))

    def record_statistic(self, fetch_statistic):
        """Record the FetchStatistic for completed / failed URL."""
        self.done.append(fetch_statistic)

    # 以下為主要運行的異步函式
    # Step 5
    async def parse_links(self, response):
        links = set()
        content_type = None
        encoding = None
        body = await response.read()
        if response.status == 200:
            content_type = response.headers.get('content-type')
            pdict = {}

            if content_type:
                content_type, pdict = cgi.parse_header(content_type)
            encoding = pdict.get('charset', 'utf-8')
            if content_type in ('text/html', 'application/xml'):
                text = await response.text()

                urls = set(re.findall(r'''(?i)href=["']([^\s"'<>]+)''', text))
                for url in urls:
                    url_join = urllib.parse.urljoin(str(response.url), url)
                    defragmented, frag = urllib.parse.urldefrag(url_join)
                    if self.url_allowed(defragmented):
                        links.add(defragmented)
                        print(defragmented)

        stat = FetchStatistic(url=response.url,
                              next_url=None,
                              status=response.status,
                              exception=None,
                              size=len(body),
                              content_type=content_type,
                              encoding=encoding,
                              num_urls=len(links),
                              num_new_urls=len(links - self.seen_urls))

        return stat, links

    # Step 4
    async def fetch(self, url, max_redirect):
        tries = 0
        exception = None
        while tries < self.max_tries:
            # 取得 url 的 response,失敗則在 max_tries 內持續嘗試
            try:
                response = await self.session.get(url, allow_redirects=False)
                break
            except Exception as e:
                exception = e
            tries += 1
        else:
            self.record_statistic(
                FetchStatistic(url=url,
                               next_url=None,
                               status=None,
                               exception=exception,
                               size=0,
                               content_type=None,
                               encoding=None,
                               num_urls=0,
                               num_new_urls=0))
            return
        try:
            # 判斷是否跳轉頁面
            if is_redirect(response):
                location = response.headers['location']
                next_url = urllib.parse.urljoin(url, location)
                self.record_statistic(
                    FetchStatistic(url=url,
                                   next_url=next_url,
                                   status=response.status,
                                   exception=None,
                                   size=0,
                                   content_type=None,
                                   encoding=None,
                                   num_urls=0,
                                   num_new_urls=0))

                if next_url in self.seen_urls:
                    return
                if max_redirect > 0:
                    self.add_url(next_url, max_redirect - 1)
                else:
                    print('redirect limit reached for %r from %r', next_url,
                          url)
            else:
                stat, links = await self.parse_links(response)

                self.record_statistic(stat)
                for link in links.difference(self.seen_urls):
                    self.q.put_nowait((link, self.max_redirect))
                self.seen_urls.update(links)
        finally:
            await response.release()

    # Step 3
    async def work(self):
        try:
            while True:
                url, max_redirect = await self.q.get()
                await self.fetch(url, max_redirect)
                self.q.task_done()

        except asyncio.CancelledError:
            pass
    
    # Step 2
    async def crawl(self):
        self.q = asyncio.Queue()  # 存放所有等待抓取的 url
        self.t0 = time.time()
        self.session = aiohttp.ClientSession()

        for root in self.roots:
            parts = urllib.parse.urlparse(root)
            host = parts.hostname

            # 判斷解析 url 後有無 host
            if not host:
                continue
            # 判斷 host 是否為數字
            if re.match(r'\A[\d\.]*\Z', host):
                self.root_domains.add(host)
            else:
                host = host.lower()
                if self.strict:
                    self.root_domains.add(host)
                else:
                    self.root_domains.add(lenient_host(host))

        for root in self.roots:
            self.add_url(root)

        workers = [
            asyncio.create_task(self.work()) for _ in range(self.max_tasks)
        ]

        await self.q.join()  # 等待列隊 url 清空,將結束任務

        for w in workers:
            w.cancel()

        await self.session.close()

        self.t1 = time.time()

# Step 1
time_start = time.time()
crawler = Crawler(['https://'], max_tasks=30, exclude='.css')
asyncio.run(crawler.crawl())

print(len(crawler.done))
print(time.time() - time_start)

三. 讨论命名元组

过去写爬虫用字典dict来处理爬下来的数据,都可以看到Python之父是使用namedtuple,所以我们来看看tuple vs namedtuple vs dict之间有什么用途?

1. tuple、namedtuple 与 dict 之间的区别?

▍命名元组与元组

使用 tuple 的时候,在使用 tuple 的其中一个值的时候,索引的索引,而索引通常可以访问的值具有可维护性和维护性
的特性。其中还解决了索引的问题,可以使用名字来访问的值,待下面会解释会再讲解。

▍namedtuple vs dict

namedtuple 是一个不可变的对象,因此他需要的空间比字典字典来的少,但对于键值的搜索速度比命名元组快,理想上 python 的字典字典在搜索键值的时间复杂度是 O(1 ),而tuple基本上还是tuple的结构,所以它的O(n)时间复杂度不同,但根据空间大小不同,但速度效率的会偏好使用tuple命名空间是非常差的。

如果是像这次爬虫类中的,单纯只需要写入储存资料的话,使用namedtuple确实会比dict和tuple都还适合。

2. namedtuple 使用方法

from collections import namedtuple

▍宣布命名元组

Product_detail = namedtuple('Product', ['name', 'price', 'sales', 'ship_fees'])

p0 = Product_detail('Max0', 6666.6, 10, True)
p1 = Product_detail('Max1', 6666.5, 12, False)
p2 = Product_detail('Max2', 6666.4, 11, True)
print(p0, p1, p2)

# 輸出內容:
>>> Product(name='Max0', price=6666.6, sales=10, ship_fees=True) 
>>> Product(name='Max1', price=6666.5, sales=12, ship_fees=False) 
>>> Product(name='Max2', price=6666.4, sales=11, ship_fees=True)

▍将列表转换成namedtuple

product_list = ['Max3', 6666.3, 10, True]
p3 = Product_detail._make(product_list)
print(p3)

# 輸出內容:
>>> Product(name='Max3', price=6666.3, sales=10, ship_fees=True)

▍将dict转换成namedtuple

produt_dict = {'name': 'Max4', 'price': 6666.2, 'sales': 9, 'ship_fees': True}
p4 = Product_detail(**produt_dict)
print(p4)

# 輸出內容:
>>> Product(name='Max4', price=6666.2, sales=9, ship_fees=True)

▍将namedtuple转换成dict

produt_nametuple_to_dict = p4._asdict()
print(produt_nametuple_to_dict)

# 輸出內容:
>>> OrderedDict([('name', 'Max4'), ('price', 6666.2), ('sales', 9), ('ship_fees', True)])

四. 讨论 urllib.parse

可以看到Python之父使用很多urllib的套件来解析爬取到的url,所以我们来了解urllib解析url的方式:

1. urllib.parse 使用方法

import urllib.parse

▍ urllib.parse.urlparse()

解析url的主机名、端口、方案、查询等各式参数,本次urlparse()主要使用主机名判断是否为内部地址。

url = 'http://www.:80/author?user=Max&pass=123#123'
parsed = urllib.parse.urlparse(url)
print(parsed)
print(parsed.hostname)
print(parsed.port)

# 輸出內容:
>>> ParseResult(scheme='http', netloc='www.:80', path='/author', params='', query='user=Max&pass=123', fragment='123')
>>> www.
>>> 80

▍urllib.parse.urljoin()

有时爬取到的网址后不会有 netloc 的部分,使用 urljoin 来合并 url

crawler_url = '/newsletter/'
response_url = 'https://'
url_join = urllib.parse.urljoin(response_url, crawler_url)
print(url_join)

# 輸出內容:
>>> https:///newsletter/

▍urllib.parse.urldefrag()

不同的截图还是同页,所以在提取时进行截图处理

url = 'http://www.:80/author?user=Max&pass=123#remove'
print(urllib.parse.urldefrag(url))

# 輸出內容:
>>> DefragResult(url='http://www.:80/author?user=Max&pass=123', fragment='remove')

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多