去年记录过一篇如何使用 Python 的线程,线程池的日志 Python 多线程编程, 需用到 threading.Thread, concurrent.futures.ThreadPoolExecutor。本文可以当作是上一文 Python 多线程编程的姊妹篇。
Python 的多线程受到 GIL(Global Interpreter Lock) 的限制,GIL 是一把加到了 Python 的解释器的锁,使得在任意时刻只允许一个 Python 进程使用 Python 解释器,也就是任意时刻,Python 只有一个线程在运行。
GIL 严重影响了计算密集型(CPU-bound) 的多线程程序,此时的多线程与单线程性能没什么差异,也发挥不了多核的威力。但对 I/O 密集型(I/O-bound) 影响不大,因为 CPU 多数时候是在等待。
为了突破 GIL 的 CPU 密集型程序的限制,可以使用非 CPython 解释器,如 Jython, IronPython 或 PyPy, 更为现实的做法就是使用子进程来替代线程去承担较为繁重的计算任务,因为 GIL 是加在进程上的,所以新的进程有独立的 GIL.
新建子进程来处理任务
需用到 multiprocessing.Process 类,这个类在 Python 2.6 就开始存在了。一般的编程语言创建一个进程是要去执行一个外部任命,然后获得它的输出,而 Python 可以像创建线程一样创建子进程,且欲执行的任务直接由 Python 在其中编写
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
from datetime import datetime import time from multiprocessing import Process import os def job(name): for _ in range(3): print('[Child {}][{}]'.format(os.getpid(), datetime.now())) time.sleep(1) print(f'sub process {os.getpid()} {name} done') if __name__ == '__main__': p = Process(target=job, args=('bob',)) # 留意如何向子进程传递参数 p.start() print(f'main process {os.getpid()} done') # p.pid 可以得到子进程的 ID p.join() # 如果不 join 的话,主进程一退出,子进程也随即结束 |
执行后输出
main process 67474 done
[Child][2021-09-02 19:32:14.121689]
[Child][2021-09-02 19:32:15.126048]
[Child][2021-09-02 19:32:16.129741]
sub process 67476 bob done
可以看到主进程与子进程分别有自己不同的进程 ID。
还有就是 if __name__ == '__main__'
变得是必要的语句了,不能把想要立即执行的代码丢在函数外了事,否则就是出现下面的错误信息
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
进程之间传递参数除了用 args=('bob',)
的任务参数外,还可用 multiprocessing.Pipe
在进程之间双向通信。也可以通过内存来共享数据,用到 multiprocessing.Value
和 multiprocessing.Array
。这一部分的详细内容请参考官方文档,在实际编程中可能会经常用到。
执行进程任务也可以加锁,用 multiprocessing.Lock
, lock.acquire()...try: ... finally: lock.release()
标准模式,因为进程之间需要保护对系统中唯一资源的竞争
在 Jupyter 中使用 Process 的问题
如果直接把上面的代码放到 JpyterLab 中去执行,将会看到这样的错误
1234567 Traceback (most recent call last):File "<string>", line 1, in <module>File "/Users/yanbin/jupyterlab-venv/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_mainexitcode = _main(fd, parent_sentinel)File "/Users/yanbin/jupyterlab-venv/lib/python3.9/multiprocessing/spawn.py", line 126, in _mainself = reduction.pickle.load(from_parent)AttributeError: Can't get attribute 'job' on <module '__main__' (built-in)>
这不是 Python 版本的问题,放别的 Python 3.8 下也是这样的错误,原因就是 Jupyter 无法与 multiprocessing 一同工作,pickle 模块在序列化数据向进程发送时出异常。解决办法是要用 multiprocess
替换掉 multiprocessing
模块
pip install multiprocess
from multiprocess import Process
然后在 JupyterLab 中执行替换成 multiprocess 的版本,输出略有不同
[Child][2021-09-02 19:41:55.326549]
main process 62917 done
[Child][2021-09-02 19:41:56.335774]
[Child][2021-09-02 19:41:57.342169]
sub process 68144 bob done
与在 Python 终端执行的一个区别是,子进程总有一行在 main process ...
之前输出,这没什么要紧的。
使用进程池
有线程池,相应的也有进程池,参照一个官方文档中的简单例子
1 2 3 4 5 6 7 8 9 10 |
from multiprocessing import Pool import os def f(x): print(f'subprocess id: {os.getpid()}') return x*x if __name__ == '__main__': with Pool(5) as p: print(p.map(f, [1, 2, 3])) |
输出为
subprocess id: 69348
subprocess id: 69350
subprocess id: 69347
[1, 4, 9]
这是用到了 Context 来管理进程池,如果逐步操作就是
1 2 3 4 |
pool = Pool(5) [pool.apply_async(f, args=(i, )) for i in (1, 2, 3)] pool.close() pool.join() |
进程池执行器
这个翻译有点别扭,直接叫 ProcessPoolExecutor 习惯些。ProcessPoolExecutor 在 concurrent.futures 模块中,它是 Python 3.2 加入进来的。至于用法呢,只要想像它是 ThreadPoolExecutor
的进程版本就差不多了。它提供的方法用
- submit(fn, /, *args, **kwargs): 向进程池提交任务
- map(func, *iterables, timeout=None, chunksize=1): 批量提交任务的快捷写法
- shutdown(wait=True, *, cancel_futures=False): 关闭进程池
首先仍然用一个使用了 with
关键字的写法
1 2 3 4 5 6 7 8 9 10 11 |
from concurrent.futures import ProcessPoolExecutor import os def f(x): return f'{os.getpid()}: {x*x}' if __name__ == '__main__': with ProcessPoolExecutor(max_workers=5) as executor: results = executor.map(f, [1, 3, 4]) for i in results: print(i) |
输出如下:
72088: 1
72090: 9
72089: 16
也可以用 submit() 函数来提交任务,得到的是一个 Future。关于 Future, 以及类似的 submit(), executor.map() 函数在 Python 多线程编程 有所覆盖。
另外,在构建 ProcessPoolExecutor
时如果不指 max_workers
参数将会取系统 CPU 的内核数(multiprocessing.cpu_count())。
如果不对 ProcessPoolExecutor 使用 with 语句,则需要去用 submit() 提交的任务进行 wait,参照
1 2 3 4 |
from concurrent.futures import wait tasks = [executor.submit(fn, i] for i in range(5)] wait(tasks) |
链接:
- 一文详解 Python GIL 设计
- multiprocessing -- Process-based parallelism
- Python多进程解决方案multiprocessing ProcessPoolExecutor
- cocurrent.futures -- Launching parallel tasks
本文链接 https://yanbin.blog/python-subprocess-process-pool/, 来自 隔叶黄莺 Yanbin Blog
[版权声明] 本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。