DynamoDB Stream 的实质就是一个依附于表的流,对表的增删改像关系型数据的触发器一样,以日志形式按顺记录到该流中。我们可以用 API 去读取其中的记录,或用来触发一个 Lambda。DynamoDB Stream 非常类似于 Kinesis 的 Stream, 它们都是有 Shard 的概念,但是 DynamoDB Stream 的 Shard 数目是不太确定的。而且还能用 KCL(Kinesis Client Library) 来操作 DynamoDB Stream。
DynamoDB Stream 和 Kinesis Stream 有几个通用的 API 操作,像 list_streams(), describe_stream(), get_shard_iterator() 和 get_records() 函数。同时呢,在设置 Lambda 的触发器时,选择 DynamoDB Stream 与 Kinesis Stream 时可配置的参数几乎是一样的,有 Batch size, Batch window, Starting position 以及重试策略。而且也是一个 Shard 只能同时启动一个 Lambda 实例,由于 DynamoDB Stream 的 Shard 数目不太确定,所以它能同时启动几个 Lambda 实例也不确定的。
另外, 一个 DynamoDB Stream 只能最多被两个 Consumer 消费,而可用来消费 Kinesis Stream 的 Consumer 数目是不受限的。DynamoDB Stream 中的记录保存时间为 24 小时, Kinesis Stream 中记录保存时间也是可配置的。我们创建一个 DynamoDB 表时还能启用 Amazon Kinesis data stream details
, 即把对 DynamoDB 的操作记录直接发送到 Kinesis Stream 中去,这样就能操作熟悉的 Kinesis Stream,Shard 数目可设定,坏处就是 Kinesis Stream 较费钱。
借用几张图来描述各种关系
图2:Table-partition-stream shard-Lambda instance
创建 DynamoDB 表并启用它的 Stream 功能
创建一个 DynamoDB 表默认时不会开启 DynamoDB Stream,在 AWS 控制台下需选择 Table, 然后右上角选择 Exports and streams
, 在 DynamoDB stream details
区域中点击 Enable
按钮来启用 Stream, View type 中有 4 个选项
- Key attributes only: 这是默认项,对表的操作只写入记录的 Key
- New image: 写入完整的新记录信息
- Old image: 写入完整的老记录信息
- New and old images: 写入完整的新,老记录信息
如果一个 DynamoDB 未开启 DynamoDB Stream, 在配置 Lambda 的触发器时选择一个 DynamoDB 会自动开启该 Dynamo 表的 Stream,并且 View type 是 New and old images
。
DynamoDB 开启了 Stream 后会生成一个 Latest stream ARN, 类似 arn:aws:dynamodb:us-east-1:1234567890:table/yanbin-test/stream/2021-09-09T02:22:59.992
我们可以用一个 aws
命令来创建 DynamoDB 表,并开启 Stream 功能
$ aws dynamodb create-table --table-name yanbin-test \
--attribute-definitions AttributeName=id,AttributeType=S \
--key-schema AttributeName=id,KeyType=HASH \
--provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=2 \
--stream-specification StreamEnabled=TRUE,StreamViewType=NEW_AND_OLD_IMAGES
执行后获得一个 JSON 输出,并且能看到它的 LatestStreamArn
在 --key-schema
中可指定多个 AttributeName=X,KeyType=Y 组合,其他的 Key 会作为 Sort key。
而后我们可用其他的 AWS 命令来查看 DynamoDB Table 和 Stream 的信息
$ aws dynamodb describe-table --table-name yanbin-test
$ aws dynamodbstreams list-streams
$ aws dynamodbstreams describe-stream --stream-arn arn:aws:dynamodb:us-east-1:1234567890:table/yanbin-test/stream/2021-09-09T02:43:29.390
describe-stream
命令可以看到 Shard 信息,显示的部分内容如
注意:
当我们为某个表开启了 Stream,然后把该表删除后,用 list-streams
命令看到该 Stream 仍然存在,这是为什么呢?因为 Stream 中的数据需保留 24 小时,所以只有等到 24 小时修该 Stream 才会被删除,其中我们还能从中消费数据。
这儿看到的是一个 Shard, 也就是用它来触发 Lambda 只会同时启动一个 Lambda 实例,但恐怕 Shard 数目还不确定。
测试 DynamoDB Stream 中的消息
创建好了 DynamoDB Stream 后,我们就可以使用它了,当有对 DynamoDB 表中的记录有增删改操作,就会有相应的事件记录写入到对应的 Stream 中去。
我们先启动一个 DynamoDB Stream 的监听器
上面的 next_iter 总是有值,所以一直在试图从 DynamoDB Stream 中取数据。我们 get_shard_iterator
指定的 ShardIteratorType
是 LATEST
,所以此时一直没有记录输出。
接下我们用代码往该流中添加记录
添加记录时我们将会看到前面的监听器输出接收到的记录,类似
我们展开一条记录显示如下
eventName
是 INSERT
时是没有 OldImage 的,再试下更新一些记录
更新记录的 Python 代码如下
在 DynamoDB Stream 的一条记录的信息如下
会有 OldImage 和 NewImage。
注:DynamoDB 并不是只要调用了 update_item
函数就会放 Stream 中放一条记录,而是会对比是否对实际的内容进行的修改,如果新旧值是一样的则不会往 Stream 中写入任何事件。
当我们删除记录时,eventName 会是 REMOVE
, 自然的,此时消息中只会有 OldImage
。
使用 DynamoDB Stream 来触发 Lambda
用 DynamoDB Stream 来触发 Lambda 可以完全想像成和使用 Kinesis Stream 触发器是一样的。首先我们选用蓝本 dynamodb-process-stream-python
来创建一个 Lambda, 它的 Python 代码如下:
接着为该 Lambda 选择一个 DynamoDB table 作为触发器,我们选上 yanbin-test
这个表,其他参数 Batch size 默认为 100, 可选的 Batch window, Starting position 默认为 Latest, 还可选 Trim horizon(即从第一条可用消息开始消费),以及其他控制重试策略的设置。
因为一个 DynamoDB 的表只关联一个有效的 Stream,所以选择了一个表便能选择上与之相应的 StreamArn,如果表尚未启用 Stream 则会自动创建一个。
此时,如果该 Lambda 的 IAM role 没有对上面 DynamoDB Stream 的 GetRecords, GetShardIterator, DescribeStream, 和 ListStreams 需通通加上。现在只要对 yanbin-test
表进行增删改操作就会往表相应的 DynamoDB Stream 中写入操作日志,同时也会触发该 Lambda 了。至于在 Lambda 中收到事件的格式就无需再重复了,参考前面的内容就是。
总结一下
- 创建一个 DynamoDB 表后,需开启 Stream 功能,DynamoDB Stream 像 Kinesis Stream 类似,也有 Shard, 消息的 SequenceNumber
- DynamoDB Stream 的 Shard 数目不确定,而 Kinesis Stream 可指定 Shard 数目, 最大为帐号的限制,比如 500
- DynamoDB Stream 中每条消息保持的时间为 24 小时,Kinesis Stream 可自定义该时长,可保存一天至一年
- 一个 DynamoDB Stream Shard 最多可有两个 Consumer, 而 Kinesis Stream 则没有这个限制
- 对 DynamoDB 表的增删改操作像关系型数据库的触发器一样记录到它相应的 Stream 中去。可选择的记录 Key, 新值,旧值,或者新旧值。
- 更新时,即例调用了 update_item() 操作,如果值保持与原来一样(A 修改为 A),也不会向 DynamoDB Stream 中写入数据
- 如果用 DynamoDB Stream 来驱动 Lambda 的话,和 Kinesis Stream 一样,一个 Shard 只能启动一个 Lambda 实例。重试策略也是一样的
- DynamoDB Stream 中的
everntName
有INSERT
,MODIFY
,和REMOVE
三种类型 - 一个 DynamoDB 表只能有一个有效的 Stream 与之关联。被删除的 DynamoDB Stream, 即使是相应表被删后的孤儿 Stream 也会被保留 24 小时
链接:
- DynamoDB 的流
- Process Large DynamoDB sTreams Using Multiple Amazon Kinesis Client Library (KCL) Workers
- Tutorial: Using AWS Lambda with Amazon DynamoDB streams
- AWS DynamoDB Streams -- Change Data Capture for DynamoDB Tables
本文链接 https://yanbin.blog/dynamodb-stream-and-trigger-lambda/, 来自 隔叶黄莺 Yanbin Blog
[版权声明] 本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。