爬虫进阶|Python多线程与线程池实战:让爬虫速度起飞

在爬虫开发中,网络IO等待是最大的效率瓶颈。单线程爬虫需要逐个请求网页,等待服务器响应时CPU完全空闲,而多线程能让爬虫同时发送多个请求,大幅提升爬取效率。本文用通俗易懂的方式讲解Python多线程、线程安全、线程池,配套可直接运行的爬虫示例代码。

一、为什么爬虫需要多线程?

1. 单线程爬虫的痛点

想象单线程爬虫就像一个快递员送100个快递

  • 必须送完一个再送下一个

  • 等待收件人开门时,什么都做不了

  • 总耗时 = 所有任务耗时之和

对应到爬虫:

  • 逐个请求网页,一个请求卡住,全部卡住

  • 网络等待期间CPU闲置,资源浪费

  • 爬取大量数据时,速度极慢

2. 多线程爬虫的优势

多线程就像多个快递员同时送快递

  • 多个请求并发执行,无需排队等待

  • 一个请求在等待响应时,其他请求正常运行

  • 总耗时 ≈ 单个任务耗时(大幅提速)

核心适用场景:网络IO密集型任务(爬虫、接口请求、文件下载),是多线程的最佳舞台。

3. 多线程原理

单个CPU同一时刻只能执行一个线程,但操作系统会毫秒级快速切换线程,让我们感觉多个任务在同时运行。

遇到网络等待时,系统自动切换到其他就绪线程,彻底避免CPU空闲。

二、Python多线程基础(threading模块)

Python内置threading模块,无需安装,直接实现多线程。

1. 基础用法:创建并启动线程

threading模块中的Thread类可以用来创建线程对象。
构造函数中常用的两个参数是:

  • target:线程启动后要执行的任务(一个可调用对象,例如函数)

  • args:传递给任务函数的参数,必须是元组类型

线程对象创建好后,并不会自动运行,需要调用start()方法启动线程。

我们用模拟爬取网页替代送快递,更贴合爬虫场景:

import time
import threading

# 模拟爬虫任务:爬取一个网页
def crawl_page(url):
    print(f"开始爬取:{url}")
    time.sleep(2)  # 模拟网络请求等待
    print(f"爬取完成:{url}")

if __name__ == '__main__':
    # 创建3个线程,分别爬取3个网页
    t1 = threading.Thread(target=crawl_page, args=("https://blog.csdn.net",))
    t2 = threading.Thread(target=crawl_page, args=("https://www.jianshu.com",))
    t3 = threading.Thread(target=crawl_page, args=("https://www.zhihu.com",))

    # 启动线程
    t1.start()
    t2.start()
    t3.start()

2. 主线程等待:join()方法

直接运行会发现主线程提前结束,使用join()让主线程等待子线程完成:

import time
import threading

def crawl_page(url):
    print(f"开始爬取:{url}")
    time.sleep(2)
    print(f"爬取完成:{url}")

if __name__ == '__main__':
    start_time = time.time()
    
    t1 = threading.Thread(target=crawl_page, args=("博客园",))
    t2 = threading.Thread(target=crawl_page, args=("掘金",))
    t3 = threading.Thread(target=crawl_page, args=("思否",))

    t1.start()
    t2.start()
    t3.start()

    # 等待所有线程执行完毕
    t1.join()
    t2.join()
    t3.join()

    print(f"总耗时:{time.time() - start_time:.2f}秒")

运行结果:总耗时≈2秒,3个任务并发完成!

三、线程竞争与非线程安全:多线程的“坑”与解决方案

为什么会出现线程竞争?

我们用爬虫场景举个最直观的例子:

假设你有一个全局变量 total_count,用来统计爬取成功的页面数,10个线程同时执行 total_count += 1

  • 线程A读取 total_count = 100,还没来得及写回,线程B也读取了 total_count = 100

  • 两个线程都执行 100 + 1 = 101,然后分别写回变量

  • 最终 total_count 变成了 101,而不是预期的 102

这就是典型的线程竞争:多个线程同时读写共享资源,互相覆盖了对方的修改,导致数据错误

而像 print 这种看似简单的操作,本质上也是一个非原子操作:它需要先输出字符串、再输出换行符,多个线程同时调用时,就会出现内容交错、换行丢失的混乱情况。

常见的非线程安全场景

(1)打印输出混乱(最直观)

问题代码

import time
import threading

def crawl_page(url):
    print(f"开始爬取:{url}")
    time.sleep(2)  # 模拟网络等待
    print(f"爬取完成:{url}")

if __name__ == '__main__':
    urls = ["url1", "url2", "url3"]
    threads = [threading.Thread(target=crawl_page, args=(url,)) for url in urls]
    for t in threads:
        t.start()

问题现象:输出内容交错、换行丢失,日志完全无法阅读。

根本原因print 不是原子操作,多个线程同时抢占控制台输出,导致内容混乱。

(2)共享变量修改错误(最致命)

问题代码

import time
import threading

# 共享变量:爬取成功计数
success_count = 0

def crawl_page(url):
    global success_count
    time.sleep(0.1)  # 模拟网络请求
    # 非线程安全的自增操作
    success_count = success_count + 1
    print(f"{url} 爬取完成,当前计数:{success_count}")

if __name__ == '__main__':
    threads = []
    for i in range(10):
        t = threading.Thread(target=crawl_page, args=(f"url{i}",))
        threads.append(t)
        t.start()
    
    for t in threads:
        t.join()
    print(f"最终计数:{success_count}")  # 预期10,实际可能只有8、9

问题现象:最终计数远小于预期,数据完全错误。

根本原因success_count += 1 拆解为3步:读取变量 → 计算+1 → 写回变量,多线程同时执行时,会互相覆盖修改。

解决方案:用锁(Lock)实现线程安全

Python的 threading.Lock() 是解决线程竞争的标准方案,它的核心逻辑是:给共享资源加一把锁,同一时间只允许一个线程进入锁内的代码块,其他线程必须等待锁释放后才能执行,从根源上避免竞争。

(1)解决打印混乱:给print加锁

修复后代码

import time
import threading

# 1. 创建一把打印专用的锁
print_lock = threading.Lock()

def crawl_page(url):
    # 2. 用with语句自动加锁/释放锁(推荐写法,无需手动lock/unlock)
    with print_lock:
        print(f"开始爬取:{url}")
    time.sleep(2)
    with print_lock:
        print(f"爬取完成:{url}")

if __name__ == '__main__':
    urls = ["https://blog.csdn.net", "https://www.jianshu.com", "https://www.zhihu.com"]
    threads = [threading.Thread(target=crawl_page, args=(url,)) for url in urls]
    for t in threads:
        t.start()
    for t in threads:
        t.join()

效果:输出完全有序,再也不会出现内容交错。

补充with print_lock 等价于 print_lock.acquire() 加锁 + print_lock.release() 释放锁,用with写法更简洁,还能避免忘记释放锁导致的死锁。

(2)解决共享变量错误:给变量操作加锁

修复后代码

import time
import threading

success_count = 0
# 1. 创建计数专用的锁
count_lock = threading.Lock()

def crawl_page(url):
    global success_count
    time.sleep(0.1)
    # 2. 给自增操作加锁,确保原子性
    with count_lock:
        success_count = success_count + 1
        print(f"{url} 爬取完成,当前计数:{success_count}")

if __name__ == '__main__':
    threads = []
    for i in range(10):
        t = threading.Thread(target=crawl_page, args=(f"url{i}",))
        threads.append(t)
        t.start()
    
    for t in threads:
        t.join()
    print(f"最终计数:{success_count}")  # 稳定输出10,完全正确

效果:计数100%准确,彻底解决数据竞争问题。

关键注意事项

  1. 锁的粒度要尽可能小:不要把整个函数都加锁,否则会退化成单线程,失去多线程的意义。只给共享资源的操作加锁即可(比如只给print和变量自增加锁)。

  2. 避免死锁:不要在一个锁内嵌套另一个锁,尤其是多个锁交叉使用时,很容易出现线程互相等待、程序卡死的情况。

  3. 爬虫场景的最佳实践打印日志必须加锁,否则日志混乱无法排查问题;全局计数、结果汇总等共享操作必须加锁;网络请求、数据解析等线程私有操作,完全不需要加锁,不影响并发效率。

补充:什么是原子操作

原子操作是指不会被线程调度打断的操作,执行过程中不会出现线程竞争,天然线程安全。

  • 比如Python中的 a = 1(赋值操作)是原子操作;

  • a += 1(读取-计算-写回)是非原子操作,必须加锁。

  • 网络请求、文件读写等IO操作,本身是线程安全的(不会修改共享资源),无需加锁。

三、爬虫进阶:线程池(ThreadPoolExecutor)

手动创建线程存在明显缺点:

  • 任务多时,创建大量线程会占用大量系统资源

  • 线程频繁创建 / 销毁,性能开销大

  • 无法控制并发数量,容易被网站封 IP

  • 代码冗余,维护麻烦

线程池 = 预先创建一批线程,重复使用,自动管理,是生产环境爬虫首选方案

1.线程池的意义

  • 避免频繁创建和销毁线程带来的资源浪费

  • 自动管理线程,避免手动创建大量 Thread 对象

  • 可以高效地调度和复用已有线程

就像物流公司老板不会为每个快递专门招聘一个临时快递员,而是让固定的快递员反复去派送。

2.创建线程池

线程池由 ThreadPoolExecutor 类提供,它位于 concurrent.futures 模块中。这个模块属于标准库,无需额外安装。

from concurrent.futures import ThreadPoolExecutor

3.基本用法

创建线程池对象:

pool = ThreadPoolExecutor(max_workers=3)

更常见的写法是结合 with 块,这样线程池在使用结束后会自动释放资源:

with ThreadPoolExecutor(max_workers=3) as pool:
    # 任务提交写在这里
    ...

4.线程池的工作机制

  • 线程池最多能创建的线程数由 max_workers 参数决定

  • 并不会一开始就创建所有线程,而是根据提交的任务数量逐步增加线程

  • 当线程数达到 max_workers 上限时,新任务会进入等待队列,直到有空闲线程再去执行

5.提交任务

在线程池中,不需要显式创建 Thread 对象,而是通过 submit 方法提交任务。

.submit(函数名, 参数1, 参数2...)

with ThreadPoolExecutor(max_workers=3) as pool:
    for i in range(10):
        pool.submit(crawl_page, f"url{i+1}")

线程池会自动分配线程去执行任务,任务完成后,线程会继续等待下一个任务。

6.获取任务结果

submit 方法会返回一个 Future 对象,里面保存了任务执行的状态和结果。

  • 调用 result() 方法可以获得函数返回值

  • result() 会阻塞,直到任务执行完成

future_list = []

with ThreadPoolExecutor(max_workers=3) as pool:
    for i in range(10):
        future = pool.submit(crawl_page, f"url{i+1}")
        future_list.append(future)

for future in future_list:
    print(f"任务返回值:{future.result()}")

7.回调函数

如果希望任务完成后自动触发某个函数,可以使用 add_done_callback

future = pool.submit(crawl_page, "url1")
future.add_done_callback(task_finish_callback)

回调函数会接收 Future 对象作为参数,可以通过以下方法判断任务状态:

  • cancelled():任务是否被取消

  • exception():任务是否抛出异常

  • result():任务返回值(仅在正常完成时可用)

示例:

def task_finish_callback(future_obj):
    if future_obj.cancelled():
        print("任务被取消")
    elif future_obj.exception():
        print(f"任务出现异常:{future_obj.exception()}")
    else:
        print(f"任务执行成功:{future_obj.result()}")

这样可以让结果处理、日志记录在爬取任务完成后自动触发。

8.批量提交任务

除了 submit,线程池还提供了 map 方法,可以一次性批量提交任务。

with ThreadPoolExecutor(max_workers=3) as pool:
    results = pool.map(crawl_page, ["url1","url2","url3","url4","url5"])

for result in results:
    print(f"任务返回值:{result}")

submit 不同的是,map 返回的是一个迭代器,包含所有任务的返回结果,并且结果顺序与任务提交顺序一致

9.计算合适的线程数

线程并不是越多越好,线程之间切换也有开销。 一般建议根据 CPU 核心数来决定:

对于 I/O 密集型任务(如网络请求、文件读写),可取 CPU 核心数的 2~5 倍

可以用 os.cpu_count() 获取核心数

import os
max_workers = os.cpu_count() * 2

爬虫属于典型的 IO 密集型任务,使用这个公式设置线程数,既能保证速度,又不会给服务器和目标网站造成过大压力。

补充:线程池异常捕获(生产必备)

submit + result 必须捕获异常,否则一个任务失败会导致整个程序崩溃:

try:
    res = future.result()
except Exception as e:
    print("任务失败:", e)

线程池爬虫示例

版本一:入门简洁版(适合学习理解)

特点:代码精简、逻辑清晰,自动适配 CPU 核心数,适合新手学习线程池基础用法,无额外复杂配置。

import time
import threading
from concurrent.futures import ThreadPoolExecutor
import requests

# 全局配置
# 线程锁:解决打印混乱
print_lock = threading.Lock()
# 请求头:模拟浏览器
HEADERS = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}

# 爬虫核心函数
def crawl_task(url):
    """单个爬取任务"""
    try:
        with print_lock:
            print(f"[线程{threading.current_thread().name}] 正在爬取:{url}")
        
        # 发送网络请求
        response = requests.get(url, headers=HEADERS, timeout=5)
        response.raise_for_status()  # 抛出请求异常
        
        with print_lock:
            print(f"[线程{threading.current_thread().name}] 爬取成功!状态码:{response.status_code} | {url}")
        
        # 返回结果
        return {
            "url": url,
            "status": "success",
            "length": len(response.text)
        }
    
    except Exception as e:
        with print_lock:
            print(f"[线程{threading.current_thread().name}] 爬取失败:{url},原因:{str(e)}")
        return {"url": url, "status": "failed", "error": str(e)}

# 线程池爬虫主函数
def thread_pool_crawl(url_list, max_workers=5):
    """
    多线程爬虫入口
    :param url_list: 待爬取URL列表
    :param max_workers: 最大并发线程数
    """
    start_time = time.time()
    result_list = []

    print(f"===== 开始多线程爬取,总任务数:{len(url_list)},最大并发:{max_workers} =====")

    # 创建线程池(with语法自动关闭线程池)
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # 批量提交任务
        future_list = [executor.submit(crawl_task, url) for url in url_list]

        # 收集所有结果
        for future in future_list:
            result_list.append(future.result())

    # 统计结果
    success_num = len([r for r in result_list if r["status"] == "success"])
    fail_num = len(result_list) - success_num

    print("\n===== 爬取完成 =====")
    print(f"总耗时:{time.time() - start_time:.2f} 秒")
    print(f"成功:{success_num} 个 | 失败:{fail_num} 个")

    return result_list

# 测试运行
if __name__ == '__main__':
    # 待爬取URL列表
    urls = [
        "https://www.baidu.com",
        "https://www.qq.com",
        "https://www.163.com",
        "https://www.sina.com.cn",
        "https://www.sohu.com",
        "https://www.zhihu.com",
        "https://www.jianshu.com",
        "https://www.csdn.net",
        "https://www.oschina.net",
        "https://www.cnblogs.com"
    ]

    # 启动多线程爬虫
    crawl_result = thread_pool_crawl(url_list=urls, max_workers=5)

版本二:企业增强版(适合实际项目实战)

在简洁版基础上增加爬虫必备功能:固定可控并发、随机延迟防封、线程安全实时计数、全局结果存储,适合正式项目爬取。

import time
import threading
import random
from concurrent.futures import ThreadPoolExecutor
import requests

# ===================== 全局配置 =====================
MAX_WORKERS = 5          # 固定最大并发数,避免请求过猛
TIME_OUT = 5             # 请求超时时间
DELAY_MIN = 0.5          # 最小随机延迟
DELAY_MAX = 1.5          # 最大随机延迟
HEADERS = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}

# ===================== 线程安全工具 =====================
print_lock = threading.Lock()
count_lock = threading.Lock()
success_count = 0
fail_count = 0
all_results = []  # 全局统一存储爬取结果

# ===================== 爬虫任务 =====================
def crawl_task(url):
    global success_count, fail_count
    # 随机延迟,模拟人为访问,防IP封禁
    time.sleep(random.uniform(DELAY_MIN, DELAY_MAX))  

    try:
        with print_lock:
            print(f"[爬取中] {url}")

        response = requests.get(url, headers=HEADERS, timeout=TIME_OUT)
        response.raise_for_status()

        # 线程安全实时计数
        with count_lock:
            success_count += 1

        with print_lock:
            print(f"[成功] {url} | 页面长度:{len(response.text)}")

        return {"url": url, "status": "success", "length": len(response.text)}

    except Exception as e:
        with count_lock:
            fail_count += 1
        with print_lock:
            print(f"[失败] {url} | 错误原因:{str(e)}")
        return {"url": url, "status": "fail", "error": str(e)}

# ===================== 线程池入口 =====================
def run_crawl(url_list):
    start = time.time()
    global all_results

    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        futures = [executor.submit(crawl_task, url) for url in url_list]
        for f in futures:
            all_results.append(f.result())

    # 输出统计信息
    print("\n===== 爬取结束 =====")
    print(f"总耗时:{time.time()-start:.2f}s")
    print(f"成功:{success_count} | 失败:{fail_count}")
    return all_results

# ===================== 测试运行 =====================
if __name__ == "__main__":
    urls = [
        "https://www.baidu.com",
        "https://www.zhihu.com",
        "https://www.jianshu.com",
        "https://www.csdn.net",
        "https://www.qq.com",
    ]
    run_crawl(urls)

四、关键知识点总结

  1. 多线程适合爬虫:爬虫是IO密集型任务,多线程能充分利用等待时间,效率翻倍

  2. threading模块:基础多线程,适合少量任务;必须用join()等待线程结束

  3. 线程安全:共享资源(print、变量)必须加Lock,避免数据错乱

  4. 线程池(首选)ThreadPoolExecutor自动管理线程,代码简洁、生产环境必备

  5. 最佳实践:IO密集型任务线程数设为 CPU核心数×2~5,兼顾速度与稳定性

五、使用建议

  1. 学习阶段:先掌握threading基础用法

  2. 实际爬虫:直接使用线程池模板,稳定高效

  3. 合规提醒:多线程爬虫请勿高频请求目标网站,遵守robots协议