探索 Flask 对 asyncio 的支持
源于自己折腾的一个小 Flask 项目中,后台需访问多个 HTTP 服务,目前采用 ThreadPoolExecutor 多线程的方式处理的。但因访问 HTTP 服务有前后关联关系,如得到请求 A 的结果后再访问 B,这似乎用 Promise.then().then() 编程方式更合适些。于是巡着这一路子,翻出 Python 的各种相关部件来,比如 Python 对 coroutine(协程) 的支持,asyncio, 及后面的 async/await 关键子,aiohttp 组件,requests 的 async 替代品有 aiohttp, grequests, 和 httpx,aiohttp 可替代 Flask, 最后竟然找到了一个更彻底的 Flask 的 Async 版本 Quart。
Python 3.4 引入了 asyncio 模块,基于生成器(yield 和 yield from) 和 @asyncio.coroutine 的方式来支持 coroutine(协程), 到 Python 3.5 后有了 async/await(@asyncio.corouting 替换为 async, yield from 替换为 await) 关键字,协程的实现变得更为简单。Python 3.4 使用 coroutine 的方式我们跳过,直接看
执行后输出如下:
自 Python 3.7 及之后可以用
执行后获得相同的结果。
我们查看一下
Python 3.4 开始,在主线程上可以用
用
在
每次请求都会打印
虽然用
异步调用全部用主线程上的 EventLoop
启动后,测试下
Flask-aiohttp,已经找不到怎么安装它。GitHub 的代码 Flask-aiohttp 已不再维护,实现上加了一个
Flask-Async, 在 PyPi 上有,也是一个比 Flask-aiohttp 更久远的项目,最近修改 6 年前,它是一个 Flask 的修改版本,加入了异步特性。实现上与 Flask-aiohttp 类似,只不过它的装饰器是
看得出来它还没用上 Python 3.5 的 async/await 关键字来实现协程。
安装
想要实现 Promise 那样的 then/then 功能,没找到
再继续 Google 找啊找啊,可以直接用 aiohttp 来实现异步的 Web 服务器,再进一步 Quart 是一个比 Flask 更完美的替代器。原本写在一篇博客之中,最后还是决定另立新篇来介绍 aiohttp 和 Quart 实现 异步 Web 服务器。
链接:
永久链接 https://yanbin.blog/how-flask-work-with-asyncio/, 来自 隔叶黄莺 Yanbin's Blog
[版权声明]
本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。
Python 3.4 引入了 asyncio 模块,基于生成器(yield 和 yield from) 和 @asyncio.coroutine 的方式来支持 coroutine(协程), 到 Python 3.5 后有了 async/await(@asyncio.corouting 替换为 async, yield from 替换为 await) 关键字,协程的实现变得更为简单。Python 3.4 使用 coroutine 的方式我们跳过,直接看
async/await 方式的实现
1import asyncio
2import time
3import threading
4
5
6async def compute(cost):
7 compute_start = time.time()
8 await asyncio.sleep(cost)
9 print(f'{cost}: {time.time() - compute_start} - {threading.current_thread().name}')
10 return cost + 1
11
12
13tasks = [asyncio.ensure_future(compute(cc)) for cc in [2, 4]]
14
15start = time.time()
16loop = asyncio.get_event_loop()
17try:
18 results = loop.run_until_complete(asyncio.gather(*tasks))
19 # loop.run_until_complete(asyncio.wait(tasks)) # 或者这种方式
20 print('results: ', results)
21finally:
22 loop.close()
23print(f'Total elapsed time {time.time() - start}')执行后输出如下:
2: 2.000903844833374 - MainThreadcompute() 函数暂停输入参数的秒数,分别两个任务,暂停时间各自为 2 和 4 秒,但总的执行时间为最大的那个数字,相当于那两个任务是并发执行的。注意,我们这里并没有使用到线程,都是用的
4: 4.000842094421387 - MainThread
results: [3, 5]
Total elapsed time 4.001974821090698
MainThread,却收到同样的效果自 Python 3.7 及之后可以用
asyncio.run() 来简单调用,以上的代码从 tasks = ... 行开始可替换为如下代码1tasks = [compute(cc) for cc in [2, 4]]
2
3async def main():
4 return await asyncio.gather(*tasks)
5
6start = time.time()
7results = asyncio.run(main()) # main() 就是一个 coroutine 对象
8print('results:', results)
9print(f'Total elapsed time {time.time() - start}')执行后获得相同的结果。
我们查看一下
asyncio.run() 函数的源代码 1def run(main, *, debug=False):
2 if events._get_running_loop() is not None:
3 raise RuntimeError(
4 "asyncio.run() cannot be called from a running event loop")
5
6 if not coroutines.iscoroutine(main):
7 raise ValueError("a coroutine was expected, got {!r}".format(main))
8
9 loop = events.new_event_loop()
10 try:
11 events.set_event_loop(loop)
12 loop.set_debug(debug)
13 return loop.run_until_complete(main)
14 finally:
15 try:
16 _cancel_all_tasks(loop)
17 loop.run_until_complete(loop.shutdown_asyncgens())
18 finally:
19 events.set_event_loop(None)
20 loop.close()Python 3.4 开始,在主线程上可以用
asyncio.get_event_loop() 直接获得 EventLoop,主线程上存在 EventLoop 直接返回,无则创建新的。 而 asyncio.main() 方法总是创建一个新的 EventLoop。Flask 中实现一个异步 API
先用asyncio.run() 的方式,用协程来异步调用三个 URL,分别获得它们的响应文本的长度 1import asyncio
2import requests
3from flask import Flask
4
5app = Flask(__name__)
6
7
8async def fetch(url):
9 return requests.get(url).text
10
11async def main():
12 tasks = [fetch(url) for url in ["https://google.com", "https://bing.com", "https://yanbin.blog"]]
13 return await asyncio.gather(*tasks)
14
15@app.route("/")
16def index():
17 responses = asyncio.run(main())
18 return f'response sizes: {[len(res) for res in responses]}\n'
19
20if __name__ == "__main__":
21 app.run(debug=False, use_reloader=False)用
python app.py 启动后,访问 / APIcurl http://localhost:5000/由前所知
response sizes: [12019, 60030, 96362]
asyncio.run() 总是会在当前线程上创建并注册一个 EventLoop,所以它总是可行的。那么能不能直接用 asyncio.get_event_loop() 获得一个 EventLoop 呢?在
index() 方法中用 asyncio.get_event_loop(),报错RuntimeError: There is no current event loop in thread 'Thread-6'.就是说在 Flask 启动的处理 HTTP 的线程上没有
EventLoop,而每次都需要自己注册一个。从每次的线程来看,Flask 应该是每次请求都创建一个线程来处理任务,用 get_event_loop() 方式的代码如下: 1import asyncio
2import requests
3from flask import Flask
4
5app = Flask(__name__)
6
7async def fetch(url):
8 return requests.get(url).text
9
10@app.route("/")
11def index():
12
13 try:
14 loop = asyncio.get_event_loop()
15 except RuntimeError as er:
16 print(er.args[0], 'create a new EventLoop')
17 loop = asyncio.new_event_loop()
18 asyncio.set_event_loop(loop)
19
20 responses = loop.run_until_complete(asyncio.gather(
21 fetch("https://google.com"),
22 fetch("https://bing.com"),
23 fetch("https://yanbin.blog")
24 ))
25 return f'response sizes: {[len(res) for res in responses]}'
26
27if __name__ == "__main__":
28 app.run(debug=False, use_reloader=False)每次请求都会打印
create a new EventLoop虽然用
asyncio.run() 避免了每次创建并注册新 EventLoop 的过程,但有时候我们确实需要 EventLoop 的方法处理协程,这样使用 Flask 的异步方式就稍显麻烦。异步调用使用主线程的 EventLoop
我们知道,Python 的主线程上注册有一个EventLoop,所以我们可以让所有异步调用用主线程上那个 EventLoop, 以下代码来自于 Python3 Asyncio call from Flask route 1import asyncio
2from flask import Flask
3
4async def abar(a):
5 print(a)
6
7loop = asyncio.get_event_loop() # 获得主线程上的 EventLoop
8app = Flask(__name__)
9
10@app.route("/")
11def notify():
12 loop.run_until_complete(abar("abar")) # 异步调用使用主线程的 EventLoop
13 return "OK"
14
15if __name__ == "__main__":
16 app.run(debug=False, use_reloader=False)异步调用全部用主线程上的 EventLoop
Quart 作者提供的一个 Flask async 方案
在 Making Flask async and Quart sync, Quart 的作者 PG Jones 给出了一个 Flask 异步化的代码,route 方法可加上 async 关键字和 @run_async 装饰 1from flask import Flask, jsonify, has_request_context, copy_current_request_context
2from functools import wraps
3from concurrent.futures import Future, ThreadPoolExecutor
4import asyncio
5
6def run_async(func):
7 @wraps(func)
8 def _wrapper(*args, **kwargs):
9 call_result = Future()
10 def _run():
11 loop = asyncio.new_event_loop()
12 try:
13 result = loop.run_until_complete(func(*args, **kwargs))
14 except Exception as error:
15 call_result.set_exception(error)
16 else:
17 call_result.set_result(result)
18 finally:
19 loop.close()
20
21 loop_executor = ThreadPoolExecutor(max_workers=1)
22 if has_request_context():
23 _run = copy_current_request_context(_run)
24 loop_future = loop_executor.submit(_run)
25 loop_future.result()
26 return call_result.result()
27
28 return _wrapper
29
30
31app = Flask(__name__)
32
33
34@app.route('/')
35@run_async
36async def index():
37 return jsonify('hello')
38
39app.run()启动后,测试下
$ curl localhost:5000
"hello"
两个不在维护的 Flask 扩展
另外还有两个试图扩展 Flask 异步功能的已不再维护的组伯Flask-aiohttp,已经找不到怎么安装它。GitHub 的代码 Flask-aiohttp 已不再维护,实现上加了一个
@async 装饰器1@app.route('/use-external-api')
2@async
3def use_external_api():
4 response = yield from aiohttp.request(
5 'GET', 'https://api.example.com/data/1')
6 data = yield from response.read()
7
8 return dataFlask-Async, 在 PyPi 上有,也是一个比 Flask-aiohttp 更久远的项目,最近修改 6 年前,它是一个 Flask 的修改版本,加入了异步特性。实现上与 Flask-aiohttp 类似,只不过它的装饰器是
@coroutine 1from asyncio import coroutine, sleep
2from flask import Flask, request
3
4app = Flask(__name__)
5
6@app.route("/hello/<string:name>")
7@coroutine
8def say_hi(name):
9 yield from sleep(2)
10 return "it worked %s" % request.args.get("name", name)
11
12app.run()看得出来它还没用上 Python 3.5 的 async/await 关键字来实现协程。
使用 aiohttp 进行异步 HTTP 调用
requests 是一个同步 HTTP 请求库,为了应用到协程当中去,必须把请求包装到 async def 定义的方法中去。aiohttp 提供了异步的方法,aiohttp 库同时提供了服务端和客户端,服务端可以用来替代 Flask 功能,可启动 HTTP 服务并用路由来定义不同的 API。我们这里只使用它的客户端组件,安装
aiohttp$ pip install aiohttp下面用 aiohttp 代替前面的 fetch(url) 方法
1import asyncio
2import async_timeout
3
4async def fetch(url):
5 async with aiohttp.ClientSession() as session, async_timeout.timeout(10):
6 async with session.get(url) as response:
7 return await response.text()想要实现 Promise 那样的 then/then 功能,没找到
aoihttp 现成的方法,准确说是 Python 的 coroutine 没提供像 Java 的 CompletableFuture 那样完备的 thenRun(), thenApply() 等等方法,所以在 Python 中还得自己用 async/await 关键字串起来,比如基于第一个异步请求的响应数据,发现第二个异步请求1async def fetch(url):
2 async with aiohttp.ClientSession() as session, async_timeout.timeout(10):
3 async with session.get(url) as response:
4 response = await response.text()
5 async with session.get(f'http://localhost:5000/ping/{len(response)}') as res:
6 return await res.text()再继续 Google 找啊找啊,可以直接用 aiohttp 来实现异步的 Web 服务器,再进一步 Quart 是一个比 Flask 更完美的替代器。原本写在一篇博客之中,最后还是决定另立新篇来介绍 aiohttp 和 Quart 实现 异步 Web 服务器。
总结:
综合前面,我们认识到- Python 要支持协程必须要与
EventLoop交互 - 自 Python 3.4 之后,主线程上可用
asyncio.get_event_loop()直接获得EventLoop asyncio.run()总是创建一个新的EventLoop,然后协程在其中执行地- Flask 处理请求时每次都创建一个新的线程,该线程上没有注册
EventLoop - 因为上一条,在 Flask(当前版本 1.1.2), 要使用协程,必须每次注册一个
EventLoop, 用asyncio.run()或用如下两行代码获得EventLoop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) - 如果要用一个异步版的
requests, 可以选aiohttp,grequests, 或httpx - 可替代的
aiohttp服务组件和Quart可直接支持协程,因为它们的路由方法可用async修饰,首选Quart
链接:
永久链接 https://yanbin.blog/how-flask-work-with-asyncio/, 来自 隔叶黄莺 Yanbin's Blog
[版权声明]
本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。