使用 Java 转换 Apache Avro 为 Parquet 数据格式(依赖更新)
在上篇 使用 Java 转换 Apache Avro 为 Parquet 数据格式
实现把 Avro 数据转换为 Parquet 文件或内存字节数组,并支持 LogicalType。其中使用到了 hadoop-core 依赖,注意到它传递的依赖都非常老旧,
到官方 Maven 仓库一看才发现还不是一般的老
长时间无人问津的项目,那一定有它的替代品。对啦,据说 hadoop-core 在 2009 年 7 月份更名为 hadoop-common 了,没找到官方说明,只看到 StackOverflow 的
Differences between Hadoop-coomon, Hadoop-core and Hadoop-client? 是这么说的。 应该是这么个说法,不然为何 hadoop-core 一直停留在 1.2.1 的版本,
而且原来 hadoop-core 中的类在 hadoop-common 中可以找到,如类 org.apache.hadoop.fs.Path。不过在 hadoop-core-1.2.1 中的
好了,针对上一篇,我们用活着的 hadoop-coomon 包来实现把 Avro 文件转换为 Parquet 文件或内存字节数组分别不同的 pom.xml 依赖配置, 代码实现与前一篇 使用 Java 转换 Apache Avro 为 Parquet 数据格式 相同。
还是重复一下转换 Avro 为 Parquet 文件的代码
AvroParquetWriter.builder() 这个方法中要用到 hadoop-common 的类 org.apache.hadoop.fs.Path。
比前面生成 Parquet 文件要省几个依赖
再回顾一下内存中完成转换为 Parquet 字节数组的代码
InMemoryOutputFile 的内容再次重复如下
以后还是尽量用 hadoop-common 库吧。 永久链接 https://yanbin.blog/convert-apache-avro-to-parquet-in-java-use-hadoop-common-instead-of-hadoop-core/, 来自 隔叶黄莺 Yanbin's Blog
[版权声明]
本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。
长时间无人问津的项目,那一定有它的替代品。对啦,据说 hadoop-core 在 2009 年 7 月份更名为 hadoop-common 了,没找到官方说明,只看到 StackOverflow 的
Differences between Hadoop-coomon, Hadoop-core and Hadoop-client? 是这么说的。 应该是这么个说法,不然为何 hadoop-core 一直停留在 1.2.1 的版本,
而且原来 hadoop-core 中的类在 hadoop-common 中可以找到,如类 org.apache.hadoop.fs.Path。不过在 hadoop-core-1.2.1 中的 fs/s3 包不见,
这么重要的 s3 文件系统没了。好了,针对上一篇,我们用活着的 hadoop-coomon 包来实现把 Avro 文件转换为 Parquet 文件或内存字节数组分别不同的 pom.xml 依赖配置, 代码实现与前一篇 使用 Java 转换 Apache Avro 为 Parquet 数据格式 相同。
把 Avro 转换为 Parquet 文件的依赖
pom.xml 中依赖配置 1<dependencies>
2 <dependency>
3 <groupId>org.apache.avro</groupId>
4 <artifactId>avro</artifactId>
5 <version>1.10.1</version>
6 </dependency>
7 <dependency>
8 <groupId>org.apache.parquet</groupId>
9 <artifactId>parquet-avro</artifactId>
10 <version>1.11.1</version>
11 </dependency>
12 <dependency>
13 <groupId>org.apache.hadoop</groupId>
14 <artifactId>hadoop-common</artifactId>
15 <version>3.3.0</version>
16 <exclusions>
17 <exclusion>
18 <groupId>*</groupId>
19 <artifactId>*</artifactId>
20 </exclusion>
21 </exclusions>
22 </dependency>
23 <dependency>
24 <groupId>org.apache.hadoop</groupId>
25 <artifactId>hadoop-auth</artifactId>
26 <version>3.3.0</version>
27 </dependency>
28 <dependency>
29 <groupId>commons-logging</groupId>
30 <artifactId>commons-logging</artifactId>
31 <version>1.2</version>
32 </dependency>
33 <dependency>
34 <groupId>org.apache.htrace</groupId>
35 <artifactId>htrace-core4</artifactId>
36 <version>4.1.0-incubating</version>
37 </dependency>
38 <dependency>
39 <groupId>org.apache.commons</groupId>
40 <artifactId>commons-configuration2</artifactId>
41 <version>2.1.1</version>
42 </dependency>
43 <dependency>
44 <groupId>com.fasterxml.woodstox</groupId>
45 <artifactId>woodstox-core</artifactId>
46 <version>5.0.3</version>
47 </dependency>
48 <dependency>
49 <groupId>com.google.guava</groupId>
50 <artifactId>guava</artifactId>
51 <version>30.1-jre</version>
52 </dependency>
53 <dependency>
54 <groupId>commons-collections</groupId>
55 <artifactId>commons-collections</artifactId>
56 <version>3.2.2</version>
57 </dependency>
58</dependencies>还是重复一下转换 Avro 为 Parquet 文件的代码
1public static <T extends SpecificRecordBase> void writeToParquetFile(List<T> avroObjects) {
2 Schema avroSchema = avroObjects.get(0).getSchema();
3 GenericData genericData = GenericData.get();
4 genericData.addLogicalTypeConversion(new TimeConversions.DateConversion());
5 Path path = new Path("users.parquet");
6 try (ParquetWriter<Object> writer = AvroParquetWriter.builder(path)
7 .withDataModel(genericData)
8 .withSchema(avroSchema)
9 .withCompressionCodec(CompressionCodecName.SNAPPY)
10 .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
11 .build()) {
12 avroObjects.forEach(r -> {
13 try {
14 writer.write(r);
15 } catch (IOException ex) {
16 throw new UncheckedIOException(ex);
17 }
18 });
19 } catch (IOException e) {
20 e.printStackTrace();
21 }
22}AvroParquetWriter.builder() 这个方法中要用到 hadoop-common 的类 org.apache.hadoop.fs.Path。
转换 Avro 为内存字节数组的依赖
pom.xml 1<dependencies>
2 <dependency>
3 <groupId>org.apache.avro</groupId>
4 <artifactId>avro</artifactId>
5 <version>1.10.1</version>
6 </dependency>
7 <dependency>
8 <groupId>org.apache.parquet</groupId>
9 <artifactId>parquet-avro</artifactId>
10 <version>1.11.1</version>
11 </dependency>
12 <dependency>
13 <groupId>org.apache.hadoop</groupId>
14 <artifactId>hadoop-common</artifactId>
15 <version>3.3.0</version>
16 <exclusions>
17 <exclusion>
18 <groupId>*</groupId>
19 <artifactId>*</artifactId>
20 </exclusion>
21 </exclusions>
22 </dependency>
23 <dependency>
24 <groupId>com.fasterxml.woodstox</groupId>
25 <artifactId>woodstox-core</artifactId>
26 <version>5.0.3</version>
27 </dependency>
28 <dependency>
29 <groupId>com.google.guava</groupId>
30 <artifactId>guava</artifactId>
31 <version>30.1-jre</version>
32 </dependency>
33 <dependency>
34 <groupId>commons-collections</groupId>
35 <artifactId>commons-collections</artifactId>
36 <version>3.2.2</version>
37 </dependency>
38</dependencies>比前面生成 Parquet 文件要省几个依赖
再回顾一下内存中完成转换为 Parquet 字节数组的代码
1public static <T extends SpecificRecordBase> byte[] writeToParquetByteArray(List<T> avroObjects) throws IOException {
2 Schema avroSchema = avroObjects.get(0).getSchema();
3 GenericData genericData = GenericData.get();
4 genericData.addLogicalTypeConversion(new TimeConversions.DateConversion());
5 InMemoryOutputFile outputFile = new InMemoryOutputFile();
6 try (ParquetWriter<Object> writer = AvroParquetWriter.builder(outputFile)
7 .withDataModel(genericData)
8 .withSchema(avroSchema)
9 .withCompressionCodec(CompressionCodecName.SNAPPY)
10 .withWriteMode(ParquetFileWriter.Mode.CREATE)
11 .build()) {
12 avroObjects.forEach(r -> {
13 try {
14 writer.write(r);
15 } catch (IOException ex) {
16 throw new UncheckedIOException(ex);
17 }
18 });
19 } catch (IOException e) {
20 e.printStackTrace();
21 }
22
23 return outputFile.toArray();
24}InMemoryOutputFile 的内容再次重复如下
1package yanbin.blog;
2
3import org.apache.parquet.io.DelegatingPositionOutputStream;
4import org.apache.parquet.io.OutputFile;
5import org.apache.parquet.io.PositionOutputStream;
6
7import java.io.ByteArrayOutputStream;
8import java.io.IOException;
9import java.io.OutputStream;
10
11public class InMemoryOutputFile implements OutputFile {
12 private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
13
14 @Override
15 public PositionOutputStream create(long blockSizeHint) throws IOException {
16 return new InMemoryPositionOutputStream(baos);
17 }
18
19 @Override
20 public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException {
21 return null;
22 }
23
24 @Override
25 public boolean supportsBlockSize() {
26 return false;
27 }
28
29 @Override
30 public long defaultBlockSize() {
31 return 0;
32 }
33
34 public byte[] toArray() {
35 return baos.toByteArray();
36 }
37
38 private static class InMemoryPositionOutputStream extends DelegatingPositionOutputStream {
39
40 public InMemoryPositionOutputStream(OutputStream outputStream) {
41 super(outputStream);
42 }
43
44 @Override
45 public long getPos() throws IOException {
46 return ((ByteArrayOutputStream) this.getStream()).size();
47 }
48 }
49}以后还是尽量用 hadoop-common 库吧。 永久链接 https://yanbin.blog/convert-apache-avro-to-parquet-in-java-use-hadoop-common-instead-of-hadoop-core/, 来自 隔叶黄莺 Yanbin's Blog
[版权声明]
本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。