【Python】第六章 异步爬虫
目录
1. 协程的基本原理
1.1 案例引入
https://www.httpbin.org/delay/5
服务器强制等待5秒才能返回响应
import logging import time import requests logging.basicConfig( level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s') URL = "https://www.httpbin.org/delay/5" TOTAL_NUMBER = 10 start_time = time.time() for _ in range(1, TOTAL_NUMBER): logging.info(f"scraping {URL}") response = requests.get(URL) end_time = time.time() logging.info(f"total time {end_time - start_time}")
1.2 基础知识
阻塞
阻塞状态:程序未得到所需计算资源时被挂起的状态
程序在操作上是阻塞的:程序在等待某个操作完成期间,自身无法继续干别的事情
非阻塞
程序在操作上非是阻塞的:程序在等待某操作的过程中,自身不被阻塞,可以继续干别的事情
仅当程序封装的级别可以囊括独立的子程序单元时,程序才可能存在非阻塞状态
因阻塞的存在而存在
同步
程序单元同步执行:不同程序单元为了共同完成某个任务,在执行过程中需要靠某种通信方式保持协调一致
异步
为了完成某种任务,有时不同程序单元之间无需通信协调也能完成任务
多进程
利用CPU的多核优势,在同i一时间并行执行多个任务,可以大大提高执行效率
协程
一种运行在用户态的轻量级线程
拥有自己的寄存器上下文和栈
调度切换
将寄存器上下文和栈保存到其他地方,等切回来时,再恢复先前保存的寄存器上下文和栈
能保留上一次调用时的状态,所有局部状态的一个特定组合,每次过程重入,就相当于进入上一次调用的状态
本质上是个单线程
实现异步操作
1.3 协程的用法
asyncio库
挂起阻塞方法的执行
定义一个方法(协程)
这个方法在调用时不会立即被执行,而是会返回一个协程对象
将来执行或者没有执行的任务的结果
和task没有本质区别
任务
对协程对象的进一步封装
包含协程对象的各个状态
协程
代指协程对象类型
可以将协程对象注册到时间循环中,它会被事件循环调用
事件循环
把函数注册到这个事件循环上,当满足发生条件时,就调用对应的处理方法
event_loop:
coroutine:
task:
future:
async关键字:
await关键字:
1.4 定义协程
import asyncio async def execute(x): print(f"Number: {x}") coroutine = execute(1) print(f"Coroutine: {coroutine}") print("After calling execute") loop = asyncio.get_event_loop() loop.run_until_complete(coroutine) print("After calling loop")
把协程对象coroutine传递给run_until_complete方法的时候,实际上进行了一个操作
将coroutine封装成task对象
显示声明task
import asyncio async def execute(x): print(f"Number: {x}") coroutine = execute(1) print(f"Coroutine: {coroutine}") print("After calling execute") loop = asyncio.get_event_loop() task = loop.create_task(coroutine) print(f"Task: {task}") loop.run_until_complete(task) print(f"Task: {task}") print("After calling loop")
使用ensure_future定义task对象
import asyncio async def execute(x): print(f"Number: {x}") coroutine = execute(1) print(f"Coroutine: {coroutine}") print("After calling execute") task = asyncio.ensure_future(coroutine) print(f"Task: {task}") loop = asyncio.get_event_loop() loop.run_until_complete(task) print(f"Task: {task}") print("After calling loop")
1.5 绑定回调
为task对象绑定一个回调方法
import asyncio import requests async def request(): url = "https://www.baidu.com/" status = requests.get(url) return status def callback(task): print(f"Status: {task.result()}") coroutine = request() task = asyncio.ensure_future(coroutine) task.add_done_callback(callback) print(f"Task: {task}") loop = asyncio.get_event_loop() loop.run_until_complete(task) print(f"Task: {task}")
等效于
import asyncio import requests async def request(): url = "https://www.baidu.com/" status = requests.get(url) return status coroutine = request() task = asyncio.ensure_future(coroutine) print(f"Task: {task}") loop = asyncio.get_event_loop() loop.run_until_complete(task) print(f"Task: {task}") print(f"Status: {task.result()}")
1.6 多任务协程
定义一个task列表,然后使用wait方法执行
import asyncio import requests async def request(): url = "https://www.baidu.com/" status = requests.get(url) return status tasks = [asyncio.ensure_future(request()) for _ in range(5)] print(f"Tasks: {tasks}") loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) for task in tasks: print(f"Task result: {task.result()}")
1.7 协程实现
单纯使用上述方法
import asyncio import time import requests async def request(): url = "https://www.httpbin.org/delay/5" print(f"Waiting for {url}") response = requests.get(url) print(f"Response: {response} from {url}") start = time.time() tasks = [asyncio.ensure_future(request()) for _ in range(5)] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) end = time.time() print(f"Cost time: {end - start}")
没有任何变化
要实现异步处理,需要先执行挂起操作
将耗时等待的操作挂起,让出控制权
import asyncio import time import requests async def request(): url = "https://www.httpbin.org/delay/5" print(f"Waiting for {url}") response = await requests.get(url) print(f"Response: {response} from {url}") start = time.time() tasks = [asyncio.ensure_future(request()) for _ in range(5)] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) end = time.time() print(f"Cost time: {end - start}")
报错
一个原生协程对象
一个由types.coroutine修饰的生成器
由一个包含__ await __方法的对象返回的一个迭代器
这个生成器可以返回协程对象
requests返回的Response对象不能和await一起使用
await后面的对象必须是如下格式之一:
将请求页面的方法独立出来
import asyncio import time import requests async def get(url): return requests.get(url) async def request(): url = "https://www.httpbin.org/delay/5" print(f"Waiting for {url}") response = await get(url) print(f"Response: {response} from {url}") start = time.time() tasks = [asyncio.ensure_future(request()) for _ in range(5)] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) end = time.time() print(f"Cost time: {end - start}")
依然没有任何变化
1.8 使用aiohttp
安装
pip3 install aiohttp1
使用
import asyncio import time import aiohttp async def get(url): session = aiohttp.ClientSession() response = await session.get(url) await response.text() await session.close() return response async def request(): url = "https://www.httpbin.org/delay/5" print(f"Waiting for {url}") response = await get(url) print(f"Response: {response} from {url}") start = time.time() tasks = [asyncio.ensure_future(request()) for _ in range(5)] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) end = time.time() print(f"Cost time: {end - start}")
2. aiohttp的使用
2.1 基本介绍
asyncio:实现对TCP、UDP、SSL协议的异步操作
aiohttp:实现对HTTP请求的异步操作
aiohttp是基于asyncio的异步HTTP网络模块
服务端
客户端
用于处理请求并返回响应
可以搭建一个支持异步处理的服务器
类似于Django、Flask、Tornado等一些Web服务器
request发起的是同步网络请求
aiohttp发起的是异步网络请求
发起请求
类似于使用request发起一个HTTP请求然后获得响应
既提供了服务端,又提供了客户端
2.2 基本实例
import aiohttp import asyncio async def fetch(session, url): # 上下文管理器,自动分配和释放资源 async with session.get(url) as response: return await response.json(), response.status async def main(): # 上下文管理器,自动分配和释放资源 async with aiohttp.ClientSession() as session: url = "https://www.httpbin.org/delay/5" html, status = await fetch(session, url) print(f"html: {html}") print(f"status: {status}") if __name__ == "__main__": loop = asyncio.get_event_loop() loop.run_until_complete(main()) # 较高版本的python可以不显示声明事件循环 asyncio.run(main())
2.3 URL参数设置
import aiohttp import asyncio async def main(): params = {"name": "abc", "age": 10} async with aiohttp.ClientSession() as session: async with session.get("https://www.httpbin.org/get", params=params) as response: print(await response.text()) if __name__ == "__main__": asyncio.run(main())
2.4 其他请求类型
session.post("https://www.httpbin.org/post", data=b"data") session.put("https://www.httpbin.org/put", data=b"data") session.delete("https://www.httpbin.org/delete") session.head("https://www.httpbin.org/get") session.options("https://www.httpbin.org/get") session.patch("https://www.httpbin.org/patch", data=b"data")
2.5 POST请求
表单提交
import aiohttp import asyncio async def main(): data = {"name": "abc", "age": 10} async with aiohttp.ClientSession() as session: async with session.post("https://www.httpbin.org/post", data=data) as response: print(await response.text()) if __name__ == "__main__": asyncio.run(main())
JSON数据提交
import aiohttp import asyncio async def main(): data = {"name": "abc", "age": 10} async with aiohttp.ClientSession() as session: async with session.post("https://www.httpbin.org/post", json=data) as response: print(await response.text()) if __name__ == "__main__": asyncio.run(main())
2.6 响应
import aiohttp import asyncio async def main(): data = {"name": "abc", "age": 10} async with aiohttp.ClientSession() as session: async with session.post("https://www.httpbin.org/post", data=data) as response: print(f"status: {response.status}") print(f"headers: {response.headers}") print(f"body: {await response.text()}") print(f"bytes: {await response.read()}") print(f"json: {await response.json()}") if __name__ == "__main__": asyncio.run(main())
2.7 超时设置
使用ClientTimeout对象设置超时
import aiohttp import asyncio async def main(): # 设置2秒的超时时间 timeout = aiohttp.ClientTimeout(total=2) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get("https://www.httpbin.org/get") as response: print(f"status: {response.status}") if __name__ == "__main__": asyncio.run(main())
2.8 并发限制
使用Semeaphore来控制并发量
防止网站在短时间内无法响应
避免将目标网站爬挂掉的风险
import aiohttp import asyncio # 爬取的最大并发量 CONCURRENCY = 5 URL = "https://www.baidu.com/" # 创建一个信号量对象 semaphore = asyncio.Semaphore(CONCURRENCY) session = None async def scrape_api(): # 信号量可以控制进入爬取的最大协程数量 async with semaphore: print(f"Scraping {URL}") async with session.get(URL) as response: await asyncio.sleep(1) return await response.text() async def main(): global session session = aiohttp.ClientSession() scrape_index_tasks = [ asyncio.ensure_future( scrape_api()) for _ in range(1000)] await asyncio.gather(*scrape_index_tasks) if __name__ == "__main__": asyncio.run(main())
3. aiohttp异步爬取实战
3.1 案例介绍
目标网站:Scrape | Book
网站数据由JavaScript渲染获得
数据可以通过Ajax接口获取
接口没有设置任何反爬措施和加密参数
目标:
使用aiohttp爬取全站的数据
将数据通过异步的方式保存到 MongoDB 中
3.2 准备工作
MongoDB数据库
3.3 页面分析
列表页的Ajax请求接口的格式:https://spa5.scrape.center/api/book/?limit=18&offset={offset}
limit:每一页包含多少书
offest:每一页的偏移量(o f f s e t = l i m i t ∗ ( p a g e − 1 ) offset=limit*(page-1)offset=limit∗(page−1))
Ajax接口返回的数据中有每本书的id,利用id可以进入该书的详情页
详情页的Ajax请求接口的格式:https://spa5.scrape.center/detail/{id}
id:书本身的ID
3.4 实现思路
爬取的两个阶段:
将所有图书的id信息组合为所有详情页的爬取任务集合,并将其声明为task组成的列表,进行异步爬取,同时爬取结果也以异步方式存储到MongoDB中
将所有列表页的爬取任务集合在一起,并将其声明为由task组成的列表,进行异步爬取
异步爬取所有列表页
拿到上一步列表页的所有内容并解析
3.5 基本配置
import logging logging.basicConfig( level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s') INDEX_URL = "https://spa5.scrape.center/api/book/?limit=18&offset={offset}" DETAIL_URL = "https://spa5.scrape.center/detail/{id}" PAGE_SIZE = 18 PAGE_NUMBER = 100 CONCURRENCY = 5
3.6 爬取列表页
实现
通用的爬取方法
import asyncio import aiohttp async def scrape_api(url): async with semaphore: try: logging.info(f"Scraping: {url}") # verify_ssl: 是否开启SSL认证 async with session.get(url, verify_ssl=False) as response: return await response.json() except aiohttp.ClientError: logging.info(f"Error: {url}", exc_info=True)
爬取列表页
async def scrape_index(page): url = INDEX_URL.format(offset=PAGE_SIZE * (page - 1)) return await scrape_api(url)
串联并用
import json async def main(): global session session = aiohttp.ClientSession() scrape_index_tasks = [ asyncio.ensure_future( scrape_index(page)) for page in range( 1, PAGE_NUMBER + 1)] results = await asyncio.gather(*scrape_index_tasks) logging.info( f"Results: {json.dumps(results, ensure_ascii=False, indent=2)}") if __name__ == "__main__": asyncio.run(main())
合并
import json import asyncio import logging import aiohttp logging.basicConfig( level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s') INDEX_URL = "https://spa5.scrape.center/api/book/?limit=18&offset={offset}" DETAIL_URL = "https://spa5.scrape.center/detail/{id}" PAGE_SIZE = 18 PAGE_NUMBER = 100 CONCURRENCY = 5 semaphore = asyncio.Semaphore(CONCURRENCY) session = None async def scrape_api(url): async with semaphore: try: logging.info(f"Scraping: {url}") async with session.get(url, verify_ssl=False) as response: return await response.json() except aiohttp.ClientError: logging.info(f"Error: {url}", exc_info=True) async def scrape_index(page): url = INDEX_URL.format(offset=PAGE_SIZE * (page - 1)) return await scrape_api(url) async def main(): global session session = aiohttp.ClientSession() scrape_index_tasks = [ asyncio.ensure_future( scrape_index(page)) for page in range( 1, PAGE_NUMBER + 1)] results = await asyncio.gather(*scrape_index_tasks) logging.info( f"Results: {json.dumps(results, ensure_ascii=False, indent=2)}") if __name__ == "__main__": asyncio.run(main())
3.7 爬取详情页
实现
在main方法中将详情页的ID获取出来
ids = [] for index_data in results: if not index_data: continue for item in index_data.get("results"): ids.append(item.get("id"))
爬取详情页
async def scrape_detail(id): url = DETAIL_URL.format(id=id) data = await scrape_api(url) logging.info(f"Saving: {data}")
main方法增加对scrape_detail方法的调用
scrape_detail_tasks = [ asyncio.ensure_future( scrape_detail(id)) for id in ids] await asyncio.wait(scrape_detail_tasks) await session.close()
合并
import json import asyncio import logging import aiohttp logging.basicConfig( level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s') INDEX_URL = "https://spa5.scrape.center/api/book/?limit=18&offset={offset}" DETAIL_URL = "https://spa5.scrape.center/api/book/{id}" PAGE_SIZE = 18 PAGE_NUMBER = 100 CONCURRENCY = 5 semaphore = asyncio.Semaphore(CONCURRENCY) session = None async def scrape_api(url): async with semaphore: try: logging.info(f"Scraping: {url}") async with session.get(url, verify_ssl=False) as response: return await response.json() except aiohttp.ClientError: logging.info(f"Error: {url}", exc_info=True) async def scrape_index(page): url = INDEX_URL.format(offset=PAGE_SIZE * (page - 1)) return await scrape_api(url) async def scrape_detail(id): url = DETAIL_URL.format(id=id) data = await scrape_api(url) logging.info(f"Saving {url}: {data}") async def main(): global session session = aiohttp.ClientSession() scrape_index_tasks = [ asyncio.ensure_future( scrape_index(page)) for page in range( 1, PAGE_NUMBER + 1)] results = await asyncio.gather(*scrape_index_tasks) logging.info( f"Results: {json.dumps(results, ensure_ascii=False, indent=2)}") ids = [] for index_data in results: if not index_data: continue for item in index_data.get("results"): ids.append(item.get("id")) scrape_detail_tasks = [ asyncio.ensure_future( scrape_detail(id)) for id in ids] await asyncio.wait(scrape_detail_tasks) await session.close() if __name__ == "__main__": asyncio.run(main())
猜你喜欢
- 【Python】如何使用Python中的正则表达式进行字符串匹配
- 如何使用Python中的正则表达式进行字符串匹配正则表达式是一种强大的字符串模式匹配工具,它能够在文本中查找特定的模式,使程序能够更快速、更灵活地处理字符串。在Python中,我们可以使用re模块来操作正则表达式。本文将介绍如何使用Python中的正则表达式进行字符串匹配,并提供具体的代码示例。导入re模块在使用正则表达式之前,我们需要先导入re模块。可以使用以下代码来导入re模块:import re登录后复制字符串匹配正则表达式可以用来匹配字符串中的特定模式。例如,我们可以使用正则表
- 【Python】pycharm环境如何配置
- 配置教程:1、下载并安装PyCharm;2、选择Python解释器;3、配置虚拟环境;4、配置代码风格;5、配置调试器;6、配置版本控制工具;7、配置插件;8、配置Python路径和环境变量;9、配置其他选项。详细介绍:1、从PyCharm官网下载适合电脑操作系统的安装包,然后按照提示完成安装;2、在PyCharm中,可以选择已有的Python解释器或者添加新的解释器等等。本教程操作系统:windows10系统、Python3.11.4版本、Dell G3电脑。PyCharm环境配置教程如下:下
- 【Python】如何升级Python的pip工具
- span style="text-wrap: wrap;">解决常见问题:Python升级pip的实用指南导言:Python是一种流行的高级编程语言,拥有强大的生态系统和广泛的第三方库。而pip是Python的默认包管理工具,用于安装和管理Python包。然而,随着时间的推移,pip的版本可能会变得过时,不支持某些新功能或存在安全漏洞。为了确保我们能够得到最新的功能和修复的漏洞,我们需要升级pip。本文将为您提供一些实用的指南和具体的代码示例。一、使用命令行升级pip打开命令行工具(Windows用户可以使用cmd或PowerShell,macOS或Li</span
- 【Python】如何使用Python中的pickle和JSON进行对象序列化和反序列化
- 如何使用Python中的pickle和JSON进行对象序列化和反序列化Python是一种简单而强大的编程语言,其内置了许多有用的库和模块,使开发人员能够快速进行各种任务。其中,pickle和JSON是两个常用的模块,用于对象序列化和反序列化。本文将介绍如何使用这两个模块进行对象的序列化和反序列化,并提供详细的代码示例。使用pickle进行对象序列化和反序列化pickle是Python中的一个模块,通过它可以将对象转化为二进制数据以便于存储或传输,同时也可以将二进制数据还原为原始对象。首先,我们需
- 【Python】第二章 基本数据库的使用
- 目录1. urllib 的使用1.4.1 Robots 协议1.4.2 robotparser1.3.1 urlparse1.3.2 urlunparse1.3.3 urlsplit1.3.4 urlunsplit1.3.5 urljoin1.3.6 urlencode1.3.7 parse_qs1.3.8 parse_qsl1.3.9 quote1.3.10 unquote1.2.1 URLError1.2.2 HTTPError1.1.1 urlopen1.1.2 Request1.1.3
- 【Python】用matplotlib实现数据集散点图的实际应用
- 实战演练:利用Matplotlib绘制数据集的散点图Matplotlib是Python中常用的绘图库之一,它提供了丰富的功能,可以绘制各种类型的图表。其中,散点图是一种常用的数据可视化方式,用于展示两个变量之间的关系。本文将介绍如何利用Matplotlib绘制数据集的散点图,并附上具体的代码示例。首先,我们需要安装Matplotlib库。可以使用pip命令执行以下语句安装:pip install matplotlib安装完成后,我们可以导入Matplotlib库并开始绘制散点
- 【Python】如何使用Python中的多线程和协程实现一个高性能的爬虫
- 如何使用Python中的多线程和协程实现一个高性能的爬虫导语:随着互联网的快速发展,爬虫技术在数据采集和分析中扮演着重要的角色。而Python作为一门强大的脚本语言,具备多线程和协程的功能,可以帮助我们实现高性能的爬虫。本文将介绍如何使用Python中的多线程和协程来实现一个高性能的爬虫,并提供具体的代码示例。多线程实现爬虫多线程是利用计算机的多核特性,将任务分解成多个子任务,并同时执行,从而提高程序的执行效率。下面是一个使用多线程实现爬虫的示例代码:import threading
- 【Python】如何利用Python编写RSA加密算法
- 如何利用Python编写RSA加密算法?引言:RSA是一种非对称加密算法,被广泛应用于信息安全领域。在现代通信中,RSA加密算法常用于加密和解密敏感数据。本文将介绍如何使用Python编写RSA加密算法,并提供具体的代码示例。1. 安装Python库在开始编写RSA加密算法之前,需要安装Python的加密库。可以使用以下命令安装:pip install rsa2. 生成RSA密钥对在RSA加密算法中,存在公钥和私钥两个密钥。公钥用于加密数据,私钥用于解密数据。首先,我们需要生