像两个人交流一样要找一个互相能理解的语言, 在国内为普通话, 跑国外多用英语相通, 两个进程间通信也需要找一个大家都能理解的数据格式. 简单的如 JSON, XML, 那是自我描述性格式, XML 有 Schema 定义, 但尚无正式的 JSON Schema 规范. 在讲求效率的场合, 纯文本式的数据交换格式无法满足要求, 于是有二进制的 Google Protobuf 和 Apache Avro. 在 Apache 的生态像 Hadoop, Kafka 中自然是选用 Avro.
Avro 支持多种语言, 如 C, C++, C#, Java, PHP, Python 和 Ruby. 它使用 JSON 来定义 Schema, 通过工具可以由 Schema 生成相应语言的数据对象, 比如 Java 的 avro-tools.jar. 这样可以在跨进程跨语言透明的实现为对象交换.
本文体验 Java 环境中 Avro 数据格式的序列化与反序列化.
Avro Schema 文件就是数据生产和消费端的通信协议; 我们可以由 Schema 生成相应的 Java 对象, 然后以具体的 Java 对象交换, 或者不生成 Java 对象而纯粹以 GenericRecord
交互. 为操作数据的简单, 我们通常采用前一种方式, 即生成具体数据传输对象.
首先定义一个 Schema
1 2 3 4 5 6 7 8 9 |
{ "namespace": "cc.unmi.data", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "address", "type": ["string", "null"]} ] } |
对于 Schema 不多加说明, 这里只定义了一个 User 对象, 有两个属性 name 和 address. Schema 的详细解释可打开 http://avro.apache.org/docs/1.8.1/spec.html.
假设文件名为 user.avsc
, avsc
应该是 Avro
Schema 文件, 我至今都未查到 Avro
是什么的缩写.
由 Schema 生成 Java 对象
我们需要用到 avro-tools-1.x.x.jar
工具包, 当前版本是 1.8.1, 命令格式是
java -jar /path/to/avro-tools-1.8.1.jar compile schema user.avsc .
上面命令会在当前目录生成 cc/unmi/data/User.java
文件. 下面的例子会使用 org.apache.avro:avro-maven-plugin
来从 Schema 生成 Java 对象.
可以大致看一下生成的 User.java 的片断
1 2 3 4 5 6 7 |
@org.apache.avro.specific.AvroGenerated public class User extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { private static final long serialVersionUID = 3019453098083125873L; public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\"...."); public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } public org.apache.avro.Schema getSchema() { return SCHEMA$; } ............ |
生成的对象中包含完整的 Schema 定义内容, 可由静态方法 getClassSchema()
和实例方法 getSchema()
获得相应的 Schema, 所以拥有了这个对象类通信时就不再需要 user.avsc
文件了. 在它的父类 SpecificRecordBase
类中定义了抽象方法 getSchema()
.
并且这个类提供了多种方式来创建一个实例
- User user = new User(); user.setName("Yanbin")..., user.put(2, "Chicago")..., user.put("name", "Qiu")
- User user = new User("Yanbin", "Chicago")
- User user = User.newBuilder().setName("Yanbin").setAddress("Chicago").build()
序列化
下面的代码把一个 User 对象序列化为字节数组, 也可以序列化为外部文件
1 2 3 4 5 6 7 |
private static byte[] serializeUser(User user) throws IOException { DatumWriter<User> userDatumWriter = new SpecificDatumWriter<>(User.class); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); BinaryEncoder binaryEncoder = EncoderFactory.get().directBinaryEncoder(outputStream, null); userDatumWriter.write(user, binaryEncoder); return outputStream.toByteArray(); } |
光有序列化代码无法验证序列化后的数据是否正确, 于是要有下面的反序列化代码
反序列化
1 2 3 4 5 |
private static User deserializeUser(byte[] data) throws IOException { DatumReader<User> userDatumReader = new SpecificDatumReader<>(User.class); BinaryDecoder binaryEncoder = DecoderFactory.get().directBinaryDecoder(new ByteArrayInputStream(data), null); return userDatumReader.read(new User(), binaryEncoder); } |
从上面方法输出的字节数组中反序列化出相等的对象来, userDatumReader.read(new User(), binaryEncoder)
执行后的返回值与被更新后的第一个参数是一样的, 所以这个方法要是能写成 reutnr userDatumReader.read(User.class, binaryEncoder);
会好看些.
有了上面的两个方法需要串联起来, 序列化的输出作为反序化的输出就能能证明两个操作是否正确
验证序列化与反序列化
1 2 3 4 5 6 7 |
public static void main(String[] args) throws IOException { User originalUser = new User("Yanbin", "Chicago"); User deserializedUser = deserializeUser(serializeUser(originalUser)); System.out.println("Same object? " + (deserializedUser == originalUser)); System.out.println("Objects equal? " + (deserializedUser.equals(originalUser))); System.out.println("All fields: " + deserializedUser); } |
执行输出结果如下
Same object? false
Objects equal? true
All fields: {"name": "Yanbin", "address": "Chicago"}
准确无误, 大功告成
本例实作是一个 Maven 项目, pom.xml
文件内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cc.unmi</groupId> <artifactId>avrodemo</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <name>Apache Avro Demo</name> <dependencies> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.8.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.8.1</version> <executions> <execution> <phase>generate-sources</phase> <goals> <goal>schema</goal> </goals> <configuration> <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory> <outputDirectory>${project.basedir}/src/main/java/</outputDirectory> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project> |
由 user.avsc
生成 Java 对象是挂在 generate-sources
阶段执行的, 所以在 mvn compile
时会生成 User.java
文件.
完整的项目文件在 GitHub 上 https://github.com/yabqiu/apache-avro-demo.
相关链接:
本文链接 https://yanbin.blog/apache-avro-serializing-deserializing/, 来自 隔叶黄莺 Yanbin Blog
[版权声明] 本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。
[…] Apache Avro: 1.8.1. 关于 Avro 序列化的内容可参见 Apache Avro 序列化与反序列化 (Java 实现) […]
本文中的序列化数据不包含 Schema 定义,如果要数据中包含 Schema 定义可以用代码
DatumWriter<User> userDatumWriter = new SpecificDatumWriter<>(User.class);
DataFileWriter<User> dataFileWriter = new DataFileWriter<>(userDatumWriter);
dataFileWriter.setCodec(CodecFactory.snappyCodec()); //加密方式
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
dataFileWriter.create(user.getSchema(), outputStream);
dataFileWriter.append(user);
dataFileWriter.append(user1); // 可加多个实例
dataFileWriter.close();
那么相应的读取代码就是
DatumReader<User> userDatumReader = new SpecificDatumReader<>();
DataFileReader<User> dataFileReader = new DataFileReader<>(new SeekableByteArrayInput(outputStream.toByteArray()), userDatumReader);
dataFileReader.forEach(System.out::println)
序列化数据用文本编辑器打开就像
Objavro.schema�{"type":"record","name":"User","namespace":"cc.unmi.data","fields":[{"name":"name","type":"string"},{"name":"address","type":["string","null"]}]}avro.codecsnappykuĤOQ1k��%�Ǝ0DYXXanbinChicagoځ�IkuĤOQ1k��%�Ǝ
能够看到序列化数据里带有当前的 Schema 定义。