使用 Java 转换 Apache Avro 为 Parquet 数据格式

Avro 和 Parquet  是处理数据时常用的两种编码格式,它们同为 Hadoop 大家庭中的成员。这两种格式都是自我描述的,即在数据文件中带有 Schema。Avro 广泛的应用于数据的序列化,如 Kafka,它是基于行的格式,可被流式处理,而 Parquet 是列式存储格式的,适合于基于列的查询,不能用于流式处理。

既然是一个系统中可能同时用到了这两种数据存储格式,那么就可能有它们之间相互转换的需求。本文探索如何从 Avro 转换为 Parquet 格式数据,以 Java 语言为例,所涉及到的话题有

  1. 转换 Avro 数据为 Parquet 文件
  2. 如何支持 Avro 的 LogicalType 类型到 Parquet 的转换, 以 date 类型为例
  3. 实现转换 Avro 数据为 Parquet 字节数组(内存中完成 Avro 到 Parquet 的转换)

本文例子中所选择 Avro 版本是当前最新的 1.10.1

创建 Avro Schema 并编译成 Java 对象

先来创建一个 Avro schema user.avsc, 并编译成 Java 代码。Schema 定义如下

编译成 Java 代码

本人在 Mac OS X 平台下用 brew install avro-tools 安装的命令来编译

$ avro-tools compile -string schema user.avsc ./

或者用下载的  avro-tools jar 包来编译

java -jar /path/to/avro-tools-1.10.1.jar -string compile schema user.avsc ./

或者是用配置的 Maven 插件来编译生成 yanbin.blog.data.User 类。

创建 Avro 对象并转换成 Parquet 格式

把在当前目录中生成的 User 类引入到 Java 项目中,本例用 Maven 来管理依赖,在 pom.xml 中引入最基本的依赖

如此,整个项目的依赖就比较干净,用 mvn dependency:tree 命令显示如下:

Java 代码如下

执行后在当前目录下生成了文件 users.parquet, 接下来看看这个文件中都有什么。

类似于安装 avro-tools, 我们可以本 Mac OS X下用 brew install parquet-tools 安装 parquet 的命令,其他平台使用各自的包管理工具来安装 parquet-tools 命令。

查看 users.parquet 中的数据

$ parquet-tools cat --json users.parquet
{"id":1,"name":"Scott"}
{"id":2,"name":"Tiger","age":20.5}

查看 users.parquet 数据的 Schema

$ parquet-tools schema users.parquet
message yanbin.blog.data.User {
  required int32 id;
  required binary name (STRING);
  optional double age;
}

从中可以看到 Avro Schema 到 Parquet Schema 的映射,"int" 为 "int32", "string" 为 binary, 有 default 的字段在 Parquet 中会加上个 optional.

LogicalType 的转换

Avro 和 Parquet 的数据类型都有自己的 LogicType 概念,下面给 user.asvc 加上一个 date LogicalType 的 birthday 字段,整个 user.avsc 内容如下

再次用 avro-tools 命令编译为 User.java 文件,在 User 类中 birthday 的类型是 java.time.LocalDate。再试图用之前的代码来转换带有 birthday 值的 Avro 对象,把前面 TestParquetmain 方法修改如下:

执行后提示错误

Exception in thread "main" java.lang.ClassCastException: java.time.LocalDate cannot be cast to java.lang.Number
    at org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:323)
    at org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:275)
    at org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:191)
    at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165)
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
    at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:301)

为了处理这个错误,我还跟到了源代码中,停在 org.apache.avro.generic.GenericData 类中方法

public <T> Conversion<T> getConversionByClass(Class<T> datumClass, LogicalType logicalType)

的代码行

Map<String, Conversion<?>> conversions = (Map)this.conversionsByClass.get(datumClass);

处给 conversionsByClass 添加个值也能解决这个问题,调试时执行代码

this.conversionsByClass.put(LocalDate.class, ImmutableMap.of("date", new TimeConversions.DateConversion()))

后来仔细检查 AvroParquetWriter  类可以调用 withDataModel(GenericData)  来添加 Conversion, 于是 writeToParquet() 方法的实现如下

执行 TestParquet  类产生新的 users.parquet  文件,再次查看它的数据和 Schema

$ parquet-tools cat --json users.parquet
{"id":1,"name":"Scott","birthday":18681}
{"id":2,"name":"Tiger","age":20.5}
$ parquet-tools schema users.parquet
message yanbin.blog.data.User {
  required int32 id;
  required binary name (STRING);
  optional double age;
  optional int32 birthday (DATE);
}

其他的 LogicalType 应该也可以采用类似的方式来处理。如果我们查看 Conversion 的实现类有以下 9 个

必要是想必可以实现自己的 Conversion 类

其实 Parquet 也有自己的 LogicalType 定义,如 MAP,DECIMAL, DATE, TIME, TIMESTAMP 等,如何把 Avro 的 date 类型映射为 Parquet 的 DATE 类型也是个问题。

如何在内存中完成 Avro 转换为 Parquet 

以上转换 Avro 为 Parquet 需要生成一个文件,是否能在内存中完成 Avro 到 Parquet 格式的转换呢?即要得到 Parquet  内容的字节数组而不借助于磁盘文件。突破口应该是在 AvroParquetWriter.builder() 这个方法上,它有两个重载方法,分别是

Path 是一个 org.apache.hadoop.fs.Path 类, 而非 Java 的 Path,这个估计不行,OutputFile 是一个接口 org.apache.parquet.io.OutputFile, 看起来有戏,它目前只有一个实现类 org.apache.parquet.hadoop.util.HadoopOutputFile, 要实现内存中存储字节类容的 OutputFile 肯定是可行的。

继续追踪 OutputFile 的 create 方法的返回值为 PositionOutputStream, 它有两个实现 DelegatingPositionOutputStreamHadoopPositionOutputStream, 前者是一个适配器. DelegatingPositionOutputStream 的构造函数需要一个  OutputStream,把它换成一个 ByteArrayOutpuStream 就能在内存中处理了。最后只要实现 PositionOutputStream  的 long getPos() 返回 ByteArrayOutputStream  的当前位置就行。

具体实现需要创建一个  InMemoryOutputFileInMemoryPositionOutputStream  类, 写在一个类文件  InMemoryOutputFile.java 中

应用 InMemoryOutputFile  的 writeToParquet() 方法,为验证内存中的内容是否正确,我们把 outputFile.toArray() 输出到 users-memory.parquet 文件

执行 TestParquest 后生成  users-memory.parquet 文件,现在是有点激动人心的时刻,验证内存中的内容是否正确

$ ls -l *.parquet
-rw-r--r-- 1 yanbin root 1173 Feb 23 23:19 users-memory.parquet
-rwxrwxrwx 1 yanbin root 1173 Feb 23 22:49 users.parquet
$
$ parquet-tools cat --json users-memory.parquet
{"id":1,"name":"Scott","birthday":18681}
{"id":2,"name":"Tiger","age":20.5}
$
$ parquet-tools schema users-memory.parquet
message yanbin.blog.data.User {
  required int32 id;
  required binary name (STRING);
  optional double age;
  optional int32 birthday (DATE);
}

文件大小相同,内容和 Schema 都无误,大功告成,洗洗睡了。

爬起来再被一句,内存中处理的话小心消耗内存,快速处理小文件不错,避免了磁盘 IO 操作,但大文件就会要命的。

类别: Spark. 标签: , . 阅读(283). 订阅评论. TrackBack.
guest
0 Comments
Inline Feedbacks
View all comments
trackback

[…] 使用 Java 转换 Apache Avro 为 Parquet 数据格式 实现把 Avro 数据转换为 Parquet 文件或内存字节数组,并支持 […]

0
Would love your thoughts, please comment.x
()
x