Avro 和 Parquet 是处理数据时常用的两种编码格式,它们同为 Hadoop 大家庭中的成员。这两种格式都是自我描述的,即在数据文件中带有 Schema。Avro 广泛的应用于数据的序列化,如 Kafka,它是基于行的格式,可被流式处理,而 Parquet 是列式存储格式的,适合于基于列的查询,不能用于流式处理。
既然是一个系统中可能同时用到了这两种数据存储格式,那么就可能有它们之间相互转换的需求。本文探索如何从 Avro 转换为 Parquet 格式数据,以 Java 语言为例,所涉及到的话题有
- 转换 Avro 数据为 Parquet 文件
- 如何支持 Avro 的 LogicalType 类型到 Parquet 的转换, 以 date 类型为例
- 实现转换 Avro 数据为 Parquet 字节数组(内存中完成 Avro 到 Parquet 的转换)
本文例子中所选择 Avro 版本是当前最新的 1.10.1
创建 Avro Schema 并编译成 Java 对象
先来创建一个 Avro schema user.avsc
, 并编译成 Java 代码。Schema 定义如下
1 2 3 4 5 6 7 8 9 10 |
{ "namespace": "yanbin.blog.data", "type": "record", "name": "User", "fields": [ {"name": "id", "type": "int"}, {"name": "name", "type": "string"}, {"name": "age", "type": ["null", "double"], "default": null} ] } |
编译成 Java 代码
本人在 Mac OS X 平台下用 brew install avro-tools
安装的命令来编译
$ avro-tools compile -string schema user.avsc ./
或者用下载的 avro-tools jar 包来编译
java -jar /path/to/avro-tools-1.10.1.jar -string compile schema user.avsc ./
或者是用配置的 Maven 插件来编译生成 yanbin.blog.data.User
类。
创建 Avro 对象并转换成 Parquet 格式
把在当前目录中生成的 User 类引入到 Java 项目中,本例用 Maven 来管理依赖,在 pom.xml 中引入最基本的依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
<dependencies> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.10.1</version> </dependency> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-avro</artifactId> <version>1.11.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> <version>1.2.1</version> <exclusions> <!-- hadoop-core 可说是引入了一堆的垃圾,排除所有 --> <exclusion> <groupId>*</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency> <!-- 补充 hadoop-core 排除的但需要用到的两个包 --> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.2</version> </dependency> <dependency> <groupId>commons-configuration</groupId> <artifactId>commons-configuration</artifactId> <version>1.6</version> </dependency> </dependencies> |
如此,整个项目的依赖就比较干净,用 mvn dependency:tree
命令显示如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
[INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ test-parquet --- [INFO] blog.yanbin:test-parquet:jar:1.0-SNAPSHOT [INFO] +- org.apache.avro:avro:jar:1.10.1:compile [INFO] | +- com.fasterxml.jackson.core:jackson-core:jar:2.11.3:compile [INFO] | +- com.fasterxml.jackson.core:jackson-databind:jar:2.11.3:compile [INFO] | | \- com.fasterxml.jackson.core:jackson-annotations:jar:2.11.3:compile [INFO] | +- org.apache.commons:commons-compress:jar:1.20:compile [INFO] | \- org.slf4j:slf4j-api:jar:1.7.30:compile [INFO] +- org.apache.parquet:parquet-avro:jar:1.11.1:compile [INFO] | +- org.apache.parquet:parquet-column:jar:1.11.1:compile [INFO] | | +- org.apache.parquet:parquet-common:jar:1.11.1:compile [INFO] | | | \- org.apache.yetus:audience-annotations:jar:0.11.0:compile [INFO] | | \- org.apache.parquet:parquet-encoding:jar:1.11.1:compile [INFO] | +- org.apache.parquet:parquet-hadoop:jar:1.11.1:compile [INFO] | | +- org.apache.parquet:parquet-jackson:jar:1.11.1:compile [INFO] | | +- org.xerial.snappy:snappy-java:jar:1.1.7.3:compile [INFO] | | \- commons-pool:commons-pool:jar:1.6:compile [INFO] | \- org.apache.parquet:parquet-format-structures:jar:1.11.1:compile [INFO] | \- javax.annotation:javax.annotation-api:jar:1.3.2:compile [INFO] +- org.apache.hadoop:hadoop-core:jar:1.2.1:compile [INFO] +- commons-logging:commons-logging:jar:1.2:compile [INFO] \- commons-configuration:commons-configuration:jar:1.6:compile [INFO] +- commons-collections:commons-collections:jar:3.2.1:compile [INFO] +- commons-lang:commons-lang:jar:2.4:compile [INFO] +- commons-digester:commons-digester:jar:1.8:compile [INFO] | \- commons-beanutils:commons-beanutils:jar:1.7.0:compile [INFO] \- commons-beanutils:commons-beanutils-core:jar:1.8.0:compile [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ |
Java 代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
package yanbin.blog; import org.apache.avro.Schema; import org.apache.avro.specific.SpecificRecordBase; import org.apache.hadoop.fs.Path; import org.apache.parquet.avro.AvroParquetWriter; import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import yanbin.blog.data.User; import java.io.IOException; import java.io.UncheckedIOException; import java.util.Arrays; import java.util.List; public class TestParquet { public static void main(String[] args) { List<User> users = Arrays.asList( User.newBuilder().setId(1).setName("Scott").build(), User.newBuilder().setId(2).setName("Tiger").setAge(20.5).build()); writeToParquet(users); } public static <T extends SpecificRecordBase> void writeToParquet(List<T> avroObjects) { Schema avroSchema = avroObjects.get(0).getSchema(); String parquetFile = "./users.parquet"; Path path = new Path(parquetFile); try (ParquetWriter<Object> writer = AvroParquetWriter.builder(path) .withSchema(avroSchema) .withCompressionCodec(CompressionCodecName.SNAPPY) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) .build()) { avroObjects.forEach(r -> { try { writer.write(r); } catch (IOException ex) { throw new UncheckedIOException(ex); } }); } catch (IOException e) { e.printStackTrace(); } } } |
执行后在当前目录下生成了文件 users.parquet
, 接下来看看这个文件中都有什么。
类似于安装 avro-tools
, 我们可以本 Mac OS X下用 brew install parquet-tools
安装 parquet 的命令,其他平台使用各自的包管理工具来安装 parquet-tools 命令。
查看 users.parquet
中的数据
$ parquet-tools cat --json users.parquet
{"id":1,"name":"Scott"}
{"id":2,"name":"Tiger","age":20.5}
查看 users.parquet
数据的 Schema
$ parquet-tools schema users.parquet
message yanbin.blog.data.User {
required int32 id;
required binary name (STRING);
optional double age;
}
从中可以看到 Avro Schema 到 Parquet Schema 的映射,"int" 为 "int32", "string" 为 binary, 有 default 的字段在 Parquet 中会加上个 optional.
LogicalType 的转换
Avro 和 Parquet 的数据类型都有自己的 LogicType 概念,下面给 user.asvc
加上一个 date
LogicalType 的 birthday 字段,整个 user.avsc
内容如下
1 2 3 4 5 6 7 8 9 10 11 |
{ "namespace": "yanbin.blog.data", "type": "record", "name": "User", "fields": [ {"name": "id", "type": "int"}, {"name": "name", "type": "string"}, {"name": "age", "type": ["null", "double"], "default": null}, {"name": "birthday", "type": ["null", {"type": "int", "logicalType": "date"}], "default": null} ] } |
再次用 avro-tools
命令编译为 User.java
文件,在 User 类中 birthday
的类型是 java.time.LocalDate
。再试图用之前的代码来转换带有 birthday
值的 Avro 对象,把前面 TestParquet
的 main
方法修改如下:
1 2 3 4 5 6 |
public static void main(String[] args) { List<User> users = Arrays.asList( User.newBuilder().setId(1).setName("Scott").setBirthday(LocalDate.now()).build(), User.newBuilder().setId(2).setName("Tiger").setAge(20.5).build()); writeToParquet(users); } |
执行后提示错误
Exception in thread "main" java.lang.ClassCastException: java.time.LocalDate cannot be cast to java.lang.Number
at org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:323)
at org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:275)
at org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:191)
at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:301)
为了处理这个错误,我还跟到了源代码中,停在 org.apache.avro.generic.GenericData
类中方法
public <T> Conversion<T> getConversionByClass(Class<T> datumClass, LogicalType logicalType)
的代码行
Map<String, Conversion<?>> conversions = (Map)this.conversionsByClass.get(datumClass);
处给 conversionsByClass
添加个值也能解决这个问题,调试时执行代码
this.conversionsByClass.put(LocalDate.class, ImmutableMap.of("date", new TimeConversions.DateConversion()))
后来仔细检查 AvroParquetWriter
类可以调用 withDataModel(GenericData)
来添加 Conversion, 于是 writeToParquet()
方法的实现如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
public static <T extends SpecificRecordBase> void writeToParquet(List<T> avroObjects) { Schema avroSchema = avroObjects.get(0).getSchema(); String parquetFile = "./users.parquet"; Path path = new Path(parquetFile); GenericData genericData = GenericData.get(); genericData.addLogicalTypeConversion(new TimeConversions.DateConversion()); try (ParquetWriter<Object> writer = AvroParquetWriter.builder(path) .withDataModel(genericData) .withSchema(avroSchema) .withCompressionCodec(CompressionCodecName.SNAPPY) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) .build()) { avroObjects.forEach(r -> { try { writer.write(r); } catch (IOException ex) { throw new UncheckedIOException(ex); } }); } catch (IOException e) { e.printStackTrace(); } } |
执行 TestParquet 类产生新的 users.parquet
文件,再次查看它的数据和 Schema
$ parquet-tools cat --json users.parquet
{"id":1,"name":"Scott","birthday":18681}
{"id":2,"name":"Tiger","age":20.5}
$ parquet-tools schema users.parquet
message yanbin.blog.data.User {
required int32 id;
required binary name (STRING);
optional double age;
optional int32 birthday (DATE);
}
其他的 LogicalType 应该也可以采用类似的方式来处理。如果我们查看 Conversion 的实现类有以下 9 个
其实 Parquet 也有自己的 LogicalType 定义,如 MAP,DECIMAL, DATE, TIME, TIMESTAMP 等,如何把 Avro 的 date 类型映射为 Parquet 的 DATE 类型也是个问题。
如何在内存中完成 Avro 转换为 Parquet
以上转换 Avro 为 Parquet 需要生成一个文件,是否能在内存中完成 Avro 到 Parquet 格式的转换呢?即要得到 Parquet 内容的字节数组而不借助于磁盘文件。突破口应该是在 AvroParquetWriter.builder()
这个方法上,它有两个重载方法,分别是
1 2 3 4 5 6 7 |
public static <T> Builder<T> builder(Path file) { return new Builder<T>(file); } public static <T> Builder<T> builder(OutputFile file) { return new Builder<T>(file); } |
Path 是一个 org.apache.hadoop.fs.Path
类, 而非 Java 的 Path,这个估计不行,OutputFile 是一个接口 org.apache.parquet.io.OutputFile
, 看起来有戏,它目前只有一个实现类 org.apache.parquet.hadoop.util.HadoopOutputFile
, 要实现内存中存储字节类容的 OutputFile
肯定是可行的。
继续追踪 OutputFile 的 create 方法的返回值为 PositionOutputStream
, 它有两个实现 DelegatingPositionOutputStream
和 HadoopPositionOutputStream
, 前者是一个适配器. DelegatingPositionOutputStream
的构造函数需要一个 OutputStream
,把它换成一个 ByteArrayOutpuStream
就能在内存中处理了。最后只要实现 PositionOutputStream
的 long getPos()
返回 ByteArrayOutputStream
的当前位置就行。
具体实现需要创建一个 InMemoryOutputFile
和 InMemoryPositionOutputStream
类, 写在一个类文件 InMemoryOutputFile.java
中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
package yanbin.blog; import org.apache.parquet.io.DelegatingPositionOutputStream; import org.apache.parquet.io.OutputFile; import org.apache.parquet.io.PositionOutputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; public class InMemoryOutputFile implements OutputFile { private final ByteArrayOutputStream baos = new ByteArrayOutputStream(); @Override public PositionOutputStream create(long blockSizeHint) throws IOException { // Mode.CREATE 会调用此方法 return new InMemoryPositionOutputStream(baos); } @Override public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException { return null; } @Override public boolean supportsBlockSize() { return false; } @Override public long defaultBlockSize() { return 0; } public byte[] toArray() { return baos.toByteArray(); } private static class InMemoryPositionOutputStream extends DelegatingPositionOutputStream { public InMemoryPositionOutputStream(OutputStream outputStream) { super(outputStream); } @Override public long getPos() throws IOException { return ((ByteArrayOutputStream) this.getStream()).size(); } } } |
应用 InMemoryOutputFile
的 writeToParquet()
方法,为验证内存中的内容是否正确,我们把 outputFile.toArray() 输出到 users-memory.parquet 文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
public static <T extends SpecificRecordBase> void writeToParquet(List<T> avroObjects) throws IOException { Schema avroSchema = avroObjects.get(0).getSchema(); GenericData genericData = GenericData.get(); genericData.addLogicalTypeConversion(new TimeConversions.DateConversion()); InMemoryOutputFile outputFile = new InMemoryOutputFile(); try (ParquetWriter<Object> writer = AvroParquetWriter.builder(outputFile) .withDataModel(genericData) .withSchema(avroSchema) .withCompressionCodec(CompressionCodecName.SNAPPY) .withWriteMode(ParquetFileWriter.Mode.CREATE) // 内存中处理什么 Mode 无所谓 .build()) { avroObjects.forEach(r -> { try { writer.write(r); } catch (IOException ex) { throw new UncheckedIOException(ex); } }); } catch (IOException e) { e.printStackTrace(); } Files.write(Paths.get("./users-memory.parquet"), outputFile.toArray()); } |
执行 TestParquest
后生成 users-memory.parquet
文件,现在是有点激动人心的时刻,验证内存中的内容是否正确
$ ls -l *.parquet
-rw-r--r-- 1 yanbin root 1173 Feb 23 23:19 users-memory.parquet
-rwxrwxrwx 1 yanbin root 1173 Feb 23 22:49 users.parquet
$
$ parquet-tools cat --json users-memory.parquet
{"id":1,"name":"Scott","birthday":18681}
{"id":2,"name":"Tiger","age":20.5}
$
$ parquet-tools schema users-memory.parquet
message yanbin.blog.data.User {
required int32 id;
required binary name (STRING);
optional double age;
optional int32 birthday (DATE);
}
文件大小相同,内容和 Schema 都无误,大功告成,洗洗睡了。
爬起来再被一句,内存中处理的话小心消耗内存,快速处理小文件不错,避免了磁盘 IO 操作,但大文件就会要命的。
[…] 使用 Java 转换 Apache Avro 为 Parquet 数据格式 实现把 Avro 数据转换为 Parquet 文件或内存字节数组,并支持 […]