您的当前位置:首页>全部文章>文章详情

【Python】第六章 异步爬虫

CrazyPanda发表于:2023-12-03 21:54:16浏览:678次TAG:

目录


1. 协程的基本原理

1.1 案例引入

  • https://www.httpbin.org/delay/5

  • 服务器强制等待5秒才能返回响应

import logging
import time
import requests

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s %(levelname)s: %(message)s')

URL = "https://www.httpbin.org/delay/5"
TOTAL_NUMBER = 10


start_time = time.time()

for _ in range(1, TOTAL_NUMBER):
    logging.info(f"scraping {URL}")
    response = requests.get(URL)

end_time = time.time()
logging.info(f"total time {end_time - start_time}")

1.2 基础知识

阻塞

  • 阻塞状态:程序未得到所需计算资源时被挂起的状态

  • 程序在操作上是阻塞的:程序在等待某个操作完成期间,自身无法继续干别的事情

非阻塞

  • 程序在操作上非是阻塞的:程序在等待某操作的过程中,自身不被阻塞,可以继续干别的事情

  • 仅当程序封装的级别可以囊括独立的子程序单元时,程序才可能存在非阻塞状态

  • 因阻塞的存在而存在

同步

  • 程序单元同步执行:不同程序单元为了共同完成某个任务,在执行过程中需要靠某种通信方式保持协调一致

异步

  • 为了完成某种任务,有时不同程序单元之间无需通信协调也能完成任务

多进程

  • 利用CPU的多核优势,在同i一时间并行执行多个任务,可以大大提高执行效率

协程

  • 一种运行在用户态的轻量级线程

  • 拥有自己的寄存器上下文和栈

  • 调度切换

    • 将寄存器上下文和栈保存到其他地方,等切回来时,再恢复先前保存的寄存器上下文和栈

  • 能保留上一次调用时的状态,所有局部状态的一个特定组合,每次过程重入,就相当于进入上一次调用的状态

  • 本质上是个单线程

  • 实现异步操作

1.3 协程的用法

  • asyncio库

    • 挂起阻塞方法的执行

    • 定义一个方法(协程)

    • 这个方法在调用时不会立即被执行,而是会返回一个协程对象

    • 将来执行或者没有执行的任务的结果

    • 和task没有本质区别

    • 任务

    • 对协程对象的进一步封装

    • 包含协程对象的各个状态

    • 协程

    • 代指协程对象类型

    • 可以将协程对象注册到时间循环中,它会被事件循环调用

    • 事件循环

    • 把函数注册到这个事件循环上,当满足发生条件时,就调用对应的处理方法

    • event_loop:

    • coroutine:

    • task:

    • future:

    • async关键字:

    • await关键字:

1.4 定义协程

import asyncio


async def execute(x):
    print(f"Number: {x}")

coroutine = execute(1)
print(f"Coroutine: {coroutine}")
print("After calling execute")

loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)
print("After calling loop")
  • 把协程对象coroutine传递给run_until_complete方法的时候,实际上进行了一个操作

    • 将coroutine封装成task对象

  • 显示声明task

import asyncio


async def execute(x):
    print(f"Number: {x}")

coroutine = execute(1)
print(f"Coroutine: {coroutine}")
print("After calling execute")

loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
print(f"Task: {task}")

loop.run_until_complete(task)
print(f"Task: {task}")
print("After calling loop")
  • 使用ensure_future定义task对象

import asyncio


async def execute(x):
    print(f"Number: {x}")

coroutine = execute(1)
print(f"Coroutine: {coroutine}")
print("After calling execute")

task = asyncio.ensure_future(coroutine)
print(f"Task: {task}")

loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print(f"Task: {task}")
print("After calling loop")

1.5 绑定回调

  • 为task对象绑定一个回调方法

import asyncio
import requests


async def request():
    url = "https://www.baidu.com/"
    status = requests.get(url)
    return status


def callback(task):
    print(f"Status: {task.result()}")


coroutine = request()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback)
print(f"Task: {task}")

loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print(f"Task: {task}")
  • 等效于

import asyncio
import requests


async def request():
    url = "https://www.baidu.com/"
    status = requests.get(url)
    return status


coroutine = request()
task = asyncio.ensure_future(coroutine)
print(f"Task: {task}")

loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print(f"Task: {task}")
print(f"Status: {task.result()}")

1.6 多任务协程

  • 定义一个task列表,然后使用wait方法执行

import asyncio
import requests


async def request():
    url = "https://www.baidu.com/"
    status = requests.get(url)
    return status


tasks = [asyncio.ensure_future(request()) for _ in range(5)]
print(f"Tasks: {tasks}")

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

for task in tasks:
    print(f"Task result: {task.result()}")

1.7 协程实现

  • 单纯使用上述方法

import asyncio
import time
import requests


async def request():
    url = "https://www.httpbin.org/delay/5"
    print(f"Waiting for {url}")
    response = requests.get(url)
    print(f"Response: {response} from {url}")

start = time.time()

tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print(f"Cost time: {end - start}")
  • 没有任何变化

  • 要实现异步处理,需要先执行挂起操作

    • 将耗时等待的操作挂起,让出控制权

import asyncio
import time
import requests


async def request():
    url = "https://www.httpbin.org/delay/5"
    print(f"Waiting for {url}")
    response = await requests.get(url)
    print(f"Response: {response} from {url}")

start = time.time()

tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print(f"Cost time: {end - start}")
  • 报错

    • 一个原生协程对象

    • 一个由types.coroutine修饰的生成器

    • 由一个包含__ await __方法的对象返回的一个迭代器

    • 这个生成器可以返回协程对象

    • requests返回的Response对象不能和await一起使用

    • await后面的对象必须是如下格式之一:

  • 将请求页面的方法独立出来

import asyncio
import time
import requests


async def get(url):
    return requests.get(url)


async def request():
    url = "https://www.httpbin.org/delay/5"
    print(f"Waiting for {url}")
    response = await get(url)
    print(f"Response: {response} from {url}")

start = time.time()

tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print(f"Cost time: {end - start}")
  • 依然没有任何变化

1.8 使用aiohttp

安装

pip3 install aiohttp1

使用

import asyncio
import time
import aiohttp


async def get(url):
    session = aiohttp.ClientSession()
    response = await session.get(url)
    await response.text()
    await session.close()
    return response


async def request():
    url = "https://www.httpbin.org/delay/5"
    print(f"Waiting for {url}")
    response = await get(url)
    print(f"Response: {response} from {url}")

start = time.time()

tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print(f"Cost time: {end - start}")

2. aiohttp的使用

2.1 基本介绍

  • asyncio:实现对TCP、UDP、SSL协议的异步操作

  • aiohttp:实现对HTTP请求的异步操作

  • aiohttp是基于asyncio的异步HTTP网络模块

    • 服务端

    • 客户端

    • 用于处理请求并返回响应

    • 可以搭建一个支持异步处理的服务器

    • 类似于Django、Flask、Tornado等一些Web服务器

    • request发起的是同步网络请求

    • aiohttp发起的是异步网络请求

    • 发起请求

    • 类似于使用request发起一个HTTP请求然后获得响应

    • 既提供了服务端,又提供了客户端

2.2 基本实例

import aiohttp
import asyncio


async def fetch(session, url):
    # 上下文管理器,自动分配和释放资源
    async with session.get(url) as response:
        return await response.json(), response.status


async def main():
    # 上下文管理器,自动分配和释放资源
    async with aiohttp.ClientSession() as session:
        url = "https://www.httpbin.org/delay/5"
        html, status = await fetch(session, url)
        print(f"html: {html}")
        print(f"status: {status}")

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    # 较高版本的python可以不显示声明事件循环
    asyncio.run(main())

2.3 URL参数设置

import aiohttp
import asyncio


async def main():
    params = {"name": "abc", "age": 10}
    async with aiohttp.ClientSession() as session:
        async with session.get("https://www.httpbin.org/get", params=params) as response:
            print(await response.text())

if __name__ == "__main__":
    asyncio.run(main())

2.4 其他请求类型

session.post("https://www.httpbin.org/post", data=b"data")
session.put("https://www.httpbin.org/put", data=b"data")
session.delete("https://www.httpbin.org/delete")
session.head("https://www.httpbin.org/get")
session.options("https://www.httpbin.org/get")
session.patch("https://www.httpbin.org/patch", data=b"data")

2.5 POST请求

表单提交

import aiohttp
import asyncio


async def main():
    data = {"name": "abc", "age": 10}
    async with aiohttp.ClientSession() as session:
        async with session.post("https://www.httpbin.org/post", data=data) as response:
            print(await response.text())

if __name__ == "__main__":
    asyncio.run(main())

JSON数据提交

import aiohttp
import asyncio


async def main():
    data = {"name": "abc", "age": 10}
    async with aiohttp.ClientSession() as session:
        async with session.post("https://www.httpbin.org/post", json=data) as response:
            print(await response.text())

if __name__ == "__main__":
    asyncio.run(main())

2.6 响应

import aiohttp
import asyncio


async def main():
    data = {"name": "abc", "age": 10}
    async with aiohttp.ClientSession() as session:
        async with session.post("https://www.httpbin.org/post", data=data) as response:
            print(f"status: {response.status}")
            print(f"headers: {response.headers}")
            print(f"body: {await response.text()}")
            print(f"bytes: {await response.read()}")
            print(f"json: {await response.json()}")


if __name__ == "__main__":
    asyncio.run(main())

2.7 超时设置

  • 使用ClientTimeout对象设置超时

import aiohttp
import asyncio


async def main():
    # 设置2秒的超时时间
    timeout = aiohttp.ClientTimeout(total=2)
    async with aiohttp.ClientSession(timeout=timeout) as session:
        async with session.get("https://www.httpbin.org/get") as response:
            print(f"status: {response.status}")


if __name__ == "__main__":
    asyncio.run(main())

2.8 并发限制

  • 使用Semeaphore来控制并发量

    • 防止网站在短时间内无法响应

    • 避免将目标网站爬挂掉的风险

import aiohttp
import asyncio

# 爬取的最大并发量
CONCURRENCY = 5
URL = "https://www.baidu.com/"

# 创建一个信号量对象
semaphore = asyncio.Semaphore(CONCURRENCY)
session = None


async def scrape_api():
    # 信号量可以控制进入爬取的最大协程数量
    async with semaphore:
        print(f"Scraping {URL}")
        async with session.get(URL) as response:
            await asyncio.sleep(1)
            return await response.text()


async def main():
    global session
    session = aiohttp.ClientSession()
    scrape_index_tasks = [
        asyncio.ensure_future(
            scrape_api()) for _ in range(1000)]
    await asyncio.gather(*scrape_index_tasks)


if __name__ == "__main__":
    asyncio.run(main())

3. aiohttp异步爬取实战

3.1 案例介绍

  • 目标网站:Scrape | Book

    • 网站数据由JavaScript渲染获得

    • 数据可以通过Ajax接口获取

    • 接口没有设置任何反爬措施和加密参数

  • 目标:

    • 使用aiohttp爬取全站的数据

    • 将数据通过异步的方式保存到 MongoDB 中

3.2 准备工作

  • MongoDB数据库

3.3 页面分析

  • 列表页的Ajax请求接口的格式:https://spa5.scrape.center/api/book/?limit=18&offset={offset}

    • limit:每一页包含多少书

    • offest:每一页的偏移量(o f f s e t = l i m i t ∗ ( p a g e − 1 ) offset=limit*(page-1)offset=limit∗(page−1))

    • Ajax接口返回的数据中有每本书的id,利用id可以进入该书的详情页

    • 详情页的Ajax请求接口的格式:https://spa5.scrape.center/detail/{id}

    • id:书本身的ID

3.4 实现思路

  • 爬取的两个阶段:

    • 将所有图书的id信息组合为所有详情页的爬取任务集合,并将其声明为task组成的列表,进行异步爬取,同时爬取结果也以异步方式存储到MongoDB中

    • 将所有列表页的爬取任务集合在一起,并将其声明为由task组成的列表,进行异步爬取

    • 异步爬取所有列表页

    • 拿到上一步列表页的所有内容并解析

3.5 基本配置

import logging


logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s %(levelname)s: %(message)s')


INDEX_URL = "https://spa5.scrape.center/api/book/?limit=18&offset={offset}"
DETAIL_URL = "https://spa5.scrape.center/detail/{id}"
PAGE_SIZE = 18
PAGE_NUMBER = 100
CONCURRENCY = 5

3.6 爬取列表页

实现

通用的爬取方法
import asyncio
import aiohttp


async def scrape_api(url):
    async with semaphore:
        try:
            logging.info(f"Scraping: {url}")
            # verify_ssl: 是否开启SSL认证
            async with session.get(url, verify_ssl=False) as response:
                return await response.json()
        except aiohttp.ClientError:
            logging.info(f"Error: {url}", exc_info=True)
爬取列表页
async def scrape_index(page):
    url = INDEX_URL.format(offset=PAGE_SIZE * (page - 1))
    return await scrape_api(url)
串联并用
import json


async def main():
    global session
    session = aiohttp.ClientSession()
    scrape_index_tasks = [
        asyncio.ensure_future(
            scrape_index(page)) for page in range(
            1, PAGE_NUMBER + 1)]
    results = await asyncio.gather(*scrape_index_tasks)
    logging.info(
        f"Results: {json.dumps(results, ensure_ascii=False, indent=2)}")

if __name__ == "__main__":
    asyncio.run(main())

合并

import json
import asyncio
import logging
import aiohttp

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s %(levelname)s: %(message)s')


INDEX_URL = "https://spa5.scrape.center/api/book/?limit=18&offset={offset}"
DETAIL_URL = "https://spa5.scrape.center/detail/{id}"
PAGE_SIZE = 18
PAGE_NUMBER = 100
CONCURRENCY = 5

semaphore = asyncio.Semaphore(CONCURRENCY)
session = None


async def scrape_api(url):
    async with semaphore:
        try:
            logging.info(f"Scraping: {url}")
            async with session.get(url, verify_ssl=False) as response:
                return await response.json()
        except aiohttp.ClientError:
            logging.info(f"Error: {url}", exc_info=True)


async def scrape_index(page):
    url = INDEX_URL.format(offset=PAGE_SIZE * (page - 1))
    return await scrape_api(url)


async def main():
    global session
    session = aiohttp.ClientSession()
    scrape_index_tasks = [
        asyncio.ensure_future(
            scrape_index(page)) for page in range(
            1, PAGE_NUMBER + 1)]
    results = await asyncio.gather(*scrape_index_tasks)
    logging.info(
        f"Results: {json.dumps(results, ensure_ascii=False, indent=2)}")


if __name__ == "__main__":
    asyncio.run(main())

3.7 爬取详情页

实现

在main方法中将详情页的ID获取出来
ids = []
for index_data in results:
    if not index_data:
        continue
    for item in index_data.get("results"):
        ids.append(item.get("id"))
爬取详情页
async def scrape_detail(id):
    url = DETAIL_URL.format(id=id)
    data = await scrape_api(url)
    logging.info(f"Saving: {data}")
main方法增加对scrape_detail方法的调用
scrape_detail_tasks = [
        asyncio.ensure_future(
            scrape_detail(id)) for id in ids]
    await asyncio.wait(scrape_detail_tasks)
    await session.close()

合并

import json
import asyncio
import logging
import aiohttp

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s %(levelname)s: %(message)s')


INDEX_URL = "https://spa5.scrape.center/api/book/?limit=18&offset={offset}"
DETAIL_URL = "https://spa5.scrape.center/api/book/{id}"
PAGE_SIZE = 18
PAGE_NUMBER = 100
CONCURRENCY = 5

semaphore = asyncio.Semaphore(CONCURRENCY)
session = None


async def scrape_api(url):
    async with semaphore:
        try:
            logging.info(f"Scraping: {url}")
            async with session.get(url, verify_ssl=False) as response:
                return await response.json()
        except aiohttp.ClientError:
            logging.info(f"Error: {url}", exc_info=True)


async def scrape_index(page):
    url = INDEX_URL.format(offset=PAGE_SIZE * (page - 1))
    return await scrape_api(url)


async def scrape_detail(id):
    url = DETAIL_URL.format(id=id)
    data = await scrape_api(url)
    logging.info(f"Saving {url}: {data}")


async def main():
    global session
    session = aiohttp.ClientSession()
    scrape_index_tasks = [
        asyncio.ensure_future(
            scrape_index(page)) for page in range(
            1, PAGE_NUMBER + 1)]
    results = await asyncio.gather(*scrape_index_tasks)
    logging.info(
        f"Results: {json.dumps(results, ensure_ascii=False, indent=2)}")

    ids = []
    for index_data in results:
        if not index_data:
            continue
        for item in index_data.get("results"):
            ids.append(item.get("id"))

    scrape_detail_tasks = [
        asyncio.ensure_future(
            scrape_detail(id)) for id in ids]
    await asyncio.wait(scrape_detail_tasks)
    await session.close()


if __name__ == "__main__":
    asyncio.run(main())

猜你喜欢

【Python】利用示例说明Python的len函数的多种应用方法
通过例子详解Python中len函数的灵活运用Python是一种简单易学的编程语言,凭借其丰富的库和强大的功能,越来越受到开发者的青睐。其中一项重要的函数是len()函数,它可以用于返回给定数据对象的元素数量。在本文中,我们将详细讨论len()函数的使用,并通过一些示例来演示其灵活运用。首先,我们来看一些基本的使用方式。len()函数可以用于多种数据类型,包括字符串、列表、字典、元组等。下面是一个简单的例子,用于计算一个字符串的长度:string = "Hello,
发表于:2024-01-15 浏览:302 TAG:
【Python】如何使用Python中的pickle和JSON进行对象序列化和反序列化
如何使用Python中的pickle和JSON进行对象序列化和反序列化Python是一种简单而强大的编程语言,其内置了许多有用的库和模块,使开发人员能够快速进行各种任务。其中,pickle和JSON是两个常用的模块,用于对象序列化和反序列化。本文将介绍如何使用这两个模块进行对象的序列化和反序列化,并提供详细的代码示例。使用pickle进行对象序列化和反序列化pickle是Python中的一个模块,通过它可以将对象转化为二进制数据以便于存储或传输,同时也可以将二进制数据还原为原始对象。首先,我们需
发表于:2024-01-22 浏览:332 TAG:
【Python】如何在Python中进行图形界面设计和开发
如何在Python中进行图形界面设计和开发引言:Python是一种功能强大且易于学习的编程语言,广泛应用于各种领域,包括图形界面设计和开发。Python提供了不少图形库和工具,使得开发者能够轻松地创建具有吸引力的用户界面。本文将介绍如何在Python中进行图形界面设计和开发,并提供一些实际的代码示例。一、图形库的选择Python提供了多个图形库,每个库都有自己的特点和用途。以下是其中几个常用的图形库:Tkinter:Tkinter是Python的标准图形库,它是Python内置的Tk界面工具集。
发表于:2024-01-23 浏览:341 TAG:
【Python】Python中的字节编码和解码技巧的最佳实践是什么?
Python中的字节编码和解码技巧的最佳实践在Python中,字节编码和解码是处理文本和数据的关键操作。正确的字节编码和解码技巧可以保证程序的正确性和运行效率。本文将介绍一些Python中的字节编码和解码的最佳实践,并提供具体的代码示例。使用正确的编码:在Python中,字符串可以是unicode形式的,也可以是字节形式的。在进行字符串的编码和解码操作时,需要注意使用正确的编码方式。常用的编码方式有UTF-8、GBK、ASCII等。如果没有指定编码方式,默认情况下Python会使用UTF-8编码
发表于:2024-01-22 浏览:304 TAG:
【Python】在Python中如何安装pandas库的方法
Python中如何安装pandas库?Pandas是一个强大且灵活的数据分析工具,它提供了丰富的数据结构和数据分析功能,使得数据处理更加快速和方便。本文将介绍如何在Python中安装pandas库,并提供具体的代码示例。在开始安装之前,确保你已经安装了Python环境。你可以在Python官网(https://www.python.org)下载最新版本的Python安装程序,并按照提示进行安装。在Python中安装pandas库有多种方法,例如使用pip或conda等软件包管理工具。下面我们将分
发表于:2024-01-09 浏览:286 TAG:
【Python】在Mac上逐步安装和配置pip
一步步教你在Mac上安装pip,需要具体代码示例尽管Mac系统自带了Python解释器,但没有自带pip包管理工具,这让我们在安装Python包时遇到了一些困难。因此,我们需要手动安装pip,以便在Mac上更方便地管理和安装Python包。下面是一步步教你在Mac上安装pip的具体方法,附带代码示例:第一步:打开终端在Mac上,我们可以通过“Finder” -> “应用程序” -> “实用工具” -> “终端”打开终端。第二步:安装homebrewHomebrew是Mac上最受
发表于:2024-01-17 浏览:378 TAG:
【Python】如何使用Python实现二分查找算法
如何使用Python实现二分查找算法?二分查找算法,也称为折半查找算法,是一种高效的查找算法。它适用于有序的数组或列表,通过将目标值与数组中间位置的元素进行比较,从而缩小查找范围。下面将介绍如何在Python中实现二分查找算法,并提供具体的代码示例。算法思路:将目标值与数组中间位置的元素进行比较;如果相等,则返回元素位置;如果目标值大于中间位置的元素,则在右半部分继续查找;如果目标值小于中间位置的元素,则在左半部分继续查找;不断将查找范围缩小一半,直到找到目标值或者查找范围为空。代码实现:下面是
发表于:2024-01-16 浏览:294 TAG:
【Python】新手Python环境配置以及pip安装教程
介于我在安装pip的时候,查资料仍然解决不了自己问题的情况下,统一整理了一下pip安装流程(只针对windows用户):目录1.介绍2.检查python和pip的环境3.下载pip3.1方法一3.2方法二4.pip扩展1.介绍pip 是 Python 包管理工具,提供了对 Python 包的查找、下载、安装、卸载的功能,目前Python 3.4 和 2.7 及以上版本都有配套安装,一般pip的位置在...\py
发表于:2023-11-29 浏览:720 TAG:
【Python】如何使用Python中的字符串操作函数处理大规模文本数据
如何使用Python中的字符串操作函数处理大规模文本数据,需要具体代码示例随着互联网的快速发展和数据的不断增加,大规模文本数据处理成了现代科技中的一个重要课题。Python作为一门简单易学且功能强大的编程语言,提供了丰富的字符串操作函数,能够很好地处理大规模文本数据。本文将介绍一些常用的字符串操作函数,并给出具体的代码示例,以帮助读者更好地掌握如何处理大规模文本数据。切割字符串在处理大规模文本数据时,常常需要将长字符串切割成小段文字进行操作。Python提供了split()函数,可以通过指定分隔
发表于:2024-01-23 浏览:321 TAG:
【Python】高效技巧:使用Pandas删除DataFrame的特定列数据
实用技巧:利用Pandas删除DataFrame中的某一列数据,需要具体代码示例在数据处理和分析中,Pandas 是一款非常强大的工具。它提供了各种功能,以便处理和操作数据。在实际的数据处理中,经常需要删除DataFrame中的某一列数据,以满足分析的需要。本文将介绍如何使用Pandas删除DataFrame中的某一列数据,并给出具体的代码示例。在开始之前,让我们先来创建一个示例DataFrame,以便进行后续的操作。import pandas as pd #&n
发表于:2024-01-10 浏览:305 TAG: