Python(十五):第十四章:深入解析并发编程

第十四章:深入解析并发编程

14. 并发编程基本概念

并发编程是指程序设计中允许多个任务同时执行的编程模式,它的核心目标是 提升执行效率。通过并发编程,原本需要 20 分钟执行的代码可能只需要 1 分钟就能完成。

进程调度机制解析

CPU 在执行程序时会涉及进程调度,主要有两种切换情况:

  1. I/O 操作触发切换:当程序遇到 I/O 操作时,操作系统会剥夺该程序对 CPU 的执行权限
  2. 时间片用尽触发切换:当一个程序长时间占用 CPU 时,操作系统也会剥夺程序对 CPU 的执行权限

所谓 I/O 操作,指的为 阻断 程序的操作,类似于 input() 函数会将程序暂停运行,达到某一个条件后才会接触阻塞状态

时间片即 CPU 分配给各个程序的时间,每个线程被分配一个时间段,称作它的时间片,即该进程允许运行的时间,使各个程序从表面上看是同时进行的。如果在时间片结束时进程还在运行,则 CPU 将被剥夺并分配给另一个进程。如果进程在时间片结束前阻塞或结束,则 CPU 当即进行切换。而不会造成 CPU 资源浪费。

在宏观上:我们可以同时打开多个应用程序,每个程序并行不悖,同时运行。

但在微观上:由于只有一个 CPU,一次只能处理程序要求的一部分,如何处理公平,一种方法就是引入时间片,每个程序轮流执行。

进程的三大状态与生命周期

进程在其生命周期中会经历三种基本状态:

image-20250426212152903

首先一个程序想要被运行,当用户双击图标后,此时程序就会从硬盘加载到内存,所有的程序想要被执行就必须经历就绪态,然后等待 CPU 执行,就绪态之后会进入进程调度,然后运行

运行时会出现以下几种情况:

  • 1.时间片运行完毕,程序也执行完毕,释放资源后退出
  • 2.程序运行过程遇到 I/O 操作(读写、发送网络请求)它是不需要 CPU 工作的,只要运行遇到了 I/O,操作系统就会把 CPU 拿走,执行其他的时间片,程序就会进入阻塞态,当 IO 请求完成后它就会结束阻塞态,回到就绪态里排队

14.1 同步与异步编程模型

同步和异步

同步:任务提交之后,原地等待任务的返回结果,等待的过程中不做任何事情

异步:任务提交之后,不再等待任务的返回结果,而是去做一些其他的事情

这两个概念主要 描述任务的提交方式

📝 实际应用:在 Web 开发中,同步请求会阻塞页面渲染,而异步请求(AJAX)则可以在后台处理数据,不影响用户体验。

阻塞和非阻塞

这两个概念主要 描述进程的运行状态

  • 阻塞:对应进程的阻塞态
  • 非阻塞:对应进程的就绪态、运行态

结合同步/异步和阻塞/非阻塞,可以形成四种组合:

  • 同步阻塞
  • 同步非阻塞
  • 异步阻塞
  • 异步非阻塞(CPU 利用率最高的一种模式)

🔍 在实际开发中,异步非阻塞模式是高并发系统的首选模式,因为它允许程序在等待 I/O 操作时继续执行其他任务。


14.2 多进程编程技术

进程基础

进程 是程序在计算机中的一次执行过程:

  • 程序 是静态的可执行文件,占用磁盘空间
  • 进程 是动态的执行过程,占用计算机运行资源

类比:一个工厂有三个车间,每个车间一个工人(共 3 人),并行处理任务,相当于一个程序创建三个进程,每个进程一个线程(共 3 人),并行处理任务。

进程创建方法

1
2
3
4
5
6
7
8
9
10
from multiprocessing import Process
Process(target,name,args,kwargs)
'''''''''''''''''
功能 : 创建进程对象
参数 :
target 绑定要执行的目标函数
name 进程名,默认是Process-x(整数)
args 元组,用于给target函数位置传参
kwargs 字典,给target函数键值传参
'''''''''''''''''''
方法一:使用 Process 类创建进程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from multiprocessing import Process
import time
import os


# 创建进程的标准方法
def worker_function(name, age):
"""子进程执行过程中触发的函数"""
print(f"子进程ID:{os.getpid()},父进程ID{os.getppid()}")
print(f"子进程正在执行,参数name={name},age={age}")
# 模拟耗时操作
time.sleep(5)
print(f"子进程{name}执行完毕")


def cpu_intensive_task(number):
"""CPU密集型任务"""
result = 0
for i in range(number):
result += i * i
print(f"进程{os.getpid()} 计算完成,结果为{result}")


if __name__ == '__main__':
print(f"父进程ID:{os.getpid()}")
start_time = time.time()

# 创建多个进程,体现并行处理能力
processes = []

# 创建四个检测执行不同人物
p1 = Process(target=worker_function, args=("张三",), kwargs={"age": 20})
p2 = Process(target=worker_function, args=("李四",), kwargs={"age": 30})
p3 = Process(target=cpu_intensive_task, args=(10000,))
p4 = Process(target=cpu_intensive_task, args=(20000,))

processes.extend([p1, p2, p3, p4]) # 将进程添加到列表中

for p in processes:
p.start() # 启动进程

# 等待所有进程结束
for p in processes:
p.join()

end_time = time.time()
print(f"所有进程执行完毕,总耗时{end_time - start_time:.2f}秒")
print("如果使用单进程顺序执行,耗时会更长,因为是两个任务在执行,多进程可以充分利用多核CPU并行处理任务")

⚠️ 重要提示:在 Windows 系统中,必须在 if __name__ == '__main__' 条件下创建进程,这是因为 Windows 使用 spawn 方式创建进程,会重新导入模块,可能导致递归创建进程。

方法二:继承 Process 类创建进程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
from multiprocessing import Process
import time
import os


# 通过继承Process类创建进程
class WorkerProcess(Process):
"""继承Process类的自定义进程"""
def __init__(self, name, age=None):
super().__init__()
self.name = name
self.age = age

def run(self):
"""重写run方法,进程启动后会执行该方法"""
print(f"子进程ID:{os.getpid()},父进程ID{os.getppid()}")
print(f"子进程正在执行,参数name={self.name},age={self.age}")
# 模拟耗时操作
time.sleep(5)
print(f"子进程{self.name}执行完毕")


class CPUIntensiveProcess(Process):
"""CPU密集型任务进程"""
def __init__(self, number):
super().__init__()
self.number = number

def run(self):
"""重写run方法,执行CPU密集型计算"""
result = 0
for i in range(self.number):
result += i * i
print(f"进程{os.getpid()} 计算完成,结果为{result}")


if __name__ == '__main__':
print(f"父进程ID:{os.getpid()}")
start_time = time.time()

# 创建多个进程,体现并行处理能力
processes = []

# 创建四个进程实例
p1 = WorkerProcess("张三", 20)
p2 = WorkerProcess("李四", 30)
p3 = CPUIntensiveProcess(10000)
p4 = CPUIntensiveProcess(20000)

processes.extend([p1, p2, p3, p4]) # 将进程添加到列表中

for p in processes:
p.start() # 启动进程

# 等待所有进程结束
for p in processes:
p.join()

end_time = time.time()
print(f"所有进程执行完毕,总耗时{end_time - start_time:.2f}秒")

多进程常用方法表

方法名说明实际应用场景
Process(target=...)创建进程对象指定新进程要执行的函数
start()启动进程开始执行进程的任务
join()等待进程结束协调多个进程的执行顺序
is_alive()检查进程是否存活监控进程状态
terminate()强制终止进程中断异常或超时的进程
Queue()创建进程安全的队列进程间数据传递
put(item)添加元素到队列向队列中放入数据
get()从队列获取元素从队列获取数据
Pipe()创建管道对象进程间双向通信

进程号与进程信息获取

在多进程编程中,获取进程信息对于调试和管理至关重要。Python 的 multiprocessing 模块提供了 current_process() 方法来获取当前进程的信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import multiprocessing
import os

def process_info():
"""打印当前进程的信息"""
# 获取当前进程对象
process = multiprocessing.current_process()

print(f"进程名称: {process.name}")
print(f"进程ID: {process.pid}")
print(f"父进程ID: {os.getppid()}")
print(f"进程授权键: {process.authkey}")
print(f"进程是否活跃: {process.is_alive()}")

if __name__ == '__main__':
p = multiprocessing.Process(target=process_info, name="自定义进程名")
p.start()
p.join()

进程间通信示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from multiprocessing import Process,Queue
import time

## 子进程执行的函数
def worker(q):
"""子进程函数,想往父进程发送消息,就往这个队列里放"""
print("子进程启动")
# 向队列中添加数据
q.put("我是一个队列数据")
time.sleep(2) # 模拟任务执行
print("子进程结束")

if __name__ == '__main__':
q = Queue() # 父进程创建队列
# 创建一个子进程对象
p = Process(target=worker, args=(q,))
# 启动子进程
p.start()
# 主进程从队列中获取数据
print("主进程等待子进程数据....")
message = q.get() # 阻塞等待数据
print(f"主进程收到来自于子进程的消息:{message}")
# 等待子进程结束
p.join()
print("主进程结束")

复杂进程通信示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import os
import time
from multiprocessing import Process, Queue, Lock, Value, Array


## 子进程执行的函数
def worker_with_args(q, lock, value, arr, sleep_num):
"""带有共享资源的工作函数
Args:
q: 队列
lock: 锁
value: 值
arr: 数组
sleep_num: 休眠时间
"""
# 获取进程id
pid = os.getpid()
print(f"进程{pid}开始执行")

# 使用锁来保证数据安全
with lock: # 相当于lock.acquire()和lock.release()的组合
value.value += 1
for i in range(len(arr)):
arr[i] **= 2 # 安全的修改数组元素
print(f"进程{pid}修改数组元素{i}{arr[i]}")
# 向队列中添加数据
q.put(f"这是一条来自于进程{pid}的信息")

# 休眠模拟工作
time.sleep(sleep_num)

print(f"进程{pid}执行完毕")


if __name__ == "__main__":
"""
解释输出逻辑:
1. 创建了4个进程,它们共享同一个数组arr,初始值为[1,2,3,4,5]
2. 每个进程获取锁后,会将数组中的每个元素进行平方操作(arr[i] **= 2)
3. 由于进程是按顺序启动的,但执行顺序不确定,所以:
- 第一个获得锁的进程将[1,2,3,4,5]平方为[1,4,9,16,25]
- 第二个获得锁的进程将[1,4,9,16,25]平方为[1,16,81,256,625]
- 第三个获得锁的进程将[1,16,81,256,625]平方为[1,256,6561,65536,390625]
- 第四个获得锁的进程将[1,256,6561,65536,390625]平方,但由于整数溢出,
导致最后两个元素变成了0和负数
4. 进程完成的顺序取决于sleep_num参数(1,2,3,4),所以最先完成的是第一个进程
"""
# 创建队列
q = Queue()
# 创建锁
lock = Lock()
# 创建一个共享值
value = Value("i", 0) # "i"表示int类型

# 创建一个共享数组
arr = Array("i", [1, 2, 3, 4, 5])

# 创建多个子进程
processes = []
for i in range(4):
p = Process(target=worker_with_args, args=(q, lock, value, arr, i + 1))
p.start()
processes.extend([p])

进程池详解

进程池是一种管理多个进程的方式,可以简化并行计算的编程。Python 的 multiprocessing 模块中的 Pool 类和 concurrent.futures 模块的 ProcessPoolExecutor 类都提供了进程池功能。

进程池的主要方法
方法描述使用场景
Pool(processes=None)创建进程池,进程数默认为 CPU 核数初始化进程池
apply(func, args)阻塞执行任务需要顺序执行且等待结果的场景
apply_async(func, args)非阻塞执行任务需要异步执行的场景
map(func, iterable)并行执行映射任务对列表元素并行处理
close()关闭进程池,不再接受新任务完成任务提交后
terminate()立即终止所有工作进程需要强制停止时
join()等待所有工作进程退出在 close()后使用
ProcessPoolExecutor 示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from concurrent.futures import ProcessPoolExecutor
import os

def worker_function(name,age):
"""进程池工作函数"""
pid = os.getpid()
print(f"进程{pid}{name}{age}岁")
return f"我的父进程是{os.getppid()} 我结束进程了"

if __name__ == '__main__':
with ProcessPoolExecutor(max_workers=4) as executor:
for i in range(10):
# submit方法提交任务到进程池,返回一个Future对象
# submit参数: function, *args, **kwargs
# 这里的i作为worker_function函数的第二个参数age传入
future = executor.submit(worker_function, f"小明{i}", i)
# 等待future对象返回结果
result = future.result()
print(result)

Pool 对象示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from multiprocessing import Pool
import os

def worker_function(args):
"""工作进程函数"""
name,age = args
pid = os.getpid()
print(f"进程{pid}{name}{age}岁")
return f"我的父进程是{os.getppid()} 我结束进程了"

if __name__ == '__main__':
with Pool() as pool:
# 准备参数列表
args_list = [(f"张三{i}号", i+18) for i in range(1,10)]

# map方法将函数应用于参数列表,并返回结果列表
# pool.map 的工作原理:
# 1. 它接收两个参数:要执行的函数(worker_function)和可迭代的参数列表(args_list)
# 2. 它会自动将参数列表中的每个元素分配给不同的进程来执行
# 3. 每个进程会调用worker_function并传入args_list中的一个元素作为参数
# 4. 所有进程执行完毕后,map会收集所有进程的返回值,并按原始参数的顺序返回结果列表
# 5. 这样实现了并行处理,提高了计算效率
result_list = pool.map(worker_function, args_list)

# 打印每个进程的返回结果
for result in result_list:
print(result)

进程号与进程信息获取

在多进程编程中,获取进程信息对于调试和管理至关重要。Python 的 multiprocessing 模块提供了 current_process() 方法来获取当前进程的信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import multiprocessing
import os

def process_info():
"""打印当前进程的信息"""
# 获取当前进程对象
process = multiprocessing.current_process()

print(f"进程名称: {process.name}")
print(f"进程ID: {process.pid}")
print(f"父进程ID: {os.getppid()}")
print(f"进程授权键: {process.authkey}") # 授权键用于在进程间通信时进行身份验证。
print(f"进程是否活跃: {process.is_alive()}")

if __name__ == '__main__':
p = multiprocessing.Process(target=process_info, name="自定义进程名")
p.start()
p.join()
进程状态特殊情况
僵尸进程
1
2
3
4
5
6
7
8
9
10
'''
子进程死后还会有一些资源占用(进程号,进程的运行状态,运行时间),等待父进程通过系统调用
进行资源回收

相当于子进程死了之后,需要父进程来给他"收尸"
除了init进程之外,所有的进程最后都会步入僵尸进程
在一种情况下是会带来危害的:
子进程退出之后,父进程没有及时处理,僵尸进程就会一直占用资源
如果产生了大量僵尸进程,资源过度使用,系统没有可用的进程号,导致系统不能产生新的进程
'''

注意:在 Windows 中,子进程退出后会立即被系统回收,不会产生真正的僵尸进程,在 Windows 系统中,不需要显式调用 wait 来回收子进程资源

孤儿进程
1
2
3
4
'''
子进程处于存活状态,父进程意外死亡,操作系统就会开设一个孤儿院(init进程),用来管理
孤儿进程,回收孤儿进程相关资源
'''

📝 知识点:操作系统会自动处理孤儿进程,将它们的父进程更改为 init 进程(PID 为 1),所以孤儿进程不会造成资源泄漏问题。

14.3 多线程编程深入解析

线程基础

线程 是轻量级的进程,也是多任务编程的一种方式:

  • 一个进程中可以包含多个线程
  • 线程也是一个运行行为,消耗计算机资源
  • 一个进程中的所有线程共享这个进程的资源
  • 线程的创建和销毁消耗资源远小于进程

一个工厂至少有一个车间,一个车间中至少有一个工人,工人去利用车间的设备工作;

一个程序至少有一个进程,一个进程中至少有一个线程,线程去利用进程的资源工作。

线程创建方法

方法一:使用 Thread 类创建线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from threading import Thread
import time

def worker_function(name,delay):
print(f"线程{name}开始工作")
time.sleep(delay)
print(f"线程{name}结束工作")

if __name__ == '__main__':
t1 = Thread(target=worker_function,args=("线程1",2))
t2 = Thread(target=worker_function,args=("线程2",4))
# 启动线程
t1.start()
t2.start()
# 等待线程结束
t1.join()
t2.join()
print("主线程结束")
方法二:继承 Thread 类创建线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from threading import Thread
import time
class MyThread(Thread):
def __init__(self,name,message):
super().__init__()
self.name = name
self.message = message

def run(self):
print(f"线程{self.name}开始执行")
time.sleep(2)
print(f"线程{self.name}执行完毕,消息:{self.message}")

if __name__ == '__main__':
t1 = MyThread(name="线程1",message="子进程操作完毕")
t2 = MyThread(name="线程2",message="子进程操作完毕")
t1.start()
t2.start()
t1.join()
t2.join()
print("主进程执行完毕")

线程常用方法表

方法名说明实际应用场景
start()启动线程开始执行线程任务
run()定义线程执行的任务重写该方法自定义线程行为
join()等待线程结束协调线程执行顺序
join(timeout)等待线程结束,有超时时间防止无限等待
is_alive()检查线程是否活动监控线程状态
getName()获取线程名称调试和日志记录
setName(name)设置线程名称便于识别不同线程
setDaemon(T/F)设置为守护线程随主线程结束而结束的后台任务
isDaemon()检查是否为守护线程确认线程类型
getId()获取线程 ID唯一标识线程
current_thread获取当前线程对象在函数中获取当前执行线程

线程使用实例

基本线程示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from threading import Thread
import time
import requests

def download_file(url,session):
"""访问网站下载文件"""
with session.get(url) as response:
print(f"读取{url} 长度为{len(response.content)}")


def download_all_sites(urls):
"""单线程下载所有网站"""
with requests.Session() as session:
for url in urls:
download_file(url,session)
time.sleep(1)

def download_site_thread(url,session):
"""多线程下载网站"""
download_file(url,session)

def download_all_sites_thread(urls):
"""多线程下载所有网站"""
threads = []
with requests.Session() as session:
for url in urls:
thread = Thread(target=download_site_thread, args=(url,session))
threads.append(thread)
thread.start()
# 等待所有线程结束
for thread in threads:
thread.join()
if __name__ == '__main__':
# 准备一些网站用于演示
sites = [
"https://www.baidu.com",
"https://www.sina.com.cn",
"https://www.qq.com",
"https://www.163.com",
"https://www.sohu.com",
]
# 单线程下载
print("=======单线程下载开始========")
start_time = time.time()
download_all_sites(sites)
end_time = time.time()
print(f"单线程下载结束,耗时{end_time-start_time}秒")

print("\n=======多线程下载开始========")
start_time = time.time()
download_all_sites_thread(sites)
end_time = time.time()
print(f"多线程下载结束,耗时{end_time-start_time}秒")
print("\nIO密集型任务(如网络请求)适合使用多线程,可以显著提高性能")
print("这是因为当一个线程等待IO操作完成时,其他线程可以继续执行")

守护线程示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from threading import Thread
import time

def worker_function(thread_name):
print(f"线程 {thread_name} 启动")
time.sleep(3)
print(f"线程 {thread_name} 结束")


if __name__ == '__main__':
t1 = Thread(target=worker_function, args=("Thread-1",))
t2 = Thread(target=worker_function, args=("Thread-2",))
# 设置t3为守护线程,主线程结束时,t3线程也会结束
t3 = Thread(target=worker_function, args=("Thread-3",), daemon=True)
# t3.setDaemon(True) # 已经被废弃的API,现在使用daemon=True参数代替
# 启动普通线程
t1.start()
t2.start()
# 启动守护线程
t3.start() # 这里需要启动t3线程,否则t3不会执行

# 等待普通线程完成
print(f"等待线程{t1.name} + {t2.name}完成...")
t1.join()
t2.join()

# 检测线程状态
print(f"线程{t1.name}是否存活:{t1.is_alive()}")
print(f"线程{t2.name}是否存活:{t2.is_alive()}")
print(f"线程{t3.name}是否存活:{t3.is_alive()}") # True

# 主线程休眠一段时间,以便守护线程有机会执行
time.sleep(10)
print("主线程结束")

💡 守护线程特性:守护线程会随着主线程的结束而结束,不管它是否执行完成。适用于需要在后台运行但不要求必须完成的任务,如监控、日志记录等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from threading import Thread
import time
import logging
import psutil
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(levelname)s %(message)s',
)

def system_monitor(interval=1):
"""
守护线程:系统资源监控器
持续监控CPU使用率和内存使用情况,并记录到日志中
"""
logging.info('系统监控守护线程启动')
try:
while True:
cpu_usage = psutil.cpu_percent(interval=interval)
mem_usage = psutil.virtual_memory().percent
logging.info(f'CPU使用率:{cpu_usage}% 内存使用率:{mem_usage}%')
time.sleep(interval)
if cpu_usage > 80 or mem_usage > 80:
logging.warning('系统资源占用过高,请及时处理')
except Exception as e:
logging.error(f'系统监控线程异常:{e}')
finally:
logging.info('系统监控守护线程结束')
if __name__ == '__main__':
# 创建并启动系统监控守护线程
monitor_thread = Thread(target=system_monitor,args=(1,),daemon=True,name="MonitorThread")
monitor_thread.start()
# 主线程继续执行一段时间,守护线程在后台运行
logging.info("主线程运行中,监控守护线程在后台运行...")
time.sleep(30) # 运行30秒后结束
# 主线程结束,守护线程将自动终止
logging.info(f"监控守护线程是否存活: {monitor_thread.is_alive()}")
logging.info("主线程结束,守护线程将自动终止")

线程池详解

线程池是一种管理线程资源的方式,它预先创建一定数量的线程,然后复用这些线程来执行任务,避免了频繁创建和销毁线程的开销。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from concurrent.futures import ThreadPoolExecutor
import time


# 定义一个耗时函数
def time_consuming_task(n):
time.sleep(1)
return n * n


if __name__ == '__main__':
start_time = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
# 使用submit方法提交任务给线程池执行,返回Future对象列表
futures = [executor.submit(time_consuming_task, i) for i in range(1, 21)]
print("任务已提交,主线程继续执行.........")
# 等待所有任务完成,并获取结果
results = [future.result() for future in futures]
print(f"任务执行完毕,结果为:{results}")
end_time = time.time()
print(f"总共耗时:{end_time - start_time}秒")
### 线程池在with语句结束时自动关闭
ThreadPoolExecutor 主要方法
方法名简洁解释适用场景
submit(fn, *args)异步执行函数,返回 Future 对象单独提交任务并获取结果
map(func, *iterables)对每个输入并行执行函数批量处理类似任务
shutdown(wait=True)关闭执行器资源释放
result()获取任务执行结果获取异步任务的返回值
add_done_callback(fn)添加任务完成回调函数任务完成后的后续处理
as_completed()返回已完成任务的迭代器先处理先完成的任务
wait()等待任务完成任务同步点

🔍 深入理解:线程池最大的好处是控制并发数量,防止系统资源被耗尽。在实际开发中,建议将线程数设置为 CPU 核心数的 1-5 倍,具体取决于任务是 I/O 密集型还是 CPU 密集型。

完整示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
from concurrent.futures import ThreadPoolExecutor,as_completed,wait
import time

def time_consuming_task(n):
"""耗时任务"""
time.sleep(n % 3 +1) # 不同的n值,耗时不同
return n * n

def task_done_callback(future):
"""任务完成后触发的回调函数"""
print(f"任务完成,结果为{future.result()}")

if __name__ == '__main__':
print("===== 1. submit方法示例 =====")
start_time = time.time()
with ThreadPoolExecutor(max_workers=3) as executor:
future_list = [executor.submit(time_consuming_task, i) for i in range(10)]
print("提交任务完成,等待结果...")
# result():获取任务执行结果,会阻塞直到所有任务完成
# as_completed(): 返回已完成任务的迭代器
for future in as_completed(future_list):
print(f"任务{future.result()}完成")
end_time = time.time()
print(f"总耗时:{end_time - start_time}秒")

print("\n===== 2. map方法示例 =====")
start_time = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
# map():对每一个输入并行执行函数,返回结果迭代器
# 与submit不同,map会自动收集结果并按输入顺序返回
# 不需要手动调用future.result()
results = executor.map(time_consuming_task, range(10))
print("任务提交完成,直接获取有序结果...")
# 转换为列表时会按照输入顺序返回结果,如果任务未完成会在这里阻塞等待
results_list = list(results)
print(f"结果:{results_list}")
end_time = time.time()
print(f"总耗时:{end_time - start_time}秒")

print("\n===== 3. add_done_callback示例 =====")
# add_done_callback(fn): 添加任务完成回调函数
start_time = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
futures = []
for i in range(1, 6):
future = executor.submit(time_consuming_task, i)
# 添加回调函数,任务完成后自动调用
future.add_done_callback(task_done_callback)
futures.append(future)
print("已添加回调函数,主线程继续执行...")
# 等待所有任务完成
for future in futures:
future.result()
print(f"耗时:{time.time() - start_time}秒")
# 适用场景:任务完成后的后续处理,适合需要在任务完成时执行额外操作而不阻塞主线程

print("\n===== 4. wait示例 =====")
# wait(): 等待任务完成
start_time = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(time_consuming_task, i) for i in range(1, 11)]
print("任务已提交,等待所有任务完成...")
# 等待所有任务完成
done, not_done = wait(futures)
print(f"完成的任务数: {len(done)}, 未完成的任务数: {len(not_done)}")
results = [future.result() for future in done]
print(f"所有任务执行完毕,结果为:{results}")
print(f"耗时:{time.time() - start_time}秒")
# 适用场景:任务同步点,适合需要等待一组任务全部或部分完成后再继续执行的情况

Event 事件同步机制

Event 是一种线程同步机制,用于协调多个线程的执行顺序。它本质上是一个内部的标志位,线程可以等待这个标志位被设置,也可以设置或清除这个标志位。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from threading import Thread, Event
import time

## 创建一个事件对象
event = Event()

## 模拟公交车到站的函数
def bus_stop():
"""模拟公交车到站过程"""
print('公交车即将到站')
time.sleep(2) # 模拟行驶时间
print('公交车已到站<====>'*10)
# 设置事件,通知等待的乘客
event.set() # 发射信号,让等车的人上车


def passenger(name):
"""模拟乘客等车

Args:
name: 乘客名称
"""
# 等待公交车到站
print(name, '等车中')
event.wait() # 阻塞等待信号
print(name, '出发!!!!!!!!!!!!!!!!!!!!!!!!!!')


if __name__ == '__main__':
# 创建公交车线程
t1 = Thread(target=bus_stop)
t1.start()

# 创建多个乘客线程
for i in range(20):
t = Thread(target=passenger, args=(f'乘客{i}',))
t.start()
time.sleep(0.1) # 模拟乘客陆续到站
Event 主要方法
方法名描述使用场景
set()设置事件标志为 True通知等待的线程继续执行
clear()清除事件标志为 False重置事件状态,使线程再次等待
is_set()检查事件状态判断事件是否已被设置
wait()等待事件被设置阻塞线程直到事件被设置或超时

🌟 应用场景:Event 适合实现一次性通知多个线程的场景,比如多个工作线程等待初始化完成、多个消费者等待数据准备就绪等。在 Web 开发中,可用于协调多个后台任务的启动时机。

定时器(Timer)

定时器是线程的一个特殊应用,用于在指定时间后执行某个操作。Python 的 threading 模块提供了 Timer 类来实现这一功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from threading import Timer

def delayed_greeting(name):
"""延迟执行的问候函数

Args:
name: 要问候的对象名称
"""
print(f"{name}说: 哈哈,我是延迟1秒后才执行的!")

## 创建一个定时器,1秒后执行hello函数,参数为"小明"
timer = Timer(1, delayed_greeting, args=("小明",))
## 启动定时器
timer.start()

print("定时器已启动,但greeting函数还未执行...")
## 主线程继续执行,不会被阻塞

💡 实用技巧:Timer 可用于实现超时处理、延迟重试、定时清理等场景。例如,在网络编程中,可以用 Timer 设置请求超时机制;在数据同步中,可以用 Timer 定期执行同步任务。

14.4 多进程 VS 多线程性能分析

在 Python 中,由于 GIL(全局解释器锁)的存在,多线程并不能真正实现并行计算。因此,根据任务特性选择合适的并发模型十分重要。

不同场景的最优选择

任务类型多进程多线程推荐选择
计算密集型效率高,可利用多核受 GIL 限制,效率相对较低多进程
IO 密集型资源占用大资源占用小,效率与多进程相当多线程

📊 实际应用建议:现代开发中,约 90%以上的程序属于 IO 密集型,适合使用多线程;对于数据分析、图像处理等计算密集型任务,则推荐使用多进程。也可以考虑混合使用:多进程下每个进程内再使用多线程。

计算密集型任务测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from multiprocessing import Process
from threading import Thread
import time

'''
计算密集型任务对比测试
'''
def task():
"""计算密集型任务"""
res = 0
for i in range(10000000): # 执行大量计算
res += i

if __name__ == '__main__':
start_time = time.time()
l = []

for i in range(10):
# 使用多进程或多线程(取消相应的注释来测试)
p = Process(target=task) # 多进程:结果大概是1.65秒
# p = Thread(target=task) # 多线程:结果大概是4.18秒

p.start()
l.append(p)

for p in l:
p.join()

end = time.time()
print("花费时间", end - start_time)

IO 密集型任务测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from multiprocessing import Process
from threading import Thread
import time

'''
IO密集型任务对比测试
'''
def task():
"""IO密集型任务,使用sleep模拟IO操作"""
time.sleep(1) # 模拟IO等待

if __name__ == '__main__':
start_time = time.time()
l = []

for i in range(100): # 创建100个任务
# 使用多进程或多线程(取消相应的注释来测试)
# p = Process(target=task) # 多进程:结果约19.34秒
p = Thread(target=task) # 多线程:结果约1.01秒

p.start()
l.append(p)

for p in l:
p.join()

end = time.time()
print("花费时间", end - start_time)

⚠️ 性能陷阱:多线程在 IO 密集型任务中表现出色,但过多的线程可能导致线程切换开销增大,反而降低效率。经验值是控制线程数为 CPU 核心数的 2-4 倍。

14.5 协程技术详解

协程基础概念

协程(Coroutine)也称为微线程,是一种用户态内的上下文切换技术,可以在单线程下实现并发效果。协程通过巧妙的编程技巧实现了程序主动让出和恢复执行的能力,使得单线程内可以 “模拟” 出并发的效果。

1
2
3
4
5
6
7
8
'''
进程:资源单位 - 系统分配资源的基本单位,拥有独立的内存空间
线程:执行单位 - CPU调度和执行的最小单位,共享所属进程的内存空间
协程:根本不存在,它是程序员人为创造出来的(切换+保存状态)
当程序遇到IO的时候,通过我们的代码,让我们的程序自动完成切换
也就是通过代码监听IO,一旦程序遇到IO,就在代码层面自动切换,给CPU的感觉就是我们的程序没有IO
换句话说也就是我们欺骗了CPU
'''

协程的核心原理是 “切换+保存状态”,即在多个任务之间来回切换,每次切换都保存当前任务的执行状态,下次切换回来继续执行。在 Python 中,可以通过 yield 关键字、greenlet 模块或 asyncio 库实现协程。

🔍 深入理解:协程不是提升计算效率,而是提升 IO 效率。在 IO 密集型应用中,协程可以让 CPU 在等待 IO 的同时执行其他任务,从而提高资源利用率。协程的切换不需要操作系统参与,开销远小于线程切换。

概念资源占用切换开销实现方式适用场景
进程高(独立内存空间)高(涉及内存映射)操作系统调度CPU 密集型,需要隔离的任务
线程中(共享内存但有独立栈)中(上下文切换)操作系统调度混合型任务,兼顾计算与 IO
协程低(共享线程内全部资源)低(用户态切换)程序自行控制IO 密集型,高并发网络应用

协程效率对比

对于计算密集型任务时,使用协程反而会降低效率!

串行执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import time

def f1():
"""计算密集型函数1"""
n = 0
for i in range(10000000):
n += i # 执行简单累加计算

def f2():
"""计算密集型函数2"""
n = 0
for i in range(10000000):
n += i # 执行简单累加计算

start_time = time.time()
f1() # 顺序执行f1
f2() # 然后执行f2
## 保留两位小数
print("串行执行总共用时:%.2f秒" % (time.time() - start_time)) # 串行执行总共用时:0.84秒
使用 yield 实现协程切换
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import time

def f1():
"""带yield的计算密集型函数1"""
n = 0
for i in range(10000000):
n += i
yield # 主动让出执行权,保存当前执行状态

def f2():
"""使用f1的生成器进行交替执行"""
g = f1() # 创建生成器对象
n = 0
for i in range(10000000):
n += i
next(g) # 切换到f1执行一步,f1会执行到下一个yield后暂停

start_time = time.time()
f2() # 执行f2,内部会与f1交替执行
## 保留两位小数
print("yield协程用时:%.2f秒" % (time.time() - start_time)) # 约1.45秒

⚠️ 注意事项:对于计算密集型任务,协程切换反而会增加开销,降低效率;但对于 IO 密集型任务,协程切换可以显著提高效率。这是因为在 IO 等待期间,协程可以切换到其他任务继续执行,避免了 CPU 空闲。

greenlet 模块(了解)

greenlet 是一个轻量级的协程库,提供了基本的协程实现。它允许在不使用回调函数的情况下,在不同函数间来回切换执行,实现了所谓的 “确定性切换”。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from greenlet import greenlet
import time

def func_a():
"""协程函数a"""
while True:
print('函数a正在运行')
time.sleep(1) # 模拟某些操作
b.switch() # 主动切换到函数b执行

def func_b():
"""协程函数b"""
while True:
print('函数b正在运行')
time.sleep(2) # 模拟某些操作
a.switch() # 切换回函数a执行

if __name__ == '__main__':
# 创建两个greenlet对象
a = greenlet(func_a) # 将函数封装为greenlet对象
b = greenlet(func_b) # 将函数封装为greenlet对象
# 从函数a开始执行
a.switch() # 启动协程a
greenlet 核心方法与属性
方法/属性名描述使用场景示例
greenlet.getcurrent()获取当前正在执行的 greenlet 对象在函数内获取当前协程current = greenlet.getcurrent()
greenlet.switch(value=None)将控制权切换到另一个 greenlet协程间的主动切换g.switch('传递参数')
greenlet.parent获取当前 greenlet 的父 greenlet协程层级管理parent = g.parent
throw(type, value=None, tb=None)向 greenlet 对象中抛出异常协程异常处理g.throw(ValueError, '错误信息')
dead判断 greenlet 是否已经执行完毕协程状态检查if g.dead: print('已执行完毕')
gr_frame获取 greenlet 当前的帧对象调试和检查协程状态frame = g.gr_frame
run绑定到 greenlet 的可调用对象查看协程的目标函数func = g.run

💡 使用技巧:greenlet 适合实现简单的协程切换,但不支持自动在 IO 操作时切换,因此常与事件循环结合使用,如 gevent 库。greenlet 的优势在于它的轻量和灵活性,可以构建复杂的协程调度系统。

gevent 模块(了解)

gevent 是基于 greenlet 的协程库,增加了事件循环和自动 IO 切换功能。它通过 “猴子补丁”(monkey patching)将标准库中的阻塞操作替换为非阻塞版本,使普通的同步代码能够以异步方式执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
'''
gevent 是一个基于协程的 Python 网络库,它使用 greenlet 在 libev 或 libuv
等事件循环之上提供高级同步 API。gevent 实现了python 标准库里面大部分的阻塞式系统调用,
包括 socket、ssl、threading 和 select 等模块,
可以使用 "猴子补丁" 将这些阻塞式调用变为协作式运行。

猴子补丁的功能很强大,但是也带来了很多的风险,尤其是像 gevent 这种直接进行 API替换的补丁,
整个 Python 进程所使用的模块都会被替换,可能自己的代码能 hold 住,
但是其它第三方库,有时候问题并不好排查,即使排查出来也是很棘手,所以,
就像松本建议的那样,如果要使用猴子补丁,那么只是做功能追加,
尽量避免大规模的 API 覆盖。 虽然猴子补丁仍然是邪恶的(evil),
但在这种情况下它是 "有用的邪恶(useful evil)"。
'''
gevent 基础操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import gevent
from gevent import monkey

## 应用猴子补丁,将标准库的阻塞操作替换为非阻塞版本
## 必须在导入其他模块前调用,确保所有IO操作都被替换
monkey.patch_all() # 替换所有可能的阻塞调用

def foo():
"""协程函数1"""
print('Running in foo')
gevent.sleep(0) # 模拟IO操作,主动让出控制权
print('Explicit context switch to foo')

def bar():
"""协程函数2"""
print('Running in bar')
gevent.sleep(0) # 模拟IO操作,主动让出控制权
print('Explicit context switch to bar')

def baz():
"""协程函数3"""
print('Running in baz')
gevent.sleep(0) # 模拟IO操作,主动让出控制权
print('Explicit context switch to baz')

## 创建三个协程
g1 = gevent.spawn(foo) # 创建协程但不立即执行
g2 = gevent.spawn(bar) # 创建协程但不立即执行
g3 = gevent.spawn(baz) # 创建协程但不立即执行

## 等待所有协程完成
gevent.joinall([g1, g2, g3]) # 类似于多线程中的join方法
gevent 常用 API 详解
方法/类名描述使用场景实际应用示例
gevent.spawn(function, *args, **kwargs)创建并运行协程启动异步任务启动多个 HTTP 请求并行处理
gevent.joinall(greenlets, timeout=None, raise_error=False)等待多个协程完成同步点,等待所有任务完成批量处理多个数据源
gevent.sleep(seconds=0)协程休眠并让出控制权模拟 IO 操作,主动让出控制权测试协程调度,防止 CPU 密集任务阻塞
gevent.wait(objects=None, timeout=None, count=None)等待对象(协程)完成等待部分任务完成等待最快完成的结果
gevent.kill(greenlet, exception=GreenletExit)终止协程取消不需要的任务实现任务超时取消
gevent.monkey.patch_all(socket=True, dns=True, ...)应用猴子补丁将同步库变为异步兼容使用前替换标准库函数
gevent.queue.Queue协程安全的队列协程间通信和数据传递生产者-消费者模式实现
gevent.event.Event事件通知机制协程间同步和通知完成信号传递
gevent.pool.Pool协程池限制并发数量控制网络请求并发数
gevent.select.select()IO 多路复用监控多个文件描述符自定义事件循环

⚠️ 使用 gevent 注意事项

  1. 所有协程运行在同一线程中,不能跨线程同步数据
  2. gevent.queue.Queue 是协程安全的,可以用于协程间通信
  3. 不能有长时间阻塞的 CPU 密集型操作,会阻塞整个事件循环
  4. 最好使用 gevent 自身的非阻塞库或已打补丁的标准库
  5. 猴子补丁会修改全局状态,可能影响第三方库的行为,应在所有导入前应用
  6. 调试协程比调试线程更困难,错误追踪可能会更复杂
实际应用场景示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import gevent
from gevent import monkey
import requests
import time

## 替换标准库
monkey.patch_all()

def fetch_url(url):
"""获取URL内容的函数

Args:
url: 要获取的网址

Returns:
tuple: (url, 响应状态码, 内容长度)
"""
try:
print(f"开始请求: {url}")
start = time.time()
response = requests.get(url, timeout=5) # 进行HTTP请求,IO操作会自动切换
elapsed = time.time() - start
print(f"完成请求: {url}, 耗时: {elapsed:.2f}秒")
return url, response.status_code, len(response.content)
except Exception as e:
print(f"请求 {url} 出错: {e}")
return url, 0, 0

## 要获取的URL列表
urls = [
"https://www.python.org",
"https://www.github.com",
"https://www.stackoverflow.com",
"https://www.wikipedia.org",
"https://www.reddit.com"
]

start_time = time.time()

## 创建协程任务
tasks = [gevent.spawn(fetch_url, url) for url in urls]

## 等待所有任务完成
gevent.joinall(tasks)

## 获取结果
results = [task.value for task in tasks]

## 打印结果
print("\n结果汇总:")
for url, status, length in results:
print(f"URL: {url}, 状态码: {status}, 内容长度: {length} 字节")

print(f"\n总耗时: {time.time() - start_time:.2f}秒")

greenlet 与 gevent 的区别与选择

特性greenletgevent实际应用建议
基本原理轻量级上下文切换基于 greenlet,增加事件循环简单任务用 greenlet,复杂系统用 gevent
IO 处理不提供 IO 操作支持提供自动 IO 切换机制网络应用选择 gevent,自定义调度选择 greenlet
切换方式需要显式调用 switch()在 IO 操作时自动切换手动控制流程用 greenlet,自动化处理用 gevent
复杂度简单,仅提供基本切换复杂,提供完整生态系统小型项目用 greenlet,大型项目用 gevent
适用场景简单协程调度高并发网络应用Web 爬虫、API 服务、代理服务器首选 gevent
性能轻量,开销小比 greenlet 略重,但实用性强极致性能用 greenlet,平衡性能和开发效率用 gevent
学习曲线简单,容易理解较复杂,概念较多入门协程从 greenlet 开始,再过渡到 gevent
社区支持基础库,更新较少活跃,有完整生态长期项目建议使用 gevent

🌟 选择建议:如果只需要轻量级的上下文切换,可以使用 greenlet;如果需要处理 IO 密集型应用,特别是网络编程,建议使用 gevent。大多数实际项目中,gevent 是更好的选择,因为它提供了更完整的功能和自动化的 IO 处理。

asyncio 协程技术

随着 Python 的发展,协程技术已经有了显著进步。从 Python 3.4 引入的 asyncio 库开始,Python 对协程的原生支持不断增强。到 2025 年,Python 已经拥有更成熟、更高效的协程生态系统。

asyncio 与原生协程

Python 3.5 引入的 async/await 语法使得协程编程变得更加直观和强大,这是目前最推荐的协程实现方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import asyncio
import time

async def fetch_data(url,delay):
"""模拟从网络获取数据的异步函数"""
print(f"开始获取数据:{url},延迟{delay}秒")
await asyncio.sleep(delay)
print(f"成功获取数据长度:{len(url)}")
return f"数据{url}"

async def main():
"""异步操作的主函数"""
print(f"程序开始时间:{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}")

print("\n===== 串行执行示例 =====")
start_time = time.time()
# 串行执行 - 请求两个API数据
result1 = await fetch_data("https://www.baidu.com", 2)
result2 = await fetch_data("https://www.sina.com.cn",3)
print(f"串行执行结果:{result1}, {result2}")
end_time = time.time()
print(f"程序结束时间:{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}")
print(f"程序耗时:{end_time-start_time}秒")

# 并行执行 - 请求两个API数据
print("\n===== 并行执行示例 =====")
start_time = time.time()
tasks = [
asyncio.create_task(fetch_data("https://www.baidu.com", 2)),
asyncio.create_task(fetch_data("https://www.sina.com.cn",3)),
]
results = await asyncio.gather(*tasks) # 批量等待所有任务完成
print(f"并行执行结果:{results}")
end_time = time.time()
print(f"程序结束时间:{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}")
print(f"程序耗时:{end_time-start_time}秒")

if __name__ == '__main__':
# 在Python 3.7+中,可以直接使用asyncio.run()运行主协程
asyncio.run(main())

asyncio 常用 API
方法/函数描述使用场景示例
asyncio.run()运行协程程序入口点asyncio.run(main())
asyncio.create_task()创建任务并行执行协程task = asyncio.create_task(coro())
asyncio.gather()并行运行多个协程批量并发任务results = await asyncio.gather(coro1(), coro2())
asyncio.wait_for()带超时的等待实现超时控制await asyncio.wait_for(coro(), timeout=1.0)
asyncio.sleep()非阻塞睡眠模拟 IO 延迟await asyncio.sleep(1.0)
asyncio.Queue协程安全的队列协程间数据传递queue = asyncio.Queue(); await queue.put(item)
asyncio.Future低级异步原语自定义异步操作future = asyncio.Future(); future.set_result(value)
asyncio.shield()防止取消传播保护关键协程await asyncio.shield(critical_coro())
asyncio.as_completed()按完成顺序返回结果处理最先完成的任务for task in asyncio.as_completed([coro1(), coro2()]): result = await task

Task 对象

Taskasyncio 中用于封装协程的对象,可以用于并发执行多个任务。可以通过 Task 对象等待协程完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import asyncio

async def nested():
print('进入 nested()')
await asyncio.sleep(1) # 模拟IO操作
print('离开 nested()')
return '42'

async def main():
task = asyncio.create_task(nested()) # 创建任务
result = await task # 等待任务完成
print(f'返回值:{result}')

asyncio.run(main())

Future 对象

FutureTask 的基类,表示一个未完成的结果。在底层异步操作中,Future 常常用来表示某些未决的操作结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio

# 定义一个异步函数,用于设置future的结果
async def set_future_result(future):
# 异步等待2秒,模拟耗时操作
await asyncio.sleep(2)
# 设置future的结果为"Hello, world!"
future.set_result("Hello, world!")

# 定义主异步函数
async def main():
# 获取当前正在运行的事件循环
loop = asyncio.get_running_loop()
# 创建Future对象,它代表一个尚未完成的异步操作
future = loop.create_future() # 创建Future对象
# 创建一个任务来执行set_future_result函数,不等待其完成立即返回
asyncio.create_task(set_future_result(future))
# 等待future完成并获取其结果
result = await future # 等待Future完成
# 打印future的结果
print(f"Future的结果: {result}")

# 运行主异步函数
asyncio.run(main())

异步上下文管理器

异步上下文管理器允许在进入和退出时执行异步操作,常用于异步资源管理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio

class AsyncResource:
async def __aenter__(self):
print("资源获取")
return self

async def __aexit__(self, exc_type, exc_value, traceback):
print("资源释放")

async def main():
async with AsyncResource():
print("执行任务中")

asyncio.run(main())

14.6 GIL 锁与 Python 并发性能

GIL(Global Interpreter Lock,全局解释器锁)是 CPython 解释器的一个特性,它确保同一时刻只有一个线程可以执行 Python 字节码。这个特性对 Python 多线程编程有着深远影响,也是导致 Python 速度慢的两大原因之一,其另外一个原因是因为 Python 是 解释形 语言,但后续可通过 pypy 技术实现 Python 的预编译,但唯独这个原因 Python 没有解决,Python 在早期开发时为解决垃圾回收机制内部问题采用了 GIL 锁,所以 Python 程序无法直接利用多核 CPU 的优势

1
2
3
4
5
6
7
'''
GIL全局解释器锁(Global Interpreter Lock),是CPython特有的一个物件,
作用是让一个进程中同一时刻只能有一个线程可以被CPU调用

如果程序想利用计算机的多核优势,让CPU同时处理一些任务,适合用多进程开发(即使资源开销大)
如果程序不想利用计算机的多核优势,适合用多线程开发
'''

GIL 的本质与工作原理

GIL 本质上是一把互斥锁,用于保护 Python 解释器的内部状态,主要解决了 Python 对象的内存管理问题。

GIL 特性描述
实现方式互斥锁(mutex)
作用对象Python 解释器进程
控制范围Python 字节码执行
释放时机I/O 操作、执行固定字节码数量后
影响范围仅影响 CPython,PyPy、Jython、IronPython 不受影响

🔍 深入理解:GIL 并非 Python 语言本身的特性,而是 CPython 实现的产物。它解决了 CPython 简单引用计数式内存管理的线程安全问题,但也限制了多线程程序利用多核性能的能力。

1
2
3
4
5
6
7
8
9
## GIL工作示意伪代码
def thread_execution():
while True:
acquire_GIL() # 获取GIL锁
execute_bytecodes() # 执行一定数量的字节码
release_GIL() # 释放GIL锁以允许其他线程运行
wait_for_GIL() # 等待再次获取GIL

# 这也就导致了每一个线程都需要在执行获取字节码时都要经历拿锁->解锁的过程

并发与并行的区别

并发(Concurrency)和并行(Parallelism)是两个在计算机科学中经常出现的概念,虽然常被混用,但有着本质区别:

特性并发(Concurrency)并行(Parallelism)
定义多个任务在同一时间间隔内发生多个任务在同一时刻发生
重点任务切换与调度任务的同时执行
资源需求可以在单处理器上通过时间片轮转实现需要多个处理器或核心
执行方式任务交替执行,共享处理器时间每个任务有独立的处理器同时执行
适用场景I/O 密集型任务,如网络请求、文件读写计算密集型任务,如图像处理、科学计算
实现难度相对简单,关注任务调度相对复杂,需考虑数据分割、同步和合并
Python 实现多线程、协程多进程

🌟 关键理解:由于 GIL 的存在,Python 的多线程实际上只能实现并发,而不能实现真正的并行。要实现并行,需要使用多进程或依赖不受 GIL 限制的扩展库(如使用 C 扩展的 NumPy)。

线程安全与并发控制

线程安全指在多线程环境下,程序能够正确地处理共享资源,不会因为多线程同时访问而导致数据不一致。尽管 Python 的 GIL 能减轻一些并发问题,但并不能完全保证线程安全。

线程安全问题示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
from threading import Thread
import time
import random

### 共享的全局变量
counter = 0
start_time = time.time()
iterations_completed = 0

def increment_counter():
"""增加计数器,但使用了非原子操作的方式"""
global counter, iterations_completed
for _ in range(1000000):
# 模拟线程安全问题:读取-修改-写入过程中可能被中断
local_counter = counter # 读取当前值

# 模拟线程在读取后被切换的情况
# 随机休眠一个很小的时间,增加线程切换的可能性
if random.random() < 0.00001:
time.sleep(0.00001)

local_counter += 1 # 在本地修改
counter = local_counter # 写回全局变量
iterations_completed += 1


def run_concurrent_threads(num_threads):
"""运行多个线程同时增加计数器"""
global counter, iterations_completed
counter = 0
iterations_completed = 0

threads = []
for _ in range(num_threads):
t = Thread(target=increment_counter)
threads.append(t)
t.start()

for t in threads:
t.join()

# 理论上应该等于 num_threads * 1000000
expected = num_threads * 1000000
print(f"预期结果: {expected}")
print(f"实际结果: {counter}")
print(f"丢失的增量: {expected - counter}")
print(f"完成的迭代次数: {iterations_completed}")


if __name__ == '__main__':
print(f'开始时间为: {time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}')

# 运行4个线程,每个线程增加计数器1000000次
# 理论上最终结果应该是4000000,但由于线程安全问题,实际结果会小于这个值
run_concurrent_threads(4)

print(f"累计用时: {round(time.time() - start_time, 1)}秒")


使用线程锁解决安全问题
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
from threading import Thread, Lock
import time
import random

### 共享的全局变量
counter = 0
start_time = time.time()
iterations_completed = 0
# 创建一个线程锁
counter_lock = Lock()


def increment_counter():
"""增加计数器,使用线程锁确保线程安全"""
global counter, iterations_completed
for _ in range(1000000):
# 使用线程锁保护临界区
with counter_lock:
counter += 1 # 在锁的保护下直接修改全局变量
iterations_completed += 1


def run_concurrent_threads(num_threads):
"""运行多个线程同时增加计数器"""
global counter, iterations_completed
counter = 0
iterations_completed = 0

threads = []
for _ in range(num_threads):
t = Thread(target=increment_counter)
threads.append(t)
t.start()

for t in threads:
t.join()

# 理论上应该等于 num_threads * 1000000
expected = num_threads * 1000000
print(f"预期结果: {expected}")
print(f"实际结果: {counter}")
print(f"丢失的增量: {expected - counter}")
print(f"完成的迭代次数: {iterations_completed}")


if __name__ == '__main__':
print(f'开始时间为: {time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}')

# 运行4个线程,每个线程增加计数器1000000次
# 使用线程锁后,最终结果应该正确等于4000000
run_concurrent_threads(4)

print(f"累计用时: {round(time.time() - start_time, 1)}秒")

🔒 线程锁作用与注意事项

  • 锁确保同一时刻只有一个线程能访问共享资源
  • 锁会影响性能,特别是在竞争激烈的情况下
  • 锁的粒度需要权衡:粒度太细会增加锁操作开销,太粗会降低并发度
  • 锁可能引发死锁问题,需谨慎设计锁的获取顺序

Python 中的锁机制全面解析

Python 的 threading 模块提供了多种锁和同步原语,用于不同并发控制场景。深入理解这些锁的特性和适用场景,对于开发可靠的并发程序至关重要。

Python 锁类型及其特性
锁类型描述独占性可重入性公平性注意事项
threading.Lock基本互斥锁非公平最简单的锁,同一线程不能重复获取
threading.RLock可重入锁非公平同一线程可多次获取,必须对应释放相同次数
threading.Condition条件变量--非公平基于锁实现,提供 wait/notify 机制
threading.Semaphore信号量-非公平限制资源访问线程数量
threading.BoundedSemaphore有界信号量-非公平限制资源数量,防止过度释放
threading.Event事件对象---用于线程间通知而非资源控制
threading.Barrier栅栏对象---使多个线程同步到达某点再继续
queue.Queue线程安全队列--先进先出内部带锁,用于线程间数据传递
multiprocessing.Lock进程锁非公平用于进程间同步的锁
asyncio.Lock异步锁-用于协程间的同步
互斥锁(Lock)

互斥锁是最基本的锁类型,它确保同一时刻只有一个线程可以访问受保护的资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import concurrent
import threading
import time
from concurrent.futures import ThreadPoolExecutor

lock = threading.Lock()
shared_data = 0


def shared_resource(thread_id: int):
"""访问共享资源的函数
Args:thread_id: 线程ID,用于标识不同线程
"""
# 尝试获取锁
if lock.acquire(timeout=1): # 添加超时参数,防止无限等待
print(f"线程{thread_id}获取锁")
try:
global shared_data
# 读取-修改-写入操作需要原子性保护
current = shared_data
time.sleep(0.1) # 模拟处理延时,增加竞争概率
shared_data = current + 1
print(f"线程{thread_id}修改共享数据,当前值为{shared_data}")
finally:
# 释放锁
lock.release()
print(f"线程{thread_id}释放锁")
else:
print(f"线程{thread_id}获取锁失败")


def shared_resource2(thread_id: int):
"""访问共享资源的函数
使用with语句,自动释放锁简化代码
"""
with lock:
print(f"线程{thread_id}获取锁")
try:
global shared_data
# 读取-修改-写入操作需要原子性保护
current = shared_data
time.sleep(0.1) # 模拟处理延时,增加竞争概率
shared_data = current + 1
print(f"线程{thread_id}修改共享数据,当前值为{shared_data}")
finally:
print(f"线程{thread_id}释放锁")


if __name__ == '__main__':
# 使用线程池创建多个线程
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(shared_resource, i) for i in range(1, 11)]
# 等待所有线程完成
concurrent.futures.wait(futures)
print(f"最终共享数据值为{shared_data}")

Lock 方法描述参数返回值
acquire(blocking=True, timeout=-1)获取锁blocking: 是否阻塞, timeout: 超时时间(秒)布尔值,表示是否获取成功
release()释放锁无,如果当前线程未持有锁则抛出 RuntimeError
locked()检查锁状态布尔值,表示锁是否被某个线程持有
__enter__()支持 with 语句锁对象自身
__exit__()with 语句退出时调用异常信息无,自动释放锁
可重入锁(RLock)

可重入锁允许同一个线程多次获取该锁,而不会导致自我死锁。这在递归调用或者嵌套加锁场景中特别有用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import threading
import time

## 创建可重入锁
rlock = threading.RLock()

# 嵌套列表数据结构
data = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]


def process_data(item, depth: int = 0):
"""递归处理数据

Args:
item: 要处理的数据项,可以是列表或单个元素
depth: 当前递归深度,用于缩进显示
"""
# 获取锁
rlock.acquire()
try:
# 创建缩进效果,增强可读性
indent = " " * depth * 2 # 增加缩进量使层次更明显

# 打印当前处理的数据项和深度
print(f'{indent}线程 {threading.current_thread().name} 处理: {item} (深度: {depth})')

# 递归处理逻辑
if isinstance(item, list):
# 列表节点处理 - 继续向下递归
print(f'{indent}├── 发现列表,开始遍历子元素...')
for i, sub_item in enumerate(item):
# 显示子项的索引,增强结构可视化
prefix = "└── " if i == len(item) - 1 else "├── "
print(f'{indent}{prefix}处理子项 {i+1}/{len(item)}: {sub_item}')

# 递归调用,这里会再次获取同一个锁
process_data(sub_item, depth + 1)
time.sleep(0.1)
else:
# 叶子节点处理 - 递归终止条件
print(f'{indent}└── 发现元素,进行处理...')
time.sleep(0.5)
print(f'{indent} 处理结果: {item * 2}')
finally:
# 释放锁
rlock.release()


if __name__ == '__main__':
## 创建多个线程访问嵌套数据
threads = []
for i in range(3):
# 每个线程处理完整的数据结构
t = threading.Thread(name=f"Thread-{i}", target=process_data, args=(data,))
threads.append(t)
t.start()
time.sleep(0.5) # 错开线程启动时间

## 等待所有线程结束
for t in threads:
t.join()

print("所有线程都结束了")
RLock 方法描述与 Lock 的区别
acquire(blocking=True, timeout=-1)获取锁记录获取线程 ID 和次数
release()释放锁计数器减 1,只有为 0 时才真正释放
_is_owned()检查当前线程是否持有锁Lock 没有此方法

💡 使用建议:一般推荐使用 RLock 而非 Lock,因为它更安全、更灵活,即使在不需要重入功能的场景下也不会有明显性能损失。

条件变量(Condition) - 根据条件控制锁

条件变量是一种高级的 同步原语(同步原语就是让多个线程能够"和谐相处"的机制),它允许线程等待特定条件满足后再继续执行。条件变量内部包含一个锁,用于控制对共享状态的访问。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import threading
import time
import random
from typing import List, Any


class Buffer:
"""线程安全的缓冲区,使用条件变量控制生产者消费者模型"""

def __init__(self, max_size: int = 5) -> None:
"""初始化缓冲区"""
self.buffer: List[Any] = [] # 共享数据缓冲区
self.max_size: int = max_size # 最大容量
# 创建条件变量,基于RLock
self.condition: threading.Condition = threading.Condition()

def produce(self, item: Any, producer_id: int) -> None:
"""生产者方法,向缓冲区添加数据"""
# 使用条件变量的with语句自动获取和释放锁
with self.condition:
# 当缓冲区已满时,等待消费者处理
while len(self.buffer) >= self.max_size:
print(f"生产者 {producer_id}: 缓冲区已满,等待消费者...")
# 等待唤醒通知,自动释放锁,让其他线程能访问缓冲区
self.condition.wait()

# 添加数据到缓冲区
self.buffer.append(item)
print(f"生产者 {producer_id}: 添加 {item} 到缓冲区,当前大小: {len(self.buffer)}")

# 通知所有等待的消费者有新数据可用
self.condition.notify_all()

def consume(self, consumer_id: int) -> Any:
"""消费者方法,从缓冲区获取数据"""
with self.condition:
# 当缓冲区为空时,等待生产者添加数据
while len(self.buffer) == 0:
print(f"消费者 {consumer_id}: 缓冲区为空,等待生产者...")
self.condition.wait()

# 从缓冲区取出数据
item = self.buffer.pop(0)
print(f"消费者 {consumer_id}: 从缓冲区取出 {item},当前大小: {len(self.buffer)}")

# 通知所有等待的生产者缓冲区有空间
self.condition.notify_all()

return item


def producer_task(buffer: Buffer, producer_id: int) -> None:
"""生产者任务"""
for i in range(10): # 生产10个产品
item = f"产品-{producer_id}-{i}"
# 模拟生产时间
time.sleep(random.uniform(0.1, 0.5))
buffer.produce(item, producer_id)


def consumer_task(buffer: Buffer, consumer_id: int) -> None:
"""消费者任务"""
for _ in range(7): # 每个消费者消费7个产品
# 模拟消费时间
time.sleep(random.uniform(0.2, 0.7))
item = buffer.consume(consumer_id)
print(f"消费者 {consumer_id} 处理 {item}")


def main() -> None:
"""主函数,创建并启动生产者和消费者线程"""
# 创建共享缓冲区
shared_buffer = Buffer(max_size=3)

# 创建生产者和消费者线程
producer_threads = [
threading.Thread(target=producer_task, args=(shared_buffer, i), name=f"Producer-{i}")
for i in range(3) # 3个生产者
]

consumer_threads = [
threading.Thread(target=consumer_task, args=(shared_buffer, i), name=f"Consumer-{i}")
for i in range(3) # 3个消费者
]

# 启动所有线程
all_threads = producer_threads + consumer_threads
for thread in all_threads:
thread.start()

# 等待所有线程完成
for thread in all_threads:
thread.join()

print("所有生产和消费任务已完成")


if __name__ == "__main__":
main()
Condition 方法描述参数注意事项
__init__(lock=None)初始化条件变量lock: 可选的 Lock 或 RLock不指定则创建 RLock
acquire(*args)获取底层锁同底层锁的 acquire 方法一般通过 with 语句使用
release()释放底层锁一般通过 with 语句自动释放
wait(timeout=None)等待条件timeout: 超时时间(秒)调用前必须已获得锁
wait_for(predicate, timeout=None)等待直到条件为真predicate: 条件函数, timeout: 超时时间简化循环等待模式
notify(n=1)唤醒 n 个等待的线程n: 要唤醒的线程数不会立即释放锁
notify_all()唤醒所有等待的线程适用于广播通知

⚠️ 使用注意

  1. 调用 wait() 会释放锁,允许其他线程修改条件状态
  2. 使用 wait_for() 可以避免虚假唤醒问题
  3. 调用 notify() 后锁不会立即释放,需要当前线程退出 with 块
  4. 使用 notify_all() 而非 notify() 可以避免信号丢失问题
信号量(Semaphore) - 控制并发数量

信号量是一种计数器,用于控制同时访问特定资源的线程数量,常用于限制并发访问数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import threading
import time
import random
from concurrent.futures import ThreadPoolExecutor

## 创建信号量,限制最多三个线程同时访问资源
pool_semaphore = threading.Semaphore(3)

## 模拟优先的资源池
resource_pool = ['资源A', '资源B', '资源C']
resource_in_use = {} # 跟踪资源使用情况

## 保护资源分配的锁
resource_lock = threading.RLock()


def worker(worker_id: int):
"""工作线程函数,模拟使用受限资源"""
# 尝试获取信号量
print(f"工作线程 {worker_id} 等待获取资源...")
with pool_semaphore: # 等同于acquire()和finally中release()
print(f"工作线程 {worker_id} 获取资源信号量")
with resource_lock:
# 检查resource_pool中的每个资源,如果该资源不在resource_in_use字典的值中,则认为是可用的
available_resources = [r for r in resource_pool if r not in resource_in_use.values()]
if not available_resources:
print(f"工作线程 {worker_id} 没有找到可用资源,理论上不应该发生!")
return

resource_name = available_resources[0]
resource_in_use[worker_id] = resource_name # 记录资源使用情况
print(f"工作线程 {worker_id} 分配到资源: {resource_name}, 当前使用情况: {resource_in_use}")
try:
# 模拟使用资源
work_time = random.uniform(0.5, 2.0)
print(f"工作线程 {worker_id} 使用资源 {resource_name} 时间: {work_time} 秒")
time.sleep(work_time)
finally:
# 释放资源
with resource_lock:
released_resource = resource_in_use.pop(worker_id)
print(f"工作线程 {worker_id} 释放资源: {released_resource}, 当前使用情况: {resource_in_use}")


if __name__ == '__main__':
# 创建线程池,并启动三个线程
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(worker, i) for i in range(3)]
for future in futures:
future.result()


Semaphore 方法描述参数返回值
__init__(value=1)初始化信号量value: 初始计数器值
acquire(blocking=True, timeout=None)获取信号量blocking: 是否阻塞, timeout: 超时时间布尔值,表示是否获取成功
release(n=1)释放信号量n: 释放的数量
__enter__()支持 with 语句信号量对象自身
__exit__()with 语句退出时调用异常信息无,自动释放信号量
有界信号量(BoundedSemaphore)详解

有界信号量是信号量的一个变种,它会检查释放操作是否会导致计数器超过初始值,如果超过则抛出异常。这可以帮助检测程序中的信号量使用错误。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import threading
import time

## 创建有界信号量,初始值为3
bounded_semaphore = threading.BoundedSemaphore(3)

def semaphore_demo():
"""演示有界信号量与普通信号量的区别"""
try:
print("获取信号量1次")
bounded_semaphore.acquire()

print("获取信号量2次")
bounded_semaphore.acquire()

print("获取信号量3次")
bounded_semaphore.acquire()

print("信号量已用完,再获取将阻塞")

# 释放全部信号量
print("释放信号量1次")
bounded_semaphore.release()

print("释放信号量2次")
bounded_semaphore.release()

print("释放信号量3次")
bounded_semaphore.release()

try:
# 超出初始值的释放将抛出异常
print("尝试额外释放一次")
bounded_semaphore.release()
print("这一行不会执行")
except ValueError as e:
print(f"捕获预期异常: {e}")
except Exception as e:
print(f"意外错误: {e}")

semaphore_demo()

🔍 Semaphore vs BoundedSemaphore

  • Semaphore 允许无限制地调用 release(),即使计数器超过初始值
  • BoundedSemaphore 在计数器超过初始值时会抛出 ValueError 异常
  • 生产环境推荐使用 BoundedSemaphore,或安全的使用 with 语句,保证程序安全
事件对象(Event)

事件对象是最简单的线程通信机制之一,它允许一个线程发送信号给其他线程,适合简单的 “一次性通知” 场景。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import threading
import time
import random
from typing import List
from concurrent.futures import ThreadPoolExecutor

## 创建事件对象
start_event = threading.Event()
results: List[str] = []
results_lock = threading.RLock()

def worker(worker_id:int) -> None:
"""工作线程函数,等待开始信号"""
prep_time = random.uniform(0.5, 1.5)
time.sleep(prep_time)
print(f"工作线程{worker_id}准备完毕,等待开始信号")
# 等待开始信号
start_event.wait()

# 收到信号开始工作
print(f"工作线程{worker_id}开始工作")
work_time = random.uniform(1, 2)
time.sleep(work_time)
# 记录结果
with results_lock:
results.append(f"工作线程{worker_id}完成工作")

print(f"工作线程{worker_id}完成工作,用时{work_time:.2f}秒")

if __name__ == '__main__':
# 创建线程池
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(worker, i) for i in range(3)]
# 等待所有线程准备完毕
time.sleep(2)
# 发送开始信号
print("发送开始信号")
start_event.set()
# 等待所有线程完成工作
for future in futures:
future.result()

print("所有工作线程完成")
print(results)
Event 方法描述参数返回值
set()设置事件,唤醒所有等待的线程
clear()清除事件标志
is_set()判断事件是否已设置布尔值
wait(timeout=None)等待事件被设置timeout: 超时时间如果超时返回 False,否则返回 True

💡 使用场景

  • 启动信号:所有线程等待统一开始
  • 停止信号:通知所有线程停止工作
  • 一次性通知:当某条件满足时通知等待线程
栅栏对象(Barrier)

栅栏是一种同步原语,它要求固定数量的线程都到达栅栏点后,才允许所有线程继续执行。这对于分阶段任务的同步特别有用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import threading
import time
import random

## 定义参与方数量
num_parties = 4

## 创建栅栏对象,当4个线程都到达时才继续
barrier = threading.Barrier(num_parties)

def worker(worker_id):
"""工作线程函数,模拟多阶段工作

Args:
worker_id: 工作线程ID
"""
print(f"工作线程 {worker_id} 开始第一阶段工作")

# 模拟第一阶段工作
work_time = random.uniform(0.5, 2.0)
time.sleep(work_time)
print(f"工作线程 {worker_id} 完成第一阶段,用时 {work_time:.2f} 秒,等待其他线程...")

try:
# 等待所有线程完成第一阶段
barrier.wait()
print(f"工作线程 {worker_id} 通过第一个栅栏,开始第二阶段")

# 模拟第二阶段工作
work_time = random.uniform(0.5, 2.0)
time.sleep(work_time)
print(f"工作线程 {worker_id} 完成第二阶段,用时 {work_time:.2f} 秒,等待其他线程...")

# 等待所有线程完成第二阶段
barrier.wait()
print(f"工作线程 {worker_id} 通过第二个栅栏,工作全部完成")

except threading.BrokenBarrierError:
print(f"工作线程 {worker_id} 检测到栅栏被破坏")

## 创建工作线程
threads = []
for i in range(num_parties):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()

## 等待所有线程完成
for t in threads:
t.join()

print("所有工作阶段已完成")
Barrier 方法描述参数返回值
__init__(parties, action=None, timeout=None)初始化栅栏parties: 参与方数量, action: 所有线程到达时执行的回调, timeout: 等待超时
wait(timeout=None)等待所有参与方到达timeout: 覆盖默认超时时间线程的到达序号(0 ~ n-1)
reset()将栅栏重置到初始状态无,正在等待的线程会抛出 BrokenBarrierError
abort()将栅栏置于损坏状态无,所有等待线程会抛出 BrokenBarrierError
parties参与方数量(属性)整数
n_waiting当前等待的线程数(属性)整数
broken栅栏是否处于损坏状态(属性)布尔值

⚠️ 注意事项

  • 如果等待超时,栅栏会进入损坏状态
  • 如果等待时的线程被中断,栅栏也会损坏
  • 可以通过 reset() 方法重新使用已损坏的栅栏
线程安全队列(Queue)

queue 模块提供的 Queue 类是一个线程安全的队列实现,通常用于线程间的数据传递和任务分发。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import threading
import queue
import time
import random
from concurrent.futures import ThreadPoolExecutor

## 创建线程安全队列
task_queue = queue.Queue(maxsize=10) # 最多容纳10个任务
result_queue = queue.Queue() # 结果队列,无大小限制
## 用于通知工作线程结束的标志
exit_flag = threading.Event()

def producer():
"""生产者线程,产生任务"""
for i in range(20):
task = f"任务-{i}"
# 将任务放入队列
task_queue.put(task)
print(f"生产者: 添加 {task} 到队列,当前队列大小: {task_queue.qsize()}")
time.sleep(random.uniform(0.1, 0.3)) # 随机延迟

# 添加结束标记
print("生产者: 所有任务已产生,设置退出标志")
exit_flag.set()

def consumer(consumer_id):
"""消费者线程,处理任务"""
# 直到所有任务都处理完毕或有新任务到来
while not exit_flag.is_set() or not task_queue.empty():
try:
# 从队列获取任务,最多等待1秒
task = task_queue.get(timeout=1)
# 模拟处理任务
print(f"消费者 {consumer_id}: 开始处理 {task}")
process_time = random.uniform(0.5, 1.5)
time.sleep(process_time)
# 将处理结果放入结果队列
result = f"结果-{task}-耗时{process_time:.2f}秒"
result_queue.put((consumer_id, result))
# 标记任务完成
task_queue.task_done()
print(f"消费者 {consumer_id}: 完成处理 {task}")
except queue.Empty:
# 队列为空且设置了退出标志时结束循环
if exit_flag.is_set():
break
print(f"消费者 {consumer_id}: 队列暂时为空,等待任务...")
time.sleep(0.5)
print(f"消费者 {consumer_id}: 退出")

if __name__ == '__main__':
# 创建生产者线程和消费者线程
with ThreadPoolExecutor(max_workers=4) as executor:
# 创建生产者线程
producers = [executor.submit(producer) for _ in range(2)]
# 创建消费者线程
consumers = [executor.submit(consumer, i) for i in range(2)]

# 等待所有线程结束
all_futures = producers + consumers
for future in all_futures:
future.result()

# 打印结果队列中的所有结果
print("\n处理结果:")
while not result_queue.empty():
consumer_id, result = result_queue.get()
print(f"消费者 {consumer_id} 的结果: {result}")

Queue 方法/属性描述参数返回值/特性
__init__(maxsize=0)初始化队列maxsize: 队列最大大小,0 表示无限
put(item, block=True, timeout=None)放入元素item: 元素, block: 是否阻塞, timeout: 超时时间无,队列满时可能阻塞或抛出 Full 异常
get(block=True, timeout=None)获取元素block: 是否阻塞, timeout: 超时时间队列元素,队列空时可能阻塞或抛出 Empty 异常
task_done()标记任务完成
join()等待队列中所有任务处理完成
qsize()返回队列大小整数
empty()检查队列是否为空布尔值
full()检查队列是否已满布尔值
put_nowait(item)非阻塞版本的 putitem: 元素无,队列满时抛出 Full 异常
get_nowait()非阻塞版本的 get队列元素,队列空时抛出 Empty 异常

💡 Queue 变种

  • queue.LifoQueue: 后进先出队列(栈)
  • queue.PriorityQueue: 优先级队列,元素为(优先级, 数据)元组
  • queue.SimpleQueue: 简单的无界队列,不支持 task_done 和 join
死锁问题分析与解决

死锁是指两个或多个线程互相等待对方释放资源,导致程序无法继续执行的情况。

死锁示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import threading
import time

## 创建两个锁
lock_1 = threading.Lock()
lock_2 = threading.Lock()

def task1():
"""第一个任务,先获取lock_1,再获取lock_2"""
print("任务1开始尝试获取锁...")
lock_1.acquire() # 获取1号锁
print("任务1获取到lock_1")
time.sleep(0.5) # 等待一会,让任务2有机会获取lock_2
print("任务1尝试获取lock_2")
lock_2.acquire() # 尝试获取2号锁,但可能永远阻塞于此

try:
print("任务1同时获取了两把锁")
# 使用两把锁保护的代码
finally:
# 释放锁
lock_2.release()
print("任务1释放了lock_2")
lock_1.release()
print("任务1释放了lock_1")


def task2():
"""第二个任务,先获取lock_2,再获取lock_1"""
print("任务2开始尝试获取锁...")
lock_2.acquire() # 获取2号锁
print("任务2获取到lock_2")
time.sleep(0.5) # 等待一会
print("任务2尝试获取lock_1")
lock_1.acquire() # 尝试获取1号锁,但可能永远阻塞于此
try:
print("任务2同时获取了两把锁")
# 使用两把锁保护的代码
finally:
# 释放锁
lock_1.release()
print("任务2释放了lock_1")
lock_2.release()
print("任务2释放了lock_2")
if __name__ == '__main__':
## 创建两个线程
t1 = threading.Thread(target=task1)
t2 = threading.Thread(target=task2)

## 启动线程
t1.start()
t2.start()

## 等待一段时间后检查是否发生死锁
time.sleep(5)

## 检查线程是否还活着
print(f"线程1状态: {'活跃' if t1.is_alive() else '已结束'}")
print(f"线程2状态: {'活跃' if t2.is_alive() else '已结束'}")

if t1.is_alive() and t2.is_alive():
print("检测到可能的死锁情况!")

⚠️ 死锁的四个必要条件

  1. 互斥条件:资源不能被共享,一次只能被一个线程使用
  2. 请求与保持条件:线程已获得资源,但又提出新的资源请求
  3. 不剥夺条件:线程已获得的资源不能强制被剥夺
  4. 循环等待条件:线程之间形成头尾相接的循环等待资源关系
死锁解决方案
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import threading
import time

## 创建两个锁
lock_1 = threading.Lock()
lock_2 = threading.Lock()

def acquire_locks_safe(lock_a, lock_b, thread_name):
"""安全地获取两个锁,使用超时机制避免死锁

Args:
lock_a: 第一个锁
lock_b: 第二个锁
thread_name: 线程名称

Returns:
bool: 是否成功获取两个锁
"""
while True:
# 尝试获取第一个锁
got_lock_a = lock_a.acquire(timeout=1)
if got_lock_a:
print(f"{thread_name}: 获取到第一个锁")
try:
# 尝试获取第二个锁
got_lock_b = lock_b.acquire(timeout=1)
if got_lock_b:
print(f"{thread_name}: 获取到第二个锁")
return True # 成功获取两个锁
# 获取第二个锁失败,释放第一个锁,避免死锁
print(f"{thread_name}: 获取第二个锁失败,释放第一个锁并重试")
finally:
if not got_lock_b:
lock_a.release()
# 短暂休眠,减少活锁可能性
time.sleep(0.1)
else:
print(f"{thread_name}: 获取第一个锁失败,重试")
time.sleep(0.1) # 短暂休眠避免CPU忙等

def task1_fixed():
"""修复死锁的任务1 - 使用安全获取锁函数"""
print("任务1开始执行...")
if acquire_locks_safe(lock_1, lock_2, "任务1"):
try:
print("任务1: 同时持有两把锁,执行关键代码")
time.sleep(0.5) # 模拟工作
finally:
# 释放锁
lock_2.release()
print("任务1: 释放lock_2")
lock_1.release()
print("任务1: 释放lock_1")
else:
print("任务1: 无法获取所需的锁,任务取消")

def task2_fixed():
"""修复死锁的任务2 - 使用一致的锁获取顺序"""
print("任务2开始执行...")
# 按与任务1相同的顺序获取锁,避免死锁
if acquire_locks_safe(lock_1, lock_2, "任务2"):
try:
print("任务2: 同时持有两把锁,执行关键代码")
time.sleep(0.5) # 模拟工作
finally:
# 释放锁
lock_2.release()
print("任务2: 释放lock_2")
lock_1.release()
print("任务2: 释放lock_1")
else:
print("任务2: 无法获取所需的锁,任务取消")

## 创建两个线程
t1 = threading.Thread(target=task1_fixed)
t2 = threading.Thread(target=task2_fixed)

## 启动线程
t1.start()
t2.start()

## 等待线程结束
t1.join()
t2.join()

print("所有线程执行完毕,没有死锁")

🛠️ 死锁预防方法

  1. 按顺序获取锁:使所有线程按相同顺序获取锁
  2. 超时机制:使用 acquire(timeout=N) 设置获取锁的超时时间
  3. 一次性获取所有锁:创建更高级别的锁来同时获取多个锁
  4. 使用显式资源分级:为资源分配层级,只允许按层级顺序获取
  5. 避免嵌套锁:设计简化的锁策略,减少同时持有多个锁的情况
  6. 使用 with 语句:确保锁在异常情况下也能被释放

原子操作与锁优化

在并发编程中,原子操作是指不可被中断的操作,它们要么完全执行,要么完全不执行。Python 提供了一些原子操作工具,可以减少对锁的依赖。

threading.local 对象 - 线程本地存储

线程本地存储提供了一种每个线程拥有自己独立数据副本的机制,避免了共享状态带来的并发问题。

我们可以往 threading.local()上挂载对象,这样我们的每一个线程就会有属于自己的独立数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import threading
import time
import random
from concurrent.futures import ThreadPoolExecutor

## 创建线程本地存储对象
thread_local_data = threading.local()

def process_request(request_id: int) -> None:
"""处理请求的工作函数"""
# 为当前线程设置上下文信息
thread_local_data.user_id = f"user-{random.randint(1000, 9999)}"
thread_local_data.request = request_id
thread_local_data.start_time = time.time()
# 模拟处理请求的各个阶段
print(f"请求 {request_id}: 开始处理 [线程: {threading.current_thread().name}, 用户: {thread_local_data.user_id}]")
process_stage("验证")
process_stage("处理")
process_stage("响应")
# 计算总处理时间
elapsed = time.time() - thread_local_data.start_time
print(f"请求 {request_id}: 完成处理,总耗时 {elapsed:.2f}秒 [线程: {threading.current_thread().name}]")


def process_stage(stage_name: str):
"""处理请求的某个阶段"""
# 访问线程本地变量,无需传递参数
request_id = thread_local_data.request
user_id = thread_local_data.user_id

# 模拟阶段处理
time.sleep(random.uniform(0.1, 0.5))
print(f"请求 {request_id}: {stage_name}阶段完成 [用户: {user_id}]")

if __name__ == '__main__':
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(process_request, i) for i in range(10)]
for future in futures:
future.result()
print("所有请求处理完成")
functools.lru_cache 带锁的缓存

functools.lru_cache 装饰器提供了一个线程安全的缓存机制,当一个函数的计算逻辑十分复杂,我们就可以采用缓存来优化这一点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import functools
import time

# ========== LRU缓存演示 ==========
@functools.lru_cache(maxsize=128)
def fibonacci(n):
"""计算斐波那契数列的第n个数,使用LRU缓存优化性能"""
if n <= 1:
return n
return fibonacci(n-1) + fibonacci(n-2)

def demonstrate_lru_cache():
"""演示LRU缓存的效果"""
# 不使用缓存的计算时间
def fibonacci_no_cache(n):
if n <= 1:
return n
return fibonacci_no_cache(n-1) + fibonacci_no_cache(n-2)

n = 35

# 测试无缓存版本
start = time.time()
result1 = fibonacci_no_cache(n)
end = time.time()
print(f"无缓存计算fibonacci({n}) = {result1},耗时: {end - start:.4f}秒")

# 测试有缓存版本
start = time.time()
result2 = fibonacci(n)
end = time.time()
print(f"首次使用缓存计算fibonacci({n}) = {result2},耗时: {end - start:.4f}秒")

# 再次调用,应该直接从缓存获取结果
start = time.time()
result3 = fibonacci(n)
end = time.time()
print(f"再次使用缓存计算fibonacci({n}) = {result3},耗时: {end - start:.8f}秒")

# 显示缓存信息
print(f"缓存信息: {fibonacci.cache_info()}")

if __name__ == "__main__":
demonstrate_lru_cache()

锁的高级应用模式

读写锁模式

Python 中的读写锁(Read-Write Lock)主要用于在多线程环境中控制对共享资源的访问。它允许多个线程同时读取共享数据,但在写操作时,其他线程不能进行读或写操作。具体的应用场景包括:

  1. 数据共享与并发读取:当多个线程需要读取同一份数据时,使用读锁可以提高并发性,允许多个线程同时访问数据,而不需要每次访问都加锁。
  2. 写操作的独占性:当有线程进行写操作时,需要获取写锁,这样可以确保写操作的独占性,避免数据竞争和不一致性。
  3. 性能优化:在读多写少的场景下,读写锁能提高性能,因为它允许多个线程并行读取数据,而只有在写入时才会阻塞其他线程。

我们先从 Python 原生实现读写锁来作为演示,掌握了原生的方式,我们可以使用 readerwriterlock 第三方库来帮我们快速实现读写锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import threading
import time
import random


class ReadWriteLock:
"""读写锁实现

允许多个读取者同时访问,或单个写入者独占访问
"""

def __init__(self):
"""初始化读写锁"""
self._read_ready = threading.Condition(threading.RLock())
self._readers = 0 # 当前读取者数量
self._writers = 0 # 当前写入者数量
self._write_waiting = 0 # 等待写入的线程数
self._writer = None # 当前持有写锁的线程ID

def acquire_read(self):
"""获取读锁"""
with self._read_ready:
# 当有写入者或正在等待的写入者时,读取者需要等待
while self._writers > 0 or self._write_waiting > 0:
self._read_ready.wait()
self._readers += 1

def release_read(self):
"""释放读锁"""
with self._read_ready:
self._readers -= 1
if self._readers == 0: # 最后一个读取者通知所有等待的线程
self._read_ready.notify_all()

def acquire_write(self):
"""获取写锁"""
me = threading.get_ident() # 获取当前线程ID
with self._read_ready:
self._write_waiting += 1 # 增加等待写入计数
# 等待没有读取者和写入者
while self._readers > 0 or self._writers > 0:
self._read_ready.wait()
self._write_waiting -= 1 # 减少等待写入计数
self._writers += 1
self._writer = me

def release_write(self):
"""释放写锁"""
with self._read_ready:
if self._writer != threading.get_ident():
raise RuntimeError("释放未持有的写锁")
self._writers -= 1
self._writer = None
self._read_ready.notify_all() # 通知所有等待的线程

# 支持with语句的上下文管理器
class ReadLock:
def __init__(self, rw_lock):
self.rw_lock = rw_lock

def __enter__(self):
self.rw_lock.acquire_read()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.rw_lock.release_read()

class WriteLock:
def __init__(self, rw_lock):
self.rw_lock = rw_lock

def __enter__(self):
self.rw_lock.acquire_write()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.rw_lock.release_write()

# 获取读锁和写锁的方法
def read_lock(self):
"""获取读锁上下文管理器"""
return self.ReadLock(self)

def write_lock(self):
"""获取写锁上下文管理器"""
return self.WriteLock(self)


### 共享数据和读写锁
shared_data = {'count': 0, 'values': []}
rw_lock = ReadWriteLock()


def reader(reader_id):
"""读取者线程

Args:
reader_id: 读取者ID
"""
for _ in range(5):
# 获取读锁
with rw_lock.read_lock():
# 读取共享数据
count = shared_data['count']
values = shared_data['values'].copy()

# 模拟读取操作
time.sleep(random.uniform(0.05, 0.1))

print(f"读取者 {reader_id}: 读取到 count={count}, values={values}")

# 读取者之间的休息
time.sleep(random.uniform(0.1, 0.3))


def writer(writer_id):
"""写入者线程

Args:
writer_id: 写入者ID
"""
for i in range(3):
# 准备新数据
new_value = writer_id * 100 + i

# 获取写锁
with rw_lock.write_lock():
# 修改共享数据
shared_data['count'] += 1
shared_data['values'].append(new_value)

# 模拟写入操作
time.sleep(random.uniform(0.1, 0.2))

print(f"写入者 {writer_id}: 更新为 count={shared_data['count']}, values={shared_data['values']}")

# 写入者之间的休息
time.sleep(random.uniform(0.3, 0.7))


### 创建读取者和写入者线程
readers = [threading.Thread(target=reader, args=(i,)) for i in range(5)]
writers = [threading.Thread(target=writer, args=(i,)) for i in range(3)]

### 启动所有线程
all_threads = readers + writers
for thread in all_threads:
thread.start()

### 等待所有线程完成
for thread in all_threads:
thread.join()

print(f"最终数据: count={shared_data['count']}, values={shared_data['values']}")
使用 readerwriterlock 库实现读写锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
"""
读写锁实现示例

readerwriterlock库提供了三种读写锁实现:
- RWLockRead:读者优先(第一读者-写者问题)
- RWLockWrite:写者优先(第二读者-写者问题)
- RWLockFair:公平优先(第三读者-写者问题)
每种锁都有对应的可降级版本(带D后缀),允许将锁从写模式降级到读模式
"""

import threading
import time
from readerwriterlock import rwlock

# 创建一个公平优先的读写锁
rw_lock = rwlock.RWLockFairD()
# 共享数据
shared_data = {'count': 0, 'values': []}


def read_demo(reader_id, sleep_time=0.5):
"""用于演示的读取函数"""
read_lock = rw_lock.gen_rlock()

try:
with read_lock:
print(f"读取者 {reader_id}: 获得读锁")
time.sleep(sleep_time) # 模拟读取操作
print(f"读取者 {reader_id}: 完成读取")
finally:
print(f"读取者 {reader_id}: 释放读锁")


def write_demo(writer_id, sleep_time=0.5):
"""用于演示的写入函数"""
write_lock = rw_lock.gen_wlock()

try:
with write_lock:
print(f"写入者 {writer_id}: 获得写锁")
time.sleep(sleep_time) # 模拟写入操作
print(f"写入者 {writer_id}: 完成写入")
finally:
print(f"写入者 {writer_id}: 释放写锁")


def demonstrate_read_read_nonblocking():
"""演示读读不互斥"""
print("\n=== 演示:读读不互斥 ===")

threads = []
for i in range(5):
thread = threading.Thread(target=read_demo, args=(i, 0.5))
threads.append(thread)
thread.start()

for thread in threads:
thread.join()


def demonstrate_read_write_blocking():
"""演示读写互斥"""
print("\n=== 演示:读写互斥 ===")

# 先启动一个长时间的读取线程
read_thread = threading.Thread(target=read_demo, args=(0, 2))
read_thread.start()

# 给读取线程一点时间获取锁
time.sleep(0.1)

# 尝试启动写入线程,应该被阻塞直到读取完成
write_thread = threading.Thread(target=write_demo, args=(0, 0.5))
write_thread.start()

read_thread.join()
write_thread.join()


def demonstrate_write_write_blocking():
"""演示写写互斥"""
print("\n=== 演示:写写互斥 ===")

# 先启动一个长时间的写入线程
write_thread1 = threading.Thread(target=write_demo, args=(0, 2))
write_thread1.start()

# 给第一个写入线程一点时间获取锁
time.sleep(0.1)

# 尝试启动另一个写入线程,应该被阻塞直到第一个写入完成
write_thread2 = threading.Thread(target=write_demo, args=(1, 0.5))
write_thread2.start()

write_thread1.join()
write_thread2.join()


def demonstrate_timeout():
"""演示锁获取超时"""
print("\n=== 演示:锁获取超时 ===")

# 先启动一个长时间的写入线程
write_thread = threading.Thread(target=write_demo, args=(0, 3))
write_thread.start()

# 给写入线程一点时间获取锁
time.sleep(0.1)

# 尝试获取读锁,但设置较短的超时时间
read_lock = rw_lock.gen_rlock()
if read_lock.acquire(blocking=True, timeout=0.5):
try:
print("读取者: 成功获得读锁(不应该发生)")
finally:
read_lock.release()
else:
print("读取者: 获取读锁超时(预期行为)")

write_thread.join()


if __name__ == '__main__':
# 演示读读不互斥
demonstrate_read_read_nonblocking()
# 演示读写互斥
demonstrate_read_write_blocking()
# 演示写写互斥
demonstrate_write_write_blocking()
# 演示锁获取超时
demonstrate_timeout()
读写锁特性描述优势适用场景
读共享/写独占多个读取可并发,写入需独占提高读多写少场景的并发性配置数据、缓存系统、数据集
读写优先级可以设置读优先或写优先根据应用需求调整性能特性根据读写比例调整策略
升级/降级支持锁的升级(读 → 写)或降级(写 → 读)灵活处理复杂访问模式先检查后修改的操作

💡 使用建议

  • 读多写少的场景推荐使用读写锁
  • 注意防止 “写饥饿”,即读取者太多导致写入者长时间等待
锁排序(解决死锁)

为避免死锁,一个常用的技术是确保所有线程按照相同的顺序获取多个锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import threading
import time


class Account:
"""模拟银行账户"""

def __init__(self, name: str, balance: int = 0):
"""初始化账户"""
self.name = name
self.balance = balance
self.lock = threading.RLock()
# 用于账户排序的唯一ID
self.id = id(self)

def __str__(self):
return f"账户{self.name}[余额={self.balance}]"


def transfer_money(from_account: Account, to_account: Account, amount: int, thread_name: str) -> None:
"""
在账户间转账,使用账户ID排序策略避免死锁
from_account: 转出账户
to_account: 转入账户
amount: 转账金额
thread_name: 线程名称
"""
# 按照账户ID从小到大的顺序获取锁,确保所有线程获取锁的顺序一致
first, second = sorted([from_account, to_account], key=lambda x: x.id)

print(f"{thread_name}: 尝试锁定账户 {first.name}")
with first.lock:
print(f"{thread_name}: 已锁定账户 {first.name}")
# 模拟网络延迟
time.sleep(0.1)

print(f"{thread_name}: 尝试锁定账户 {second.name}")
with second.lock:
print(f"{thread_name}: 已锁定账户 {second.name}")

# 执行转账操作
from_account.balance -= amount
to_account.balance += amount

print(f"{thread_name}: 已从{from_account.name}转账{amount}元到{to_account.name}")


if __name__ == '__main__':
# 创建两个账户
alice = Account("Alice", 1000)
bob = Account("Bob", 1000)

print(f"初始状态: {alice}, {bob}")

# 创建两个线程,同时进行相反方向的转账
t1 = threading.Thread(
name="Thread-1",
target=transfer_money,
args=(alice, bob, 500, "转账线程1")
)

t2 = threading.Thread(
name="Thread-2",
target=transfer_money,
args=(bob, alice, 300, "转账线程2")
)

# 启动线程
t1.start()
t2.start()

# 等待线程结束
t1.join()
t2.join()

print(f"最终状态: {alice}, {bob}")
两阶段锁定

两阶段锁定是一种事务并发控制协议,分为获取阶段和释放阶段,可以保证事务的可串行化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import threading
import time

class TwoPhaseLockDatabase:
"""演示两阶段锁定协议的简单数据库"""

def __init__(self):
"""初始化数据库"""
self.data = {'A': 100, 'B': 200} # 简单的数据项
self.locks = {'A': threading.RLock(), 'B': threading.RLock()} # 每个数据项的锁

def transaction(self, items_to_read, items_to_write, operation):
"""执行两阶段锁定事务

Args:
items_to_read: 需要读取的数据项列表
items_to_write: 需要写入的数据项列表
operation: 事务操作函数

Returns:
bool: 事务是否成功
"""
# 按字母顺序排序所有需要锁定的项,避免死锁
all_items = sorted(set(items_to_read + items_to_write))
acquired_locks = []

try:
# 阶段1: 获取锁阶段(增长阶段)
print(f"事务 {threading.current_thread().name}: 开始获取锁")
for item in all_items:
# 对于读取项获取共享锁,对于写入项获取排他锁
# 这里简化为都使用排他锁
if self.locks[item].acquire(timeout=1):
acquired_locks.append(item)
print(f"事务 {threading.current_thread().name}: 已锁定 {item}")
else:
raise TimeoutError(f"获取 {item} 的锁超时")

# 执行事务操作
result = operation(self.data)
print(f"事务 {threading.current_thread().name}: 操作完成")
return result

except Exception as e:
print(f"事务 {threading.current_thread().name}: 错误 - {e}")
return False

finally:
# 阶段2: 释放锁阶段(收缩阶段)
for item in acquired_locks:
self.locks[item].release()
print(f"事务 {threading.current_thread().name}: 已释放 {item}")

def transfer_money(db, from_account, to_account, amount):
"""转账事务"""
def operation(data):
if data[from_account] < amount:
print(f"账户 {from_account} 余额不足")
return False

# 模拟操作耗时
time.sleep(0.1)

# 执行转账
data[from_account] -= amount
data[to_account] += amount
print(f"已从 {from_account} 转账 {amount}{to_account}")
return True

return db.transaction([from_account, to_account], [from_account, to_account], operation)

def run_transaction(db, thread_id, from_acc, to_acc, amount):
"""执行事务的线程函数"""
print(f"线程 {thread_id}: 尝试转账 {amount}{from_acc}{to_acc}")

start_time = time.time()
success = transfer_money(db, from_acc, to_acc, amount)
elapsed = time.time() - start_time

status = "成功" if success else "失败"
print(f"线程 {thread_id}: 转账{status},耗时 {elapsed:.2f}秒")

# 创建数据库实例
db = TwoPhaseLockDatabase()
print(f"初始账户状态: {db.data}")

# 创建并启动多个线程
threads = []
transactions = [
("A", "B", 30), # 从A转30到B
("B", "A", 50), # 从B转50到A
("A", "B", 20) # 从A转20到B
]

for i, (from_acc, to_acc, amount) in enumerate(transactions):
t = threading.Thread(
name=f"Transaction-{i}",
target=run_transaction,
args=(db, i, from_acc, to_acc, amount)
)
threads.append(t)
t.start()

# 等待所有线程完成
for t in threads:
t.join()

print(f"最终账户状态: {db.data}")
print(f"总金额: {sum(db.data.values())}") # 总金额应该不变
超时重试模式

在并发环境中,有时获取锁可能会失败。超时重试模式可以增加获取锁的成功概率,同时避免永久阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import threading
import time
import random

### 共享资源
resource_value = 0
resource_lock = threading.Lock()

def update_resource(worker_id, max_retries=3):
"""更新共享资源,使用超时重试模式

Args:
worker_id: 工作线程ID
max_retries: 最大重试次数

Returns:
bool: 是否成功更新
"""
global resource_value

# 随机生成操作标识,用于跟踪
operation_id = random.randint(10000, 99999)
print(f"工作线程 {worker_id} [操作:{operation_id}]: 尝试更新资源")

retry_count = 0
backoff = 0.1 # 初始回退时间

while retry_count < max_retries:
# 尝试获取锁,设置超时时间
if resource_lock.acquire(timeout=0.5):
try:
print(f"工作线程 {worker_id} [操作:{operation_id}]: 获取到锁,当前值: {resource_value}")

# 模拟资源更新操作
local_value = resource_value
# 随机决定操作时间,有时可能很长
work_time = random.uniform(0.1, 1.0)
time.sleep(work_time)

# 更新资源
resource_value = local_value + 1

print(f"工作线程 {worker_id} [操作:{operation_id}]: 更新成功,新值: {resource_value},耗时: {work_time:.2f}秒")
return True

finally:
# 释放锁
resource_lock.release()
print(f"工作线程 {worker_id} [操作:{operation_id}]: 释放锁")
else:
retry_count += 1
print(f"工作线程 {worker_id} [操作:{operation_id}]: 获取锁超时,重试 {retry_count}/{max_retries}")

if retry_count < max_retries:
# 使用指数退避策略,每次重试间隔加长
time.sleep(backoff)
backoff *= 2 # 指数增长

print(f"工作线程 {worker_id} [操作:{operation_id}]: 达到最大重试次数,放弃操作")
return False

def worker_thread(worker_id, operations):
"""工作线程函数

Args:
worker_id: 工作线程ID
operations: 要执行的操作次数
"""
success_count = 0

for i in range(operations):
# 尝试更新资源
if update_resource(worker_id):
success_count += 1

# 线程之间的间隔
time.sleep(random.uniform(0.1, 0.5))

print(f"工作线程 {worker_id}: 完成 {success_count}/{operations} 次成功更新")

### 创建多个工作线程
threads = []
for i in range(5):
t = threading.Thread(target=worker_thread, args=(i, 3))
threads.append(t)
t.start()

### 等待所有线程完成
for t in threads:
t.join()

print(f"最终资源值: {resource_value}")
多进程与多线程结合的混合模型

对于复杂应用,常常需要结合多进程和多线程的优势:多进程跨越 GIL 限制利用多核心,每个进程内使用多线程处理 I/O 任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import os
import time
import multiprocessing
import threading
import concurrent.futures
import requests
from io import StringIO
import csv

def io_task(url):
"""模拟I/O密集型任务:发送HTTP请求并处理响应"""
try:
response = requests.get(url, timeout=5)
print(f"线程 {threading.current_thread().name} 完成请求 {url}, 状态码: {response.status_code}")
return response.status_code
except Exception as e:
print(f"线程 {threading.current_thread().name} 请求 {url} 失败: {str(e)}")
return None

def cpu_task(data):
"""模拟CPU密集型任务:处理CSV数据"""
result = 0
# 模拟CPU密集型计算
for _ in range(1000000):
result += 1

# 解析CSV数据
csv_data = StringIO(data)
reader = csv.reader(csv_data)
rows = list(reader)

print(f"进程 {os.getpid()} 处理了 {len(rows)} 行数据")
return len(rows)

def process_worker(process_id, urls):
"""每个进程的工作函数,使用线程池处理I/O任务"""
print(f"进程 {os.getpid()} (ID: {process_id}) 启动")

# 创建线程池处理I/O任务
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
# 提交所有URL请求任务到线程池
future_to_url = {executor.submit(io_task, url): url for url in urls}

# 收集结果
results = []
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
status_code = future.result()
results.append((url, status_code))
except Exception as e:
print(f"处理 {url} 时出错: {str(e)}")

# 模拟一些CSV数据处理(CPU密集型任务)
sample_csv = "col1,col2,col3\n1,2,3\n4,5,6\n7,8,9"
cpu_result = cpu_task(sample_csv)

print(f"进程 {os.getpid()} (ID: {process_id}) 完成所有任务")
return results, cpu_result

def main():
# 测试URL列表
all_urls = [
f"https://httpbin.org/delay/{i%3}" for i in range(16)
]

# 将URL分成4组,每个进程处理4个URL
url_chunks = [all_urls[i:i+4] for i in range(0, len(all_urls), 4)]

start_time = time.time()

# 创建进程池
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as process_executor:
# 提交任务到进程池
futures = [process_executor.submit(process_worker, i, urls)
for i, urls in enumerate(url_chunks)]

# 收集所有进程的结果
for future in concurrent.futures.as_completed(futures):
try:
io_results, cpu_result = future.result()
print(f"进程返回结果: {len(io_results)} 个URL请求, CPU任务处理了 {cpu_result} 行数据")
except Exception as e:
print(f"进程执行出错: {str(e)}")

elapsed_time = time.time() - start_time
print(f"总执行时间: {elapsed_time:.2f} 秒")

if __name__ == "__main__":
main()

这种混合模型充分利用了 Python 的并发性能:

  1. 多进程并行:跨越 GIL 限制,在多个 CPU 核心上同时执行 Python 代码
  2. 每进程多线程:处理进程内的 I/O 密集型任务,提高 I/O 并发性
  3. 任务队列:有效分配和管理工作负载,平衡资源利用
细粒度锁与粗粒度锁

锁的粒度指锁保护资源的范围大小。细粒度锁保护小范围资源,提高并发度;粗粒度锁保护大范围资源,简化编程但可能降低并发度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import threading
import time
import random


# 共享资源
class BankAccount:
def __init__(self, balance):
self.balance = balance
# 粗粒度锁 - 用于整个账户操作
self.coarse_lock = threading.Lock()
# 细粒度锁 - 分别用于读取和写入操作
self.read_lock = threading.Lock()
self.write_lock = threading.Lock()

# 粗粒度锁示例 - 锁定整个账户操作
def transfer_coarse(account, amount):
with account.coarse_lock:
# 模拟读取余额操作
current_balance = account.balance
# 模拟网络延迟或处理时间
time.sleep(random.uniform(0.001, 0.005))
# 模拟更新余额操作
account.balance = current_balance + amount
print(f"粗粒度锁: 转账 {amount},当前余额 {account.balance}")


# 细粒度锁示例 - 分别锁定读取和写入操作
def transfer_fine(account, amount):
# 锁定读取操作
with account.read_lock:
current_balance = account.balance
# 模拟处理时间
time.sleep(random.uniform(0.001, 0.005))

# 这里可以执行一些不需要锁定的计算
time.sleep(random.uniform(0.001, 0.002))

# 锁定写入操作
with account.write_lock:
account.balance = current_balance + amount
print(f"细粒度锁: 转账 {amount},当前余额 {account.balance}")


# 测试函数
def main():
# 创建共享账户
account = BankAccount(1000)
threads = []

print("=== 测试粗粒度锁 ===")
start_time = time.time()

# 创建10个使用粗粒度锁的线程
for i in range(10):
amount = random.randint(1, 100)
t = threading.Thread(target=transfer_coarse, args=(account, amount))
threads.append(t)
t.start()

# 等待所有线程完成
for t in threads:
t.join()

coarse_time = time.time() - start_time
print(f"粗粒度锁总耗时: {coarse_time:.4f}秒")

# 重置账户和线程列表
account.balance = 1000
threads = []

print("\n=== 测试细粒度锁 ===")
start_time = time.time()

# 创建10个使用细粒度锁的线程
for i in range(10):
amount = random.randint(1, 100)
t = threading.Thread(target=transfer_fine, args=(account, amount))
threads.append(t)
t.start()

# 等待所有线程完成
for t in threads:
t.join()

fine_time = time.time() - start_time
print(f"细粒度锁总耗时: {fine_time:.4f}秒")
print(f"\n性能比较: 细粒度锁比粗粒度锁快 {(coarse_time / fine_time):.2f} 倍")

if __name__ == '__main__':
main()

锁粒度优点缺点适用场景
粗粒度锁简单、易维护、不易死锁并发性能低、可能导致线程等待简单应用、对性能要求不高的场景
细粒度锁并发性能高、资源利用率高实现复杂、可能造成死锁高性能要求、资源访问模式明确的场景

14.7 消息队列与进程通信

在并发编程中,队列是一种常用的数据结构。它遵循 先进先出(FIFO) 的原则,适合用于线程或进程间的通信,而堆栈则遵循 后进先出(LIFO) 的原则。Python 中的 queuemultiprocessing 模块提供了多种类型的队列,每种队列适用于不同的场景。

队列基础知识

Python 的 queue 模块和 multiprocessing 模块提供了多种队列类型,主要包括:

队列类型模块特点适用场景
Queuequeue线程安全的 FIFO 队列线程间通信
LifoQueuequeue线程安全的 LIFO 队列(堆栈)需要后进先出的场景
PriorityQueuequeue优先级队列任务具有优先级的场景
Queuemultiprocessing进程安全的 FIFO 队列进程间通信
JoinableQueuemultiprocessing带有任务完成通知机制的队列生产者-消费者模型

队列使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import random
from multiprocessing import Queue
from concurrent.futures import ThreadPoolExecutor
import time



# 生产者进程函数
def producer(q: Queue) -> None:
"""生产者函数,负责生产数据并放入队列"""
for i in range(10):
item = f"小吃{i}"
print(f"生产者生产了{item}")
q.put(item) # 放入队列
time.sleep(random.uniform(0.1, 0.5)) # 模拟耗时操作
q.put(None) # 生产结束信号
print("生产者结束")


# 消费者进程函数
def consumer(q: Queue):
"""消费者函数,负责从队列中获取数据并消费"""
while True:
item = q.get() # 从队列中获取项目
if item is None: # 若获取到结束信号,则退出循环
break
print(f"消费者消费了{item}")
time.sleep(random.uniform(1, 2))
print("消费者结束")


if __name__ == '__main__':
# 创建一个队列对象
q = Queue()
with ThreadPoolExecutor(max_workers=2) as executor:
# 启动生产者进程
executor.submit(producer, q)
# 启动消费者进程
executor.submit(consumer, q)

优先级队列示例

优先级队列按任务的优先级顺序处理任务。数字越小优先级越高。以下是如何使用 PriorityQueue 的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import queue
import threading
import time

# 创建优先级队列
# 优先级队列的元素是元组,第一个元素是优先级,第二个元素是任务
pq = queue.PriorityQueue()

def process_tasks():
"""按照优先级处理任务"""
while True:
try:
priority,task = pq.get(timeout=3)
print(f"处理任务:[优先级{priority}] {task}")
pq.task_done()
except queue.Empty:
print("队列为空,任务处理完毕")
break
# 添加任务到优先级队列
pq.put((3, "普通任务"))
pq.put((1, "紧急任务"))
pq.put((2, "中等优先级任务"))
pq.put((1, "另一个紧急任务"))
pq.put((5, "低优先级任务"))

# 创建任务处理线程
worker = threading.Thread(target=process_tasks)
worker.start()

pq.join() # 等待所有任务处理完毕
worker.join()

print("所有任务处理完毕")