探索 Flask 对 asyncio 的支持

源于自己折腾的一个小 Flask 项目中,后台需访问多个 HTTP 服务,目前采用 ThreadPoolExecutor 多线程的方式处理的。但因访问 HTTP 服务有前后关联关系,如得到请求 A 的结果后再访问 B,这似乎用 Promise.then().then() 编程方式更合适些。于是巡着这一路子,翻出 Python 的各种相关部件来,比如 Python 对 coroutine(协程) 的支持,asyncio, 及后面的 async/await 关键子,aiohttp 组件,requests 的 async 替代品有 aiohttp, grequests, 和 httpx,aiohttp  可替代 Flask, 最后竟然找到了一个更彻底的 Flask 的 Async 版本 Quart。 


Python 3.4 引入了 asyncio 模块,基于生成器(yield 和 yield from) 和 @asyncio.coroutine 的方式来支持 coroutine(协程), 到 Python 3.5 后有了 async/await(@asyncio.corouting 替换为 async, yield from 替换为 await) 关键字,协程的实现变得更为简单。Python 3.4  使用 coroutine 的方式我们跳过,直接看

async/await 方式的实现

 1import asyncio
 2import time
 3import threading
 4
 5
 6async def compute(cost):
 7    compute_start = time.time()
 8    await asyncio.sleep(cost)
 9    print(f'{cost}: {time.time() - compute_start} - {threading.current_thread().name}')
10    return cost + 1
11
12
13tasks = [asyncio.ensure_future(compute(cc)) for cc in [2, 4]]
14
15start = time.time()
16loop = asyncio.get_event_loop()
17try:
18    results = loop.run_until_complete(asyncio.gather(*tasks))
19    # loop.run_until_complete(asyncio.wait(tasks))  # 或者这种方式
20    print('results: ', results)
21finally:
22    loop.close()
23print(f'Total elapsed time {time.time() - start}')

执行后输出如下:
2: 2.000903844833374 - MainThread
4: 4.000842094421387 - MainThread
results: [3, 5]
Total elapsed time 4.001974821090698
compute() 函数暂停输入参数的秒数,分别两个任务,暂停时间各自为 2 和  4  秒,但总的执行时间为最大的那个数字,相当于那两个任务是并发执行的。注意,我们这里并没有使用到线程,都是用的 MainThread,却收到同样的效果

自 Python 3.7 及之后可以用  asyncio.run() 来简单调用,以上的代码从 tasks = ... 行开始可替换为如下代码
1tasks = [compute(cc) for cc in [2, 4]]
2
3async def main():
4    return await asyncio.gather(*tasks)
5
6start = time.time()
7results = asyncio.run(main())   # main() 就是一个 coroutine 对象
8print('results:', results)
9print(f'Total elapsed time {time.time() - start}')

执行后获得相同的结果。

我们查看一下  asyncio.run()  函数的源代码
 1def run(main, *, debug=False):
 2    if events._get_running_loop() is not None:
 3        raise RuntimeError(
 4            "asyncio.run() cannot be called from a running event loop")
 5
 6    if not coroutines.iscoroutine(main):
 7        raise ValueError("a coroutine was expected, got {!r}".format(main))
 8
 9    loop = events.new_event_loop()
10    try:
11        events.set_event_loop(loop)
12        loop.set_debug(debug)
13        return loop.run_until_complete(main)
14    finally:
15        try:
16            _cancel_all_tasks(loop)
17            loop.run_until_complete(loop.shutdown_asyncgens())
18        finally:
19            events.set_event_loop(None)
20            loop.close()

Python 3.4 开始,在主线程上可以用 asyncio.get_event_loop() 直接获得 EventLoop,主线程上存在 EventLoop 直接返回,无则创建新的。 而 asyncio.main() 方法总是创建一个新的 EventLoop

Flask 中实现一个异步 API

先用 asyncio.run() 的方式,用协程来异步调用三个 URL,分别获得它们的响应文本的长度
 1import asyncio
 2import requests
 3from flask import Flask
 4
 5app = Flask(__name__)
 6
 7
 8async def fetch(url):
 9    return requests.get(url).text
10
11async def main():
12   tasks = [fetch(url) for url in ["https://google.com", "https://bing.com", "https://yanbin.blog"]]
13   return await asyncio.gather(*tasks)
14
15@app.route("/")
16def index():
17    responses = asyncio.run(main())
18    return f'response sizes: {[len(res) for res in responses]}\n'
19
20if __name__ == "__main__":
21    app.run(debug=False, use_reloader=False)

python app.py 启动后,访问 / API
curl http://localhost:5000/
response sizes: [12019, 60030, 96362]
由前所知 asyncio.run() 总是会在当前线程上创建并注册一个 EventLoop,所以它总是可行的。那么能不能直接用 asyncio.get_event_loop() 获得一个 EventLoop 呢?

index() 方法中用 asyncio.get_event_loop(),报错
RuntimeError: There is no current event loop in thread 'Thread-6'.
就是说在 Flask 启动的处理 HTTP  的线程上没有 EventLoop,而每次都需要自己注册一个。从每次的线程来看,Flask 应该是每次请求都创建一个线程来处理任务,用 get_event_loop() 方式的代码如下:
 1import asyncio
 2import requests
 3from flask import Flask
 4
 5app = Flask(__name__)
 6
 7async def fetch(url):
 8    return requests.get(url).text
 9
10@app.route("/")
11def index():
12
13    try:
14        loop = asyncio.get_event_loop()
15    except RuntimeError as er:
16        print(er.args[0], 'create a new EventLoop')
17        loop = asyncio.new_event_loop()
18        asyncio.set_event_loop(loop)
19
20    responses = loop.run_until_complete(asyncio.gather(
21        fetch("https://google.com"),
22        fetch("https://bing.com"),
23        fetch("https://yanbin.blog")
24    ))
25    return f'response sizes: {[len(res) for res in responses]}'
26
27if __name__ == "__main__":
28    app.run(debug=False, use_reloader=False)

每次请求都会打印 create a new EventLoop

虽然用 asyncio.run() 避免了每次创建并注册新 EventLoop 的过程,但有时候我们确实需要 EventLoop 的方法处理协程,这样使用 Flask 的异步方式就稍显麻烦。

异步调用使用主线程的 EventLoop

我们知道,Python 的主线程上注册有一个  EventLoop,所以我们可以让所有异步调用用主线程上那个 EventLoop, 以下代码来自于 Python3 Asyncio call from Flask route
 1import asyncio
 2from flask import Flask
 3
 4async def abar(a):
 5    print(a)
 6
 7loop = asyncio.get_event_loop()  # 获得主线程上的 EventLoop
 8app = Flask(__name__)
 9
10@app.route("/")
11def notify():
12    loop.run_until_complete(abar("abar"))  # 异步调用使用主线程的 EventLoop
13    return "OK"
14
15if __name__ == "__main__":
16    app.run(debug=False, use_reloader=False)

异步调用全部用主线程上的 EventLoop

Quart 作者提供的一个 Flask async 方案

Making Flask async and Quart sync, Quart 的作者 PG Jones 给出了一个 Flask 异步化的代码,route 方法可加上 async  关键字和 @run_async 装饰
 1from flask import Flask, jsonify, has_request_context, copy_current_request_context
 2from functools import wraps
 3from concurrent.futures import Future, ThreadPoolExecutor
 4import asyncio
 5
 6def run_async(func):
 7    @wraps(func)
 8    def _wrapper(*args, **kwargs):
 9        call_result = Future()
10        def _run():
11            loop = asyncio.new_event_loop()
12            try:
13                result = loop.run_until_complete(func(*args, **kwargs))
14            except Exception as error:
15                call_result.set_exception(error)
16            else:
17                call_result.set_result(result)
18            finally:
19                loop.close()
20
21        loop_executor = ThreadPoolExecutor(max_workers=1)
22        if has_request_context():
23            _run = copy_current_request_context(_run)
24        loop_future = loop_executor.submit(_run)
25        loop_future.result()
26        return call_result.result()
27
28    return _wrapper
29
30
31app = Flask(__name__)
32
33
34@app.route('/')
35@run_async
36async def index():
37    return jsonify('hello')
38
39app.run()

启动后,测试下
$ curl localhost:5000
"hello"

两个不在维护的 Flask 扩展

另外还有两个试图扩展 Flask 异步功能的已不再维护的组伯

Flask-aiohttp,已经找不到怎么安装它。GitHub 的代码 Flask-aiohttp 已不再维护,实现上加了一个 @async 装饰器
1@app.route('/use-external-api')
2@async
3def use_external_api():
4    response = yield from aiohttp.request(
5        'GET', 'https://api.example.com/data/1')
6    data = yield from response.read()
7
8    return data

Flask-Async, 在 PyPi 上有,也是一个比 Flask-aiohttp 更久远的项目,最近修改 6 年前,它是一个 Flask 的修改版本,加入了异步特性。实现上与 Flask-aiohttp 类似,只不过它的装饰器是 @coroutine
 1from asyncio import coroutine, sleep
 2from flask import Flask, request
 3
 4app = Flask(__name__)
 5
 6@app.route("/hello/<string:name>")
 7@coroutine
 8def say_hi(name):
 9    yield from sleep(2)
10    return "it worked %s" % request.args.get("name", name)
11
12app.run()

看得出来它还没用上 Python 3.5 的 async/await 关键字来实现协程。

使用 aiohttp 进行异步 HTTP 调用

requests 是一个同步 HTTP 请求库,为了应用到协程当中去,必须把请求包装到 async def 定义的方法中去。aiohttp 提供了异步的方法,aiohttp 库同时提供了服务端和客户端,服务端可以用来替代 Flask  功能,可启动 HTTP 服务并用路由来定义不同的 API。我们这里只使用它的客户端组件,

安装 aiohttp
$ pip install aiohttp
下面用 aiohttp 代替前面的 fetch(url) 方法
1import asyncio
2import async_timeout
3
4async def fetch(url):
5    async with aiohttp.ClientSession() as session, async_timeout.timeout(10):
6        async with session.get(url) as response:
7            return await response.text()

想要实现 Promise 那样的 then/then 功能,没找到  aoihttp 现成的方法,准确说是 Python 的  coroutine 没提供像 Java 的 CompletableFuture 那样完备的 thenRun(), thenApply() 等等方法,所以在 Python  中还得自己用 async/await 关键字串起来,比如基于第一个异步请求的响应数据,发现第二个异步请求
1async def fetch(url):
2    async with aiohttp.ClientSession() as session, async_timeout.timeout(10):
3        async with session.get(url) as response:
4            response = await response.text()
5            async with session.get(f'http://localhost:5000/ping/{len(response)}') as res:
6                return await res.text()

再继续 Google 找啊找啊,可以直接用 aiohttp 来实现异步的 Web 服务器,再进一步 Quart 是一个比  Flask 更完美的替代器。原本写在一篇博客之中,最后还是决定另立新篇来介绍 aiohttp 和 Quart 实现 异步 Web 服务器。

总结:

综合前面,我们认识到

  1. Python 要支持协程必须要与 EventLoop 交互
  2. 自 Python 3.4 之后,主线程上可用 asyncio.get_event_loop() 直接获得 EventLoop
  3. asyncio.run() 总是创建一个新的 EventLoop,然后协程在其中执行地
  4. Flask 处理请求时每次都创建一个新的线程,该线程上没有注册 EventLoop
  5. 因为上一条,在 Flask(当前版本 1.1.2), 要使用协程,必须每次注册一个 EventLoop, 用 asyncio.run() 或用如下两行代码获得 EventLoop
           loop = asyncio.new_event_loop()
           asyncio.set_event_loop(loop)
  6. 如果要用一个异步版的 requests, 可以选 aiohttp, grequests, 或 httpx
  7. 可替代的 aiohttp 服务组件和 Quart 可直接支持协程,因为它们的路由方法可用 async 修饰,首选  Quart

链接:

  1. Async IO in Python: A Complete Walkthrough
  2. Python3 Asyncio call from Flask route

  永久链接 https://yanbin.blog/how-flask-work-with-asyncio/, 来自 隔叶黄莺 Yanbin's Blog
[版权声明] 本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。