Python 的多线程
本文记录自己学习 Python 多线程的过程和思考
Python 批量处理数据的不同方式?
- 传统方法: 使用 for 循环顺序遍历
1
2
3data_list = []
for url in url_list:
data_list.append(get_data_from_url(url)) - 多线程: 开启多个线程处理
1
2
3from multiprocessing.pool import ThreadPool
tpool = ThreadPool(20) # 创建一个线程池,20个线程数
data_list = tpool.map(get_data_from_url, url_list) # 将任务交给线程池,与python的map方法类似 - 多进程: 开启多个进程处理
1
2
3from multiprocessing import Pool
pool = Pool(4)
data_list = pool.map(get_data_from_url, url_list) # 与线程池的map方法工作原理一致
什么是 Python 的全局解释器锁 (GIL)?
- GIL (Global Interpreter Lock,全局解释器锁),一个 Python 线程想要执行一段代码,必须先拿到 GIL 锁后才被允许执行,也就是说,即使我们使用了多线程,但同一时刻却只有一个线程在执行
- GIL 是存在于 CPython 解释器中的,属于解释器层级,而并非属于 Python 的语言特性。 也就是说,如果你自己有能力实现一个 Python 解释器,完全可以不使用 GIL
- 在单核 CPU 上,数百次的间隔检查才会导致一次线程切换,GIL 不会影响效率。但是在多核 CPU 上,存在严重的线程颠簸 (thrashing),导致线程进行锁竞争、切换线程,会消耗资源,反而效率更低
Python 的全局解释器锁 (GIL) 的原理?
- Python 的线程就是 C 语言的 pthread,它是通过操作系统调度算法调度执行的
- Python2.x 的代码执行是基于 opcode 数量的调度方式,简单来说就是每执行一定数量的字节码,或遇到系统 IO 时,会强制释放 GIL,然后触发一次操作系统的线程调度
- Python3.x 进行了优化,基于固定时间的调度方式,就是每执行固定时间的字节码,或遇到系统 IO 时,强制释放 GIL,触发系统的线程调度
Python 的全局解释器锁 (GIL) 的执行步骤?
- 设置 GIL
- 切换到一个线程去运行
- 运行直至指定数量的字节码指令,或者线程主动让出控制(可以调用 sleep (0))
- 把线程设置为睡眠状态
- 解锁 GIL
- 再次重复以上所有步骤
什么是线程颠簸 (thrashing)?
- 一般来说多核的多线程比单核多线程效率更低,因为当单核多线程每次释放 GIL 的时候,下一个线程能直接获取到 GIL,能够无缝执行,当多核环境中某个 CPU 释放 GIL 后,本该在其它 CPU 的线程也都会竞争获得此 CPU 的 GIL,但很大可能 GIL 又被此 CPU 下的某个线程拿到,导致其它几个 CPU 上被唤醒的线程醒着等待到切换时间后又进入调度状态,这样会造成线程颠簸(Thrashing),导致效率更低
如何解决 Python 的全局解释器锁 (GIL) 的效率低下问题?
- IO 密集型任务场景,可以使用多线程可以提高运行效率
- CPU 密集型任务场景,不使用多线程,推荐使用多进程方式部署运行
- 更换没有 GIL 的 Python 解释器,但需要提前评估运行结果是否与 CPython 一致
- 编写 Python 的 C 扩展模块,把 CPU 密集型任务交给 C 模块处理,但缺点是编码较为复杂
全局解释器锁 (GIL) 会导致多线程在 Python 上无效吗?
- 如果使用多线程运行一个 CPU 密集型任务,那么 Python 多线程是无法提高运行效率的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25# 单线程执行2次CPU密集型任务
import time
import threading
def loop():
count = 0
while count <= 1000000000:
count += 1
# 单线程执行 2 次 CPU 密集型任务
start = time.time()
loop()
loop()
end = time.time()
print("execution time: %s" % (end - start))
# execution time: 89.63111019134521
# 2个线程同时执行CPU密集型任务
start = time.time()
t1 = threading.Thread(target=loop)
t2 = threading.Thread(target=loop)
t1.start()
t2.start()
t1.join()
t2.join()
end = time.time()
print("execution time: %s" % (end - start))
# execution time: 92.29994678497314 - 注意:在 IO 密集型的任务,大部分时间都花在等待 IO 上,并没有一直占用 CPU 的资源,所以并不会像上面的程序那样,进行无效的线程切换
什么是 Python 的 thread 包?
- threading 包主要运用多线程的开发,但由于全局解释器锁 (GIL) 的存在,Python 中的多线程其实并不是真正的多线程,如果想要充分地使用多核 CPU 的资源,大部分情况需要使用多进程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24import threading
import os
import time
from datetime import datetime as dt
def print_cube(num):
print("Cube: {}".format(num * num * num),dt.now().strftime("%H:%M:%S"),os.getpid())
time.sleep(1)
def print_square(num):
print("Square: {}".format(num * num),dt.now().strftime("%H:%M:%S"),os.getpid())
time.sleep(1)
# 创建线程
t1 = threading.Thread(target=print_square, args=(10,))
t2 = threading.Thread(target=print_cube, args=(10,))
# 启动线程
t1.start()
t2.start()
# 等待线程完成
t1.join()
t2.join()
# 两个线程均完成
print("Done!",dt.now().strftime("%H:%M:%S"),os.getpid())
# Square: 100 11:58:51 28352
# Cube: 1000 11:58:51 28352
# Done! 11:58:52 28352
Python 如何解决 thread 包的线程同步问题?
- 线程同步被定义为一种机制,它确保两个或多个并发线程不会同时执行某些称为关键段的特定程序段
- Python 使用锁 threading.Lock () 来确保线程同步
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25import threading
x = 0
def increment():
global x
x += 1
def thread_task(lock):
for _ in range(100000):
lock.acquire()
increment()
lock.release()
def main_task():
global x
x = 0
lock = threading.Lock() # 声明一个锁
t1 = threading.Thread(target=thread_task, args=(lock,))
t2 = threading.Thread(target=thread_task, args=(lock,))
t1.start()
t2.start()
t1.join()
t2.join()
for i in range(2):
main_task()
print("Iteration {0}: x = {1}".format(i,x))
# Iteration 0: x = 200000
# Iteration 1: x = 200000
Python 如何使用 Theading 自定义线程类?
- 使用 Threading 模块创建线程,直接从 threading.Thread 继承,然后重写__init__方法和 run 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27import threading
import time
exitFlag = 0
class myThread (threading.Thread): #继承父类threading.Thread
def __init__(self, threadID, name, counter):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.counter = counter
def run(self):#把要执行代码写到run函数里,线程在创建后会直接运行run函数
print "Starting " + self.name
print_time(self.name, self.counter, 5)
print "Exiting " + self.name
def print_time(threadName, delay, counter):
while counter:
if exitFlag:
(threading.Thread).exit()
time.sleep(delay)
print "%s: %s" % (threadName, time.ctime(time.time()))
counter -= 1
# 创建新线程
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)
# 开启线程
thread1.start()
thread2.start()
print "Exiting Main Thread"
什么是 Python 的 multiprocessing?
- 在 Python 2.6 版本的时候引入了 multiprocessing 包,它完整的复制了一套 threading 所提供的接口方便迁移。 唯一的不同就是它使用了多进程而不是多线程。每个进程有自己的独立的 GIL,因此也不会出现进程之间的 GIL 争抢
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21import multiprocessing
import time
from datetime import datetime as dt
def print_cube(num):
print("Cube: {}".format(num * num * num),dt.now().strftime("%H:%M:%S"))
time.sleep(1)
def print_square(num):
print("Square: {}".format(num * num),dt.now().strftime("%H:%M:%S"))
time.sleep(1)
if __name__ == "__main__":
# creating processes
p1 = multiprocessing.Process(target=print_square, args=(10, ))
p2 = multiprocessing.Process(target=print_cube, args=(10, ))
p1.start()
p2.start()
p1.join()
p2.join()
print("Done!",dt.now().strftime("%H:%M:%S"))
# Square: 100 11:57:55
# Cube: 1000 11:57:55
# Done! 11:57:56
multiprocessing 的常用组件及功能?
- 创建管理进程模块
- Process(用于创建进程)
- Pool(用于创建管理进程池)
- Queue(用于进程通信,资源共享)
- Value,Array(用于进程通信,资源共享)
- Pipe(用于管道通信)
- Manager(用于资源共享)
- 同步子进程模块
- Condition(条件变量)
- Event(事件)
- Lock(互斥锁)
- RLock(可重入的互斥锁 (同一个进程可以多次获得它,同时不会造成阻塞)
- Semaphore(信号量)
如何验证 multiprocessing 下不同进程有自己的独立空间?
- 以 Python 的多进程为例,以下代码无法在进程外获得变量的数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18import multiprocessing
result = []
def square_list(mylist):
"""
function to square a given list
"""
global result
for num in mylist:
result.append(num * num)
print("Result(in process p1): {}".format(result))
if __name__ == "__main__":
mylist = [1,2,3,4]
p1 = multiprocessing.Process(target=square_list, args=(mylist,))
p1.start()
p1.join()
print("Result(in main program): {}".format(result))
# Result(in process p1): [1, 4, 9, 16]
# Result(in main program): []
multiprocessing 的 Process 类?
- 一个 Process 类来代表一个进程对象
- 注意:在 windows 中 Process () 必须放到 if name―‘main’: 下
1
2
3
4
5
6
7
8
9
10
11from multiprocessing import Process
import os
def run_proc(name):
print('Run child process %s (%s)...' % (name, os.getpid()))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')
multiprocessing 的 Pool 类?
- Pool (进程池) 提供指定数量的进程供用户调用,当有新的请求提交到 Pool 中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程
- 多进程的 Pool 对象有 4 个同步方法,分别是 map () 、 imap () 、 imap_unordered () 和 starmap () 函数,3 异步方法 map_async () 、starmap_async () 和 starmap_async ()。无论同步异步,都用于向通过进程池可迭代的所有项调用函数的任务发出任务
- 同步与异步的差别在于:同步会阻塞后续代码的执行;异步不阻塞,并通过返回 AsyncResult 对象收集多进程的处理结果
1
2
3
4
5
6
7
8
9from multiprocessing import Pool
def test(i):
print(i)
if __name__ == "__main__":
lists = range(100)
pool = Pool(8)
pool.map(test, lists)
pool.close()
pool.join()
multiprocessing 的 Pool 类的 apply 方法?
- 单次同步执行、 阻塞的:等待当前子进程执行完毕后,在执行下一个进程。有三个进程 0,1,2。等待 子进程 0 执行完毕后,再执行子进程 1,然后子进程 2,最后回到主进程执行主进程剩余部分
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22import multiprocessing
import time
def func(msg):
print("msg:", msg)
time.sleep(2)
print("end")
if __name__ == "__main__":
pool = multiprocessing.Pool()
for i in range(2):
msg = "hello %d" %(i)
pool.apply(func, (msg, ))
print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
pool.close()
pool.join()
print("Sub-process(es) done.")
# 输出:
msg: hello 0
end
msg: hello 1
end
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
Sub-process(es) done.
multiprocessing 中 Pool 类的 join () 的方法?
- 用来等待进程池中的 worker 进程执行完毕,防止主进程在 worker 进程结束前结束。但 pool.Join () 必须使用在 pool.Close () 或者 pool.Terminate () 之后
- 其中 close () 跟 terminate () 的区别在于 close () 会等待池中的 worker 进程执行结束再关闭 pool, 而 terminate () 则是直接关闭
multiprocessing 的 Pool 类的 apply_async 方法?
- 单次异步执行, 异步非阻塞的:不用等待当前进程执行完毕,随时根据系统调度来进行进程切换
- apply 和 apply_async 一次执行一个任务,但 apply_async 可以异步执行,因而也可以实现并发
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24import multiprocessing
import time
def func(msg):
print("msg:", msg)
time.sleep(1)
print("end")
if __name__ == "__main__":
pool = multiprocessing.Pool(processes = 2)
for i in range(2):
msg = "hello %d" %(i)
#维持执行进程总数为processes,当一个进程执行完毕后会添加新进程进去
pool.apply_async(func, (msg, ))
print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
pool.close()
# 执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
pool.join()
print("Sub-process(es) done.")
# 输出
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
msg: hello 0
msg: hello 1
end
end
Sub-process(es) done.
multiprocessing 的 Pool 类的 map 方法?
- 将迭代项分配给进程池中的每个 worker。结果按照分配 Pool 对象时的顺序进行收集,以保留原始顺序
- map 和 map_async 与 apply 和 apply_async 的区别是可以并发执行任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20import multiprocessing
import time
def func(msg):
print("msg:", msg)
time.sleep(2)
print("end")
if __name__ == "__main__":
pool = multiprocessing.Pool(2)
pool.map(func, range(2))
print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
pool.close()
pool.join()
print("Sub-process(es) done.")
# 输出(注意Mark~位置):
msg: 0
msg: 1
end
end
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
Sub-process(es) done.
multiprocessing 的 Pool 类的 map_async 方法?
- map_sync (列表、有序) 异步非阻塞的:不用等待当前进程执行完毕,随时根据系统调度来进行进程切换
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20import multiprocessing
import time
def func(msg):
print("msg:", msg)
time.sleep(2)
print("end")
if __name__ == "__main__":
pool = multiprocessing.Pool(2)
pool.map_async(func, range(2))
print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
pool.close()
pool.join()
print("Sub-process(es) done.")
# 输出:
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
msg: 0
msg: 1
end
end
Sub-process(es) done.
multiprocessing 的 Pool 类的 imap 方法?
- imap (迭代、有序) 方法比 map () “懒惰”。默认情况下,它会把每个单独的迭代项发送给下一个可用的 worker。这可能会引入额外的通信开销,因此建议使用大于 1 的块
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24import multiprocessing
import time
def func(msg):
print("msg: ", msg)
time.sleep(4-msg)
return msg
if __name__ == "__main__":
pool = multiprocessing.Pool(3)
results = pool.imap(func, range(3))
for res in results:
print("res: ",res)
print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
pool.close()
pool.join()
print("Sub-process(es) done.")
# 输出
msg: 0
msg: 1
msg: 2
res: 0
res: 1
res: 2
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
Sub-process(es) done.
multiprocessing 的 Pool 类的 imap_unordered 方法?
- imap_unordered (迭代、无序) 方法与 imap () 方法类似,但它不会保留结果的顺序。允许乱序处理映射意味着一旦进程完成处理便收集结果,否则必须按序收集结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24import multiprocessing
import time
def func(msg):
print("msg: ", msg)
time.sleep(4-msg)
return msg
if __name__ == "__main__":
pool = multiprocessing.Pool(3)
results = pool.imap_unordered(func, range(3))
for res in results:
print("res: ", res)
print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
pool.close()
pool.join()
print("Sub-process(es) done.")
# 输出
msg: 0
msg: 1
msg: 2
res: 2
res: 1
res: 0
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
Sub-process(es) done.
multiprocessing 的 Pool 类的 starmap 方法?
- 与 map 和 map_async 的区别是,这两个函数可以传入多个参数
- starmap 方法每个迭代项必须是 元组,并且为了让元组中的每个值成为位置参数,使用了 * 修饰符将其传递给函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21import multiprocessing
import time
def func(msg1, msg2):
print("msg1:", msg1, "msg2:", msg2)
time.sleep(2)
print("end")
if __name__ == "__main__":
pool = multiprocessing.Pool(2)
msgs = [(1,1),(2,2)]
pool.starmap(func, msgs)
print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
pool.close()
pool.join()
print("Sub-process(es) done.")
# 输出
msg1: 1 msg2: 1
msg1: 2 msg2: 2
end
end
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
Sub-process(es) done.
multiprocessing 的 Pool 类的 starmap_async 方法?
- starmap 和 starmap_async 与 map 和 map_async 的区别是,starmap 和 starmap_async 可以传入多个参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21import multiprocessing
import time
def func(msg1, msg2):
print("msg1:", msg1, "msg2:", msg2)
time.sleep(2)
print("end")
if __name__ == "__main__":
pool = multiprocessing.Pool(2)
msgs = [(1, 1), (2, 2)]
pool.starmap_async(func, msgs)
print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
pool.close()
pool.join()
print("Sub-process(es) done.")
# 输出:
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
msg1: 1 msg2: 1
msg1: 2 msg2: 2
end
end
Sub-process(es) done.
multiprocessing 的 Pool 类中 imap 和 imap_unordered 、map_async 区别?
- map_async 生成子进程时使用的是 list,而 imap 和 imap_unordered 则是 Iterable, map_async 效率略高,而 imap 和 imap_unordered 内存消耗显著的小
- 在处理结果上,imap 和 imap_unordered 可以尽快返回一个 Iterable 的结果,而 map_async 则需要等待全部 Task 执行完毕,返回 list
- imap 和 imap_unordered 的区别是:imap 和 map_async 一样,都按顺序等待 Task 的 执行结果,而 imap_unordered 则不必。 imap_unordered 返回的 Iterable,会优先迭代到 先执行完成的 Task
multiprocessing 的 Queue 类?
- Queue 是用来创建进程间资源共享的队列的类,使用 Queue 可以达到多进程间数据传递的功能 (缺点:只适用 Process 类,不能在 Pool 进程池中使用)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23import multiprocessing
def square_list(mylist, q):
for num in mylist:
q.put(num * num)
def print_queue(q):
print("Queue elements:")
while not q.empty():
print(q.get())
print("Queue is now empty!")
if __name__ == "__main__":
mylist = [1,2,3,4]
q = multiprocessing.Queue()
p1 = multiprocessing.Process(target=square_list, args=(mylist, q))
p2 = multiprocessing.Process(target=print_queue, args=(q,))
p1.start()
p1.join()
p2.start()
p2.join()
# 1
# 4
# 9
# 16
# Queue is now empty!
multiprocessing 的 JoinableQueue 类?
- JoinableQueue 就像是一个 Queue 对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21from multiprocessing import Process, JoinableQueue
import time, random
def consumer(q):
while True:
res = q.get()
print('消费者拿到了 %s' % res)
q.task_done()
def producer(seq, q):
for item in seq:
time.sleep(random.randrange(1,2))
q.put(item)
print('生产者做好了 %s' % item)
q.join()
if __name__ == "__main__":
q = JoinableQueue()
seq = ('产品%s' % i for i in range(5))
p = Process(target=consumer, args=(q,))
p.daemon = True # 设置为守护进程,在主线程停止时p也停止,但是不用担心,producer内调用q.join保证了consumer已经处理完队列中的所有元素
p.start()
producer(seq, q)
print('主线程')
multiprocessing 的 Value、Array 类?
- multiprocessing 中 Value 和 Array 的实现原理都是在共享内存中创建 ctypes () 对象来达到共享数据的 目的,两者实现方法大同小异,只是选用不同的 ctypes 数据类型而已(注意:Value 和 Array 只适用于 Process 类)
1
2
3
4
5
6
7
8
9
10
11
12import multiprocessing
def f(n, a):
n.value = 3.14
a[0] = 5
if __name__ == '__main__':
num = multiprocessing.Value('d', 0.0)
arr = multiprocessing.Array('i', range(10))
p = multiprocessing.Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])
multiprocessing 的 Pipe 类?
- 多进程还有一种数据传递方式叫管道原理和 Queue 相同。Pipe 可以在进程之间创建一条管道,并 返回元组( conn1,conn2), 其中 conn1,conn2 表示管道两端的连接对象,强调一点:必须在产生 Process 对象之前产生管道
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26import multiprocessing
def sender(conn, msgs):
for msg in msgs:
conn.send(msg)
print("Sent the message: {}".format(msg))
conn.close()
def receiver(conn):
while 1:
msg = conn.recv()
if msg == "END":
break
print("Received the message: {}".format(msg))
if __name__ == "__main__":
msgs = ["hello", "hey","END"]
parent_conn, child_conn = multiprocessing.Pipe()
p1 = multiprocessing.Process(target=sender, args=(parent_conn,msgs))
p2 = multiprocessing.Process(target=receiver, args=(child_conn,))
p1.start()
p2.start()
p1.join()
p2.join()
# Sent the message: hello
# Sent the message: hey
# Sent the message: END
# Received the message: hello
# Received the message: hey
multiprocessing 的 Manager 类?
- Manager () 返回的 manager 对象控制了一个 server 进程,此进程包含的 python 对象可以被其他的进 程通过 proxies 来访问。从而达到多进程间数据通信且安全。 Manager 模块常与 Pool 模块一起使 用
- 管理器是独立运行的子进程,其中存在真实的对象,并以服务器的形式运行,其他进程通过使用 代理访问共享对象,这些代理作为客户端运行。 Manager () 是 BaseManager 的子类,返回一个启 动的 SyncManager () 实例,可用于创建共享对象并返回访问这些共享对象的代理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22import multiprocessing
def f(x, arr, l, d, n):
x.value = 3.14
arr[0] = 5
l.append('Hello')
d[1] = 2
n.a = 10
if __name__ == '__main__':
server = multiprocessing.Manager()
x = server.Value('d', 0.0)
arr = server.Array('i', range(10))
l = server.list()
d = server.dict()
n = server.Namespace()
proc = multiprocessing.Process(target=f, args=(x, arr, l, d, n))
proc.start()
proc.join()
print(x.value)
print(arr)
print(l)
print(d)
print(n)
multiprocessing 的 Condition 类?
- 可以把 Condition 理解为一把高级的锁,它提供了比 Lock, RLock 更高级的功能,允许我们能够控制复杂的线程同步问题。 Condition 在内部维护一个锁对象(默认是 RLock),可以在创建 Condigtion 对象的时候把琐对象作为参数传入
- 以下例子是生成一个 Condition 对象,Condition 的 wait 函数在没有收到 notify_all 函数调用前,线程循环在 wait 上等待
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32def stage_1(cond):
name = multiprocessing.current_process().name
print('Starting', name)
with cond:
print('{} done and ready for stage 2'.format(name))
cond.notify_all()
def stage_2(cond):
name = multiprocessing.current_process().name
print('Starting', name)
with cond:
cond.wait()
print('{} running'.format(name))
if __name__ == '__main__':
condition = multiprocessing.Condition()
s1 = multiprocessing.Process(name='s1',target=stage_1,args=(condition,))
s2_clients = [
multiprocessing.Process(name='stage_2[{}]'.format(i),target=stage_2,args=(condition,),)
for i in range(1, 3)
]
for c in s2_clients:
c.start()
time.sleep(1)
s1.start()
s1.join()
for c in s2_clients:
c.join()
# Starting stage_2[1]
# Starting stage_2[2]
# Starting s1
# s1 done and ready for stage 2
# stage_2[1] running
# stage_2[2] running
multiprocessing 的 Event 类?
- Event 内部包含了一个标志位,初始的时候为 false。可以使用 set () 来将其设置为 true;或者使用 clear () 将其从新设置为 false;可以使用 is_set () 来检查标志位的状态;另一个最重要的函数就是 wait (timeout=None),用来阻塞当前线程,直到 event 的内部标志位被设置为 true 或者 timeout 超 时。如果内部标志位为 true 则 wait () 函数理解返回
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32def wait_for_event(e):
print('wait_for_event: starting')
e.wait()
print('wait_for_event: e.is_set()->', e.is_set())
def wait_for_event_timeout(e, t):
print('wait_for_event_timeout: starting')
e.wait(t)
print('wait_for_event_timeout: e.is_set()->', e.is_set())
if __name__ == '__main__':
e = multiprocessing.Event()
w1 = multiprocessing.Process(
name='block',
target=wait_for_event,
args=(e,),
)
w1.start()
w2 = multiprocessing.Process(
name='nonblock',
target=wait_for_event_timeout,
args=(e, 2),
)
w2.start()
print('main: waiting before calling Event.set()')
time.sleep(3)
e.set()
print('main: event is set')
#main: waiting before calling Event.set()
#wait_for_event: starting
#wait_for_event_timeout: starting
#wait_for_event_timeout: e.is_set()-> False
#main: event is set
#wait_for_event: e.is_set()-> True
multiprocessing 的 Lock 类?
- Lock 锁的作用是当多个进程需要访问共享资源的时候,避免访问的冲突。加锁保证了多个进程修 改同一块数据时,同一时间只能有一个修改,即串行的修改,牺牲了速度但保证了数据安全
1
2
3
4
5
6
7
8
9from multiprocessing import Process, Lock
def l(lock, num):
lock.acquire()
print("Hello Num: %s" % (num))
lock.release()
if __name__ == '__main__':
lock = Lock() # 这个一定要定义为全局
for num in range(20):
Process(target=l, args=(lock, num)).start()
multiprocessing 的 RLock 类?
- RLock(可重入锁)是一个可以被同一个线程请求多次的同步指令。RLock 使用了 “拥有的线程” 和 “递归等级” 的概念,处于锁定状态时,RLock 被某个线程拥有。拥有 RLock 的线程可以再次调用 acquire (),释放锁时需要调用 release () 相同次数。可以认为 RLock 包含一个锁定池和一个初始值为 0 的计数器,每次成功调用 acquire ()/release (),计数器将 + 1/-1,为 0 时锁处于未锁定状态
multiprocessing 的 Semaphore 类?
- 信号量是一个更高级的锁机制。信号量内部有一个计数器而不像锁对象内部有锁标识,而且只有 当占用信号量的线程数超过信号量时线程才阻塞。这允许了多个线程可以同时访问相同的代码区
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27from multiprocessing import Process, Semaphore
import time, random
def go_wc(sem, user):
sem.acquire()
print('%s 占到一个茅坑' % user)
time.sleep(random.randint(0, 3))
sem.release()
print(user, 'OK')
if __name__ == '__main__':
sem = Semaphore(2)
p_l = []
for i in range(5):
p = Process(target=go_wc, args=(sem, 'user%s' % i,))
p.start()
p_l.append(p)
for i in p_l:
i.join()
# user0 占到一个茅坑
# user1 占到一个茅坑
# user0 OK
# user2 占到一个茅坑
# user1 OK
# user3 占到一个茅坑
# user2 OK
# user4 占到一个茅坑
# user4 OK
# user3 OK
什么是 concurrent.futures?
- 异步执行代码,线程池使用 ThreadPoolExecutor,进程使用 ProcessPoolExecutor。两者都实现了相同的接口,该接口由抽象的 Executor 类定义。实现了对 threading 和 multiprocessing 更高级的抽象
1
2
3
4
5
6
7from concurrent.futures import ProcessPoolExecutor
pool_size = 4
pattern = "*.gz"
combined = Counter()
with ProcessPoolExecutor(max_workers=pool_size) as workers:
for result in workers.map(analysis, glob.glob(pattern)):
combined.update(result)
concurrent.futures 的 Executor 类?
- 一个抽象类,它提供异步执行调用的方法。它不应该直接使用,而应该通过其具体的子类使用
- ThreadPoolExecutor 类是 Executor 子类,使用线程池执行异步调用
- ProcessPoolExecutor 类是 Executor 子类,使用进程池执行异步调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14import concurrent.futures as cf
#without multiprocessing
for i in range(1,21):
flip_image(i)
#using I/O bottleneck
with cf.ThreadPoolExecutor(max_workers=2) as p:
processing = p.map(flip_image,[x for x in range(1,21)])
for process in processing:
print(process)
#using CPU bottleneck
with cf.ProcessPoolExecutor(max_workers=2) as p:
processing = p.map(flip_image,[x for x in range(1,21)])
for process in processing:
print(process)
concurrent.futures 的 Future 类?
- Future 类封装了可调用对象的异步执行。Future 的实例由 Executor.submit () 创建
1
2
3
4
5
6
7
8from concurrent import futures
def test(num):
import time
return time.ctime(), num
data = [1, 2, 3]
with futures.ThreadPoolExecutor(max_workers=1) as executor:
for future in executor.map(test, data):
print(future)
multiprocessing.dummy 与 multiprocessing 的区别?
- multiprocessing.dummy 模块与 multiprocessing 模块的区别: dummy 模块是多线程,而 multiprocessing 是多进程, api 都是通用的。所有可以很方便将代码在多线程和多进程之间切换。multiprocessing.dummy 通常在 IO 场景可以尝试使用,比如使用如下方式引入线程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26import time
from multiprocessing.dummy import Pool as tp
def run(num):
print('num is {}'.format(num))
if num == 0:
time.sleep(5)
print('{} is end'.format(num))
if __name__ == '__main__':
print('start')
pool = tp(5)
num_list = [0, 1, 2]
pool.map_async(run, num_list)
print('非阻塞~~~~')
pool.close()
pool.join()
print('end')
结果:
start
非阻塞~~~~
num is 0
num is 1
num is 2
2 is end
1 is end
0 is end
end