Python 子进程与子进程池的应用

去年记录过一篇如何使用 Python 的线程,线程池的日志 Python 多线程编程, 需用到 threading.Thread, concurrent.futures.ThreadPoolExecutor。本文可以当作是上一文 Python 多线程编程的姊妹篇。

Python 的多线程受到 GIL(Global Interpreter Lock) 的限制,GIL 是一把加到了 Python 的解释器的锁,使得在任意时刻只允许一个 Python  进程使用 Python 解释器,也就是任意时刻,Python 只有一个线程在运行。

GIL 严重影响了计算密集型(CPU-bound) 的多线程程序,此时的多线程与单线程性能没什么差异,也发挥不了多核的威力。但对 I/O 密集型(I/O-bound) 影响不大,因为 CPU 多数时候是在等待。

为了突破 GIL 的 CPU 密集型程序的限制,可以使用非 CPython 解释器,如 Jython, IronPython 或 PyPy, 更为现实的做法就是使用子进程来替代线程去承担较为繁重的计算任务,因为 GIL 是加在进程上的,所以新的进程有独立的 GIL.

新建子进程来处理任务

需用到 multiprocessing.Process 类,这个类在 Python 2.6 就开始存在了。一般的编程语言创建一个进程是要去执行一个外部任命,然后获得它的输出,而 Python 可以像创建线程一样创建子进程,且欲执行的任务直接由 Python 在其中编写

执行后输出

main process 67474 done
[Child][2021-09-02 19:32:14.121689]
[Child][2021-09-02 19:32:15.126048]
[Child][2021-09-02 19:32:16.129741]
sub process 67476 bob done

可以看到主进程与子进程分别有自己不同的进程 ID。

还有就是 if __name__ == '__main__' 变得是必要的语句了,不能把想要立即执行的代码丢在函数外了事,否则就是出现下面的错误信息

RuntimeError:
            An attempt has been made to start a new process before the
            current process has finished its bootstrapping phase.
            This probably means that you are not using fork to start your
            child processes and you have forgotten to use the proper idiom
            in the main module:
                  if __name__ == '__main__':
                      freeze_support()
                      ...
            The "freeze_support()" line can be omitted if the program
            is not going to be frozen to produce an executable.

进程之间传递参数除了用 args=('bob',) 的任务参数外,还可用 multiprocessing.Pipe 在进程之间双向通信。也可以通过内存来共享数据,用到 multiprocessing.Valuemultiprocessing.Array。这一部分的详细内容请参考官方文档,在实际编程中可能会经常用到。

执行进程任务也可以加锁,用 multiprocessing.Lock, lock.acquire()...try: ... finally: lock.release() 标准模式,因为进程之间需要保护对系统中唯一资源的竞争

在 Jupyter 中使用 Process 的问题

如果直接把上面的代码放到 JpyterLab 中去执行,将会看到这样的错误

这不是 Python 版本的问题,放别的 Python 3.8 下也是这样的错误,原因就是 Jupyter 无法与 multiprocessing 一同工作,pickle 模块在序列化数据向进程发送时出异常。解决办法是要用 multiprocess 替换掉 multiprocessing 模块

pip install multiprocess
from multiprocess import Process

然后在 JupyterLab 中执行替换成 multiprocess 的版本,输出略有不同

[Child][2021-09-02 19:41:55.326549]
main process 62917 done
[Child][2021-09-02 19:41:56.335774]
[Child][2021-09-02 19:41:57.342169]
sub process 68144 bob done

与在 Python 终端执行的一个区别是,子进程总有一行在 main process ... 之前输出,这没什么要紧的。

使用进程池

有线程池,相应的也有进程池,参照一个官方文档中的简单例子

输出为

subprocess id: 69348
subprocess id: 69350
subprocess id: 69347
[1, 4, 9]

这是用到了 Context 来管理进程池,如果逐步操作就是

进程池执行器

这个翻译有点别扭,直接叫  ProcessPoolExecutor 习惯些。ProcessPoolExecutor 在 concurrent.futures 模块中,它是 Python 3.2 加入进来的。至于用法呢,只要想像它是 ThreadPoolExecutor 的进程版本就差不多了。它提供的方法用

  1. submit(fn, /, *args, **kwargs): 向进程池提交任务
  2. map(func, *iterables, timeout=None, chunksize=1): 批量提交任务的快捷写法
  3. shutdown(wait=True, *, cancel_futures=False): 关闭进程池

首先仍然用一个使用了 with 关键字的写法

输出如下:

72088: 1
72090: 9
72089: 16

也可以用 submit() 函数来提交任务,得到的是一个   Future。关于 Future, 以及类似的  submit(), executor.map() 函数在 Python 多线程编程 有所覆盖。

另外,在构建 ProcessPoolExecutor 时如果不指  max_workers 参数将会取系统 CPU 的内核数(multiprocessing.cpu_count())。

如果不对 ProcessPoolExecutor 使用 with 语句,则需要去用 submit() 提交的任务进行 wait,参照

 

链接:

  1. 一文详解 Python GIL 设计
  2. multiprocessing -- Process-based parallelism
  3. Python多进程解决方案multiprocessing ProcessPoolExecutor
  4. cocurrent.futures -- Launching parallel tasks

本文链接 https://yanbin.blog/python-subprocess-process-pool/, 来自 隔叶黄莺 Yanbin Blog

[版权声明] Creative Commons License 本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。

Subscribe
Notify of
guest

0 Comments
Inline Feedbacks
View all comments