前面尝试过用 Java 转换 Apache Avro 数据为 Parquet 格式,本文用 Python 来做同样的事情,并且加入 logicalType: date 类型的支持。本测试中的 Avro 数据也是由 Python 代码生成的。
重复一句 Avro 与 Parquet 的最粗略的区别:Avro 广泛的应用于数据的序列化,如 Kafka,它是基于行的格式,可被流式处理,而 Parquet 是列式存储格式的,适合于基于列的查询。
第一步,生成 Avro 数据文件 user.avro, 须先安装 fastavro
pip install fastavro
生成 user.avro 的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
from datetime import date from fastavro import parse_schema, writer schema = { "namespace": "data", "type": "record", "name": "User", "fields": [ {"name": "id", "type": "int"}, {"name": "name", "type": "string"}, {"name": "birthday", "type": {"type": "int", "logicalType": "date"}} ] } parsed_schema = parse_schema(schema) records = [ {'id': 100, 'name': 'Tom', 'birthday': date.today()}, {'id': 101, 'name': 'Jerry', 'birthday': date(2019, 4, 24)}, ] with open('user.avro', 'wb') as out: writer(out, parsed_schema, records) |
产生了一个 user.avro, 用 avro-tools 查看其中的数据与 schema
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
$ avro-tools getschema user.avro { "type" : "record", "name" : "User", "namespace" : "data", "fields" : [ { "name" : "id", "type" : "int" }, { "name" : "name", "type" : "string" }, { "name" : "birthday", "type" : { "type" : "int", "logicalType" : "date" } } ] } $ avro-tools tojson user.avro {"id":100,"name":"Tom","birthday":18747} {"id":101,"name":"Jerry","birthday":18010} |
user.avro 数据文件中的 birthday 能识别为 logicalType: date 类型
转换 Avro 为 Parquet
先安装 pandas
pip install pyarrow pandas
转换代码
1 2 3 4 5 6 7 8 9 10 11 12 13 |
import pandas as pd from fastavro import reader def process(records): df = pd.DataFrame.from_records(records) df.to_parquet('user.parquet') with open('user.avro', 'rb') as fo: avro_reader = reader(fo) rs = [r for r in avro_reader] process(rs) |
用 parque-tools 查看生成的 user.parquet 文件的 schema 与数据
1 2 3 4 5 6 7 8 9 10 |
$ parquet-tools schema user.parquet message schema { optional int64 id; optional binary name (STRING); optional int32 birthday (DATE); } $ parquet-tools cat --json user.parquet {"id":100,"name":"Tom","birthday":18747} {"id":101,"name":"Jerry","birthday":18010} |
数据完全正确,并且 birthday 在 Parquet 的 schema 中仍然是 int32(DATE) 类型。
如果喜欢在 Python 中用 Apache 的 avro 库也没问题, 安装 avro
pip install avro
读取记录的代码如下
1 2 3 4 5 6 |
from avro.datafile import DataFileReader from avro.io import DatumReader with open('user.avro', 'rb') as fo: reader = DataFileReader(fo, DatumReader()) rs = [r for r in reader] process(rs) |
更简单的使用 pandavro 包读取 avro 文件的方式
安装 pyarrow 和 pandavro
pip install pyarrow pandavro
会连带安装多个包,用 pip freeze 看下
$ pip freeze
fastavro==1.4.0
numpy==1.20.2
pandas==1.2.4
pandavro==1.6.0
pyarrow==4.0.0
python-dateutil==2.8.1
pytz==2021.1
six==1.15.0
读取 avro 的代码
1 2 3 4 |
import pandavro as pdx rs = pdx.from_avro('user.avro') process(rs) |
最后汇总一下如何最简单的转换 Avro 文件为 Parquet 格式
须安装的插件
pip install pyarrow pandas
完整转换 avro 文件为 parquet 文件代码如下
1 2 3 4 5 6 |
import pandas as pd import pandavro as pdx rs = pdx.from_avro('user.avro') df = pd.DataFrame.from_records(rs) df.to_parquet('user.parquet') |
执行代码只须 3 行。代码中虽未显式导入 pyarrow, 但内部会用到,所以必须安装 pyarrow
链接:
本文链接 https://yanbin.blog/python-convert-avro-to-parquet/, 来自 隔叶黄莺 Yanbin Blog
[版权声明] 本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。