使用 Java 转换 Apache Avro 为 Parquet 数据格式
Avro 和 Parquet 是处理数据时常用的两种编码格式,它们同为 Hadoop 大家庭中的成员。这两种格式都是自我描述的,即在数据文件中带有 Schema。
Avro 广泛的应用于数据的序列化,如 Kafka,它是基于行的格式,可被流式处理,而 Parquet 是列式存储格式的,适合于基于列的查询,不能用于流式处理。
既然是一个系统中可能同时用到了这两种数据存储格式,那么就可能有它们之间相互转换的需求。本文探索如何从 Avro 转换为 Parquet 格式数据,以 Java 语言为例,所涉及到的话题有
本文例子中所选择 Avro 版本是当前最新的 1.10.1
编译成 Java 代码
本人在 Mac OS X 平台下用
如此,整个项目的依赖就比较干净,用
Java 代码如下
执行后在当前目录下生成了文件
类似于安装
查看
再次用
执行后提示错误
执行 TestParquet 类产生新的
必要是想必可以实现自己的 Conversion 类
其实 Parquet 也有自己的 LogicalType 定义,如 MAP,DECIMAL, DATE, TIME, TIMESTAMP 等,如何把 Avro 的 date 类型映射为 Parquet 的 DATE 类型也是个问题。
Path 是一个
继续追踪 OutputFile 的 create 方法的返回值为
具体实现需要创建一个
应用
执行
爬起来再被一句,内存中处理的话小心消耗内存,快速处理小文件不错,避免了磁盘 IO 操作,但大文件就会要命的。 永久链接 https://yanbin.blog/convert-apache-avro-to-parquet-format-in-java/, 来自 隔叶黄莺 Yanbin's Blog
[版权声明]
本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。
既然是一个系统中可能同时用到了这两种数据存储格式,那么就可能有它们之间相互转换的需求。本文探索如何从 Avro 转换为 Parquet 格式数据,以 Java 语言为例,所涉及到的话题有
- 转换 Avro 数据为 Parquet 文件
- 如何支持 Avro 的 LogicalType 类型到 Parquet 的转换, 以 date 类型为例
- 实现转换 Avro 数据为 Parquet 字节数组(内存中完成 Avro 到 Parquet 的转换)
本文例子中所选择 Avro 版本是当前最新的 1.10.1
创建 Avro Schema 并编译成 Java 对象
先来创建一个 Avro schemauser.avsc, 并编译成 Java 代码。Schema 定义如下 1{
2 "namespace": "yanbin.blog.data",
3 "type": "record",
4 "name": "User",
5 "fields": [
6 {"name": "id", "type": "int"},
7 {"name": "name", "type": "string"},
8 {"name": "age", "type": ["null", "double"], "default": null}
9 ]
10}编译成 Java 代码
本人在 Mac OS X 平台下用
brew install avro-tools 安装的命令来编译$ avro-tools compile -string schema user.avsc ./或者用下载的 avro-tools jar 包来编译
java -jar /path/to/avro-tools-1.10.1.jar -string compile schema user.avsc ./或者是用配置的 Maven 插件来编译生成
yanbin.blog.data.User 类。创建 Avro 对象并转换成 Parquet 格式
把在当前目录中生成的 User 类引入到 Java 项目中,本例用 Maven 来管理依赖,在 pom.xml 中引入最基本的依赖 1<dependencies>
2 <dependency>
3 <groupId>org.apache.avro</groupId>
4 <artifactId>avro</artifactId>
5 <version>1.10.1</version>
6 </dependency>
7 <dependency>
8 <groupId>org.apache.parquet</groupId>
9 <artifactId>parquet-avro</artifactId>
10 <version>1.11.1</version>
11 </dependency>
12 <dependency>
13 <groupId>org.apache.hadoop</groupId>
14 <artifactId>hadoop-core</artifactId>
15 <version>1.2.1</version>
16 <exclusions> <!-- hadoop-core 可说是引入了一堆的垃圾,排除所有 -->
17 <exclusion>
18 <groupId>*</groupId>
19 <artifactId>*</artifactId>
20 </exclusion>
21 </exclusions>
22 </dependency>
23 <!-- 补充 hadoop-core 排除的但需要用到的两个包 -->
24 <dependency>
25 <groupId>commons-logging</groupId>
26 <artifactId>commons-logging</artifactId>
27 <version>1.2</version>
28 </dependency>
29 <dependency>
30 <groupId>commons-configuration</groupId>
31 <artifactId>commons-configuration</artifactId>
32 <version>1.6</version>
33 </dependency>
34</dependencies>如此,整个项目的依赖就比较干净,用
mvn dependency:tree 命令显示如下: 1[INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ test-parquet ---
2[INFO] blog.yanbin:test-parquet:jar:1.0-SNAPSHOT
3[INFO] +- org.apache.avro:avro:jar:1.10.1:compile
4[INFO] | +- com.fasterxml.jackson.core:jackson-core:jar:2.11.3:compile
5[INFO] | +- com.fasterxml.jackson.core:jackson-databind:jar:2.11.3:compile
6[INFO] | | \- com.fasterxml.jackson.core:jackson-annotations:jar:2.11.3:compile
7[INFO] | +- org.apache.commons:commons-compress:jar:1.20:compile
8[INFO] | \- org.slf4j:slf4j-api:jar:1.7.30:compile
9[INFO] +- org.apache.parquet:parquet-avro:jar:1.11.1:compile
10[INFO] | +- org.apache.parquet:parquet-column:jar:1.11.1:compile
11[INFO] | | +- org.apache.parquet:parquet-common:jar:1.11.1:compile
12[INFO] | | | \- org.apache.yetus:audience-annotations:jar:0.11.0:compile
13[INFO] | | \- org.apache.parquet:parquet-encoding:jar:1.11.1:compile
14[INFO] | +- org.apache.parquet:parquet-hadoop:jar:1.11.1:compile
15[INFO] | | +- org.apache.parquet:parquet-jackson:jar:1.11.1:compile
16[INFO] | | +- org.xerial.snappy:snappy-java:jar:1.1.7.3:compile
17[INFO] | | \- commons-pool:commons-pool:jar:1.6:compile
18[INFO] | \- org.apache.parquet:parquet-format-structures:jar:1.11.1:compile
19[INFO] | \- javax.annotation:javax.annotation-api:jar:1.3.2:compile
20[INFO] +- org.apache.hadoop:hadoop-core:jar:1.2.1:compile
21[INFO] +- commons-logging:commons-logging:jar:1.2:compile
22[INFO] \- commons-configuration:commons-configuration:jar:1.6:compile
23[INFO] +- commons-collections:commons-collections:jar:3.2.1:compile
24[INFO] +- commons-lang:commons-lang:jar:2.4:compile
25[INFO] +- commons-digester:commons-digester:jar:1.8:compile
26[INFO] | \- commons-beanutils:commons-beanutils:jar:1.7.0:compile
27[INFO] \- commons-beanutils:commons-beanutils-core:jar:1.8.0:compile
28[INFO] ------------------------------------------------------------------------
29[INFO] BUILD SUCCESS
30[INFO] ------------------------------------------------------------------------Java 代码如下
1package yanbin.blog;
2
3import org.apache.avro.Schema;
4import org.apache.avro.specific.SpecificRecordBase;
5import org.apache.hadoop.fs.Path;
6import org.apache.parquet.avro.AvroParquetWriter;
7import org.apache.parquet.hadoop.ParquetFileWriter;
8import org.apache.parquet.hadoop.ParquetWriter;
9import org.apache.parquet.hadoop.metadata.CompressionCodecName;
10import yanbin.blog.data.User;
11
12import java.io.IOException;
13import java.io.UncheckedIOException;
14import java.util.Arrays;
15import java.util.List;
16
17public class TestParquet {
18
19 public static void main(String[] args) {
20 List<User> users = Arrays.asList(
21 User.newBuilder().setId(1).setName("Scott").build(),
22 User.newBuilder().setId(2).setName("Tiger").setAge(20.5).build());
23 writeToParquet(users);
24 }
25
26 public static <T extends SpecificRecordBase> void writeToParquet(List<T> avroObjects) {
27 Schema avroSchema = avroObjects.get(0).getSchema();
28 String parquetFile = "./users.parquet";
29 Path path = new Path(parquetFile);
30 try (ParquetWriter<Object> writer = AvroParquetWriter.builder(path)
31 .withSchema(avroSchema)
32 .withCompressionCodec(CompressionCodecName.SNAPPY)
33 .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
34 .build()) {
35 avroObjects.forEach(r -> {
36 try {
37 writer.write(r);
38 } catch (IOException ex) {
39 throw new UncheckedIOException(ex);
40 }
41 });
42 } catch (IOException e) {
43 e.printStackTrace();
44 }
45 }
46}执行后在当前目录下生成了文件
users.parquet, 接下来看看这个文件中都有什么。类似于安装
avro-tools, 我们可以本 Mac OS X下用 brew install parquet-tools 安装 parquet 的命令,其他平台使用各自的包管理工具来安装 parquet-tools 命令。查看
users.parquet 中的数据$ parquet-tools cat --json users.parquet查看
{"id":1,"name":"Scott"}
{"id":2,"name":"Tiger","age":20.5}
users.parquet 数据的 Schema$ parquet-tools schema users.parquet从中可以看到 Avro Schema 到 Parquet Schema 的映射,"int" 为 "int32", "string" 为 binary, 有 default 的字段在 Parquet 中会加上个 optional.
message yanbin.blog.data.User {
required int32 id;
required binary name (STRING);
optional double age;
}
LogicalType 的转换
Avro 和 Parquet 的数据类型都有自己的 LogicType 概念,下面给user.asvc 加上一个 date LogicalType 的 birthday 字段,整个 user.avsc 内容如下 1{
2 "namespace": "yanbin.blog.data",
3 "type": "record",
4 "name": "User",
5 "fields": [
6 {"name": "id", "type": "int"},
7 {"name": "name", "type": "string"},
8 {"name": "age", "type": ["null", "double"], "default": null},
9 {"name": "birthday", "type": ["null", {"type": "int", "logicalType": "date"}], "default": null}
10 ]
11}再次用
avro-tools 命令编译为 User.java 文件,在 User 类中 birthday 的类型是 java.time.LocalDate。
再试图用之前的代码来转换带有 birthday 值的 Avro 对象,把前面 TestParquet 的 main 方法修改如下:1 public static void main(String[] args) {
2 List<User> users = Arrays.asList(
3 User.newBuilder().setId(1).setName("Scott").setBirthday(LocalDate.now()).build(),
4 User.newBuilder().setId(2).setName("Tiger").setAge(20.5).build());
5 writeToParquet(users);
6 }执行后提示错误
Exception in thread "main" java.lang.ClassCastException: java.time.LocalDate cannot be cast to java.lang.Number为了处理这个错误,我还跟到了源代码中,停在
at org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:323)
at org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:275)
at org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:191)
at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:301)
org.apache.avro.generic.GenericData 类中方法public <T> Conversion<T> getConversionByClass(Class<T> datumClass, LogicalType logicalType)的代码行
Map<String, Conversion<?>> conversions = (Map)this.conversionsByClass.get(datumClass);处给
conversionsByClass 添加个值也能解决这个问题,调试时执行代码this.conversionsByClass.put(LocalDate.class, ImmutableMap.of("date", new TimeConversions.DateConversion()))后来仔细检查
AvroParquetWriter 类可以调用 withDataModel(GenericData) 来添加 Conversion, 于是 writeToParquet() 方法的实现如下 1 public static <T extends SpecificRecordBase> void writeToParquet(List<T> avroObjects) {
2 Schema avroSchema = avroObjects.get(0).getSchema();
3 String parquetFile = "./users.parquet";
4 Path path = new Path(parquetFile);
5 GenericData genericData = GenericData.get();
6 genericData.addLogicalTypeConversion(new TimeConversions.DateConversion());
7 try (ParquetWriter<Object> writer = AvroParquetWriter.builder(path)
8 .withDataModel(genericData)
9 .withSchema(avroSchema)
10 .withCompressionCodec(CompressionCodecName.SNAPPY)
11 .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
12 .build()) {
13 avroObjects.forEach(r -> {
14 try {
15 writer.write(r);
16 } catch (IOException ex) {
17 throw new UncheckedIOException(ex);
18 }
19 });
20 } catch (IOException e) {
21 e.printStackTrace();
22 }
23 }执行 TestParquet 类产生新的
users.parquet 文件,再次查看它的数据和 Schema$ parquet-tools cat --json users.parquet其他的 LogicalType 应该也可以采用类似的方式来处理。如果我们查看 Conversion 的实现类有以下 9 个
{"id":1,"name":"Scott","birthday":18681}
{"id":2,"name":"Tiger","age":20.5} $ parquet-tools schema users.parquet
message yanbin.blog.data.User {
required int32 id;
required binary name (STRING);
optional double age;
optional int32 birthday (DATE);
}
必要是想必可以实现自己的 Conversion 类其实 Parquet 也有自己的 LogicalType 定义,如 MAP,DECIMAL, DATE, TIME, TIMESTAMP 等,如何把 Avro 的 date 类型映射为 Parquet 的 DATE 类型也是个问题。
如何在内存中完成 Avro 转换为 Parquet
以上转换 Avro 为 Parquet 需要生成一个文件,是否能在内存中完成 Avro 到 Parquet 格式的转换呢?即要得到 Parquet 内容的字节数组而不借助于磁盘文件。突破口应该是在 `AvroParquetWriter.builder()` 这个方法上,它有两个重载方法,分别是1 public static <T> Builder<T> builder(Path file) {
2 return new Builder<T>(file);
3 }
4
5 public static <T> Builder<T> builder(OutputFile file) {
6 return new Builder<T>(file);
7 }Path 是一个
org.apache.hadoop.fs.Path 类, 而非 Java 的 Path,这个估计不行,OutputFile 是一个接口 org.apache.parquet.io.OutputFile,
看起来有戏,它目前只有一个实现类 org.apache.parquet.hadoop.util.HadoopOutputFile, 要实现内存中存储字节类容的 OutputFile 肯定是可行的。继续追踪 OutputFile 的 create 方法的返回值为
PositionOutputStream, 它有两个实现 DelegatingPositionOutputStream 和
HadoopPositionOutputStream, 前者是一个适配器. DelegatingPositionOutputStream 的构造函数需要一个 OutputStream,
把它换成一个 ByteArrayOutpuStream 就能在内存中处理了。最后只要实现 PositionOutputStream 的 long getPos()
返回 ByteArrayOutputStream 的当前位置就行。具体实现需要创建一个
InMemoryOutputFile 和 InMemoryPositionOutputStream 类, 写在一个类文件
InMemoryOutputFile.java 中 1package yanbin.blog;
2
3import org.apache.parquet.io.DelegatingPositionOutputStream;
4import org.apache.parquet.io.OutputFile;
5import org.apache.parquet.io.PositionOutputStream;
6
7import java.io.ByteArrayOutputStream;
8import java.io.IOException;
9import java.io.OutputStream;
10
11public class InMemoryOutputFile implements OutputFile {
12 private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
13
14 @Override
15 public PositionOutputStream create(long blockSizeHint) throws IOException { // Mode.CREATE 会调用此方法
16 return new InMemoryPositionOutputStream(baos);
17 }
18
19 @Override
20 public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException {
21 return null;
22 }
23
24 @Override
25 public boolean supportsBlockSize() {
26 return false;
27 }
28
29 @Override
30 public long defaultBlockSize() {
31 return 0;
32 }
33
34 public byte[] toArray() {
35 return baos.toByteArray();
36 }
37
38 private static class InMemoryPositionOutputStream extends DelegatingPositionOutputStream {
39
40 public InMemoryPositionOutputStream(OutputStream outputStream) {
41 super(outputStream);
42 }
43
44 @Override
45 public long getPos() throws IOException {
46 return ((ByteArrayOutputStream) this.getStream()).size();
47 }
48 }
49}应用
InMemoryOutputFile 的 writeToParquet() 方法,为验证内存中的内容是否正确,我们把 outputFile.toArray() 输出到 users-memory.parquet 文件 1 public static <T extends SpecificRecordBase> void writeToParquet(List<T> avroObjects) throws IOException {
2 Schema avroSchema = avroObjects.get(0).getSchema();
3 GenericData genericData = GenericData.get();
4 genericData.addLogicalTypeConversion(new TimeConversions.DateConversion());
5 InMemoryOutputFile outputFile = new InMemoryOutputFile();
6 try (ParquetWriter<Object> writer = AvroParquetWriter.builder(outputFile)
7 .withDataModel(genericData)
8 .withSchema(avroSchema)
9 .withCompressionCodec(CompressionCodecName.SNAPPY)
10 .withWriteMode(ParquetFileWriter.Mode.CREATE) // 内存中处理什么 Mode 无所谓
11 .build()) {
12 avroObjects.forEach(r -> {
13 try {
14 writer.write(r);
15 } catch (IOException ex) {
16 throw new UncheckedIOException(ex);
17 }
18 });
19 } catch (IOException e) {
20 e.printStackTrace();
21 }
22
23 Files.write(Paths.get("./users-memory.parquet"), outputFile.toArray());
24 }执行
TestParquest 后生成 users-memory.parquet 文件,现在是有点激动人心的时刻,验证内存中的内容是否正确$ ls -l *.parquet文件大小相同,内容和 Schema 都无误,大功告成,洗洗睡了。
-rw-r--r-- 1 yanbin root 1173 Feb 23 23:19 users-memory.parquet
-rwxrwxrwx 1 yanbin root 1173 Feb 23 22:49 users.parquet
$
$ parquet-tools cat --json users-memory.parquet
{"id":1,"name":"Scott","birthday":18681}
{"id":2,"name":"Tiger","age":20.5}
$
$ parquet-tools schema users-memory.parquet
message yanbin.blog.data.User {
required int32 id;
required binary name (STRING);
optional double age;
optional int32 birthday (DATE);
}
爬起来再被一句,内存中处理的话小心消耗内存,快速处理小文件不错,避免了磁盘 IO 操作,但大文件就会要命的。 永久链接 https://yanbin.blog/convert-apache-avro-to-parquet-format-in-java/, 来自 隔叶黄莺 Yanbin's Blog
[版权声明]
本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。