使用 Java 转换 Apache Avro 为 Parquet 数据格式(依赖更新)

在上篇 使用 Java 转换 Apache Avro 为 Parquet 数据格式 实现把 Avro 数据转换为 Parquet 文件或内存字节数组,并支持 LogicalType。其中使用到了 hadoop-core 依赖,注意到它传递的依赖都非常老旧, 到官方 Maven 仓库一看才发现还不是一般的老


长时间无人问津的项目,那一定有它的替代品。对啦,据说 hadoop-core 在 2009 年 7 月份更名为 hadoop-common 了,没找到官方说明,只看到 StackOverflow 的 Differences between Hadoop-coomon, Hadoop-core and Hadoop-client? 是这么说的。 应该是这么个说法,不然为何 hadoop-core 一直停留在  1.2.1 的版本, 而且原来 hadoop-core 中的类在 hadoop-common 中可以找到,如类 org.apache.hadoop.fs.Path。不过在 hadoop-core-1.2.1 中的 fs/s3 包不见, 这么重要的 s3 文件系统没了。

好了,针对上一篇,我们用活着的 hadoop-coomon 包来实现把  Avro 文件转换为 Parquet 文件或内存字节数组分别不同的 pom.xml 依赖配置, 代码实现与前一篇 使用 Java 转换 Apache Avro 为 Parquet 数据格式 相同。

把  Avro 转换为 Parquet 文件的依赖

pom.xml 中依赖配置
 1<dependencies>
 2    <dependency>
 3        <groupId>org.apache.avro</groupId>
 4        <artifactId>avro</artifactId>
 5        <version>1.10.1</version>
 6    </dependency>
 7    <dependency>
 8        <groupId>org.apache.parquet</groupId>
 9        <artifactId>parquet-avro</artifactId>
10        <version>1.11.1</version>
11    </dependency>
12    <dependency>
13        <groupId>org.apache.hadoop</groupId>
14        <artifactId>hadoop-common</artifactId>
15        <version>3.3.0</version>
16        <exclusions>
17            <exclusion>
18                <groupId>*</groupId>
19                <artifactId>*</artifactId>
20            </exclusion>
21        </exclusions>
22    </dependency>
23    <dependency>
24        <groupId>org.apache.hadoop</groupId>
25        <artifactId>hadoop-auth</artifactId>
26        <version>3.3.0</version>
27    </dependency>
28    <dependency>
29        <groupId>commons-logging</groupId>
30        <artifactId>commons-logging</artifactId>
31        <version>1.2</version>
32    </dependency>
33    <dependency>
34        <groupId>org.apache.htrace</groupId>
35        <artifactId>htrace-core4</artifactId>
36        <version>4.1.0-incubating</version>
37    </dependency>
38    <dependency>
39        <groupId>org.apache.commons</groupId>
40        <artifactId>commons-configuration2</artifactId>
41        <version>2.1.1</version>
42    </dependency>
43    <dependency>
44        <groupId>com.fasterxml.woodstox</groupId>
45        <artifactId>woodstox-core</artifactId>
46        <version>5.0.3</version>
47    </dependency>
48    <dependency>
49        <groupId>com.google.guava</groupId>
50        <artifactId>guava</artifactId>
51        <version>30.1-jre</version>
52    </dependency>
53    <dependency>
54        <groupId>commons-collections</groupId>
55        <artifactId>commons-collections</artifactId>
56        <version>3.2.2</version>
57    </dependency>
58</dependencies>

还是重复一下转换 Avro 为 Parquet 文件的代码
 1public static <T extends SpecificRecordBase> void writeToParquetFile(List<T> avroObjects) {
 2    Schema avroSchema = avroObjects.get(0).getSchema();
 3    GenericData genericData = GenericData.get();
 4    genericData.addLogicalTypeConversion(new TimeConversions.DateConversion());
 5    Path path = new Path("users.parquet");
 6    try (ParquetWriter<Object> writer = AvroParquetWriter.builder(path)
 7            .withDataModel(genericData)
 8            .withSchema(avroSchema)
 9            .withCompressionCodec(CompressionCodecName.SNAPPY)
10            .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
11            .build()) {
12        avroObjects.forEach(r -> {
13            try {
14                writer.write(r);
15            } catch (IOException ex) {
16                throw new UncheckedIOException(ex);
17            }
18        });
19    } catch (IOException e) {
20        e.printStackTrace();
21    }
22}

AvroParquetWriter.builder() 这个方法中要用到 hadoop-common 的类 org.apache.hadoop.fs.Path。

转换 Avro 为内存字节数组的依赖

pom.xml
 1<dependencies>
 2    <dependency>
 3        <groupId>org.apache.avro</groupId>
 4        <artifactId>avro</artifactId>
 5        <version>1.10.1</version>
 6    </dependency>
 7    <dependency>
 8        <groupId>org.apache.parquet</groupId>
 9        <artifactId>parquet-avro</artifactId>
10        <version>1.11.1</version>
11    </dependency>
12    <dependency>
13        <groupId>org.apache.hadoop</groupId>
14        <artifactId>hadoop-common</artifactId>
15        <version>3.3.0</version>
16        <exclusions>
17            <exclusion>
18                <groupId>*</groupId>
19                <artifactId>*</artifactId>
20            </exclusion>
21        </exclusions>
22    </dependency>
23    <dependency>
24        <groupId>com.fasterxml.woodstox</groupId>
25        <artifactId>woodstox-core</artifactId>
26        <version>5.0.3</version>
27    </dependency>
28    <dependency>
29        <groupId>com.google.guava</groupId>
30        <artifactId>guava</artifactId>
31        <version>30.1-jre</version>
32    </dependency>
33    <dependency>
34        <groupId>commons-collections</groupId>
35        <artifactId>commons-collections</artifactId>
36        <version>3.2.2</version>
37    </dependency>
38</dependencies>

比前面生成 Parquet 文件要省几个依赖

再回顾一下内存中完成转换为 Parquet 字节数组的代码
 1public static <T extends SpecificRecordBase> byte[] writeToParquetByteArray(List<T> avroObjects) throws IOException {
 2    Schema avroSchema = avroObjects.get(0).getSchema();
 3    GenericData genericData = GenericData.get();
 4    genericData.addLogicalTypeConversion(new TimeConversions.DateConversion());
 5    InMemoryOutputFile outputFile = new InMemoryOutputFile();
 6    try (ParquetWriter<Object> writer = AvroParquetWriter.builder(outputFile)
 7            .withDataModel(genericData)
 8            .withSchema(avroSchema)
 9            .withCompressionCodec(CompressionCodecName.SNAPPY)
10            .withWriteMode(ParquetFileWriter.Mode.CREATE)
11            .build()) {
12        avroObjects.forEach(r -> {
13            try {
14                writer.write(r);
15            } catch (IOException ex) {
16                throw new UncheckedIOException(ex);
17            }
18        });
19    } catch (IOException e) {
20        e.printStackTrace();
21    }
22
23    return outputFile.toArray();
24}

InMemoryOutputFile 的内容再次重复如下
 1package yanbin.blog;
 2
 3import org.apache.parquet.io.DelegatingPositionOutputStream;
 4import org.apache.parquet.io.OutputFile;
 5import org.apache.parquet.io.PositionOutputStream;
 6
 7import java.io.ByteArrayOutputStream;
 8import java.io.IOException;
 9import java.io.OutputStream;
10
11public class InMemoryOutputFile implements OutputFile {
12    private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
13
14    @Override
15    public PositionOutputStream create(long blockSizeHint) throws IOException {
16        return new InMemoryPositionOutputStream(baos);
17    }
18
19    @Override
20    public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException {
21        return null;
22    }
23
24    @Override
25    public boolean supportsBlockSize() {
26        return false;
27    }
28
29    @Override
30    public long defaultBlockSize() {
31        return 0;
32    }
33
34    public byte[] toArray() {
35        return baos.toByteArray();
36    }
37
38    private static class InMemoryPositionOutputStream extends DelegatingPositionOutputStream {
39
40        public InMemoryPositionOutputStream(OutputStream outputStream) {
41            super(outputStream);
42        }
43
44        @Override
45        public long getPos() throws IOException {
46            return ((ByteArrayOutputStream) this.getStream()).size();
47        }
48    }
49}

以后还是尽量用 hadoop-common 库吧。 永久链接 https://yanbin.blog/convert-apache-avro-to-parquet-in-java-use-hadoop-common-instead-of-hadoop-core/, 来自 隔叶黄莺 Yanbin's Blog
[版权声明] 本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。