爬虫进阶|Python多线程与线程池实战:让爬虫速度起飞
在爬虫开发中,网络IO等待是最大的效率瓶颈。单线程爬虫需要逐个请求网页,等待服务器响应时CPU完全空闲,而多线程能让爬虫同时发送多个请求,大幅提升爬取效率。本文用通俗易懂的方式讲解Python多线程、线程安全、线程池,配套可直接运行的爬虫示例代码。
一、为什么爬虫需要多线程?
1. 单线程爬虫的痛点
想象单线程爬虫就像一个快递员送100个快递:
必须送完一个再送下一个
等待收件人开门时,什么都做不了
总耗时 = 所有任务耗时之和
对应到爬虫:
逐个请求网页,一个请求卡住,全部卡住
网络等待期间CPU闲置,资源浪费
爬取大量数据时,速度极慢
2. 多线程爬虫的优势
多线程就像多个快递员同时送快递:
多个请求并发执行,无需排队等待
一个请求在等待响应时,其他请求正常运行
总耗时 ≈ 单个任务耗时(大幅提速)
核心适用场景:网络IO密集型任务(爬虫、接口请求、文件下载),是多线程的最佳舞台。
3. 多线程原理
单个CPU同一时刻只能执行一个线程,但操作系统会毫秒级快速切换线程,让我们感觉多个任务在同时运行。
遇到网络等待时,系统自动切换到其他就绪线程,彻底避免CPU空闲。
二、Python多线程基础(threading模块)
Python内置threading模块,无需安装,直接实现多线程。
1. 基础用法:创建并启动线程
threading模块中的Thread类可以用来创建线程对象。
构造函数中常用的两个参数是:
target:线程启动后要执行的任务(一个可调用对象,例如函数)args:传递给任务函数的参数,必须是元组类型
线程对象创建好后,并不会自动运行,需要调用start()方法启动线程。
我们用模拟爬取网页替代送快递,更贴合爬虫场景:
import time
import threading
# 模拟爬虫任务:爬取一个网页
def crawl_page(url):
print(f"开始爬取:{url}")
time.sleep(2) # 模拟网络请求等待
print(f"爬取完成:{url}")
if __name__ == '__main__':
# 创建3个线程,分别爬取3个网页
t1 = threading.Thread(target=crawl_page, args=("https://blog.csdn.net",))
t2 = threading.Thread(target=crawl_page, args=("https://www.jianshu.com",))
t3 = threading.Thread(target=crawl_page, args=("https://www.zhihu.com",))
# 启动线程
t1.start()
t2.start()
t3.start()2. 主线程等待:join()方法
直接运行会发现主线程提前结束,使用join()让主线程等待子线程完成:
import time
import threading
def crawl_page(url):
print(f"开始爬取:{url}")
time.sleep(2)
print(f"爬取完成:{url}")
if __name__ == '__main__':
start_time = time.time()
t1 = threading.Thread(target=crawl_page, args=("博客园",))
t2 = threading.Thread(target=crawl_page, args=("掘金",))
t3 = threading.Thread(target=crawl_page, args=("思否",))
t1.start()
t2.start()
t3.start()
# 等待所有线程执行完毕
t1.join()
t2.join()
t3.join()
print(f"总耗时:{time.time() - start_time:.2f}秒")运行结果:总耗时≈2秒,3个任务并发完成!
三、线程竞争与非线程安全:多线程的“坑”与解决方案
为什么会出现线程竞争?
我们用爬虫场景举个最直观的例子:
假设你有一个全局变量 total_count,用来统计爬取成功的页面数,10个线程同时执行 total_count += 1:
线程A读取
total_count = 100,还没来得及写回,线程B也读取了total_count = 100两个线程都执行
100 + 1 = 101,然后分别写回变量最终
total_count变成了101,而不是预期的102
这就是典型的线程竞争:多个线程同时读写共享资源,互相覆盖了对方的修改,导致数据错误。
而像 print 这种看似简单的操作,本质上也是一个非原子操作:它需要先输出字符串、再输出换行符,多个线程同时调用时,就会出现内容交错、换行丢失的混乱情况。
常见的非线程安全场景
(1)打印输出混乱(最直观)
问题代码:
import time
import threading
def crawl_page(url):
print(f"开始爬取:{url}")
time.sleep(2) # 模拟网络等待
print(f"爬取完成:{url}")
if __name__ == '__main__':
urls = ["url1", "url2", "url3"]
threads = [threading.Thread(target=crawl_page, args=(url,)) for url in urls]
for t in threads:
t.start()问题现象:输出内容交错、换行丢失,日志完全无法阅读。
根本原因:print 不是原子操作,多个线程同时抢占控制台输出,导致内容混乱。
(2)共享变量修改错误(最致命)
问题代码:
import time
import threading
# 共享变量:爬取成功计数
success_count = 0
def crawl_page(url):
global success_count
time.sleep(0.1) # 模拟网络请求
# 非线程安全的自增操作
success_count = success_count + 1
print(f"{url} 爬取完成,当前计数:{success_count}")
if __name__ == '__main__':
threads = []
for i in range(10):
t = threading.Thread(target=crawl_page, args=(f"url{i}",))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"最终计数:{success_count}") # 预期10,实际可能只有8、9问题现象:最终计数远小于预期,数据完全错误。
根本原因:success_count += 1 拆解为3步:读取变量 → 计算+1 → 写回变量,多线程同时执行时,会互相覆盖修改。
解决方案:用锁(Lock)实现线程安全
Python的 threading.Lock() 是解决线程竞争的标准方案,它的核心逻辑是:给共享资源加一把锁,同一时间只允许一个线程进入锁内的代码块,其他线程必须等待锁释放后才能执行,从根源上避免竞争。
(1)解决打印混乱:给print加锁
修复后代码:
import time
import threading
# 1. 创建一把打印专用的锁
print_lock = threading.Lock()
def crawl_page(url):
# 2. 用with语句自动加锁/释放锁(推荐写法,无需手动lock/unlock)
with print_lock:
print(f"开始爬取:{url}")
time.sleep(2)
with print_lock:
print(f"爬取完成:{url}")
if __name__ == '__main__':
urls = ["https://blog.csdn.net", "https://www.jianshu.com", "https://www.zhihu.com"]
threads = [threading.Thread(target=crawl_page, args=(url,)) for url in urls]
for t in threads:
t.start()
for t in threads:
t.join()效果:输出完全有序,再也不会出现内容交错。
补充:with print_lock 等价于 print_lock.acquire() 加锁 + print_lock.release() 释放锁,用with写法更简洁,还能避免忘记释放锁导致的死锁。
(2)解决共享变量错误:给变量操作加锁
修复后代码:
import time
import threading
success_count = 0
# 1. 创建计数专用的锁
count_lock = threading.Lock()
def crawl_page(url):
global success_count
time.sleep(0.1)
# 2. 给自增操作加锁,确保原子性
with count_lock:
success_count = success_count + 1
print(f"{url} 爬取完成,当前计数:{success_count}")
if __name__ == '__main__':
threads = []
for i in range(10):
t = threading.Thread(target=crawl_page, args=(f"url{i}",))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"最终计数:{success_count}") # 稳定输出10,完全正确效果:计数100%准确,彻底解决数据竞争问题。
关键注意事项
锁的粒度要尽可能小:不要把整个函数都加锁,否则会退化成单线程,失去多线程的意义。只给共享资源的操作加锁即可(比如只给
print和变量自增加锁)。避免死锁:不要在一个锁内嵌套另一个锁,尤其是多个锁交叉使用时,很容易出现线程互相等待、程序卡死的情况。
爬虫场景的最佳实践:打印日志必须加锁,否则日志混乱无法排查问题;全局计数、结果汇总等共享操作必须加锁;网络请求、数据解析等线程私有操作,完全不需要加锁,不影响并发效率。
补充:什么是原子操作?
原子操作是指不会被线程调度打断的操作,执行过程中不会出现线程竞争,天然线程安全。
比如Python中的
a = 1(赋值操作)是原子操作;而
a += 1(读取-计算-写回)是非原子操作,必须加锁。网络请求、文件读写等IO操作,本身是线程安全的(不会修改共享资源),无需加锁。
三、爬虫进阶:线程池(ThreadPoolExecutor)
手动创建线程存在明显缺点:
任务多时,创建大量线程会占用大量系统资源
线程频繁创建 / 销毁,性能开销大
无法控制并发数量,容易被网站封 IP
代码冗余,维护麻烦
线程池 = 预先创建一批线程,重复使用,自动管理,是生产环境爬虫首选方案。
1.线程池的意义
避免频繁创建和销毁线程带来的资源浪费
自动管理线程,避免手动创建大量 Thread 对象
可以高效地调度和复用已有线程
就像物流公司老板不会为每个快递专门招聘一个临时快递员,而是让固定的快递员反复去派送。
2.创建线程池
线程池由 ThreadPoolExecutor 类提供,它位于 concurrent.futures 模块中。这个模块属于标准库,无需额外安装。
from concurrent.futures import ThreadPoolExecutor3.基本用法
创建线程池对象:
pool = ThreadPoolExecutor(max_workers=3)更常见的写法是结合 with 块,这样线程池在使用结束后会自动释放资源:
with ThreadPoolExecutor(max_workers=3) as pool:
# 任务提交写在这里
...4.线程池的工作机制
线程池最多能创建的线程数由
max_workers参数决定并不会一开始就创建所有线程,而是根据提交的任务数量逐步增加线程
当线程数达到
max_workers上限时,新任务会进入等待队列,直到有空闲线程再去执行
5.提交任务
在线程池中,不需要显式创建 Thread 对象,而是通过 submit 方法提交任务。
.submit(函数名, 参数1, 参数2...)
with ThreadPoolExecutor(max_workers=3) as pool:
for i in range(10):
pool.submit(crawl_page, f"url{i+1}")线程池会自动分配线程去执行任务,任务完成后,线程会继续等待下一个任务。
6.获取任务结果
submit 方法会返回一个 Future 对象,里面保存了任务执行的状态和结果。
调用
result()方法可以获得函数返回值result()会阻塞,直到任务执行完成
future_list = []
with ThreadPoolExecutor(max_workers=3) as pool:
for i in range(10):
future = pool.submit(crawl_page, f"url{i+1}")
future_list.append(future)
for future in future_list:
print(f"任务返回值:{future.result()}")7.回调函数
如果希望任务完成后自动触发某个函数,可以使用 add_done_callback。
future = pool.submit(crawl_page, "url1")
future.add_done_callback(task_finish_callback)回调函数会接收 Future 对象作为参数,可以通过以下方法判断任务状态:
cancelled():任务是否被取消exception():任务是否抛出异常result():任务返回值(仅在正常完成时可用)
示例:
def task_finish_callback(future_obj):
if future_obj.cancelled():
print("任务被取消")
elif future_obj.exception():
print(f"任务出现异常:{future_obj.exception()}")
else:
print(f"任务执行成功:{future_obj.result()}")这样可以让结果处理、日志记录在爬取任务完成后自动触发。
8.批量提交任务
除了 submit,线程池还提供了 map 方法,可以一次性批量提交任务。
with ThreadPoolExecutor(max_workers=3) as pool:
results = pool.map(crawl_page, ["url1","url2","url3","url4","url5"])
for result in results:
print(f"任务返回值:{result}")与 submit 不同的是,map 返回的是一个迭代器,包含所有任务的返回结果,并且结果顺序与任务提交顺序一致。
9.计算合适的线程数
线程并不是越多越好,线程之间切换也有开销。 一般建议根据 CPU 核心数来决定:
对于 I/O 密集型任务(如网络请求、文件读写),可取 CPU 核心数的 2~5 倍
可以用 os.cpu_count() 获取核心数
import os
max_workers = os.cpu_count() * 2爬虫属于典型的 IO 密集型任务,使用这个公式设置线程数,既能保证速度,又不会给服务器和目标网站造成过大压力。
补充:线程池异常捕获(生产必备)
submit + result 必须捕获异常,否则一个任务失败会导致整个程序崩溃:
try:
res = future.result()
except Exception as e:
print("任务失败:", e)线程池爬虫示例
版本一:入门简洁版(适合学习理解)
特点:代码精简、逻辑清晰,自动适配 CPU 核心数,适合新手学习线程池基础用法,无额外复杂配置。
import time
import threading
from concurrent.futures import ThreadPoolExecutor
import requests
# 全局配置
# 线程锁:解决打印混乱
print_lock = threading.Lock()
# 请求头:模拟浏览器
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
# 爬虫核心函数
def crawl_task(url):
"""单个爬取任务"""
try:
with print_lock:
print(f"[线程{threading.current_thread().name}] 正在爬取:{url}")
# 发送网络请求
response = requests.get(url, headers=HEADERS, timeout=5)
response.raise_for_status() # 抛出请求异常
with print_lock:
print(f"[线程{threading.current_thread().name}] 爬取成功!状态码:{response.status_code} | {url}")
# 返回结果
return {
"url": url,
"status": "success",
"length": len(response.text)
}
except Exception as e:
with print_lock:
print(f"[线程{threading.current_thread().name}] 爬取失败:{url},原因:{str(e)}")
return {"url": url, "status": "failed", "error": str(e)}
# 线程池爬虫主函数
def thread_pool_crawl(url_list, max_workers=5):
"""
多线程爬虫入口
:param url_list: 待爬取URL列表
:param max_workers: 最大并发线程数
"""
start_time = time.time()
result_list = []
print(f"===== 开始多线程爬取,总任务数:{len(url_list)},最大并发:{max_workers} =====")
# 创建线程池(with语法自动关闭线程池)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 批量提交任务
future_list = [executor.submit(crawl_task, url) for url in url_list]
# 收集所有结果
for future in future_list:
result_list.append(future.result())
# 统计结果
success_num = len([r for r in result_list if r["status"] == "success"])
fail_num = len(result_list) - success_num
print("\n===== 爬取完成 =====")
print(f"总耗时:{time.time() - start_time:.2f} 秒")
print(f"成功:{success_num} 个 | 失败:{fail_num} 个")
return result_list
# 测试运行
if __name__ == '__main__':
# 待爬取URL列表
urls = [
"https://www.baidu.com",
"https://www.qq.com",
"https://www.163.com",
"https://www.sina.com.cn",
"https://www.sohu.com",
"https://www.zhihu.com",
"https://www.jianshu.com",
"https://www.csdn.net",
"https://www.oschina.net",
"https://www.cnblogs.com"
]
# 启动多线程爬虫
crawl_result = thread_pool_crawl(url_list=urls, max_workers=5)版本二:企业增强版(适合实际项目实战)
在简洁版基础上增加爬虫必备功能:固定可控并发、随机延迟防封、线程安全实时计数、全局结果存储,适合正式项目爬取。
import time
import threading
import random
from concurrent.futures import ThreadPoolExecutor
import requests
# ===================== 全局配置 =====================
MAX_WORKERS = 5 # 固定最大并发数,避免请求过猛
TIME_OUT = 5 # 请求超时时间
DELAY_MIN = 0.5 # 最小随机延迟
DELAY_MAX = 1.5 # 最大随机延迟
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
# ===================== 线程安全工具 =====================
print_lock = threading.Lock()
count_lock = threading.Lock()
success_count = 0
fail_count = 0
all_results = [] # 全局统一存储爬取结果
# ===================== 爬虫任务 =====================
def crawl_task(url):
global success_count, fail_count
# 随机延迟,模拟人为访问,防IP封禁
time.sleep(random.uniform(DELAY_MIN, DELAY_MAX))
try:
with print_lock:
print(f"[爬取中] {url}")
response = requests.get(url, headers=HEADERS, timeout=TIME_OUT)
response.raise_for_status()
# 线程安全实时计数
with count_lock:
success_count += 1
with print_lock:
print(f"[成功] {url} | 页面长度:{len(response.text)}")
return {"url": url, "status": "success", "length": len(response.text)}
except Exception as e:
with count_lock:
fail_count += 1
with print_lock:
print(f"[失败] {url} | 错误原因:{str(e)}")
return {"url": url, "status": "fail", "error": str(e)}
# ===================== 线程池入口 =====================
def run_crawl(url_list):
start = time.time()
global all_results
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = [executor.submit(crawl_task, url) for url in url_list]
for f in futures:
all_results.append(f.result())
# 输出统计信息
print("\n===== 爬取结束 =====")
print(f"总耗时:{time.time()-start:.2f}s")
print(f"成功:{success_count} | 失败:{fail_count}")
return all_results
# ===================== 测试运行 =====================
if __name__ == "__main__":
urls = [
"https://www.baidu.com",
"https://www.zhihu.com",
"https://www.jianshu.com",
"https://www.csdn.net",
"https://www.qq.com",
]
run_crawl(urls)四、关键知识点总结
多线程适合爬虫:爬虫是IO密集型任务,多线程能充分利用等待时间,效率翻倍
threading模块:基础多线程,适合少量任务;必须用
join()等待线程结束线程安全:共享资源(print、变量)必须加
Lock,避免数据错乱线程池(首选):
ThreadPoolExecutor自动管理线程,代码简洁、生产环境必备最佳实践:IO密集型任务线程数设为 CPU核心数×2~5,兼顾速度与稳定性
五、使用建议
学习阶段:先掌握
threading基础用法实际爬虫:直接使用线程池模板,稳定高效
合规提醒:多线程爬虫请勿高频请求目标网站,遵守
robots协议