这几天一直浸淫在对 Python 的学习当中,对于一个更习惯 Java 语言的人来说,在接接触 Python 各种概念时会不停的与 Java 进行碰撞。譬如这里要说到的线程,Python 能如何像 Java 一样创建并执行单个线程,以及是否也能使用线程池来进行多作务的执行呢?
整个读完了《THE Quick Python Book》一书也只字未提多线程,然而对于有长时间的 IO 等待的程序,对于当今普及的多核以及核内超线程的 CPU 来说,不使用多线程来并行或并发处理任务是万万不能的,否则效率的差别是数量级的。
基于与 Java 多线程编程进行的比较,主要着力于两个问题:1)创建并执行新的线程,2)线程池中执行任务
创建并执行新的线程
默认的,代码是在主线程中执行,主线程名称为 MainThread
。如果要创建一个子线程并执行需要用到模块 threading
。下面的是基本的代码
1 2 3 4 5 6 7 8 9 10 |
import threading def play_music(): print(threading.current_thread().name, " - playing music") print(threading.current_thread().name, " - main") thread = threading.Thread(target=play_music) thread.start() |
程序执行输出
MainThread - main
Thread-1 - playing music
是不是感觉 Python 创建并启动新线程的方式与 Java 基本是一样的,Java 用的是 new Thread(runnable).start()
。只是因为 Python 里函数是第一类对象,所以可以为子线程直接指定要执行的函数,而无需像 Java 那样由一个 Runnable 的 run 方法来指定操作。
Python 中也是分两步走:
- threading.Thread(target=play_music), 创建子线程并用
target
指定子线程中要操作的函数 - thread.start(), 调用线程的
start()
方法来启动一个线程,这里的thread
也有run()
方法,与 Java Runnable 的run()
是一样的,调用它只会在当前线程中执行play_music()
, 而不会启动子线程
如果深入看 Thread 类的 run()
方法,它实际上就是执行的 target
参数所指示的函数。
通过 threading.current_thread().name
打印的当前线程名称可以看出 play_music
函数是在子线程中执行,但是从控制台的输出我们无法感受到主线程与子线程是否并发执行。我们可以对上面的程序稍稍修改一下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
import time import threading def play_music(): for i in range(5): print(threading.current_thread().name, i) time.sleep(1) thread = threading.Thread(target=play_music) thread.start() for j in range(5): print(threading.current_thread().name, j) time.sleep(1) |
这时候程序执行后的输出
Thread-1 0
MainThread 0
Thread-1 1
MainThread 1
Thread-1 2
MainThread 2
Thread-1 3
MainThread 3
Thread-1 4
MainThread 4
循环中加上延时能够看到主线程与子线程同时执行的效果。当然,启动的多个子线程也是一样的同时执行。
关于线程的其他一些说明
继续用 Python 的线程与 Java 的相类比,它们的线程的行为基本是一致的。
完整的 Thread
类初始化方法是
1 2 |
def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None): |
参数传递
也可以向 target 函数传递参数,因为运行方法的参数是不定的,所以传递时可以通过 tuple 或 list 向运行方法传递一个或多个参数,运行方法是从 tuple 或 list 中拆解出参数的。
1 2 3 4 5 |
def play_music(a): pass threading.Thread(target=play_music, args=(8,)) threading.Thread(target=play_music, args=[8]) |
不能用 threading.Thread(target=play_music, args=8), 因为 play_music 没有名为 args 的形参。当 args 是只有一个元素的 tuple 时,必须附加一个逗号,否则 args=(8)
和 args=8
是一个效果
多个参数时,用 tuple 或 list 传递即可
1 2 3 4 5 |
def play_music(a, b): pass threading.Thread(target=play_music, args=(8,9)) threading.Thread(target=play_music, args=[8,9]) |
有关于函数参数拆解的内容可参考 https://yanbin.blog/python-function-argument-unpacking/
关于守护线程
创建线程的时候可以指定线程组,线程名(默认为 Thread-N),是否守护线程(daemon)。在线程初始化也可以修线程名和 daemon 属性
这样创建的线程的 daemon 默认为 False,这与 Java new Thread() 创建的线程是一样的,也就是说子线程没退出的话程序运行也不会结束。这与 Java 线程的行为是一样的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
import time import atexit import threading def play_music(): for i in range(3): print(threading.current_thread().name, i) time.sleep(1) atexit.register(lambda: print("program exit")) print("main start") thread = threading.Thread(target=play_music) thread.start() print("subthread started") |
这段程序执行后输出:
main start
Thread-1 0
subthread started
Thread-1 1
Thread-1 2
program exit
而如果创建线程时指定 daemon=True, 修改创建线程的代码行为
1 |
thread = threading.Thread(target=play_music, daemon=True) |
再执行后输出为:
main start
Thread-1 0
subthread started
program exit
子线程无法阻止主线程的退出,主线程退出后,正在执行的子线程也立即中断。
封装自己的线程类
看到有基于 Python 的 threading.Thread
封装自己的线程类,大致代码如下
1 2 3 4 5 6 7 8 9 |
class MyThread(threading.Thread): def run(self): for i in range(5): print(self.name, i) time.sleep(1) MyThread().start() |
仿照的像个 Java 的 Thread 一样,其实个人觉得没有太大的必要性,因为用 threading.Thread(target=func)
的方式比创建的 MyThread 类还更方便灵活。
使用 Python 线程池
如果多任务处理每次都要创建线程,启动,运行,结束,这会带来不少问题。创建线程需要时间与空间的消耗,如果任务耗时,可能会创建过多的线程占用系统资源;线程数量少了,效率又是个问题。
就是比较难以一资源与效率之间平衡,这时候就需要线程池的实现了,根据系统资源与效率初始化一定数量的线程放在池子里,线程可以得到重用,避免了频繁的创建新的线程,同时线程数量也是可控的。
Java 的实现方式是一般是创建 ExecutorService
线程池(ThreadPoolExecutor),而后只管往线程池提交任务,剩下的事情交给线程池去处理。Python 也借用了同样的实现方式,要用到 concurrent.futures
模块的 ThreadPoolExecutor
, 连名称都一样的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
import time import threading import atexit from concurrent.futures import ThreadPoolExecutor def play_music(num): print(threading.current_thread().name, " daemon: ", threading.current_thread().isDaemon()) for k in range(2): print(threading.current_thread().name, num) time.sleep(1) atexit.register(lambda: print("program exit")) thread_pool = ThreadPoolExecutor(2) for i in range(3): thread_pool.submit(play_music, i) # 同时演示了如何向任务函数传递参数 # thread_pool.shutdown() |
上面代码执行后输出大致如下:
ThreadPoolExecutor-0_0 daemon: True
ThreadPoolExecutor-0_0 0
ThreadPoolExecutor-0_1 daemon: True
ThreadPoolExecutor-0_1 1
program exit
ThreadPoolExecutor-0_1 1
ThreadPoolExecutor-0_0 0
ThreadPoolExecutor-0_1 daemon: True
ThreadPoolExecutor-0_1 2
ThreadPoolExecutor-0_1 2
ThreadPoolExecutor
池中的线程 daemon 是 True,我们看到注册的 ShutdownHook
已经输出了 program exit
,本来是宣告程序结束了,可实际上提交到线程池中的任务还必须执行完。这个行为与 Java 的 daemon 全为 true 的 ForkJoinPool
线程池的行为是不一样的,Java 的主线程一旦结束,已提交到 ForkJoinPool
中的任务也会终止掉。
这让人有些迷惑,是什么原因呢?偶然间查看 thread
模块的代码,发现它也注册了一个 ShutdownHook
1 2 3 4 5 6 7 8 9 10 |
def _python_exit(): global _shutdown _shutdown = True items = list(_threads_queues.items()) for t, q in items: q.put(None) for t, q in items: t.join() atexit.register(_python_exit) |
这使得主线程在退出前触发了这个 _python_exit
函数,其中确保了已提交到线程池中的任务仍然要完成。
如果前面的 thread_pool.shutdown()
行启用,执行的结果像下面那样
ThreadPoolExecutor-0_0 daemon: True
ThreadPoolExecutor-0_1 daemon: True
ThreadPoolExecutor-0_1 1
ThreadPoolExecutor-0_0 0
ThreadPoolExecutor-0_0 0
ThreadPoolExecutor-0_1 1
ThreadPoolExecutor-0_0 daemon: True
ThreadPoolExecutor-0_0 2
ThreadPoolExecutor-0_0 2
program exit
必须是所有的任务执行结束才会触发 ShutdownHook
。这是正常的线程池行为,因为执行 ThreadPoolExecutor
的 shutdown()
后,必须等待已提交的任务执行完。
另外,submit()
也是有返回值的,即任务的返回值,能够由以下方式获得结果
1 2 |
future = thread_pool.submit(play_music, 20) result =future.result() |
线程池与 with
关键字
ThreadPoolExecutor
的 shutdown()
是可以工作上下文管理器中的。ThreadPoolExecutor
继承自 _base.Executor
,该类中有这样的定义
1 2 3 4 5 6 |
def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.shutdown(wait=True) return False |
改写上面的试验代码使用 with
关键字
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
import time import threading import atexit from concurrent.futures import ThreadPoolExecutor def play_music(num): print(threading.current_thread().name, " daemon: ", threading.current_thread().isDaemon()) for k in range(2): print(threading.current_thread().name, num) time.sleep(1) atexit.register(lambda: print("program exit")) with ThreadPoolExecutor(2) as thread_pool: for i in range(3): thread_pool.submit(play_music, i) |
这时候的执行行为与调用了 thread_pool.shutdown()
方法的是一致的,输出类似下面
ThreadPoolExecutor-0_0 daemon: True
ThreadPoolExecutor-0_0 0
ThreadPoolExecutor-0_1 daemon: True
ThreadPoolExecutor-0_1 1
ThreadPoolExecutor-0_0 0
ThreadPoolExecutor-0_1 1
ThreadPoolExecutor-0_0 daemon: True
ThreadPoolExecutor-0_0 2
ThreadPoolExecutor-0_0 2
program exit
也就是在所有提交的任务完成后当前程序才会结束。
其他线程相关话题
涉及到线程的话就会有不同线程之间的协作问题,比如线程安全,通知,等待等问题。这些也还是留待以后有需求时再作研究。但一些通常问题还是有必要现在就解决掉,如提提交了多个任务到线程,必须在全部任务完成后才能进行下一步行动
wait 所有任务
可以用 concurrent.futures
的 wait
方法来等待多个任务,条件可以是 FIRST_COMPLETED
, FIRST_EXCEPTION
或 ALL_COMPLETED
,默认为 ALL_COMPLETED
。
1 2 3 4 5 6 7 8 9 10 11 12 |
from concurrent.futures import ThreadPoolExecutor, wait ...... futures = [] executor = ThreadPoolExecutor(2) for i in range(3): futures.append(executor.submit(play_music, i)) wait(futures) # 所有任务完成后遍历任务结果 # wait(futures, return_when='FIRST_COMPLETED') # 比如从多处查询结果,只要有一个结果就行 for future in futures: print(future.result()) |
没有 wait(futures)
的话在提交完任务后会立即执行 for future in futures:
这一行,当然用 with ThreadPoolExecutor(2) as executor:
也能保证在退出这个 with
上下文时所有的任务是完成了的。
使用 map 得到结果
其他函数式编程我们可以习惯了用 map
函数,Python 的线程结果也能用 map
函数,看下面的例子
1 2 3 4 5 6 7 |
results = [] with ThreadPoolExecutor(2) as executor: results.extend(executor.map(play_music, [1, 2, 3])) for result in results: print(result) |
executor.map(play_music, [1, 2, 3])
实现了提交多个任务到线程池,分别应用列表中的元素为任务参数,并把最终结合任务结果为一个列表。
Future 相关操作
往线程池中提交一个任务后会返回一下个 concurrent.futures.Future
对象。前面我们调用过 future.result()
获得执行结果,还有
- .done() 是否已执行完成
- .add_done_call() 任务执行完的回调函数,可利用它在某个任务执行完后触发下一个操作
- .running(), cancelled() 查看状态
- .cancel() 取消任务
还能用 futures.as_complete(futures)
来等待多个任务
1 2 3 4 5 6 7 8 9 10 |
from concurrent.futures import ThreadPoolExecutor, as_completed .... executor = ThreadPoolExecutor(2) futures = [] for i in range(5): to_do.append(executor.submit(play_music)) for future in as_completed(futures): print(future.result()) |
以上的内容将来用以指导多线程编程应该是具很好的引子。
链接:
本文链接 https://yanbin.blog/python-programming-with-threads/, 来自 隔叶黄莺 Yanbin Blog
[版权声明] 本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。
[…] Python 的线程,线程池的日志 Python 多线程编程, 需用到 threading.Thread, […]
[…] Python 多线程编程 中学习了 Python 中如何使用多线程来调度任务,自己不妨再来温习一遍 Python […]