AWS Lambda 中使用 Python 并发编程

无论在何处,有多重任务要处理时,并发编程总是要得到考虑的。比如有 IO 等待时的并发或 CPU 密集型时的并行计算,并发通常是指在同一个 CPU 上按时间片轮换执行,并行是任务在不同的 CPU 上执行。能有效使用 CPU 多核的语言可以让线程运行在不同的核上实现并行,如果是启动的子进程能由操作系统运行在其他 CPU 核上。

回到 AWS Lambda 中的 Python 代码,如果是处理 IO 等待,使用多线程并发就行,大致的代码如下:

with ThreadPoolExecutor(10) as executor:
    result = executor.map(task_function, task_inputs)

以上代码在 AWS Lambda 中是可以运行的。

如果是 CPU 密集型的任务,用 Python 的多线程就要歇菜了,因为存在著名的 Python's GIL 的约束。这时候就必须要考虑多进程并行的方式,同时应知晓当前选择的 Lambda 运行环境有多少个 CPU 内核,因为如果是单核的话再多进程也无济于事,没必要启动多于核心数的进程。底下是本人上篇博客测试收集的不同 AWS Lambda 内存选择对应的 CPU 核心数,以及实际可用内存大小的关系表 阅读全文 >>

实测 AWS Lambda 不同内存配置下的 CPU 核心数

目前(2023-05-25) AWS Lambda 的内存选择区间是 128MB ~ 10240MB, 最长运行时间为 15 分钟,但没有 vCPU 个数的选择。vCPU 的数量是基于所选内存大小而有不同的,如果我们在 Lambda 中需使用多进程充分发挥 CPU 性能的话,有必要了解当前 Lambda 所在运行环境的 CPU 内核数,甚至是单核的频率。

CPU 个数可用如下 Python 内置的其中一个方法取得

multiprocessing.cpu_count()
os.cpu_count()

要获得 CPU 频率或内存的话,将要用到 psutil  组件的方法,可把 psutil 做成 Lambda  层以引用,或与 Lambda 函数代码一同打成  zip 包。

安装方法 psutil

pip install --target . psutil

psutil 会安装到当前目录,然后在当前目录下再创建 lambda_function.py 文件,再打包 阅读全文 >>

Spring 5 响应式编程研究

Spring 5.0 发布之时(2017-09-28) WebFlux 是它的一大亮点,即响应式 Web 编程。因为同一时代的 RxJava 2 和 Akka Actor 具备一定的流行度,Spring 5 也来赶这一趟时髦。于是多线程编程大致两种模式

  1. CompletableFuture, runAsync, supplyAsync, whenComplete...
  2. Obervable, observeOn, subscribe, subscribeOn...

以及 PlayFramework 的 Action 方法无论返回 Result 还是 CompletableStage<Result>, 内部都是异步的模式。

Akka Actor 比 CompletableFuture, RxJava,以及本文将要讨论的 Reactor 更高级的是 Akka System 可以分布式部署,Actor 分布在不同的进程,主机上。 

那时候业界已行成了一个 Reactive Stream 规范 org.reactivestreams(Publisher, Subscriber, Subscription, Processor), JDK 9 也奈不住寂寞,无法对 Reactive Stream 置若罔闻,在 2017-09-21 发布时加入了 java.util.concurrent.flow 包(Publisher, Subscriber, Processor, Subscription) 作为自己的 Reactive Stream 规范。

然而随着云计算的普及,基于消息系统解耦合的任务分解让代码变得更清晰,编码中甚至不用考虑多线程的行为,部署方式能解决任务执行的效率。

阅读全文 >>

配置 FastAPI/Uvicorn/Hypercorn 的访问日志

有了 FastAPI 之后,用 Python 实现 API 或 Web 都不再考虑 Flask 了。Flask 最早在 13 年前的 2010 年 4 月 1 日发布,实现的是 WSGI; FastAPI 较为年轻,于 4 年前的 2018 年 12 月 5 日发布,支持 ASGI。性能方面普遍是 FastAPI 比 Flask 高,编程方面就各取所好吧,使用 Flask 的时候还是 1.x 的版本,最近用 FastAPI 较多,所以无法对比。

FastAPI 本身没提供启动 Web 服务的代码,不像 Flask 还能通过 Flask 对象 或 flask 命令启动一个开发用途的 Web 服务,而 FastAPI 必须用其他的组件(ASGI server)来启动,比如各种 *corn 或 Daphne

  1. Uvicorn: FastAPI 官方的出品,默认启用访问日志,相关的参数有 --log-config <path>, --access-log/ --no-access-log 启用或关闭访问日志,默认是开启的。
  2. Hypercorn: 相关的配置选项有 --access-logformat, --access-logfile
  3.  Hypercorn: 它只是实现了 WSGI 规格的服务,所以不兼容 FastAPI, 只能作为 Uvicorn 进程的管理器。Gunicorn 作为 WSGI 服务器有丰富的访问日志配置,但访问日志仍然是由 Uvicorn 输出

阅读全文 >>

应用 AWS Lambda 部署 FastAPI

前两年用 AWS Lambda 搭配 API Gateway 使用是为了省钱,因为没有请求时不花钱。又由于是 Rest API, 所以实现部分用了 FastAPI 的装饰器,但不实际启动 FastAPI 的 Web 服务,Lambda 的 handler 方法根据 routeKey 手动映射到 FastAPI 的装饰方法。大概实现是

def lambda_handler(event: dict, context):
    fastapi_function = locate_fastapi_function(event['routeKey'])
    return fastapi_function(<extract parameters from event>)

当时也思考着能不能把 Lambda 的请求与 FastAPI 的 Web 服务桥接起来,却又不能真正启动一个  Web 服务,否则 Lambda 调用不能结束。比如说 AWS Lambda 收到请求时快速启动 FastAPI 服务,该服务绑定到  TCP 端口或 Socket 文件都行,然后 Lambda 请求代理到 FastAPI 服务,最后关闭 FastAPI 服务,但是想来都不那么容易实现。 阅读全文 >>

理解 Java 线程池 ThreadPoolExecutor

使用 JDK 5 的线程池实现有近 20 年的时间了,快速创建一个线程池经常是调用 Executors 中的工厂方法。但是涉及过更精细的线程池管理控制时不得不用 ThreadPoolExecutor 的构造方法,这也就是为什么有些公司不建议用 Executors 的工厂方法创建线程池,而应该直接创建 ThreadPoolExecutor 或 ForkJoinPool 实例。 

例如代码

ExecutorService threadPool = Executors.newFixedThreadPool(10);

实际上调用的是

new ThreadPoolExecutor(nThreads, nThreads,
                                                0L, TimeUnit.MILLISECONDS,
                                                new LinkedBlockingQueue<Runnable>());

前两个参数 corePoolSize 和 maximumPoolSize 是一样的; OL, TimeUnit.MILLISECONDS 表示线程创建后只要线程池还在就是永生的; workQueue 是一个大小为 Integer.MAX_VALUE 的队列, 几乎可以无限提交任务,耗尽内存

不建议用 Executors 的工厂方法的原因大致有二: 阅读全文 >>

学习使用 AWS Cognito 并 OAuth2 验证

OAuth 是 Open Authorization 的缩写,是一种开放的可为 Web 或桌面应用进行用户验证和授权的协议。例如,在互联网上的许多应用,可不用额外注册帐户而采用第三方的帐户(Gmail, Apple Id 等)登陆并完成授权,这就有 OAuth 身影。

当我们提到 OAuth 的时候,常常会碰到 OAuth 1.0, OAuth 2.0, OpenID, 和  Auth0.

  1. OAuth 1.0 于 2007 年 4 月 发布(OAuthCore 1.0),存在严重的安全漏洞,2009 年 6 月发布修正版(OAuthCore 1.0 Revision A). 较少使用了, 每个 token 加密,但不要求 HTTPS/TLS 协议
  2. OAuth 2.0 于 2012 年 10 月发布,它与  OAuth 1.0 互不兼容,目前多数平台都支持此版本,它强制使用 HTTPS/TLS 协议,更安全,相关的概念有 Access Token, Refresh Token, Bearer Token
  3. OpenID 侧重于 Authentication, 它是在 OAuth 上层用于鉴定用户是否可以登陆,OAuth 专注在 Authorization。与 OpenID 相对应的有 SAML(Security Assertion Markup Language)
  4. Auth0 是一个软件产品 -- 身份管理平台(Auth0 Authentication Platform - Identity Access Management),或者说是一套解决方案,这个缺德的命名纯粹是来搅浑水的。前面的 OAuth 1.0, OAuth 2.0 和 OpenID 都是协议规范,Okta 旗下的 Auth0 使用该名字抢了 OAuth 的光芒。

那 Amazon Cognito 是什么呢?它和 Auth0 类似,也是一个身份访问管理平台(Implement secure, frictionless customer identity and access management that scales),提供了用户的登陆验证,权限管理。背后的实现也是 OAuth 2.0, OIDC(OpenID Connection), 和 SAML。因此通过对 Cognito 的学习的另一个目的是由此了解 OAuth 2.0 协议的相关内容。 阅读全文 >>

体验 Python FastAPI 的并发能力及线, 进程模型

本文进行实际测试 FastAPI 的并发能力,即同时能处理多少个请求,另外还能接收多少请求放在等待队列当中; 并找到如何改变默认并发数; 以及它是如何运用线程或进程来处理请求。我们可以此与 Flask 进行对比,参考 Python Flask 框架的并发能力及线,进程模型,是否真如传说中所说的 FastAPI 性能比 Flask 强, FastAPI 是否对得起它那道闪电的 Logo。

本文使用 JMeter 进行测试,测试机器为 MacBook Pro, CPU 6 核超线程,内存 16 Gb。

对于每一种类型 Web 服务基本的测试是每秒发送 2 个请求,连续发送 1000 个,500 秒发送完所有请求,程序中 API 方法接受到请求后 sleep 800 秒,保证在全部 1000 个请求送出之前一直占着连接,并有充足的时间对连接进行分析。在测试极端并发数时,由于在 Mac OS X 尽管设置了 ulimit 最多也只能创建 4000 多一点线程,所以在模拟更多用户数时,JMeter 在远程 Linux(Docker 或虚拟机) 上运行测试用例。

请求的 URL 是 http://localhost:8080/?id=${count}, 带一个自增序列用以识别不同的请求, JMeter 的 Thread Group 配置为 Number of Threads (users): 1000, Ramp-up period (seconds): 500 阅读全文 >>

Python __slots__ 的用法笔记

Python 是一个动态语言,可以动态的给实例或类增减属性或方法,给类添加的属性会影响到前后所有创建的实例。但是使用 __slots__ 属性可以限定类或实例属性和方法,如果没有 __slots__ 的话实例的属性和方法包含在实例的 __dict__ 字典中,类的属性和方法包含在类的 __dict__ 字典中。

在使用 __slots__ 按常规写法可能会出现的问题大概有

  1. AttributeError: 'Xxx' object has no attribute 'yyy'
  2. AttributeError: 'Xxx' object attribute 'yyy' is read-only
  3. ValueError: 'yyy' in __slots__ conflicts with class variable

我们来看下面的例子 阅读全文 >>

AWS SNS 订阅到 HTTP 的过程及消息报文

AWS SNS(Simple Notification Service) 以消息订阅,推送的方式对组件进行解藕。当有新消息发送到 SNS 主题中,SNS 会向当前所有的订阅者发送一个消息(广播),它本身不像 SQS 那样会存储消息,而只是一个纯粹的消息路由。SNS 消息可以订阅到 Amazon Kinesis Data Firehose, SQS, Lambda, Email, Email-JSON, HTTP, HTTPs, Platform application endpoint, 和 SMS。同邮件列表一样,订阅 SNS 消息也是需要确认的,不然 SNS 消息就可能恶意满天飞。

本文试验如何用 HTTP 端点订阅 SNS 消息,订阅确认,以及发送消息到 SNS 主题后消息推送到 HTTP 端点的细节,重点是了解订阅及被推送过来消息时的 HTTP 报文内容。SNS 的 HTTP 端点订阅需要一个公网上的 HTTP URL, 对 SNS 可见,所以我在本地测试时在家中路由器上加一个端口映射,对 Modem 获得的公有 IP 的 8080 端口访问转发到写此文用所用机器的 8080 端口上。

在本机需要在 8080 端口上启动一个 HTTP 服务以迎接 AWS 消息的到来,比如用 python 3 的话,简单运行命令 python -m http.server 8080。如果不想在 API 代码中分析 HTTP 报文数据,只需打开 Wireshark(过滤条件 tcp.port=8080 && http) 抓取 8080 上的 HTTP 数据通信即可。在 API 代码中如何处理 HTTP 请求数据不是本文的重点。 阅读全文 >>