Apache Airflow 任务中使用模板或上下文

本文大概记录一下在 Apache Airflow 的 Task 或 Operator(这两个基本是同一概念) 中如何使用 模板(Template) 和上下文(Context). Airflow 的模板引擎用的  Jinja Template, 它也被 FastAPI 和 Flask 所采纳。首先只有构造 Operator 时的参数或参数指定的文件内容中,或者调用 Operator render_template() 方法才能用模板语法,像 {{ ds }}.  Apache Airflow 有哪些模板变量可用请参考: Templates references / Variables, 本文将会打印出一个 Task 的 context 变量列出所有可用的上下文变量, 不断的深入,最后在源代码找到相关的定义。

通过使用模板或上下文,我们能能够在任务中使用到 Apache Airflow 一些内置的变量值,如 DAG 或任务当前运行时的状态等 。

当我们手动触发一个 DAG 时, 在 Configuration JSON 中输入的参数也能在 context 中找到. 所有的 Operator  继承链可追溯到 AbstractOperator -> Templater, 因此所有的 Operator(Task) 都能通过调用 Templater.render_template() 方法对模板进行渲染,该方法的原型是 阅读全文 >>

Docker Compose 简单配置 Apache Airflow 3.0(PostgreSQL)

Apache Airflow 重新唤起我的注意力是因为 Airflow 3.0 在近日 April 22, 2025 发布了,其二则是我们一直都有计划任务的需求,以下几种方案都太简陋

  1. 用 Windows 的计划任务或 Linux 的 Cron 都不易管理,且有单点故障问题
  2. 在 Java Spring 项目中使用集群模式的 Quartz 有些麻烦,且对于 AutoScaling 也不怎么友好
  3. AWS 上用 CloudWatch Rule + AWS Lambda 的方案可靠性没有问题,但不适于监控 

因此还有必要再次尝试 Apache Airflow, 它有集中管理的界面,各个部件都是可伸缩的,如 WebServer, Workers 等。特别是刚出的 Apache Airflow 3.0 带来以下主要新特性

  1. 新的服务化架构,各个部件间耦合度降低
  2. 多语言支持,借助了 Task SDK, 可望用 Java, JavaScript, TypeScript 等语言写 DAG
  3. DAG 支持版本控制,可回溯历史
  4. 支持事件驱动,即  DAG 可响应外部事件,如文件到达,消息队列等
  5. 引入了资产驱动调度功能,可根据数据资产的变化 进行触发,可以说是事件驱动的一类
  6. 全新的 React UI 界面

阅读全文 >>

为 FastAPI 的 SwaggerUI 定制 CSS 样式

FastAPI 比起 Flask 而言一个十分便利的功能是它内置对 Swagger UI 文档的支持,然而默认生成的 Swagger UI 也总不尽如人意,于是就有了如何通过引入自己的样式(或样式文件)对默认 Swagger UI 进行定制化的需求。在 ChatGPT 之前,Google 和阅读源代码是齐头并进的选择,自己有了 ChatGPT 之类的 AI, 人们一下就把身段放低了许多,再也不像使用 Google 那样的心态去使用 AI 了。所以呢,第一次支持付了 $8 问问当前号称最厉害的 Grok 3(也算是对 DOGE 的支持吧), 得到答案如下

在网站的 /static 目录下也创建了 custom_swagger.css 文件,然而根本就没有效果,Inspect 浏览器后发现 FastAPI 的 /docs 根本就有加载 /static/custom_swagger.css 文件。 阅读全文 >>

SciPy 最优化之最小化

 SciPy 是一个开源的算法库和数学工具包,可以处理最优化、线性代数、积分、插值、拟合、特殊函数、快速傅里叶变换、信号处理、图像处理、常微分方程求解器等。 它依赖于 NumPy, Pandas 也依赖了 NumPy。本文重点是体验它怎么处理最优化的问题。很多情形下通过 SciPy 的  optimize.minimize 方法寻求目标函数最小值的过程得到最优化的输入与输出。比如寻找二次元函数的根,求解线性/动态规则,金融行业的计算出最优投资组合的资产分配等。为什么 SciPy 没有 maximize 方法呢,因为没有必要,想要找到最大化的值,只要把目标函数的值取反,或者是模或绝对值的最小值。看到 minimize 方法名更让人觉得目标函数会有一个收敛值。

虽然 SciPy 对特定的问题有更直白的函数,如求根有 optimize.root, 线性规则 optimize.linprog(现不建议使用),但各种优化基本都可以回归到 minimize 方法调用。minimize 方法的原型是

除了必须的目标函数和初始值,还有更多参数,像常用的约束(contraints) - 满足某些特定条件的最优化, 线程或非线性约束等; 求解方法(method) - Powell, Newton-CG 等

下面用 optimize.minimize 来求解一些问题 阅读全文 >>

Python logging 使用笔记

使用 Python 的话用不着像 Java 那样是考虑用 Logback  还是 Log4J 的问题,因为它内置提供了完备功能的 logging 库。虽然 JDK  也有 java.util.logging(JUL), 它的特性其实也不差,如日志级别,输出格式,不同的输出目的地的选择,但在 Logback 和 Log4J 的光环之下几乎无人问津。相比而言 Python 的 logging 却极为受宠,非必要时基本不会去考虑引入第三方的日志库,如 Loguru, LogBook, Structlog, Picologging, 尽管它们也很出色,毕竟是庶出。

logging 的最基本用法

在基本前面加是 字,是因为这一节仅仅是如何让 logging 作为 print() 的替代品,暂不涉及到参数的传递,异常的输出,以及格式定制,日志往哪里输出的问题。

运行,什么也看不到,因为 Python logging 的默认级别是 warning, 这不符合人的基本认知,一般 logging.info() 起码是用来替代 print() 的,居然直接用无法输出,不知该库的设计者是怎么个想法。 阅读全文 >>

Java, Python 两种形式的 base64encode

在用 Python 写 Web 服务端代码时,用 base64.encodebytes() 函数对字符串进行编码,然后在 Java 端用 Base64.getDecoder().decode() 时无法解码,难道 base64 编码在两种语言间还有这等差异。Google 一下,得到的答案是在 Java 端要用 Base64.getMimeDecoder().decode() 函数解码。这一问题算是解决了, 不过后来又在 Python 写的 AWS Lambda 中输出

return {
    "statusCode": 200,
    "body": base64.encodebytes(b"short message"),
    "isBase64Encoded": True
}

以 AWS Lambda functionURL 的方式来访问,对于 body 中的小字符串是没问题,一旦 body 够大时在 Postman 或 curl 命令中无法直接展示出来,用 curl --output a.out 存成本地文件,打开后看到的是带换行的格式

H4sIAAZi7GYC/+19WXfcOLLmX+HxwxzXOS6b2AhiprvnyFtZt7yoJbdr6r74UEpKyq5UpjoXL/fX
D8AlkysIkCFmpo2H7pJJEBkAAsCHQMQXf/t6HX/YrO83a++P168+xcvVdDH/+yP6lOGnzEciCNHn
gD7yTufT9evpLD6L1rd/f3Q6n03nsXexXk7nN4+8F4v59fRms4zW8uNtHdjH9KkfPEXoM6JPsf8U
......

阅读全文 >>

AWS Lambda 中使用 Python 并发编程

无论在何处,有多重任务要处理时,并发编程总是要得到考虑的。比如有 IO 等待时的并发或 CPU 密集型时的并行计算,并发通常是指在同一个 CPU 上按时间片轮换执行,并行是任务在不同的 CPU 上执行。能有效使用 CPU 多核的语言可以让线程运行在不同的核上实现并行,如果是启动的子进程能由操作系统运行在其他 CPU 核上。

回到 AWS Lambda 中的 Python 代码,如果是处理 IO 等待,使用多线程并发就行,大致的代码如下:

with ThreadPoolExecutor(10) as executor:
    result = executor.map(task_function, task_inputs)

以上代码在 AWS Lambda 中是可以运行的。

如果是 CPU 密集型的任务,用 Python 的多线程就要歇菜了,因为存在著名的 Python's GIL 的约束。这时候就必须要考虑多进程并行的方式,同时应知晓当前选择的 Lambda 运行环境有多少个 CPU 内核,因为如果是单核的话再多进程也无济于事,没必要启动多于核心数的进程。底下是本人上篇博客测试收集的不同 AWS Lambda 内存选择对应的 CPU 核心数,以及实际可用内存大小的关系表 阅读全文 >>

配置 FastAPI/Uvicorn/Hypercorn 的访问日志

有了 FastAPI 之后,用 Python 实现 API 或 Web 都不再考虑 Flask 了。Flask 最早在 13 年前的 2010 年 4 月 1 日发布,实现的是 WSGI; FastAPI 较为年轻,于 4 年前的 2018 年 12 月 5 日发布,支持 ASGI。性能方面普遍是 FastAPI 比 Flask 高,编程方面就各取所好吧,使用 Flask 的时候还是 1.x 的版本,最近用 FastAPI 较多,所以无法对比。

FastAPI 本身没提供启动 Web 服务的代码,不像 Flask 还能通过 Flask 对象 或 flask 命令启动一个开发用途的 Web 服务,而 FastAPI 必须用其他的组件(ASGI server)来启动,比如各种 *corn 或 Daphne

  1. Uvicorn: FastAPI 官方的出品,默认启用访问日志,相关的参数有 --log-config <path>, --access-log/ --no-access-log 启用或关闭访问日志,默认是开启的。
  2. Hypercorn: 相关的配置选项有 --access-logformat, --access-logfile
  3.  Hypercorn: 它只是实现了 WSGI 规格的服务,所以不兼容 FastAPI, 只能作为 Uvicorn 进程的管理器。Gunicorn 作为 WSGI 服务器有丰富的访问日志配置,但访问日志仍然是由 Uvicorn 输出

阅读全文 >>

体验 Python FastAPI 的并发能力及线, 进程模型

本文进行实际测试 FastAPI 的并发能力,即同时能处理多少个请求,另外还能接收多少请求放在等待队列当中; 并找到如何改变默认并发数; 以及它是如何运用线程或进程来处理请求。我们可以此与 Flask 进行对比,参考 Python Flask 框架的并发能力及线,进程模型,是否真如传说中所说的 FastAPI 性能比 Flask 强, FastAPI 是否对得起它那道闪电的 Logo。

本文使用 JMeter 进行测试,测试机器为 MacBook Pro, CPU 6 核超线程,内存 16 Gb。

对于每一种类型 Web 服务基本的测试是每秒发送 2 个请求,连续发送 1000 个,500 秒发送完所有请求,程序中 API 方法接受到请求后 sleep 800 秒,保证在全部 1000 个请求送出之前一直占着连接,并有充足的时间对连接进行分析。在测试极端并发数时,由于在 Mac OS X 尽管设置了 ulimit 最多也只能创建 4000 多一点线程,所以在模拟更多用户数时,JMeter 在远程 Linux(Docker 或虚拟机) 上运行测试用例。

请求的 URL 是 http://localhost:8080/?id=${count}, 带一个自增序列用以识别不同的请求, JMeter 的 Thread Group 配置为 Number of Threads (users): 1000, Ramp-up period (seconds): 500 阅读全文 >>

Python __slots__ 的用法笔记

Python 是一个动态语言,可以动态的给实例或类增减属性或方法,给类添加的属性会影响到前后所有创建的实例。但是使用 __slots__ 属性可以限定类或实例属性和方法,如果没有 __slots__ 的话实例的属性和方法包含在实例的 __dict__ 字典中,类的属性和方法包含在类的 __dict__ 字典中。

在使用 __slots__ 按常规写法可能会出现的问题大概有

  1. AttributeError: 'Xxx' object has no attribute 'yyy'
  2. AttributeError: 'Xxx' object attribute 'yyy' is read-only
  3. ValueError: 'yyy' in __slots__ conflicts with class variable

我们来看下面的例子 阅读全文 >>