Python的多线程
本文记录自己学习 Python 多线程的过程和思考
Python批量处理数据的不同方式?
- 传统方法: 使用 for 循环顺序遍历
1
2
3data_list = []
for url in url_list:
data_list.append(get_data_from_url(url)) - 多线程: 开启多个线程处理
1
2
3from multiprocessing.pool import ThreadPool
tpool = ThreadPool(20) # 创建一个线程池,20个线程数
data_list = tpool.map(get_data_from_url, url_list) # 将任务交给线程池,与python的map方法类似 - 多进程: 开启多个进程处理
1
2
3from multiprocessing import Pool
pool = Pool(4)
data_list = pool.map(get_data_from_url, url_list) # 与线程池的map方法工作原理一致
什么是Python的全局解释器锁(GIL)?
- GIL(Global Interpreter Lock,全局解释器锁),一个Python线程想要执行一段代码,必须先拿到GIL锁后才被允许执行,也就是说,即使我们使用了多线程,但同一时刻却只有一个线程在执行
- GIL 是存在于CPython解释器中的,属于解释器层级,而并非属于 Python 的语言特性。 也就是说,如果你自己有能力实现一个 Python 解释器,完全可以不使用 GIL
- 在单核CPU上,数百次的间隔检查才会导致一次线程切换,GIL不会影响效率。但是在多核CPU上,存在严重的线程颠簸(thrashing),导致线程进行锁竞争、切换线程,会消耗资源,反而效率更低
Python的全局解释器锁(GIL)的原理?
- Python的线程就是C语言的pthread,它是通过操作系统调度算法调度执行的
- Python2.x的代码执行是基于opcode数量的调度方式,简单来说就是每执行一定数量的字节码,或遇到系统IO时,会强制释放GIL,然后触发一次操作系统的线程调度
- Python3.x进行了优化,基于固定时间的调度方式,就是每执行固定时间的字节码,或遇到系统IO时,强制释放GIL,触发系统的线程调度
Python的全局解释器锁(GIL)的执行步骤?
- 设置GIL
- 切换到一个线程去运行
- 运行直至指定数量的字节码指令,或者线程主动让出控制(可以调用sleep(0))
- 把线程设置为睡眠状态
- 解锁GIL
- 再次重复以上所有步骤
什么是线程颠簸(thrashing)?
- 一般来说多核的多线程比单核多线程效率更低,因为当单核多线程每次释放GIL的时候,下一个线程能直接获取到GIL,能够无缝执行,当多核环境中某个CPU释放GIL后,本该在其它CPU的线程也都会竞争获得此CPU的GIL,但很大可能GIL又被此CPU下的某个线程拿到,导致其它几个CPU上被唤醒的线程醒着等待到切换时间后又进入调度状态,这样会造成线程颠簸(Thrashing),导致效率更低
如何解决Python的全局解释器锁(GIL)的效率低下问题?
- IO密集型任务场景,可以使用多线程可以提高运行效率
- CPU密集型任务场景,不使用多线程,推荐使用多进程方式部署运行
- 更换没有GIL的Python解释器,但需要提前评估运行结果是否与CPython一致
- 编写Python的C扩展模块,把CPU密集型任务交给C模块处理,但缺点是编码较为复杂
全局解释器锁(GIL)会导致多线程在Python上无效吗?
- 如果使用多线程运行一个CPU密集型任务,那么Python多线程是无法提高运行效率的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25# 单线程执行2次CPU密集型任务
import time
import threading
def loop():
count = 0
while count <= 1000000000:
count += 1
# 单线程执行 2 次 CPU 密集型任务
start = time.time()
loop()
loop()
end = time.time()
print("execution time: %s" % (end - start))
# execution time: 89.63111019134521
# 2个线程同时执行CPU密集型任务
start = time.time()
t1 = threading.Thread(target=loop)
t2 = threading.Thread(target=loop)
t1.start()
t2.start()
t1.join()
t2.join()
end = time.time()
print("execution time: %s" % (end - start))
# execution time: 92.29994678497314 - 注意:在IO密集型的任务,大部分时间都花在等待IO上,并没有一直占用CPU的资源,所以并不会像上面的程序那样,进行无效的线程切换
什么是Python的thread包?
- threading包主要运用多线程的开发,但由于全局解释器锁(GIL)的存在,Python中的多线程其实并不是真正的多线程,如果想要充分地使用多核 CPU的资源,大部分情况需要使用多进程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24import threading
import os
import time
from datetime import datetime as dt
def print_cube(num):
print("Cube: {}".format(num * num * num),dt.now().strftime("%H:%M:%S"),os.getpid())
time.sleep(1)
def print_square(num):
print("Square: {}".format(num * num),dt.now().strftime("%H:%M:%S"),os.getpid())
time.sleep(1)
# 创建线程
t1 = threading.Thread(target=print_square, args=(10,))
t2 = threading.Thread(target=print_cube, args=(10,))
# 启动线程
t1.start()
t2.start()
# 等待线程完成
t1.join()
t2.join()
# 两个线程均完成
print("Done!",dt.now().strftime("%H:%M:%S"),os.getpid())
# Square: 100 11:58:51 28352
# Cube: 1000 11:58:51 28352
# Done! 11:58:52 28352
Python如何解决thread包的线程同步问题?
- 线程同步被定义为一种机制,它确保两个或多个并发线程不会同时执行某些称为关键段的特定程序段
- Python使用锁threading.Lock()来确保线程同步
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25import threading
x = 0
def increment():
global x
x += 1
def thread_task(lock):
for _ in range(100000):
lock.acquire()
increment()
lock.release()
def main_task():
global x
x = 0
lock = threading.Lock() # 声明一个锁
t1 = threading.Thread(target=thread_task, args=(lock,))
t2 = threading.Thread(target=thread_task, args=(lock,))
t1.start()
t2.start()
t1.join()
t2.join()
for i in range(2):
main_task()
print("Iteration {0}: x = {1}".format(i,x))
# Iteration 0: x = 200000
# Iteration 1: x = 200000
Python如何使用Theading自定义线程类?
- 使用Threading模块创建线程,直接从threading.Thread继承,然后重写__init__方法和run方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27import threading
import time
exitFlag = 0
class myThread (threading.Thread): #继承父类threading.Thread
def __init__(self, threadID, name, counter):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.counter = counter
def run(self):#把要执行代码写到run函数里,线程在创建后会直接运行run函数
print "Starting " + self.name
print_time(self.name, self.counter, 5)
print "Exiting " + self.name
def print_time(threadName, delay, counter):
while counter:
if exitFlag:
(threading.Thread).exit()
time.sleep(delay)
print "%s: %s" % (threadName, time.ctime(time.time()))
counter -= 1
# 创建新线程
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)
# 开启线程
thread1.start()
thread2.start()
print "Exiting Main Thread"
什么是Python的multiprocessing?
- 在 Python 2.6版本的时候引入了 multiprocessing包,它完整的复制了一套 threading所提供的接口方便迁移。 唯一的不同就是它使用了多进程而不是多线程。每个进程有自己的独立的 GIL,因此也不会出现进程之间的 GIL争抢
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21import multiprocessing
import time
from datetime import datetime as dt
def print_cube(num):
print("Cube: {}".format(num * num * num),dt.now().strftime("%H:%M:%S"))
time.sleep(1)
def print_square(num):
print("Square: {}".format(num * num),dt.now().strftime("%H:%M:%S"))
time.sleep(1)
if __name__ == "__main__":
# creating processes
p1 = multiprocessing.Process(target=print_square, args=(10, ))
p2 = multiprocessing.Process(target=print_cube, args=(10, ))
p1.start()
p2.start()
p1.join()
p2.join()
print("Done!",dt.now().strftime("%H:%M:%S"))
# Square: 100 11:57:55
# Cube: 1000 11:57:55
# Done! 11:57:56
multiprocessing的常用组件及功能?
- 创建管理进程模块
- Process(用于创建进程)
- Pool(用于创建管理进程池)
- Queue(用于进程通信,资源共享)
- Value,Array(用于进程通信,资源共享)
- Pipe(用于管道通信)
- Manager(用于资源共享)
- 同步子进程模块
- Condition(条件变量)
- Event(事件)
- Lock(互斥锁)
- RLock(可重入的互斥锁(同一个进程可以多次获得它,同时不会造成阻塞)
- Semaphore(信号量)
如何验证multiprocessing下不同进程有自己的独立空间?
- 以Python的多进程为例,以下代码无法在进程外获得变量的数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18import multiprocessing
result = []
def square_list(mylist):
"""
function to square a given list
"""
global result
for num in mylist:
result.append(num * num)
print("Result(in process p1): {}".format(result))
if __name__ == "__main__":
mylist = [1,2,3,4]
p1 = multiprocessing.Process(target=square_list, args=(mylist,))
p1.start()
p1.join()
print("Result(in main program): {}".format(result))
# Result(in process p1): [1, 4, 9, 16]
# Result(in main program): []
multiprocessing的Process类?
- 一个 Process类来代表一个进程对象
- 注意:在windows中Process()必须放到if name―‘main’:下
1
2
3
4
5
6
7
8
9
10
11from multiprocessing import Process
import os
def run_proc(name):
print('Run child process %s (%s)...' % (name, os.getpid()))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')
multiprocessing的Pool类?
- Pool(进程池) 提供指定数量的进程供用户调用,当有新的请求提交到 Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程
- 多进程的Pool对象有4个同步方法,分别是 map() 、 imap() 、 imap_unordered() 和 starmap() 函数,3异步方法map_async() 、starmap_async() 和 starmap_async()。无论同步异步,都用于向通过进程池可迭代的所有项调用函数的任务发出任务
- 同步与异步的差别在于:同步会阻塞后续代码的执行;异步不阻塞,并通过返回AsyncResult对象收集多进程的处理结果
1
2
3
4
5
6
7
8
9from multiprocessing import Pool
def test(i):
print(i)
if __name__ == "__main__":
lists = range(100)
pool = Pool(8)
pool.map(test, lists)
pool.close()
pool.join()
multiprocessing的Pool类的apply方法?
- 单次同步执行、 阻塞的:等待当前子进程执行完毕后,在执行下一个进程。有三个进程 0,1,2。等待 子进程 0 执行完毕后,再执行子进程 1,然后子进程 2,最后回到主进程执行主进程剩余部分
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22import multiprocessing
import time
def func(msg):
print("msg:", msg)
time.sleep(2)
print("end")
if __name__ == "__main__":
pool = multiprocessing.Pool()
for i in range(2):
msg = "hello %d" %(i)
pool.apply(func, (msg, ))
print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
pool.close()
pool.join()
print("Sub-process(es) done.")
# 输出:
msg: hello 0
end
msg: hello 1
end
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
Sub-process(es) done.
multiprocessing中Pool类的join()的方法?
- 用来等待进程池中的 worker 进程执行完毕,防止主进程在 worker 进程结束前结束。但 pool.Join ()必须使用在 pool.Close ()或者 pool.Terminate ()之后
- 其中close()跟terminate()的区别在于close()会等待池中的worker进程执行结束再关闭 pool,而terminate()则是直接关闭
multiprocessing的Pool类的apply_async方法?
- 单次异步执行, 异步非阻塞的:不用等待当前进程执行完毕,随时根据系统调度来进行进程切换
- apply和apply_async 一次执行一个任务,但 apply_async 可以异步执行,因而也可以实现并发
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24import multiprocessing
import time
def func(msg):
print("msg:", msg)
time.sleep(1)
print("end")
if __name__ == "__main__":
pool = multiprocessing.Pool(processes = 2)
for i in range(2):
msg = "hello %d" %(i)
#维持执行进程总数为processes,当一个进程执行完毕后会添加新进程进去
pool.apply_async(func, (msg, ))
print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
pool.close()
# 执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
pool.join()
print("Sub-process(es) done.")
# 输出
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
msg: hello 0
msg: hello 1
end
end
Sub-process(es) done.
multiprocessing的Pool类的map方法?
- 将迭代项分配给进程池中的每个worker。结果按照分配Pool对象时的顺序进行收集,以保留原始顺序
- map和map_async与apply和apply_async的区别是可以并发执行任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20import multiprocessing
import time
def func(msg):
print("msg:", msg)
time.sleep(2)
print("end")
if __name__ == "__main__":
pool = multiprocessing.Pool(2)
pool.map(func, range(2))
print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
pool.close()
pool.join()
print("Sub-process(es) done.")
# 输出(注意Mark~位置):
msg: 0
msg: 1
end
end
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
Sub-process(es) done.
multiprocessing的Pool类的map_async方法?
- map_sync(列表、有序) 异步非阻塞的:不用等待当前进程执行完毕,随时根据系统调度来进行进程切换
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20import multiprocessing
import time
def func(msg):
print("msg:", msg)
time.sleep(2)
print("end")
if __name__ == "__main__":
pool = multiprocessing.Pool(2)
pool.map_async(func, range(2))
print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
pool.close()
pool.join()
print("Sub-process(es) done.")
# 输出:
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
msg: 0
msg: 1
end
end
Sub-process(es) done.
multiprocessing的Pool类的imap方法?
- imap(迭代、有序) 方法比 map() “懒惰”。默认情况下,它会把每个单独的迭代项发送给下一个可用的 worker。这可能会引入额外的通信开销,因此建议使用大于1的块
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24import multiprocessing
import time
def func(msg):
print("msg: ", msg)
time.sleep(4-msg)
return msg
if __name__ == "__main__":
pool = multiprocessing.Pool(3)
results = pool.imap(func, range(3))
for res in results:
print("res: ",res)
print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
pool.close()
pool.join()
print("Sub-process(es) done.")
# 输出
msg: 0
msg: 1
msg: 2
res: 0
res: 1
res: 2
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
Sub-process(es) done.
multiprocessing的Pool类的imap_unordered方法?
- imap_unordered( 迭代、无序) 方法与imap()方法类似,但它不会保留结果的顺序。允许乱序处理映射意味着一旦进程完成处理便收集结果,否则必须按序收集结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24import multiprocessing
import time
def func(msg):
print("msg: ", msg)
time.sleep(4-msg)
return msg
if __name__ == "__main__":
pool = multiprocessing.Pool(3)
results = pool.imap_unordered(func, range(3))
for res in results:
print("res: ", res)
print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
pool.close()
pool.join()
print("Sub-process(es) done.")
# 输出
msg: 0
msg: 1
msg: 2
res: 2
res: 1
res: 0
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
Sub-process(es) done.
multiprocessing的Pool类的starmap方法?
- 与map和map_async的区别是,这两个函数可以传入多个参数
- starmap方法每个迭代项必须是 元组,并且为了让元组中的每个值成为位置参数,使用了 * 修饰符将其传递给函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21import multiprocessing
import time
def func(msg1, msg2):
print("msg1:", msg1, "msg2:", msg2)
time.sleep(2)
print("end")
if __name__ == "__main__":
pool = multiprocessing.Pool(2)
msgs = [(1,1),(2,2)]
pool.starmap(func, msgs)
print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
pool.close()
pool.join()
print("Sub-process(es) done.")
# 输出
msg1: 1 msg2: 1
msg1: 2 msg2: 2
end
end
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
Sub-process(es) done.
multiprocessing的Pool类的starmap_async方法?
- starmap和starmap_async与map和map_async的区别是,starmap和starmap_async可以传入多个参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21import multiprocessing
import time
def func(msg1, msg2):
print("msg1:", msg1, "msg2:", msg2)
time.sleep(2)
print("end")
if __name__ == "__main__":
pool = multiprocessing.Pool(2)
msgs = [(1, 1), (2, 2)]
pool.starmap_async(func, msgs)
print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
pool.close()
pool.join()
print("Sub-process(es) done.")
# 输出:
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
msg1: 1 msg2: 1
msg1: 2 msg2: 2
end
end
Sub-process(es) done.
multiprocessing的Pool类中imap和imap_unordered 、map_async区别?
- map_async生成子进程时使用的是list,而imap和 imap_unordered则是Iterable, map_async效率略高,而imap和 imap_unordered内存消耗显著的小
- 在处理结果上,imap 和 imap_unordered 可以尽快返回一个Iterable的结果,而 map_async则需要等待全部Task执行完毕,返回list
- imap 和 imap_unordered 的区别是:imap 和 map_async一样,都按顺序等待Task的 执行结果,而imap_unordered则不必。 imap_unordered返回的Iterable,会优先迭代到 先执行完成的Task
multiprocessing的Queue类?
- Queue是用来创建进程间资源共享的队列的类,使用 Queue可以达到多进程间数据传递的功能 (缺点:只适用 Process类,不能在 Pool进程池中使用)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23import multiprocessing
def square_list(mylist, q):
for num in mylist:
q.put(num * num)
def print_queue(q):
print("Queue elements:")
while not q.empty():
print(q.get())
print("Queue is now empty!")
if __name__ == "__main__":
mylist = [1,2,3,4]
q = multiprocessing.Queue()
p1 = multiprocessing.Process(target=square_list, args=(mylist, q))
p2 = multiprocessing.Process(target=print_queue, args=(q,))
p1.start()
p1.join()
p2.start()
p2.join()
# 1
# 4
# 9
# 16
# Queue is now empty!
multiprocessing的JoinableQueue类?
- JoinableQueue就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21from multiprocessing import Process, JoinableQueue
import time, random
def consumer(q):
while True:
res = q.get()
print('消费者拿到了 %s' % res)
q.task_done()
def producer(seq, q):
for item in seq:
time.sleep(random.randrange(1,2))
q.put(item)
print('生产者做好了 %s' % item)
q.join()
if __name__ == "__main__":
q = JoinableQueue()
seq = ('产品%s' % i for i in range(5))
p = Process(target=consumer, args=(q,))
p.daemon = True # 设置为守护进程,在主线程停止时p也停止,但是不用担心,producer内调用q.join保证了consumer已经处理完队列中的所有元素
p.start()
producer(seq, q)
print('主线程')
multiprocessing的Value、Array类?
- multiprocessing 中Value和Array的实现原理都是在共享内存中创建 ctypes()对象来达到共享数据的 目的,两者实现方法大同小异,只是选用不同的 ctypes数据类型而已(注意:Value和Array只适用于Process类)
1
2
3
4
5
6
7
8
9
10
11
12import multiprocessing
def f(n, a):
n.value = 3.14
a[0] = 5
if __name__ == '__main__':
num = multiprocessing.Value('d', 0.0)
arr = multiprocessing.Array('i', range(10))
p = multiprocessing.Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])
multiprocessing的Pipe类?
- 多进程还有一种数据传递方式叫管道原理和 Queue相同。Pipe可以在进程之间创建一条管道,并 返回元组( conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生 Process对象之前产生管道
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26import multiprocessing
def sender(conn, msgs):
for msg in msgs:
conn.send(msg)
print("Sent the message: {}".format(msg))
conn.close()
def receiver(conn):
while 1:
msg = conn.recv()
if msg == "END":
break
print("Received the message: {}".format(msg))
if __name__ == "__main__":
msgs = ["hello", "hey","END"]
parent_conn, child_conn = multiprocessing.Pipe()
p1 = multiprocessing.Process(target=sender, args=(parent_conn,msgs))
p2 = multiprocessing.Process(target=receiver, args=(child_conn,))
p1.start()
p2.start()
p1.join()
p2.join()
# Sent the message: hello
# Sent the message: hey
# Sent the message: END
# Received the message: hello
# Received the message: hey
multiprocessing的Manager类?
- Manager()返回的manager对象控制了一个 server进程,此进程包含的 python对象可以被其他的进 程通过proxies来访问。从而达到多进程间数据通信且安全。 Manager模块常与Pool模块一起使 用
- 管理器是独立运行的子进程,其中存在真实的对象,并以服务器的形式运行,其他进程通过使用 代理访问共享对象,这些代理作为客户端运行。 Manager()是BaseManager的子类,返回一个启 动的SyncManager()实例,可用于创建共享对象并返回访问这些共享对象的代理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22import multiprocessing
def f(x, arr, l, d, n):
x.value = 3.14
arr[0] = 5
l.append('Hello')
d[1] = 2
n.a = 10
if __name__ == '__main__':
server = multiprocessing.Manager()
x = server.Value('d', 0.0)
arr = server.Array('i', range(10))
l = server.list()
d = server.dict()
n = server.Namespace()
proc = multiprocessing.Process(target=f, args=(x, arr, l, d, n))
proc.start()
proc.join()
print(x.value)
print(arr)
print(l)
print(d)
print(n)
multiprocessing的Condition类?
- 可以把Condition理解为一把高级的锁,它提供了比 Lock, RLock更高级的功能,允许我们能够控制复杂的线程同步问题。 Condition在内部维护一个锁对象(默认是 RLock),可以在创建 Condigtion对象的时候把琐对象作为参数传入
- 以下例子是生成一个Condition对象,Condition的wait函数在没有收到notify_all函数调用前,线程循环在wait上等待
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32def stage_1(cond):
name = multiprocessing.current_process().name
print('Starting', name)
with cond:
print('{} done and ready for stage 2'.format(name))
cond.notify_all()
def stage_2(cond):
name = multiprocessing.current_process().name
print('Starting', name)
with cond:
cond.wait()
print('{} running'.format(name))
if __name__ == '__main__':
condition = multiprocessing.Condition()
s1 = multiprocessing.Process(name='s1',target=stage_1,args=(condition,))
s2_clients = [
multiprocessing.Process(name='stage_2[{}]'.format(i),target=stage_2,args=(condition,),)
for i in range(1, 3)
]
for c in s2_clients:
c.start()
time.sleep(1)
s1.start()
s1.join()
for c in s2_clients:
c.join()
# Starting stage_2[1]
# Starting stage_2[2]
# Starting s1
# s1 done and ready for stage 2
# stage_2[1] running
# stage_2[2] running
multiprocessing的Event类?
- Event内部包含了一个标志位,初始的时候为 false。可以使用 set()来将其设置为 true;或者使用 clear()将其从新设置为 false;可以使用 is_set()来检查标志位的状态;另一个最重要的函数就是 wait(timeout=None),用来阻塞当前线程,直到 event的内部标志位被设置为 true或者timeout超 时。如果内部标志位为 true则wait()函数理解返回
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32def wait_for_event(e):
print('wait_for_event: starting')
e.wait()
print('wait_for_event: e.is_set()->', e.is_set())
def wait_for_event_timeout(e, t):
print('wait_for_event_timeout: starting')
e.wait(t)
print('wait_for_event_timeout: e.is_set()->', e.is_set())
if __name__ == '__main__':
e = multiprocessing.Event()
w1 = multiprocessing.Process(
name='block',
target=wait_for_event,
args=(e,),
)
w1.start()
w2 = multiprocessing.Process(
name='nonblock',
target=wait_for_event_timeout,
args=(e, 2),
)
w2.start()
print('main: waiting before calling Event.set()')
time.sleep(3)
e.set()
print('main: event is set')
#main: waiting before calling Event.set()
#wait_for_event: starting
#wait_for_event_timeout: starting
#wait_for_event_timeout: e.is_set()-> False
#main: event is set
#wait_for_event: e.is_set()-> True
multiprocessing的Lock类?
- Lock锁的作用是当多个进程需要访问共享资源的时候,避免访问的冲突。加锁保证了多个进程修 改同一块数据时,同一时间只能有一个修改,即串行的修改,牺牲了速度但保证了数据安全
1
2
3
4
5
6
7
8
9from multiprocessing import Process, Lock
def l(lock, num):
lock.acquire()
print("Hello Num: %s" % (num))
lock.release()
if __name__ == '__main__':
lock = Lock() # 这个一定要定义为全局
for num in range(20):
Process(target=l, args=(lock, num)).start()
multiprocessing的RLock类?
- RLock(可重入锁)是一个可以被同一个线程请求多次的同步指令。RLock使用了“拥有的线程”和“递归等级”的概念,处于锁定状态时,RLock被某个线程拥有。拥有RLock的线程可以再次调用acquire(),释放锁时需要调用release()相同次数。可以认为RLock包含一个锁定池和一个初始值为0的计数器,每次成功调用acquire()/release(),计数器将+1/-1,为0时锁处于未锁定状态
multiprocessing的Semaphore类?
- 信号量是一个更高级的锁机制。信号量内部有一个计数器而不像锁对象内部有锁标识,而且只有 当占用信号量的线程数超过信号量时线程才阻塞。这允许了多个线程可以同时访问相同的代码区
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27from multiprocessing import Process, Semaphore
import time, random
def go_wc(sem, user):
sem.acquire()
print('%s 占到一个茅坑' % user)
time.sleep(random.randint(0, 3))
sem.release()
print(user, 'OK')
if __name__ == '__main__':
sem = Semaphore(2)
p_l = []
for i in range(5):
p = Process(target=go_wc, args=(sem, 'user%s' % i,))
p.start()
p_l.append(p)
for i in p_l:
i.join()
# user0 占到一个茅坑
# user1 占到一个茅坑
# user0 OK
# user2 占到一个茅坑
# user1 OK
# user3 占到一个茅坑
# user2 OK
# user4 占到一个茅坑
# user4 OK
# user3 OK
什么是concurrent.futures?
- 异步执行代码,线程池使用ThreadPoolExecutor,进程使用ProcessPoolExecutor。两者都实现了相同的接口,该接口由抽象的Executor类定义。实现了对threading和multiprocessing更高级的抽象
1
2
3
4
5
6
7from concurrent.futures import ProcessPoolExecutor
pool_size = 4
pattern = "*.gz"
combined = Counter()
with ProcessPoolExecutor(max_workers=pool_size) as workers:
for result in workers.map(analysis, glob.glob(pattern)):
combined.update(result)
concurrent.futures的Executor类?
- 一个抽象类,它提供异步执行调用的方法。它不应该直接使用,而应该通过其具体的子类使用
- ThreadPoolExecutor类是Executor子类,使用线程池执行异步调用
- ProcessPoolExecutor类是Executor子类,使用进程池执行异步调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14import concurrent.futures as cf
#without multiprocessing
for i in range(1,21):
flip_image(i)
#using I/O bottleneck
with cf.ThreadPoolExecutor(max_workers=2) as p:
processing = p.map(flip_image,[x for x in range(1,21)])
for process in processing:
print(process)
#using CPU bottleneck
with cf.ProcessPoolExecutor(max_workers=2) as p:
processing = p.map(flip_image,[x for x in range(1,21)])
for process in processing:
print(process)
concurrent.futures的Future类?
- Future类封装了可调用对象的异步执行。Future的实例由Executor.submit()创建
1
2
3
4
5
6
7
8from concurrent import futures
def test(num):
import time
return time.ctime(), num
data = [1, 2, 3]
with futures.ThreadPoolExecutor(max_workers=1) as executor:
for future in executor.map(test, data):
print(future)
multiprocessing.dummy与multiprocessing的区别?
- multiprocessing.dummy 模块与 multiprocessing 模块的区别: dummy 模块是多线程,而 multiprocessing 是多进程, api 都是通用的。所有可以很方便将代码在多线程和多进程之间切换。multiprocessing.dummy通常在IO场景可以尝试使用,比如使用如下方式引入线程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26import time
from multiprocessing.dummy import Pool as tp
def run(num):
print('num is {}'.format(num))
if num == 0:
time.sleep(5)
print('{} is end'.format(num))
if __name__ == '__main__':
print('start')
pool = tp(5)
num_list = [0, 1, 2]
pool.map_async(run, num_list)
print('非阻塞~~~~')
pool.close()
pool.join()
print('end')
结果:
start
非阻塞~~~~
num is 0
num is 1
num is 2
2 is end
1 is end
0 is end
end