使用 Java 转换 Apache Avro 为 Parquet 数据格式

Avro 和 Parquet  是处理数据时常用的两种编码格式,它们同为 Hadoop 大家庭中的成员。这两种格式都是自我描述的,即在数据文件中带有 Schema。 Avro 广泛的应用于数据的序列化,如 Kafka,它是基于行的格式,可被流式处理,而 Parquet 是列式存储格式的,适合于基于列的查询,不能用于流式处理。


既然是一个系统中可能同时用到了这两种数据存储格式,那么就可能有它们之间相互转换的需求。本文探索如何从 Avro 转换为 Parquet 格式数据,以 Java 语言为例,所涉及到的话题有

  1. 转换 Avro 数据为 Parquet 文件
  2. 如何支持 Avro 的 LogicalType 类型到 Parquet 的转换, 以 date 类型为例
  3. 实现转换 Avro 数据为 Parquet 字节数组(内存中完成 Avro 到 Parquet 的转换)

本文例子中所选择 Avro 版本是当前最新的 1.10.1

创建 Avro Schema 并编译成 Java 对象

先来创建一个 Avro schema user.avsc, 并编译成 Java 代码。Schema 定义如下
 1{
 2  "namespace": "yanbin.blog.data",
 3  "type": "record",
 4  "name": "User",
 5  "fields": [
 6    {"name": "id", "type": "int"},
 7    {"name": "name", "type": "string"},
 8    {"name": "age", "type": ["null", "double"], "default": null}
 9  ]
10}

编译成 Java 代码

本人在 Mac OS X 平台下用 brew install avro-tools 安装的命令来编译
$ avro-tools compile -string schema user.avsc ./
或者用下载的  avro-tools jar 包来编译
java -jar /path/to/avro-tools-1.10.1.jar -string compile schema user.avsc ./
或者是用配置的 Maven 插件来编译生成 yanbin.blog.data.User 类。

创建 Avro 对象并转换成 Parquet 格式

把在当前目录中生成的 User 类引入到 Java 项目中,本例用 Maven 来管理依赖,在 pom.xml 中引入最基本的依赖
 1<dependencies>
 2    <dependency>
 3        <groupId>org.apache.avro</groupId>
 4        <artifactId>avro</artifactId>
 5        <version>1.10.1</version>
 6    </dependency>
 7    <dependency>
 8        <groupId>org.apache.parquet</groupId>
 9        <artifactId>parquet-avro</artifactId>
10        <version>1.11.1</version>
11    </dependency>
12    <dependency>
13        <groupId>org.apache.hadoop</groupId>
14        <artifactId>hadoop-core</artifactId>
15        <version>1.2.1</version>
16        <exclusions> <!-- hadoop-core 可说是引入了一堆的垃圾,排除所有 -->
17            <exclusion>
18                <groupId>*</groupId>
19                <artifactId>*</artifactId>
20            </exclusion>
21        </exclusions>
22    </dependency>
23    <!-- 补充 hadoop-core 排除的但需要用到的两个包 -->
24    <dependency>
25        <groupId>commons-logging</groupId>
26        <artifactId>commons-logging</artifactId>
27        <version>1.2</version>
28    </dependency>
29    <dependency>
30        <groupId>commons-configuration</groupId>
31        <artifactId>commons-configuration</artifactId>
32        <version>1.6</version>
33    </dependency>
34</dependencies>

如此,整个项目的依赖就比较干净,用 mvn dependency:tree 命令显示如下:
 1[INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ test-parquet ---
 2[INFO] blog.yanbin:test-parquet:jar:1.0-SNAPSHOT
 3[INFO] +- org.apache.avro:avro:jar:1.10.1:compile
 4[INFO] |  +- com.fasterxml.jackson.core:jackson-core:jar:2.11.3:compile
 5[INFO] |  +- com.fasterxml.jackson.core:jackson-databind:jar:2.11.3:compile
 6[INFO] |  |  \- com.fasterxml.jackson.core:jackson-annotations:jar:2.11.3:compile
 7[INFO] |  +- org.apache.commons:commons-compress:jar:1.20:compile
 8[INFO] |  \- org.slf4j:slf4j-api:jar:1.7.30:compile
 9[INFO] +- org.apache.parquet:parquet-avro:jar:1.11.1:compile
10[INFO] |  +- org.apache.parquet:parquet-column:jar:1.11.1:compile
11[INFO] |  |  +- org.apache.parquet:parquet-common:jar:1.11.1:compile
12[INFO] |  |  |  \- org.apache.yetus:audience-annotations:jar:0.11.0:compile
13[INFO] |  |  \- org.apache.parquet:parquet-encoding:jar:1.11.1:compile
14[INFO] |  +- org.apache.parquet:parquet-hadoop:jar:1.11.1:compile
15[INFO] |  |  +- org.apache.parquet:parquet-jackson:jar:1.11.1:compile
16[INFO] |  |  +- org.xerial.snappy:snappy-java:jar:1.1.7.3:compile
17[INFO] |  |  \- commons-pool:commons-pool:jar:1.6:compile
18[INFO] |  \- org.apache.parquet:parquet-format-structures:jar:1.11.1:compile
19[INFO] |     \- javax.annotation:javax.annotation-api:jar:1.3.2:compile
20[INFO] +- org.apache.hadoop:hadoop-core:jar:1.2.1:compile
21[INFO] +- commons-logging:commons-logging:jar:1.2:compile
22[INFO] \- commons-configuration:commons-configuration:jar:1.6:compile
23[INFO]    +- commons-collections:commons-collections:jar:3.2.1:compile
24[INFO]    +- commons-lang:commons-lang:jar:2.4:compile
25[INFO]    +- commons-digester:commons-digester:jar:1.8:compile
26[INFO]    |  \- commons-beanutils:commons-beanutils:jar:1.7.0:compile
27[INFO]    \- commons-beanutils:commons-beanutils-core:jar:1.8.0:compile
28[INFO] ------------------------------------------------------------------------
29[INFO] BUILD SUCCESS
30[INFO] ------------------------------------------------------------------------

Java 代码如下
 1package yanbin.blog;
 2
 3import org.apache.avro.Schema;
 4import org.apache.avro.specific.SpecificRecordBase;
 5import org.apache.hadoop.fs.Path;
 6import org.apache.parquet.avro.AvroParquetWriter;
 7import org.apache.parquet.hadoop.ParquetFileWriter;
 8import org.apache.parquet.hadoop.ParquetWriter;
 9import org.apache.parquet.hadoop.metadata.CompressionCodecName;
10import yanbin.blog.data.User;
11
12import java.io.IOException;
13import java.io.UncheckedIOException;
14import java.util.Arrays;
15import java.util.List;
16
17public class TestParquet {
18
19    public static void main(String[] args) {
20        List<User> users = Arrays.asList(
21                User.newBuilder().setId(1).setName("Scott").build(),
22                User.newBuilder().setId(2).setName("Tiger").setAge(20.5).build());
23        writeToParquet(users);
24    }
25
26    public static <T extends SpecificRecordBase> void writeToParquet(List<T> avroObjects) {
27        Schema avroSchema = avroObjects.get(0).getSchema();
28        String parquetFile = "./users.parquet";
29        Path path = new Path(parquetFile);
30        try (ParquetWriter<Object> writer = AvroParquetWriter.builder(path)
31                .withSchema(avroSchema)
32                .withCompressionCodec(CompressionCodecName.SNAPPY)
33                .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
34                .build()) {
35            avroObjects.forEach(r -> {
36                try {
37                    writer.write(r);
38                } catch (IOException ex) {
39                    throw new UncheckedIOException(ex);
40                }
41            });
42        } catch (IOException e) {
43            e.printStackTrace();
44        }
45    }
46}

执行后在当前目录下生成了文件 users.parquet, 接下来看看这个文件中都有什么。

类似于安装 avro-tools, 我们可以本 Mac OS X下用 brew install parquet-tools 安装 parquet 的命令,其他平台使用各自的包管理工具来安装 parquet-tools 命令。

查看 users.parquet 中的数据
$ parquet-tools cat --json users.parquet
{"id":1,"name":"Scott"}
{"id":2,"name":"Tiger","age":20.5}
查看 users.parquet 数据的 Schema
$ parquet-tools schema users.parquet
message yanbin.blog.data.User {
  required int32 id;
  required binary name (STRING);
  optional double age;
}
从中可以看到 Avro Schema 到 Parquet Schema 的映射,"int" 为 "int32", "string" 为 binary, 有 default 的字段在 Parquet 中会加上个 optional.

LogicalType 的转换

Avro 和 Parquet 的数据类型都有自己的 LogicType 概念,下面给 user.asvc 加上一个 date LogicalType 的 birthday 字段,整个 user.avsc 内容如下
 1{
 2  "namespace": "yanbin.blog.data",
 3  "type": "record",
 4  "name": "User",
 5  "fields": [
 6    {"name": "id", "type": "int"},
 7    {"name": "name", "type": "string"},
 8    {"name": "age", "type": ["null", "double"], "default": null},
 9    {"name": "birthday", "type": ["null", {"type": "int", "logicalType": "date"}], "default": null}
10  ]
11}

再次用 avro-tools 命令编译为 User.java 文件,在 User 类中 birthday 的类型是 java.time.LocalDate。 再试图用之前的代码来转换带有 birthday 值的 Avro 对象,把前面 TestParquetmain 方法修改如下:
1    public static void main(String[] args) {
2        List<User> users = Arrays.asList(
3                User.newBuilder().setId(1).setName("Scott").setBirthday(LocalDate.now()).build(),
4                User.newBuilder().setId(2).setName("Tiger").setAge(20.5).build());
5        writeToParquet(users);
6    }

执行后提示错误
Exception in thread "main" java.lang.ClassCastException: java.time.LocalDate cannot be cast to java.lang.Number
    at org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:323)
    at org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:275)
    at org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:191)
    at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165)
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
    at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:301)
为了处理这个错误,我还跟到了源代码中,停在 org.apache.avro.generic.GenericData 类中方法
public <T> Conversion<T> getConversionByClass(Class<T> datumClass, LogicalType logicalType)
的代码行

Map<String, Conversion<?>> conversions = (Map)this.conversionsByClass.get(datumClass);
处给 conversionsByClass 添加个值也能解决这个问题,调试时执行代码
this.conversionsByClass.put(LocalDate.class, ImmutableMap.of("date", new TimeConversions.DateConversion()))
后来仔细检查 AvroParquetWriter  类可以调用 withDataModel(GenericData)  来添加 Conversion, 于是 writeToParquet() 方法的实现如下
 1    public static <T extends SpecificRecordBase> void writeToParquet(List<T> avroObjects) {
 2        Schema avroSchema = avroObjects.get(0).getSchema();
 3        String parquetFile = "./users.parquet";
 4        Path path = new Path(parquetFile);
 5        GenericData genericData = GenericData.get();
 6        genericData.addLogicalTypeConversion(new TimeConversions.DateConversion());
 7        try (ParquetWriter<Object> writer = AvroParquetWriter.builder(path)
 8                .withDataModel(genericData)
 9                .withSchema(avroSchema)
10                .withCompressionCodec(CompressionCodecName.SNAPPY)
11                .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
12                .build()) {
13            avroObjects.forEach(r -> {
14                try {
15                    writer.write(r);
16                } catch (IOException ex) {
17                    throw new UncheckedIOException(ex);
18                }
19            });
20        } catch (IOException e) {
21            e.printStackTrace();
22        }
23    }

执行 TestParquet  类产生新的 users.parquet  文件,再次查看它的数据和 Schema
$ parquet-tools cat --json users.parquet
{"id":1,"name":"Scott","birthday":18681}
{"id":2,"name":"Tiger","age":20.5} $ parquet-tools schema users.parquet
message yanbin.blog.data.User {
  required int32 id;
  required binary name (STRING);
  optional double age;
  optional int32 birthday (DATE);
}
其他的 LogicalType 应该也可以采用类似的方式来处理。如果我们查看 Conversion 的实现类有以下 9 个

必要是想必可以实现自己的 Conversion 类

其实 Parquet 也有自己的 LogicalType 定义,如 MAP,DECIMAL, DATE, TIME, TIMESTAMP 等,如何把 Avro 的 date 类型映射为 Parquet 的 DATE 类型也是个问题。

如何在内存中完成 Avro 转换为 Parquet 

以上转换 Avro 为 Parquet 需要生成一个文件,是否能在内存中完成 Avro 到 Parquet 格式的转换呢?即要得到 Parquet  内容的字节数组而不借助于磁盘文件。突破口应该是在 `AvroParquetWriter.builder()` 这个方法上,它有两个重载方法,分别是

1  public static <T> Builder<T> builder(Path file) {
2    return new Builder<T>(file);
3  }
4
5  public static <T> Builder<T> builder(OutputFile file) {
6    return new Builder<T>(file);
7  }

Path 是一个 org.apache.hadoop.fs.Path 类, 而非 Java 的 Path,这个估计不行,OutputFile 是一个接口 org.apache.parquet.io.OutputFile, 看起来有戏,它目前只有一个实现类 org.apache.parquet.hadoop.util.HadoopOutputFile, 要实现内存中存储字节类容的 OutputFile 肯定是可行的。

继续追踪 OutputFile 的 create 方法的返回值为 PositionOutputStream, 它有两个实现 DelegatingPositionOutputStreamHadoopPositionOutputStream, 前者是一个适配器. DelegatingPositionOutputStream 的构造函数需要一个  OutputStream, 把它换成一个 ByteArrayOutpuStream 就能在内存中处理了。最后只要实现 PositionOutputStream  的 long getPos() 返回 ByteArrayOutputStream  的当前位置就行。

具体实现需要创建一个  InMemoryOutputFileInMemoryPositionOutputStream  类, 写在一个类文件  InMemoryOutputFile.java 中
 1package yanbin.blog;
 2
 3import org.apache.parquet.io.DelegatingPositionOutputStream;
 4import org.apache.parquet.io.OutputFile;
 5import org.apache.parquet.io.PositionOutputStream;
 6
 7import java.io.ByteArrayOutputStream;
 8import java.io.IOException;
 9import java.io.OutputStream;
10
11public class InMemoryOutputFile implements OutputFile {
12    private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
13
14    @Override
15    public PositionOutputStream create(long blockSizeHint) throws IOException { // Mode.CREATE 会调用此方法
16        return new InMemoryPositionOutputStream(baos);
17    }
18
19    @Override
20    public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException {
21        return null;
22    }
23
24    @Override
25    public boolean supportsBlockSize() {
26        return false;
27    }
28
29    @Override
30    public long defaultBlockSize() {
31        return 0;
32    }
33
34    public byte[] toArray() {
35        return baos.toByteArray();
36    }
37
38    private static class InMemoryPositionOutputStream extends DelegatingPositionOutputStream {
39
40        public InMemoryPositionOutputStream(OutputStream outputStream) {
41            super(outputStream);
42        }
43
44        @Override
45        public long getPos() throws IOException {
46            return ((ByteArrayOutputStream) this.getStream()).size();
47        }
48    }
49}

应用 InMemoryOutputFile  的 writeToParquet() 方法,为验证内存中的内容是否正确,我们把 outputFile.toArray() 输出到 users-memory.parquet 文件
 1    public static <T extends SpecificRecordBase> void writeToParquet(List<T> avroObjects) throws IOException {
 2        Schema avroSchema = avroObjects.get(0).getSchema();
 3        GenericData genericData = GenericData.get();
 4        genericData.addLogicalTypeConversion(new TimeConversions.DateConversion());
 5        InMemoryOutputFile outputFile = new InMemoryOutputFile();
 6        try (ParquetWriter<Object> writer = AvroParquetWriter.builder(outputFile)
 7                .withDataModel(genericData)
 8                .withSchema(avroSchema)
 9                .withCompressionCodec(CompressionCodecName.SNAPPY)
10                .withWriteMode(ParquetFileWriter.Mode.CREATE) // 内存中处理什么 Mode 无所谓
11                .build()) {
12            avroObjects.forEach(r -> {
13                try {
14                    writer.write(r);
15                } catch (IOException ex) {
16                    throw new UncheckedIOException(ex);
17                }
18            });
19        } catch (IOException e) {
20            e.printStackTrace();
21        }
22
23        Files.write(Paths.get("./users-memory.parquet"), outputFile.toArray());
24    }

执行 TestParquest 后生成  users-memory.parquet 文件,现在是有点激动人心的时刻,验证内存中的内容是否正确
$ ls -l *.parquet
-rw-r--r-- 1 yanbin root 1173 Feb 23 23:19 users-memory.parquet
-rwxrwxrwx 1 yanbin root 1173 Feb 23 22:49 users.parquet
$
$ parquet-tools cat --json users-memory.parquet
{"id":1,"name":"Scott","birthday":18681}
{"id":2,"name":"Tiger","age":20.5}
$
$ parquet-tools schema users-memory.parquet
message yanbin.blog.data.User {
  required int32 id;
  required binary name (STRING);
  optional double age;
  optional int32 birthday (DATE);
}
文件大小相同,内容和 Schema 都无误,大功告成,洗洗睡了。

爬起来再被一句,内存中处理的话小心消耗内存,快速处理小文件不错,避免了磁盘 IO 操作,但大文件就会要命的。 永久链接 https://yanbin.blog/convert-apache-avro-to-parquet-format-in-java/, 来自 隔叶黄莺 Yanbin's Blog
[版权声明] 本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。