【Python】第二章 基本数据库的使用
目录
1. urllib 的使用
实现 HTTP 请求的发送,并且不需要关心 HTTP 协议本身甚至更底层的实现,只需指定请求的 URL 、请求头、请求体等信息
可以把服务器返回的响应转化为 Python 对象
响应状态码
响应头
响应体
···
通过该对象可以方便地获取响应的相关信息
urllib 库包含的 4 个模块
request:最基本的 HTTP 请求模块,可以模拟请求的发送
error:异常处理模块
parse:工具模块
robotparser(使用较少):识别网站的 robot.txt 文件,进而判断网站是否可爬
1.1 发送请求(request)
1.1.1 urlopen
抓取网页的源代码
语法:
urllib.request.urlopen(url, data=None, [timeout, ]*, cafile=None, capath=None, cadefault=False, context=None)
import urllib.request response = urllib.request.urlopen("https://www.python.org/") print(response.read().decode("utf-8")) # 获取网页源码 print(type(response)) # <class 'http.client.HTTPResponse'> print(response.status) # 获取网页状态码 print(response.getheaders()) # 获取网页响应头 print(response.getheader("Server")) # 获取网页指定的响应头
data 参数
需要将参数转化为 bytes 类型
当传入参数时,请求方式将由 GET 转化为 POST
import urllib.request import urllib.parse data = bytes(urllib.parse.urlencode({"name": "abc"}), encoding='utf-8') response = urllib.request.urlopen("https://www.httpbin.org/post", data=data) print(response.read().decode('utf-8')) # 模拟表单提交,以 POST 方式传输数据
timeout 参数
设置超时时间
单位为秒
import socket import urllib.request import urllib.error try: response = urllib.request.urlopen( "https://www.httpbin.org/get", timeout=0.1) except urllib.error.URLError as e: if isinstance(e.reason, socket.timeout): print("TIME OUT")
其他参数
context 参数:
参数类型必须为 ssl.SSLContext 类
用来指定 SSL 的设置
cafile 参数:
用来指定 CA 证书
capath 参数:
用来指定 CA 整数的路径
cadefault 参数(已经弃用)
1.1.2 Request
将请求独立成一个对象
更加丰富和灵活地配置参数
import urllib.request url = "https://www.python.org/" request = urllib.request.Request(url) response = urllib.request.urlopen(request) print(response.read().decode("utf-8"))
语法:
urllib.request.Request(url, data=None, headers={}, origin_req_host=None, unverifiable=False, method=None)
用于指定请求的使用方法
GET
POST
PUT
用于指定请求是否为无法验证
用户没有足够的权限来接收这个请求的结果
请求方的 host 名称或 IP 地址
请求头
是一个字典
构造请求时,可以通过 headers 参数直接构造此项,也可以通过调用请求实例的 add_header 方法添加
通过修改 User-Agent 来伪装浏览器
参数类型需要为 byte 类型
字典需要先用 urllib.parse.urlencode() 修饰
用于请求 URL
url:
data:
headers:
origin_req_host:
unverifiable:
method:
from urllib import request, parse url = "https://www.httpbin.org//post" headers = { "User-Agent": "Edg/105.0.1343.42", "Host": "www.python.org" } user = {"name": "abc"} data = bytes(parse.urlencode(user), encoding="utf-8") req = request.Request(url=url, data=data, headers=headers, method="POST") # req = request.Request(url=url, data=data, method="POST") # req.add_header("User-Agent", "Edg/105.0.1343.42") response = request.urlopen(req) print(response.read().decode("utf-8"))
使用 add_header() 添加响应头
from urllib import request, parse url = "https://www.httpbin.org//post" user = {"name": "abc"} data = bytes(parse.urlencode(user), encoding="utf-8") req = request.Request(url=url, data=data, method="POST") req.add_header("User-Agent", "Edg/105.0.1343.42") req.add_header("Host", "www.python.org") response = request.urlopen(req) print(response.read().decode("utf-8"))
1.1.3 高级用法
Handler:各种处理器
处理登录验证
处理 Cookie
处理代理设置
···
urllib.request 中的 BaseHandler 类,是其他所有 Handler 类的父类
提供最基本的方法
子类的例子
HTTPDefaultErrorHandler:用于处理 HTTP 响应错误,所有错误都会抛出 HTTPError 类型的异常
HTTPRedirectHandler:用于处理重定向
HTTPCookieProcesser:用于处理 Cookie
ProxyHandler:用于设置代理,默认为空
HTTPPasswordMgr:用于管理密码
HTTPBasicAuthHandler:用于管理认证
OpenerDirector(Opener):urlopen 方法就是 urllib 提供的一个 Opener
利用 Handler 类来构建 Opener 类
验证
借助 HTTPBasicAuthHandler 模块完成验证
from urllib.request import HTTPBasicAuthHandler, HTTPPasswordMgrWithDefaultRealm, build_opener from urllib.error import URLError import ssl ssl._create_default_https_context = ssl._create_unverified_context # 如果没有进行SSL认证,可以通过此方式将SSL认证全局关闭 username = "admin" password = "admin" url = "https://ssr3.scrape.center/" p = HTTPPasswordMgrWithDefaultRealm() p.add_password(None, url, username, password) # 将账号密码封装 handler = HTTPBasicAuthHandler(p) opener = build_opener(handler) try: result = opener.open(url) html = result.read().decode("utf-8") print(html) except URLError as e: print(e.reason)
代理
添加代理
from urllib.request import ProxyHandler, build_opener from urllib.error import URLError handler = ProxyHandler({ "http": "http://127.0.0.1:8080", "https": "https://127.0.0.1:8080" }) opener = build_opener(handler) try: response = opener.open("https://www.baidu.com/") print(response.read().decode("utf-8")) except URLError as e: print(e.reason)
Cookie
获取 Cookie
from http.cookiejar import CookieJar from urllib.request import HTTPCookieProcessor, build_opener cookie = CookieJar() handler = HTTPCookieProcessor(cookie) opener = build_opener(handler) response = opener.open("https://www.baidu.com/") for item in cookie: print(f"{item.name} = {item.value}")
存储 Cookie
读取 Cookie
保存 Cookie
···
MozillaCookieJar:用于处理与 Cookie 和文件相关的事件
from http.cookiejar import MozillaCookieJar from urllib.request import HTTPCookieProcessor, build_opener filename = "Cookie.txt" cookie = MozillaCookieJar(filename) handler = HTTPCookieProcessor(cookie) opener = build_opener(handler) response = opener.open("https://www.baidu.com/") cookie.save(ignore_discard=True, ignore_expires=True)
读取并使用 Cookie
from http.cookiejar import MozillaCookieJar from urllib.request import HTTPCookieProcessor, build_opener filename = "Cookie.txt" cookie = MozillaCookieJar() cookie.load(filename, ignore_discard=True, ignore_expires=True) handler = HTTPCookieProcessor(cookie) opener = build_opener(handler) response = opener.open("https://www.baidu.com/") print(response.read().decode("utf-8"))
1.2 处理异常(error)
1.2.1 URLError
error 异常模块的基类
返回错误原因
from urllib import request, error try: request = request.urlopen("https://www.bilibili.com/") except error.URLError as e: print(e.reason)
1.2.2 HTTPError
专门用于处理 HTTP 请求错误
认证请求失败
···
拥有的属性
code:状态码
reason:错误原因
headers:请求头
from urllib import request, error try: request = request.urlopen("https://www.bilibili.com/") except error.HTTPError as e: print(f"{e.code}\n{e.reason}\n{e.headers}")
reason 返回的不一定是字符串,可能是一个对象
from urllib.request import urlopen from urllib.error import URLError from socket import timeout try: response = urlopen("https://www.bilibili.com/", timeout=0.01) except URLError as e: if isinstance(e.reason, timeout): print("Time out")
1.3 解析链接(parse)
1.3.1 urlparse
用于 URL 的识别和分段
返回值类型:ParseResult(是一个元组)
scheme:协议名称
netloc:域名
path:访问路径
params:参数
query:查询条件
fragment:锚点
属性:
可以通过属性名获取内容
也可以通过下标获取内容
from urllib.parse import urlparse result = urlparse("https://www.baidu.com/index.html;user?id=5#comment") print(type(result)) print(result)
语法
urlparse(url, scheme='', allow_fragments=True)
若不记录则将此锚点归入此 URL 中的最后一个属性中
url:需要解析的 URL
scheme:当 URL 未指定协议时,使用此协议
allow_fragments:是否允许记录锚点
1.3.2 urlunparse
用于构造 URL
接收的参数长度必须为 6
数组
元组
···
只要长度为 6 就可以
from urllib.parse import urlunparse data = ["https", "www.baidu.com", "index.html", "user", "a=6", "comment"] print(urlunparse(data))
1.3.3 urlsplit
使用方式与 urlparse 相似
不单独解析 params
params 会合并到 path 中
from urllib.parse import urlsplit result = urlsplit("https://www.baidu.com/index.html;user?id=5#comment") print(result)
1.3.4 urlunsplit
使用方式与 urlunparse 相似
接收的参数长度必须为 5
from urllib.parse import urlunsplit data = ["https", "www.baidu.com", "index.html;user", "a=6", "comment"] print(urlunsplit(data))
1.3.5 urljoin
生成链接
语法
urljoin(base, url, allow_fragments=True)
base:基础链接
url:新的链接
1
urljoin 方法会分析 base 的 scheme、netloc 和 path,对 url 缺失的部分进行补充
正常情况
from urllib.parse import urljoin print(urljoin("https://www.baidu.com?wd=abc", "https://www.bilibili.com/index.html")) # https://www.bilibili.com/index.html print(urljoin("https://www.baidu.com?wd=abc", "https:///")) # https://www.baidu.com/ print(urljoin("https://www.baidu.com?wd=abc", "https:///index.html")) # https://www.baidu.com/index.html
非人情况
from urllib.parse import urljoin print(urljoin("https://www.baidu.com?wd=abc", "https")) # https://www.baidu.com/https print(urljoin("https://www.baidu.com?wd=abc", "https:")) # https://www.baidu.com?wd=abc print(urljoin("https://www.baidu.com?wd=abc", "https://")) # https://www.baidu.com?wd=abc print(urljoin("https://www.baidu.com?wd=abc", "www.bilibili.com/index.html")) # https://www.baidu.com/www.bilibili.com/index.html print(urljoin("https://www.baidu.com?wd=abc", "//www.bilibili.com/index.html")) # https://www.bilibili.com/index.html print(urljoin("https://www.baidu.com?wd=abc", "?ab=123")) # https://www.baidu.com?ab=123 print(urljoin("https://www.baidu.com?wd=abc", "#123")) # https://www.baidu.com?wd=abc#123
1.3.6 urlencode
构造 GET 请求
from urllib.parse import urlencode params = { "name": "abc", "password": "123" } base_url = "https://www.baidu.com?" print(base_url + urlencode(params))
1.3.7 parse_qs
将 GET 请求参数转化为字典
from urllib.parse import parse_qs query = "name=abc&password=123" print(parse_qs(query))
1.3.8 parse_qsl
将参数转化为元组组成的列表
from urllib.parse import parse_qsl query = "name=abc&password=123" print(parse_qsl(query))
1.3.9 quote
将内容转化为 URL 编码格式
from urllib.parse import quote word = "辰默" url = "https://www.baidu.com?word=" + quote(word) print(url)
1.3.10 unquote
对 URL 进行解码
from urllib.parse import unquote url = "https://www.baidu.com?word=%E8%BE%B0%E9%BB%98" print(unquote(url))
1.4 分析 Robots 协议(robotparser)
1.4.1 Robots 协议
网络爬虫排除标准
通常是一个 robots.txt 文本文件(存放在网站的根目录下)
探索爬虫在访问一个站点时,首先会检查是否存在该文件
存在:根据其中定义的爬取范围来爬取
不存在:搜索爬虫会访问所有可以直接访问的页面
样例
User-agent: * Disallow: / Allow: /public/
/:所有
*:所有
User-agent:搜索爬虫的名称
Disallow:不允许爬取的目录
Allow:允许爬取的目录(一般与 Disallow 一起使用)
1.4.2 robotparser
根据 robots.txt 判断爬取爬虫是否有权限爬取此网页
from urllib.robotparser import RobotFileParser rp = RobotFileParser() rp.set_url("https://www.baidu.com/robots.txt") rp.read() print(rp.can_fetch("Baiduspider", "https://www.baidu.com")) print(rp.can_fetch("Baiduspider", "https://www.baidu.com/homepage/")) print(rp.can_fetch("Googlebot", "https://www.baidu.com/homepage/"))
也可以使用 pars 对 robots.txt 进行解析
from urllib.request import urlopen from urllib.robotparser import RobotFileParser rp = RobotFileParser() rp.parse(urlopen("https://www.baidu.com/robots.txt").read().decode("utf-8").split("\n")) print(rp.can_fetch("Baiduspider", "https://www.baidu.com")) print(rp.can_fetch("Baiduspider", "https://www.baidu.com/homepage/")) print(rp.can_fetch("Googlebot", "https://www.baidu.com/homepage/"))
2. requests 的使用
2.1 实例引入
使用 GET 方法请求页面
urlopen 方法本质是使用 GET 方法
import requests r = requests.get("https://www.baidu.com") print(type(r)) print(r.status_code) print(type(r.text)) print(r.text[:100]) print(r.cookies)
其他请求方法
import requests r = requests.get("https://www.httpbin.org/get") r = requests.post("https://www.httpbin.org/post") r = requests.put("https://www.httpbin.org/put") r = requests.delete("https://www.httpbin.org/delete") r = requests.patch("https://www.httpbin.org/patch")
2.2 GET 请求
2.2.1 基本实例
import requests r = requests.get("https://www.httpbin.org/get") print(r.text)
添加参数
import requests data = { "name": "abc", "password": "123" } r = requests.get("https://www.httpbin.org/get", params=data) print(r.text)
虽然 r.txt 为 str 类型,但是是 JSON 格式的
可以通过 json 方法获得 JSON 格式的数据
返回结果是一个字典
import requests r = requests.get("https://www.httpbin.org/get") print(type(r.text)) print(r.json()) print(type(r.json()))
2.2.2 抓取网页
import requests import re r = requests.get("https://ssr1.scrape.center/") pattern = re.compile("<h2.*?>(.*?)</h2>", re.S) titles = re.findall(pattern, r.text) print(titles)
2.2.3 抓取二进制数据
图片、音频、视频等文件由二进制编码组成
import requests r = requests.get("https://ssr1.scrape.center/static/img/logo.png") with open("logo.png", "wb") as f: # 以二进制形式写入 f.write(r.content)
2.2.4 添加请求头
import requests headers = { "User-Agent": "Edg/105.0.1343.42", } r = requests.get("https://ssr1.scrape.center/", headers=headers) print(r.text)
2.3 POST 请求
import requests data = { "name": "abc", "password": "123" } r = requests.post("https://www.httpbin.org/post", data=data) print(r.text)
2.4 响应
import requests r = requests.get("https://ssr1.scrape.center/") print(r.status_code) print(r.headers) print(r.cookies) print(r.url) print(r.history)
status_code:响应状态码
headers: 响应头
url: URL
history: 请求历史
cookies: Cookie
2.5 高级用法
2.5.1 文件上传
对 files 参数进行传参
import requests r = requests.get("https://ssr1.scrape.center/static/img/logo.png") with open("logo.png", "wb") as f: f.write(r.content) files = { "file": open("logo.png", "rb") } r = requests.post("https://www.httpbin.org/post", files=files) print(r.text)
2.5.2 Cookie 设置
Cookie 获取
import requests r = requests.get("https://www.baidu.com") cookies = r.cookies print(cookies) for key, value in cookies.items(): print(f"{key}={value}")
Cookie 设置
通过headers参数设置
from requests import get headers = { "User-agent": "Edg/105.0.1343.42", "Cookie": "BDORZ=27315" } r = get("https://www.baidu.com", headers=headers)
通过cookies参数设置
from requests import get from requests.cookies import RequestsCookieJar cookies: "BDORZ=27315" jar = RequestsCookieJar() headers = { "User-agent": "Edg/105.0.1343.42" } for cookie in cookies.split(";"): key, value = cookie.split("=", 1) jar.set(key, value) r = get("https://www.baidu.com", cookies=jar, headers=headers) print(r.text)
2.5.3 Session 维持
get 和 post 方法均可以做到访问网页,但实际上使用得是不同得 Session
等效于用两个浏览器打开了不同得页面
使用 Session 对象维持 Session 并避免重复设置 Cookie
未使用 Session 维持
from requests import get r1 = get("https://www.httpbin.org/cookies/set/abc/123") print(r1.text) r2 = get("https://www.httpbin.org/cookies") print(r2.text)
使用 Session 维持
from requests import Session s = Session() r1 = s.get("https://www.httpbin.org/cookies/set/abc/123") print(r1.text) r2 = s.get("https://www.httpbin.org/cookies") print(r2.text)
2.5.4 SSL 证书验证
请求的 URL 证书无效
from requests import get response = get("https://ssr2.scrape.center/") print(response.status_code)
使用 verify 参数控制是否验证证书
不验证证书会给出警告
from requests import get response = get("https://ssr2.scrape.center/", verify=False) print(response.status_code)
忽略警告(与书上不同)
书上:urllib3 位于 requests.packages(Python 2 中的使用方式)
实际:Python 3 将 urllib3 独立
from requests import get from urllib3 import disable_warnings disable_warnings() response = get("https://ssr2.scrape.center/", verify=False) print(response.status_code)
捕获警告到日志的方式忽略警告
from requests import get from logging import captureWarnings captureWarnings(True) response = get("https://ssr2.scrape.center/", verify=False) print(response.status_code)
2.5.5 超时设置
timeout 参数:设置响应时间
响应时间 = 连接时间 + 读取时间
from requests import get r1 = get("https://www.httpbin.org/get", timeout=(0.1, 0.2)) print(r1.status_code) r2 = get("https://www.httpbin.org/get", timeout=0.1) print(r2.status_code)
2.5.6 身份认证
auth 参数:设置身份验证
参数类型为 HTTPBasicAuth
from requests import get from requests.auth import HTTPBasicAuth r = get("https://ssr3.scrape.center/", auth=HTTPBasicAuth("admin", "admin")) print(r.status_code)
可以直接传入元组作为参数
from requests import get r = get("https://ssr3.scrape.center/", auth=("admin", "admin")) print(r.status_code)
2.5.7 代理设置
防止大规模爬取下,网站的保护机制导致无法访问
proxies 参数:设置代理
from requests import get proxies = { "http": "http://10.10.10.10:1080", "https": "http://user:password@10.10.10.10:1080" # 需要身份验证的代理 } r = get("https://ssr3.scrape.center/", proxies=proxies) print(r.status_code)
2.5.8 Prepared Request
发送请求在 request 库中的实现步骤
url
headers
data
···
request 发送请求
在内部创建一个 Request 对象
对 Request 对象赋值(设置参数)
将 Request 对象发送
请求成功后会再获得一个 Request 对象
from requests import Request, Session url = "https://www.httpbin.org/post" data = { "name": "abc" } headers = { "User-Agent": "Edg/105.0.1343.42" } s = Session() req = Request("POST", url, data=data, headers=headers) # 创建 Request 对象 prepped = s.prepare_request(req) # 转化为 Prepare Request 对象 r = s.send(prepped) print(r.text)
3. 正则表达式
3.1 实例引入
使用 在线正则表达式测试 (oschina.net) 可以快速生成指定样式的正则表达式
对于 URL 的正则表达式
[a-zA-z]+://[^\s]*
3.2 match
从字符串的开头开始匹配,一旦开头不匹配,则整个匹配失败
3.2.1 匹配目标
from re import match content = "hello 12345 world 12345" result = match("^hello\\s(\\d*)\\sworld", content) print(result) print(result.group()) print(result.group(1)) print(result.span())
3.2.2 通用匹配
匹配除换行符外的所有字符
from re import match content = "hello 12345 world 12345" result = match("^hello.*world", content) print(result) print(result.group()) print(result.span())
3.2.3 贪婪与非贪婪
贪婪
from re import match content = "hello 12345 world 12345" result = match("^hello.*(\\d+).*world", content) print(result) print(result.group(1))
非贪婪
from re import match content = "hello 12345 world 12345" result = match("^hello.*?(\\d+).*world", content) print(result) print(result.group(1))
注意
如果匹配结果在字符串结尾,那么非贪婪模式可能不会匹配到内容
from re import match content = "hello 12345 world 12345" result1 = match("^hello.*?world(.*)", content) print(result1.group(1)) result2 = match("^hello.*?world(.*?)", content) print(result2.group(1))
3.2.4 修饰符
修饰符 | 描述 |
---|---|
re.I | 匹配对大小写不敏感 |
re.L | 实现本地化识别匹配 |
re.M | 多行匹配(影响 ^ 和 $) |
re.S | 匹配内容包含换行符在内的所有字符 |
re.U | 根据 Unicode 字符集解析字符(影响 \w、\W、\b 和 \B) |
re.X | 将正则表达式书写得更易于理解 |
3.2.5 转义匹配
from re import match content = "hello (12345) world 12345" result = match("^hello.*?\\(\\d*\\).*?world", content) print(result)
3.3 search(html文本有问题)
import re from re import search from requests import get html = get("https://www.bilibili.com/").text result = search("<h3.*?bili-video-card.*?title=(.*?)>", html, re.S) print(result)
3.4 findall(html文本有问题)
import re from re import search from requests import get html = get("https://www.bilibili.com/").text results = findall("<h3.*?bili-video-card.*?title=(.*?)>", html, re.S) for result in results: print(result)
3.5 sub
批量处理文本
from re import sub content = sub("\\d+", "", "1q2w3e") print(content)
3.6 compile
将正则字符串编译成正则表达式对象,以便在后面的匹配中复用
import re content1 = "2000-9-5 12:00" content2 = "2001-9-17 13:00" pattern = re.compile("\\d{2}:\\d{2}") result1 = re.sub(pattern, "", content1) result2 = re.sub(pattern, "", content2) print(result1) print(result2)
4. httpx 的使用
urllib 和 requests 只能访问 HTTP/1.1
httpx 可以访问 HTTP/2.0
4.1 示例
https://spa16.scrape.center/ 是强制使用 HTTP/2.0 访问的一个网站
import requests url = "https://spa16.scrape.center/" response = requests.get(url) print(response.text)
通过此种方式访问会抛出 RemoteDisconnected 错误
4.2 安装
pip3 install 'httpx[http2]'1
4.3 基本使用
httpx 与 requests 有许多相似的 API
get 方法
import httpx url = "https://www.httpbin.org/get" response = httpx.get(url) print(response.status_code) print(response.headers) print(response.text)
配置 User-Agent
import httpx url = "https://www.httpbin.org/get" headers = { "User-Agent": "Edg/106.0.1370.37" } response = httpx.get(url, headers=headers) print(response.status_code) print(response.headers) print(response.text)
解决 HTTP/2.0 问题
httpx 默认使用 HTTP/1.1,需要手动启动 HTTP/2.0
import httpx client = httpx.Client(http2=True) url = "https://spa16.scrape.center/" response = client.get(url) print(response.status_code) print(response.headers) print(response.text)
post、put、delete、patch 方法
使用方法与上方或 requests 一样
4.4 Client 对象
推荐使用 with as
import httpx client = httpx.Client(http2=True) url = "https://spa16.scrape.center/" response = client.get(url) print(response.status_code) print(response.headers) print(response.text)
等价于
import httpx url = "https://www.httpbin.org/get" client = httpx.Client() try: response = client.get(url) finally: client.close()
添加 headers
import httpx url = "https://www.httpbin.org/headers" headers = { "User-Agent": "Edg/106.0.1370.37" } with httpx.Client(headers=headers) as client: response = client.get(url) print(response.json()["headers"]["User-Agent"])
4.5 支持 HTTP/2.0
import httpx url = "https://www.httpbin.org/get" client = httpx.Client(http2=True) response = client.get(url) print(response.text) print(response.http_version)
注意
在客户端的 httpx 上启用对 HTTP/2.0 的支持并不意味着请求和响应都将通过 HTTP/2.0 传输
客户端和服务器都支持 HTTP/2.0 才可以传输
如果客户端连接到仅支持 HTTP/1.1 的服务器,那么它也需要改用 HTTP/1.1
5. 基础爬虫案例实战
爬取 Scrape | Movie(等效于豆瓣Top250)
5.1 爬取目标
利用 requests 爬取这个站点每一页的电影列表,顺着列表再爬取每个电影的详情页
利用正则表达式提取每部电影的名称、封面、类别、上映时间、评分、剧情简介等内容
把以上爬取的内容保存为 JSON 文本文件
使用多线程实现爬取加速
5.2 爬取列表页
分析
每部电影的区块
<div data-v-7f856186 class="el-row"> ···</div>123
从列表进入详情页
通过 herf 决定跳转的详情页
<a data-v-7f856186 href="/detail/电影的排名" class="name"> ···</a>123
翻页的逻辑
https://ssr1.scrape.center/page/页数1
实现
遍历所有页码,构造 10 页的索引页 URL
从每个索引页,分析提取出每个电影的详情页 URL
基础配置
requests:爬取页面
logging:输出信息
re:实现正则表达式解析
urljoin:URL 拼接
import requests import logging import re from urllib.parse import urljoin # 日志的输出级别 # 日志的输出格式 logging.basicConfig( level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s') # 站点的根 URL BASE_URL = "https://ssr1.scrape.center" # 爬取的总页码数量 TOTAL_PAGE = 10
获取页面资源(html)
def get_html(url): logging.info(f"scrape {url}") try: response = requests.get(url) if response.status_code == 200: return response.text logging.error(f"status_code: {response.status_code} url: {url}") except requests.RequestException: # exc_info: 是否打印 Traceback 错误堆栈信息 logging.error(f"{url} error", exc_info=True)
获取列表页面的页面资源(含页面跳转)
def get_page_html(num_of_page): page_url = f"{BASE_URL}/page/{num_of_page}" return get_html(page_url)
解析列表页面(获取详情页的 URL)
def parse_page(html): pattern = re.compile("<a.*?href=\"(.*?)\".*?class=\"name\">") items = re.findall(pattern, html) if not items: return [] for item in items: detail_url = urljoin(BASE_URL, item) logging.info(f"detail_url: {detail_url}") yield detail_url
串联调用
def main(): for num_of_page in range(1, TOTAL_PAGE + 1): page_html = get_page_html(num_of_page) detail_urls = parse_page(page_html) logging.info(f"detail_urls: {list(detail_urls)}") if __name__ == "__main__": main()
合并
import requests import logging import re from urllib.parse import urljoin logging.basicConfig( level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s') BASE_URL = "https://ssr1.scrape.center" TOTAL_PAGE = 10 def get_html(url): logging.info(f"scrape {url}") try: response = requests.get(url) if response.status_code == 200: return response.text logging.error(f"status_code: {response.status_code} url: {url}") except requests.RequestException: # exc_info: 是否打印 Traceback 错误堆栈信息 logging.error(f"{url} error", exc_info=True) def get_page_html(num_of_page): page_url = f"{BASE_URL}/page/{num_of_page}" return get_html(page_url) def parse_page(html): pattern = re.compile("<a.*?href=\"(.*?)\".*?class=\"name\">") items = re.findall(pattern, html) if not items: return [] for item in items: detail_url = urljoin(BASE_URL, item) logging.info(f"detail_url: {detail_url}") yield detail_url def main(): for num_of_page in range(1, TOTAL_PAGE + 1): page_html = get_page_html(num_of_page) detail_urls = parse_page(page_html) logging.info(f"detail_urls: {list(detail_urls)}") if __name__ == "__main__": main()
5.3 爬取详情页
分析
想要获取内容及其节点信息
封面
<img.*?src=\"封面\".*?class=\"cover\">
名称
"<h2.*?>名称</h2>"
类别
<div class="categories"> <button type="button".*?> <span>类别1</span> </button> <button type="button".*?> <span>类别2</span> </button> </div>
上映时间
<div class="info"> <span>上映时间 上映</span> </div>
评分
<p class="score"> 评分</p>
剧情简介
<div class="drama"> <h3>剧情简介</h3> <p>剧情简介</p> </div>
实现
获取详情页页面资源(html)
与上方 get_html 方法相同
但为了功能的可扩展性,便不直接引用 get_html 方法
def get_detail_html(detail_url): return get_html(detail_url)
获取内容
def parse_detail(html): cover_pattern = re.compile("<img.*?src=\"(.*?)\".*?class=\"cover\">", re.S) name_pattern = re.compile("<h2.*?>(.*?)</h2>") categories_pattern = re.compile( "<button.*?category.*?<span>(.*?)</span>.*?</button>", re.S) published_pattern = re.compile( "<span.*?>(\\d{4}-\\d{2}-\\d{2}).*?上映</span>", re.S) score_pattern = re.compile("<p.*?score.*?>(.*?)</p>", re.S) drama_pattern = re.compile("<div.*?drama.*?<p.*?>(.*?)</p></div>", re.S) cover = re.search( cover_pattern, html).group(1).strip() if re.search( cover_pattern, html) else None name = re.search( name_pattern, html).group(1).strip() if re.search( name_pattern, html) else None categories = re.findall( categories_pattern, html) if re.findall( categories_pattern, html) else [] published = re.search( published_pattern, html).group(1).strip() if re.search( published_pattern, html) else None score = re.search( score_pattern, html).group(1).strip() if re.search( score_pattern, html) else None drama = re.search( drama_pattern, html).group(1).strip() if re.search( drama_pattern, html) else None return { "cover": cover, "name": name, "categories": categories, "published": published, "score": score, "drama": drama }
串联调用
def main(): for num_of_page in range(1, TOTAL_PAGE + 1): page_html = get_page_html(num_of_page) detail_urls = parse_page(page_html) for detail_url in detail_urls: detail_html = get_detail_html(detail_url) data = parse_detail(detail_html) logging.info(f"data: {data}") if __name__ == "__main__": main()
合并
import requests import logging import re from urllib.parse import urljoin logging.basicConfig( level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s') BASE_URL = "https://ssr1.scrape.center" TOTAL_PAGE = 10 def get_html(url): logging.info(f"scrape {url}") try: response = requests.get(url) if response.status_code == 200: return response.text logging.error(f"status_code: {response.status_code} url: {url}") except requests.RequestException: # exc_info: 是否打印 Traceback 错误堆栈信息 logging.error(f"{url} error", exc_info=True) def get_page_html(num_of_page): page_url = f"{BASE_URL}/page/{num_of_page}" return get_html(page_url) def parse_page(html): pattern = re.compile("<a.*?href=\"(.*?)\".*?class=\"name\">") items = re.findall(pattern, html) if not items: return [] for item in items: detail_url = urljoin(BASE_URL, item) logging.info(f"detail_url: {detail_url}") yield detail_url def get_detail_html(detail_url): return get_html(detail_url) def parse_detail(html): cover_pattern = re.compile("<img.*?src=\"(.*?)\".*?class=\"cover\">", re.S) name_pattern = re.compile("<h2.*?>(.*?)</h2>") categories_pattern = re.compile( "<button.*?category.*?<span>(.*?)</span>.*?</button>", re.S) published_pattern = re.compile( "<span.*?>(\\d{4}-\\d{2}-\\d{2}).*?上映</span>", re.S) score_pattern = re.compile("<p.*?score.*?>(.*?)</p>", re.S) drama_pattern = re.compile("<div.*?drama.*?<p.*?>(.*?)</p></div>", re.S) cover = re.search( cover_pattern, html).group(1).strip() if re.search( cover_pattern, html) else None name = re.search( name_pattern, html).group(1).strip() if re.search( name_pattern, html) else None categories = re.findall( categories_pattern, html) if re.findall( categories_pattern, html) else [] published = re.search( published_pattern, html).group(1).strip() if re.search( published_pattern, html) else None score = re.search( score_pattern, html).group(1).strip() if re.search( score_pattern, html) else None drama = re.search( drama_pattern, html).group(1).strip() if re.search( drama_pattern, html) else None return { "cover": cover, "name": name, "categories": categories, "published": published, "score": score, "drama": drama } def main(): for num_of_page in range(1, TOTAL_PAGE + 1): page_html = get_page_html(num_of_page) detail_urls = parse_page(page_html) for detail_url in detail_urls: detail_html = get_detail_html(detail_url) data = parse_detail(detail_html) logging.info(f"data: {data}") if __name__ == "__main__": main()
5.4 保存数据
保存为 JSON 格式
实现
基础配置
import json from pathlib import Path RESULTS_DIR = "results" Path("./" + RESULTS_DIR).mkdir(parents=True, exist_ok=True)
保存数据
def save_data(data): name = data.get("name") data_path = f"{RESULTS_DIR}/{name}.json} # ensure_ascii: 确保是否以ASCII编码呈现 # indent: JSON 的结果缩进 json.dump(data, open(data_path, "w", encoding="utf8"), ensure_ascii=False, indent=2)
串联调用
def main(): for num_of_page in range(1, TOTAL_PAGE + 1): page_html = get_page_html(num_of_page) detail_urls = parse_page(page_html) for detail_url in detail_urls: detail_html = get_detail_html(detail_url) data = parse_detail(detail_html) logging.info(f"data: {data}") logging.info("saving data") save_data(data) logging.info("saving successfully") if __name__ == "__main__": main()
合并
import json import requests import logging import re from urllib.parse import urljoin from pathlib import Path logging.basicConfig( level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s') BASE_URL = "https://ssr1.scrape.center" TOTAL_PAGE = 10 RESULTS_DIR = "results" Path("./" + RESULTS_DIR).mkdir(parents=True, exist_ok=True) def get_html(url): logging.info(f"scrape {url}") try: response = requests.get(url) if response.status_code == 200: return response.text logging.error(f"status_code: {response.status_code} url: {url}") except requests.RequestException: # exc_info: 是否打印 Traceback 错误堆栈信息 logging.error(f"{url} error", exc_info=True) def get_page_html(num_of_page): page_url = f"{BASE_URL}/page/{num_of_page}" return get_html(page_url) def parse_page(html): pattern = re.compile("<a.*?href=\"(.*?)\".*?class=\"name\">") items = re.findall(pattern, html) if not items: return [] for item in items: detail_url = urljoin(BASE_URL, item) logging.info(f"detail_url: {detail_url}") yield detail_url def get_detail_html(detail_url): return get_html(detail_url) def parse_detail(html): cover_pattern = re.compile("<img.*?src=\"(.*?)\".*?class=\"cover\">", re.S) name_pattern = re.compile("<h2.*?>(.*?)</h2>") categories_pattern = re.compile( "<button.*?category.*?<span>(.*?)</span>.*?</button>", re.S) published_pattern = re.compile( "<span.*?>(\\d{4}-\\d{2}-\\d{2}).*?上映</span>", re.S) score_pattern = re.compile("<p.*?score.*?>(.*?)</p>", re.S) drama_pattern = re.compile("<div.*?drama.*?<p.*?>(.*?)</p></div>", re.S) cover = re.search( cover_pattern, html).group(1).strip() if re.search( cover_pattern, html) else None name = re.search( name_pattern, html).group(1).strip() if re.search( name_pattern, html) else None categories = re.findall( categories_pattern, html) if re.findall( categories_pattern, html) else [] published = re.search( published_pattern, html).group(1).strip() if re.search( published_pattern, html) else None score = re.search( score_pattern, html).group(1).strip() if re.search( score_pattern, html) else None drama = re.search( drama_pattern, html).group(1).strip() if re.search( drama_pattern, html) else None return { "cover": cover, "name": name, "categories": categories, "published": published, "score": score, "drama": drama } def save_data(data): name = data.get("name") data_path = f"{RESULTS_DIR}/{name}.json" json.dump( data, open( data_path, "w", encoding="utf8"), ensure_ascii=False, indent=2) def main(): for num_of_page in range(1, TOTAL_PAGE + 1): page_html = get_page_html(num_of_page) detail_urls = parse_page(page_html) for detail_url in detail_urls: detail_html = get_detail_html(detail_url) data = parse_detail(detail_html) logging.info(f"data: {data}") logging.info("saving data") save_data(data) logging.info("saving successfully") if __name__ == "__main__": main()
5.5 多进程加速
改写 main 方法
import multiprocessing def main(page): page_html = get_page_html(page) detail_urls = parse_page(page_html) for detail_url in detail_urls: detail_html = get_detail_html(detail_url) data = parse_detail(detail_html) logging.info(f"data: {data}") logging.info("saving data") save_data(data) logging.info("saving successfully") if __name__ == "__main__": pool = multiprocessing.Pool() pages = range(1, TOTAL_PAGE + 1) pool.map(main, pages) pool.close() pool.join()
合并
import multiprocessing import json import requests import logging import re from urllib.parse import urljoin from pathlib import Path logging.basicConfig( level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s') BASE_URL = "https://ssr1.scrape.center" TOTAL_PAGE = 10 RESULTS_DIR = "results" Path("./" + RESULTS_DIR).mkdir(parents=True, exist_ok=True) def get_html(url): logging.info(f"scrape {url}") try: response = requests.get(url) if response.status_code == 200: return response.text logging.error(f"status_code: {response.status_code} url: {url}") except requests.RequestException: # exc_info: 是否打印 Traceback 错误堆栈信息 logging.error(f"{url} error", exc_info=True) def get_page_html(num_of_page): page_url = f"{BASE_URL}/page/{num_of_page}" return get_html(page_url) def parse_page(html): pattern = re.compile("<a.*?href=\"(.*?)\".*?class=\"name\">") items = re.findall(pattern, html) if not items: return [] for item in items: detail_url = urljoin(BASE_URL, item) logging.info(f"detail_url: {detail_url}") yield detail_url def get_detail_html(detail_url): return get_html(detail_url) def parse_detail(html): cover_pattern = re.compile("<img.*?src=\"(.*?)\".*?class=\"cover\">", re.S) name_pattern = re.compile("<h2.*?>(.*?)</h2>") categories_pattern = re.compile( "<button.*?category.*?<span>(.*?)</span>.*?</button>", re.S) published_pattern = re.compile( "<span.*?>(\\d{4}-\\d{2}-\\d{2}).*?上映</span>", re.S) score_pattern = re.compile("<p.*?score.*?>(.*?)</p>", re.S) drama_pattern = re.compile("<div.*?drama.*?<p.*?>(.*?)</p></div>", re.S) cover = re.search( cover_pattern, html).group(1).strip() if re.search( cover_pattern, html) else None name = re.search( name_pattern, html).group(1).strip() if re.search( name_pattern, html) else None categories = re.findall( categories_pattern, html) if re.findall( categories_pattern, html) else [] published = re.search( published_pattern, html).group(1).strip() if re.search( published_pattern, html) else None score = re.search( score_pattern, html).group(1).strip() if re.search( score_pattern, html) else None drama = re.search( drama_pattern, html).group(1).strip() if re.search( drama_pattern, html) else None return { "cover": cover, "name": name, "categories": categories, "published": published, "score": score, "drama": drama } def save_data(data): name = data.get("name") data_path = f"{RESULTS_DIR}/{name}.json" json.dump( data, open( data_path, "w", encoding="utf8"), ensure_ascii=False, indent=2) def main(page): page_html = get_page_html(page) detail_urls = parse_page(page_html) for detail_url in detail_urls: detail_html = get_detail_html(detail_url) data = parse_detail(detail_html) logging.info(f"data: {data}") logging.info("saving data") save_data(data) logging.info("saving successfully") if __name__ == "__main__": pool = multiprocessing.Pool() pages = range(1, TOTAL_PAGE + 1) pool.map(main, pages) pool.close() pool.join()
猜你喜欢
- 【Python】学习如何有效使用matplotlib绘图来提高效率
- 提高效率!掌握matplotlib画图技巧与窍门引言:在数据分析和可视化的过程中,matplotlib是一个重要的工具。它提供了丰富的绘图函数和灵活的参数设置,但有时候我们可能会在使用matplotlib时遇到一些困难,导致效率低下。本文将介绍一些matplotlib的画图技巧和窍门,帮助我们更加高效地进行数据可视化。一、优化图形显示对于大型数据集的可视化,绘制出的图形可能比较拥挤,不容易观察。以下几种方法可以让我们更好地展示数据。1.使用子图:使用子图可以将图形分为多个小区域展示,每个区域显示
- 【Python】深入探究Python中len函数的工作原理和用法
- 解析Python中的len函数:探索其背后的原理和用法在Python编程语言中,len函数是一种常用的内置函数,用于获取序列对象的长度或元素个数。本文将深入探讨len函数背后的原理和用法,并提供具体的代码示例。一、len函数的原理len函数的原理非常简单,它会返回传入序列对象的元素个数。这里的序列对象可以是字符串、列表、元组、集合等。实际上,len函数是通过调用序列对象的__len__方法来实现的。__len__方法是Python内置类型(如str、list、tuple、set等)的一个特殊方法
- 【Python】如何在Python中进行数据聚合和分组
- 如何在Python中进行数据聚合和分组在数据分析和处理的过程中,经常需要对数据进行聚合和分组操作。Python提供了各种强大的库和工具,方便我们进行数据聚合和分组的操作。本文将介绍如何在Python中使用pandas库进行数据聚合和分组,并提供具体的代码示例。一、数据聚合数据聚合是将多个数据合并成一个或少量几个数据的操作。在Python中,可以使用pandas库中的groupby()函数进行数据聚合。示例代码如下:import pandas as pd
- 【Python】pycharm环境如何配置
- 配置教程:1、下载并安装PyCharm;2、选择Python解释器;3、配置虚拟环境;4、配置代码风格;5、配置调试器;6、配置版本控制工具;7、配置插件;8、配置Python路径和环境变量;9、配置其他选项。详细介绍:1、从PyCharm官网下载适合电脑操作系统的安装包,然后按照提示完成安装;2、在PyCharm中,可以选择已有的Python解释器或者添加新的解释器等等。本教程操作系统:windows10系统、Python3.11.4版本、Dell G3电脑。PyCharm环境配置教程如下:下
- 【Python】Python中的字典与JSON之间的相互转换方法有哪些?
- Python中的字典与JSON之间的相互转换方法有哪些?作为一种十分常用的数据结构,字典在Python中被广泛应用。而JSON(JavaScript Object Notation)作为一种轻量级的数据交换格式,也被广泛应用于网络数据传输和存储。在Python中,字典与JSON之间的相互转换是一项常见的操作。本文将介绍几种常用的方法,并附上相应的代码示例。方法一:使用json模块的dumps()函数和loads()函数json模块是Python标准库中用于处理JSON数据的模块。其中,dumps
- 【Python】详细解读matplotlib的安装步骤,让你快速上手绘图
- matplotlib是一个强大的Python绘图库,它可以帮助我们创建各种类型的图表,包括折线图、柱状图、散点图等。本文将详细解读matplotlib的安装步骤,并通过具体代码示例让你快速上手绘图。一、安装matplotlib要使用matplotlib,首先需要通过pip或conda安装它。如果你使用的是pip,可以在命令行中输入以下命令进行安装:$ pip install matplotlib如果你使用的是conda,可以在命令行中输入以下命令进行安装:$ conda install mat
- 【Python】如何用Python绘制3D地理图表
- 如何用Python绘制3D地理图表概述:绘制3D地理图表可以帮助我们更直观地理解地理数据和空间分布。Python作为一种功能强大且易于使用的编程语言,提供了许多库和工具,可用于绘制各种类型的地理图表。在本文中,我们将学习如何使用Python编程语言和一些流行的库,如Matplotlib和Basemap,来绘制3D地理图表。环境准备:在开始之前,我们需要确保已经安装了Python和一些必要的库。这里假设您已经安装了Python 3.x版本,并且已经安装了以下库:Matplotlib:用于绘制图表和
- 【Python】如何使用Python脚本在Linux服务器上进行网络监控
- 如何使用Python脚本在Linux服务器上进行网络监控引言:随着科技的发展和互联网的普及,网络已经成为人们生活和工作不可或缺的一部分。然而,网络的稳定性和安全性一直是重要的关注点。为了确保服务器的正常运行,网络监控是必不可少的。本文将介绍如何使用Python脚本在Linux服务器上进行网络监控,并提供具体的代码示例。一、安装必要的库在开始之前,我们需要确保服务器上安装了python相关的库,包括psutil、socket和time。对于Debian和Ubuntu,可以使用以下命令安装:sudo