最近转战到 Amazon 的云服务 AWS 上,考虑到在使用它的 Lambda 服务时 Python 应用有比较可观的启动速度,与之相比而言,Java 总是慢热型,还是一个内存大户。所以有想法 Lambda 函数用 Python 来写,来增强响应速度,而内部的应用仍然采用 Java, 于是就有了 Java 与 Python 的数据交换格式。使用 Kafka 的时候是用的 Apache Avro, 因此继续考察它。
注意,本文的内容会有很大部份与前一篇 Apache Avro 序列化与反序列化 (Java 实现) 雷同,不过再经一次的了应用,了解更深了。
在不同类型语言间进行数据交换,很容易会想到用 JSON 格式,那我们为什么还要用 Apache Avro 呢?通过接下来的内容,我们可以看到以下几点:
- Apache Avro 序列化的格式也是 JSON 的,Java 的 Avro 库依赖于 Jackson 库
- 序列化数据库本身带有 Schema 定义的,方便于反序列化,特别是对于 Java 代码; 而 JavaScript 会表示多此一举
- 自动支持序列化数据的压缩,在官方提供的库中,Java 可支持
deflate
,snappy
,bzip2
, 和xz
. 其他语言中可能少些,如 Python 只支持deflate
, 和snappy
, 应该可扩充。序列化数据中 Schema 部分不被压缩 - 天然支持序列化对象列表,这样在序列化数据中只需要一份 Schema,类似于数据库表 Schema 加上多记录行的表示方式。只用 Apache Avro 传输小对象的话,数据量比 JSON 事 JDK 序列化的数据要大。
Apache Avro 官方提供有 C, C++, C#, Java, PHP, Python 和 Ruby 的支持库,可在网上找到其他语种的类库,如 NodeJS, Go 的,等等。
正所谓将要在 Java 和 Python 两个语言间用 Apache Avro 进行数据交换,所以后面的 Reader, Writer 代码会交错着进行
首先也是定义 Schema
与前篇略有不同的是,这里使用了两个 Schema 文件,其中的对象是一对多的关系
文件 product.avsc
1 2 3 4 5 6 7 8 9 |
{ "namespace": "cc.unmi.data", "type": "record", "name": "Product", "fields": [ {"name": "id", "type": "int"}, {"name": "name", "type": ["string", "null"]} ] } |
文件 user.avsc
1 2 3 4 5 6 7 8 9 10 |
{ "namespace": "cc.unmi.data", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "address", "type": ["string", "null"]}, {"name": "products", "type": {"type": "array", "items": "Product"}} ] } |
product.avsc 的内容也可以内联到 user.avsc 中,如果 product.avsc 需要被重用的话就最好用单独的文件定义。
先看 Java 序列化,Python 反序列化
由 Schema 生成 Java 对象
Java 应用中我们一般会由前面的 Schema 定义生成 Java 领域对象,先要下载 avro-tools-x.x.x.jar
工具包 ,然后执行
java -jar avro-tools-1.8.1.jar compile schema product.avsc user.avsc java
User
和 Product
类将生成在 cc/unmi/data
目录中。生成的每一个 Java 对象中都含有当前的 Schema 定义,常量 SCHEMA$
, 或可通过 getSchema()
方法来获得,所以对于 Java 的 Reader 都不需要依赖于序列化数据中的 Schema 定义。
Java writer 的代码
我在项目中用的构建工具是 sbt, 所以引入 Apache Avro 的依赖方式如下
libraryDependencies += "org.apache.avro" % "avro" % "1.8.1"
类 Writer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
package cc.unmi; import cc.unmi.data.Product; import cc.unmi.data.User; import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileWriter; import org.apache.avro.io.DatumWriter; import org.apache.avro.specific.SpecificDatumWriter; import java.io.File; import java.io.IOException; import java.util.Arrays; public class Writer { public static void main(String[] args) throws IOException { User user = User.newBuilder() .setName("Yanbin") .setAddress("Chicago") .setProducts(Arrays.asList( Product.newBuilder().setId(1).setName("Book1").build(), Product.newBuilder().setId(2).setName("Book2").build() )).build(); DatumWriter<User> userDatumWriter = new SpecificDatumWriter<>(User.class); DataFileWriter<User> dataFileWriter = new DataFileWriter<>(userDatumWriter); dataFileWriter.setCodec(CodecFactory.snappyCodec()); //采用 snappy 进行压缩 dataFileWriter.create(user.getSchema(), new File("../user-by-java.avro")); //序列化到文件中 dataFileWriter.append(user); //添加第一个实例 user.setName("Unmi"); user.getProducts().forEach(product -> { product.setId(product.getId() + 2); product.setName("Book" + (Integer.parseInt(product.getName().toString().substring(4)) + 2)); }); dataFileWriter.append(user); //添加第二个实例 dataFileWriter.close(); } } |
上面的代码序列化了两个 User 实例,并写到了 ../user-by-java.avro
文件中, 数据部份用了 snappy 进行压缩。
Python reader 代码
在 Python 开始应用 Apache Avro 之前,有几条命令要执行,假设是在 Mac 下
brew install snappy # 在安装下一个之前一定要在系统中安装有 snappy 库
pip install python-snappy # 让 Python 的 Avro 支持 snappy 压缩,否则只能用 deflate
pip install avro # 安装了 Python 的 Avro 库,也同时安装了 avro 命令,可以用 avro 命令读写数据
Python 没有官方的由 Schema 文件生成 Python 类的工具,找到一个非官方的 avro-gen, 但用起来有问题。所以暂时忘了它,或者可以自己创建相关领域类。
reader.py
1 2 3 4 5 6 7 |
from avro.datafile import DataFileReader from avro.io import DatumReader reader = DataFileReader(open("../user-by-java.avro", "rb"), DatumReader()) for user in reader: print(user) reader.close() |
执行后控制台下打印出
{u'products': [{u'id': 1, u'name': u'Book1'}, {u'id': 2, u'name': u'Book2'}], u'name': u'Yanbin', u'address': u'Chicago'}
{u'products': [{u'id': 3, u'name': u'Book3'}, {u'id': 4, u'name': u'Book4'}], u'name': u'Unmi', u'address': u'Chicago'}
在 Python 得到的是一个字典的列表,想要获得某个 user 的 name 属性要用 user['name']
, 所以还是要创建出相应的类型用起来方便些。如果只是反序列化出字典的话,序列化数据中的 Schema 定义就毫无意义了。原生态的 JSON 对于 JavaScript 运用起来就得心应手了,怎么都是点操作,如 user.name
。
写到这里本想窥探一下 Apache Avro 的序列化数据格式,但一加就篇幅剧增,也超越了标题限定的范围,所以还是另立新篇。还是接着反着方向来看 Java 读取 Python 序列化的 Apache Avro 数据。
Python 序列化,Java 反序列化
Python writer 代码
由于未能生成相应的 Python 领域对象 Product 和 User, 所以在序列化时还需要获得 Schema 定义来指导工作,在本例中要序列化的数据是字典
writer.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
import avro.schema import json from avro.datafile import DataFileWriter from avro.io import DatumWriter # schema = avro.schema.parse(open("../user-with-product.avsc", "rb").read()) # 如果 product.avsc 内联在了 user.avsc 则只需这一句 def load_avsc_files(*avsc_files): all_schemas = avro.schema.Names() schemas = None for avsc_file in avsc_files: file_text = open(avsc_file).read() json_data = json.loads(file_text) schemas = avro.schema.make_avsc_object(json_data, all_schemas) # print json.dumps(user_schema.to_json(avro.schema.Names()), indent=2) return schemas schema = load_avsc_files("../product.avsc", "../user.avsc") # 把多个 Schema 内联在一起 writer = DataFileWriter(open("../user-by-python.avro", "w"), DatumWriter(), schema, codec="snappy") writer.append({'name': 'Yanbin', 'address': 'Chicago', 'products': [{'id': 3, 'name': 'Book3'}, {'id': 4, 'name': 'Book4'}]}) writer.append({'name': 'Unmi', 'address': 'Chicago', 'products': [{'id': 4, 'name': 'Book4'}, {'id': 5, 'name': 'Book5'}]}) writer.close() |
如果只是序列化 Product 的实例只需要用到 product.avsc
文件,最后序列化数据文件写在 ../user-by-python.avro
文件中,并且也采用了 snappy 来压缩数据。
Java reader 代码
写到这里反而有些纳闷了,Apache Avro 与 JSON 的主要区别是它在序列化数据中自带 Schema 定义,而我们读写代码似乎只是把其中的 Schema 定义当摆设。由 Schema 生成的 Java 对象中已包含 Schema 定义,Python 把数据都当字典看待,更不关序列化数据中的 Schema 定义毛事。下图中生成的序列化文件数据
参考 Object Container Files, 简单说它的文件结构是 Magic(Obj+版本01), meta(avro.schema 和 avro.codec), sync(16字节的随机数), 最后加上数据。所以可以看出每次用这种结构来传送小对象是很不经济的,如果是直接用 JSON 或 JDK 序列化的话,只需要最后面数据部份,正如我们前面已看到和后面将要看到的代码一样,序列化文件中的非数据部分好像都是多余的。有效益的办法应该是让数据部分臃肿起来,挤兑掉非 Payload 部分所占的比例。
好啦,现在回过后来看 Java 的 reader 代码
类 Reader
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
package cc.unmi; import cc.unmi.data.User; import org.apache.avro.file.DataFileReader; import org.apache.avro.io.DatumReader; import org.apache.avro.specific.SpecificDatumReader; import java.io.File; import java.io.IOException; public class Reader { public static void main(String[] args) throws IOException { DatumReader<User> userDatumReader = new SpecificDatumReader<>(); DataFileReader<User> dataFileReader = new DataFileReader<>(new File("../user-by-python.avro"), userDatumReader); dataFileReader.forEach(System.out::println); } } |
读取代码总是比较简单的,可成功序列化为一个个 User 对象,执行后输出为
{"name": "Yanbin", "address": "Chicago", "products": [{"id": 3, "name": "Book3"}, {"id": 4, "name": "Book4"}]}
{"name": "Unmi", "address": "Chicago", "products": [{"id": 4, "name": "Book4"}, {"id": 5, "name": "Book5"}]}