源于自己折腾的一个小 Flask 项目中,后台需访问多个 HTTP 服务,目前采用 ThreadPoolExecutor 多线程的方式处理的。但因访问 HTTP 服务有前后关联关系,如得到请求 A 的结果后再访问 B,这似乎用 Promise.then().then() 编程方式更合适些。于是巡着这一路子,翻出 Python 的各种相关部件来,比如 Python 对 coroutine(协程) 的支持,asyncio, 及后面的 async/await 关键子,aiohttp 组件,requests 的 async 替代品有 aiohttp, grequests, 和 httpx,aiohttp 可替代 Flask, 最后竟然找到了一个更彻底的 Flask 的 Async 版本 Quart。
Python 3.4 引入了 asyncio 模块,基于生成器(yield 和 yield from) 和 @asyncio.coroutine 的方式来支持 coroutine(协程), 到 Python 3.5 后有了 async/await(@asyncio.corouting 替换为 async, yield from 替换为 await) 关键字,协程的实现变得更为简单。Python 3.4 使用 coroutine 的方式我们跳过,直接看
async/await 方式的实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
import asyncio import time import threading async def compute(cost): compute_start = time.time() await asyncio.sleep(cost) print(f'{cost}: {time.time() - compute_start} - {threading.current_thread().name}') return cost + 1 tasks = [asyncio.ensure_future(compute(cc)) for cc in [2, 4]] start = time.time() loop = asyncio.get_event_loop() try: results = loop.run_until_complete(asyncio.gather(*tasks)) # loop.run_until_complete(asyncio.wait(tasks)) # 或者这种方式 print('results: ', results) finally: loop.close() print(f'Total elapsed time {time.time() - start}') |
执行后输出如下:
2: 2.000903844833374 - MainThread
4: 4.000842094421387 - MainThread
results: [3, 5]
Total elapsed time 4.001974821090698
compute() 函数暂停输入参数的秒数,分别两个任务,暂停时间各自为 2 和 4 秒,但总的执行时间为最大的那个数字,相当于那两个任务是并发执行的。注意,我们这里并没有使用到线程,都是用的 MainThread
,却收到同样的效果
自 Python 3.7 及之后可以用 asyncio.run()
来简单调用,以上的代码从 tasks = ...
行开始可替换为如下代码
1 2 3 4 5 6 7 8 9 |
tasks = [compute(cc) for cc in [2, 4]] async def main(): return await asyncio.gather(*tasks) start = time.time() results = asyncio.run(main()) # main() 就是一个 coroutine 对象 print('results:', results) print(f'Total elapsed time {time.time() - start}') |
执行后获得相同的结果。
我们查看一下 asyncio.run()
函数的源代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
def run(main, *, debug=False): if events._get_running_loop() is not None: raise RuntimeError( "asyncio.run() cannot be called from a running event loop") if not coroutines.iscoroutine(main): raise ValueError("a coroutine was expected, got {!r}".format(main)) loop = events.new_event_loop() try: events.set_event_loop(loop) loop.set_debug(debug) return loop.run_until_complete(main) finally: try: _cancel_all_tasks(loop) loop.run_until_complete(loop.shutdown_asyncgens()) finally: events.set_event_loop(None) loop.close() |
Python 3.4 开始,在主线程上可以用 asyncio.get_event_loop()
直接获得 EventLoop
,主线程上存在 EventLoop
直接返回,无则创建新的。 而 asyncio.main()
方法总是创建一个新的 EventLoop
。
Flask 中实现一个异步 API
先用 asyncio.run()
的方式,用协程来异步调用三个 URL,分别获得它们的响应文本的长度
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
import asyncio import requests from flask import Flask app = Flask(__name__) async def fetch(url): return requests.get(url).text async def main(): tasks = [fetch(url) for url in ["https://google.com", "https://bing.com", "https://yanbin.blog"]] return await asyncio.gather(*tasks) @app.route("/") def index(): responses = asyncio.run(main()) return f'response sizes: {[len(res) for res in responses]}\n' if __name__ == "__main__": app.run(debug=False, use_reloader=False) |
用 python app.py
启动后,访问 /
API
curl http://localhost:5000/
response sizes: [12019, 60030, 96362]
由前所知 asyncio.run()
总是会在当前线程上创建并注册一个 EventLoop
,所以它总是可行的。那么能不能直接用 asyncio.get_event_loop()
获得一个 EventLoop
呢?
在 index()
方法中用 asyncio.get_event_loop()
,报错
RuntimeError: There is no current event loop in thread 'Thread-6'.
就是说在 Flask 启动的处理 HTTP 的线程上没有 EventLoop
,而每次都需要自己注册一个。从每次的线程来看,Flask 应该是每次请求都创建一个线程来处理任务,用 get_event_loop()
方式的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
import asyncio import requests from flask import Flask app = Flask(__name__) async def fetch(url): return requests.get(url).text @app.route("/") def index(): try: loop = asyncio.get_event_loop() except RuntimeError as er: print(er.args[0], 'create a new EventLoop') loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) responses = loop.run_until_complete(asyncio.gather( fetch("https://google.com"), fetch("https://bing.com"), fetch("https://yanbin.blog") )) return f'response sizes: {[len(res) for res in responses]}' if __name__ == "__main__": app.run(debug=False, use_reloader=False) |
每次请求都会打印 create a new EventLoop
虽然用 asyncio.run()
避免了每次创建并注册新 EventLoop
的过程,但有时候我们确实需要 EventLoop
的方法处理协程,这样使用 Flask 的异步方式就稍显麻烦。
异步调用使用主线程的 EventLoop
我们知道,Python 的主线程上注册有一个 EventLoop
,所以我们可以让所有异步调用用主线程上那个 EventLoop
, 以下代码来自于 Python3 Asyncio call from Flask route
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
import asyncio from flask import Flask async def abar(a): print(a) loop = asyncio.get_event_loop() # 获得主线程上的 EventLoop app = Flask(__name__) @app.route("/") def notify(): loop.run_until_complete(abar("abar")) # 异步调用使用主线程的 EventLoop return "OK" if __name__ == "__main__": app.run(debug=False, use_reloader=False) |
异步调用全部用主线程上的 EventLoop
Quart 作者提供的一个 Flask async 方案
在 Making Flask async and Quart sync, Quart 的作者 PG Jones 给出了一个 Flask 异步化的代码,route 方法可加上 async 关键字和 @run_async 装饰
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
from flask import Flask, jsonify, has_request_context, copy_current_request_context from functools import wraps from concurrent.futures import Future, ThreadPoolExecutor import asyncio def run_async(func): @wraps(func) def _wrapper(*args, **kwargs): call_result = Future() def _run(): loop = asyncio.new_event_loop() try: result = loop.run_until_complete(func(*args, **kwargs)) except Exception as error: call_result.set_exception(error) else: call_result.set_result(result) finally: loop.close() loop_executor = ThreadPoolExecutor(max_workers=1) if has_request_context(): _run = copy_current_request_context(_run) loop_future = loop_executor.submit(_run) loop_future.result() return call_result.result() return _wrapper app = Flask(__name__) @app.route('/') @run_async async def index(): return jsonify('hello') app.run() |
启动后,测试下
$ curl localhost:5000
"hello"
两个不在维护的 Flask 扩展
另外还有两个试图扩展 Flask 异步功能的已不再维护的组伯
Flask-aiohttp,已经找不到怎么安装它。GitHub 的代码 Flask-aiohttp 已不再维护,实现上加了一个 @async
装饰器
1 2 3 4 5 6 7 8 |
@app.route('/use-external-api') @async def use_external_api(): response = yield from aiohttp.request( 'GET', 'https://api.example.com/data/1') data = yield from response.read() return data |
Flask-Async, 在 PyPi 上有,也是一个比 Flask-aiohttp 更久远的项目,最近修改 6 年前,它是一个 Flask 的修改版本,加入了异步特性。实现上与 Flask-aiohttp 类似,只不过它的装饰器是 @coroutine
1 2 3 4 5 6 7 8 9 10 11 12 |
from asyncio import coroutine, sleep from flask import Flask, request app = Flask(__name__) @app.route("/hello/<string:name>") @coroutine def say_hi(name): yield from sleep(2) return "it worked %s" % request.args.get("name", name) app.run() |
看得出来它还没用上 Python 3.5 的 async/await 关键字来实现协程。
使用 aiohttp 进行异步 HTTP 调用
requests
是一个同步 HTTP 请求库,为了应用到协程当中去,必须把请求包装到 async def
定义的方法中去。aiohttp
提供了异步的方法,aiohttp
库同时提供了服务端和客户端,服务端可以用来替代 Flask 功能,可启动 HTTP 服务并用路由来定义不同的 API。我们这里只使用它的客户端组件,
安装 aiohttp
$ pip install aiohttp
下面用 aiohttp 代替前面的 fetch(url) 方法
1 2 3 4 5 6 7 |
import asyncio import async_timeout async def fetch(url): async with aiohttp.ClientSession() as session, async_timeout.timeout(10): async with session.get(url) as response: return await response.text() |
想要实现 Promise 那样的 then/then 功能,没找到 aoihttp
现成的方法,准确说是 Python 的 coroutine 没提供像 Java 的 CompletableFuture 那样完备的 thenRun(), thenApply() 等等方法,所以在 Python 中还得自己用 async/await 关键字串起来,比如基于第一个异步请求的响应数据,发现第二个异步请求
1 2 3 4 5 6 |
async def fetch(url): async with aiohttp.ClientSession() as session, async_timeout.timeout(10): async with session.get(url) as response: response = await response.text() async with session.get(f'http://localhost:5000/ping/{len(response)}') as res: return await res.text() |
再继续 Google 找啊找啊,可以直接用 aiohttp 来实现异步的 Web 服务器,再进一步 Quart 是一个比 Flask 更完美的替代器。原本写在一篇博客之中,最后还是决定另立新篇来介绍 aiohttp 和 Quart 实现 异步 Web 服务器。
总结:
综合前面,我们认识到
- Python 要支持协程必须要与
EventLoop
交互 - 自 Python 3.4 之后,主线程上可用
asyncio.get_event_loop()
直接获得EventLoop
asyncio.run()
总是创建一个新的EventLoop
,然后协程在其中执行地- Flask 处理请求时每次都创建一个新的线程,该线程上没有注册
EventLoop
- 因为上一条,在 Flask(当前版本 1.1.2), 要使用协程,必须每次注册一个
EventLoop
, 用asyncio.run()
或用如下两行代码获得EventLoop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) - 如果要用一个异步版的
requests
, 可以选aiohttp
,grequests
, 或httpx
- 可替代的
aiohttp
服务组件和Quart
可直接支持协程,因为它们的路由方法可用async
修饰,首选Quart
链接:
本文链接 https://yanbin.blog/how-flask-work-with-asyncio/, 来自 隔叶黄莺 Yanbin Blog
[版权声明] 本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。
[…] Python 中 async, await 关键字的一些知识在去年的一篇 探索 Flask 对 asyncio 的支持 有讲到,一直没在实际上用过它们,所以基本上也就忘干净了。随着 Flask 2 […]
[…] Python 中 async, await 关键字的一些知识在去年的一篇 探索 Flask 对 asyncio 的支持 有讲到,一直没在实际上用过它们,所以基本上也就忘干净了。随着 Flask 2 […]
[…] Python 中 async, await 关键字的一些知识在去年的一篇 探索 Flask 对 asyncio 的支持 有讲到,一直没在实际上用过它们,所以基本上也就忘干净了。随着 Flask 2 […]