在上篇 使用 Java 转换 Apache Avro 为 Parquet 数据格式 实现把 Avro 数据转换为 Parquet 文件或内存字节数组,并支持 LogicalType。其中使用到了 hadoop-core 依赖,注意到它传递的依赖都非常老旧,到官方 Maven 仓库一看才发现还不是一般的老
长时间无人问津的项目,那一定有它的替代品。对啦,据说 hadoop-core 在 2009 年 7 月份更名为 hadoop-common 了,没找到官方说明,只看到 StackOverflow 的 Differences between Hadoop-coomon, Hadoop-core and Hadoop-client? 是这么说的。 应该是这么个说法,不然为何 hadoop-core 一直停留在 1.2.1 的版本,而且原来 hadoop-core 中的类在 hadoop-common 中可以找到,如类 org.apache.hadoop.fs.Path。不过在 hadoop-core-1.2.1 中的 fs/s3
包不见,这么重要的 s3 文件系统没了。
好了,针对上一篇,我们用活着的 hadoop-coomon 包来实现把 Avro 文件转换为 Parquet 文件或内存字节数组分别不同的 pom.xml 依赖配置,代码实现与前一篇 使用 Java 转换 Apache Avro 为 Parquet 数据格式 相同。
把 Avro 转换为 Parquet 文件的依赖
pom.xml 中依赖配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
<dependencies> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.10.1</version> </dependency> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-avro</artifactId> <version>1.11.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.3.0</version> <exclusions> <exclusion> <groupId>*</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-auth</artifactId> <version>3.3.0</version> </dependency> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.2</version> </dependency> <dependency> <groupId>org.apache.htrace</groupId> <artifactId>htrace-core4</artifactId> <version>4.1.0-incubating</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-configuration2</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>com.fasterxml.woodstox</groupId> <artifactId>woodstox-core</artifactId> <version>5.0.3</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>30.1-jre</version> </dependency> <dependency> <groupId>commons-collections</groupId> <artifactId>commons-collections</artifactId> <version>3.2.2</version> </dependency> </dependencies> |
还是重复一下转换 Avro 为 Parquet 文件的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
public static <T extends SpecificRecordBase> void writeToParquetFile(List<T> avroObjects) { Schema avroSchema = avroObjects.get(0).getSchema(); GenericData genericData = GenericData.get(); genericData.addLogicalTypeConversion(new TimeConversions.DateConversion()); Path path = new Path("users.parquet"); try (ParquetWriter<Object> writer = AvroParquetWriter.builder(path) .withDataModel(genericData) .withSchema(avroSchema) .withCompressionCodec(CompressionCodecName.SNAPPY) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) .build()) { avroObjects.forEach(r -> { try { writer.write(r); } catch (IOException ex) { throw new UncheckedIOException(ex); } }); } catch (IOException e) { e.printStackTrace(); } } |
AvroParquetWriter.builder() 这个方法中要用到 hadoop-common 的类 org.apache.hadoop.fs.Path。
转换 Avro 为内存字节数组的依赖
pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
<dependencies> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.10.1</version> </dependency> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-avro</artifactId> <version>1.11.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.3.0</version> <exclusions> <exclusion> <groupId>*</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.fasterxml.woodstox</groupId> <artifactId>woodstox-core</artifactId> <version>5.0.3</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>30.1-jre</version> </dependency> <dependency> <groupId>commons-collections</groupId> <artifactId>commons-collections</artifactId> <version>3.2.2</version> </dependency> </dependencies> |
比前面生成 Parquet 文件要省几个依赖
再回顾一下内存中完成转换为 Parquet 字节数组的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
public static <T extends SpecificRecordBase> byte[] writeToParquetByteArray(List<T> avroObjects) throws IOException { Schema avroSchema = avroObjects.get(0).getSchema(); GenericData genericData = GenericData.get(); genericData.addLogicalTypeConversion(new TimeConversions.DateConversion()); InMemoryOutputFile outputFile = new InMemoryOutputFile(); try (ParquetWriter<Object> writer = AvroParquetWriter.builder(outputFile) .withDataModel(genericData) .withSchema(avroSchema) .withCompressionCodec(CompressionCodecName.SNAPPY) .withWriteMode(ParquetFileWriter.Mode.CREATE) .build()) { avroObjects.forEach(r -> { try { writer.write(r); } catch (IOException ex) { throw new UncheckedIOException(ex); } }); } catch (IOException e) { e.printStackTrace(); } return outputFile.toArray(); } |
InMemoryOutputFile 的内容再次重复如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
package yanbin.blog; import org.apache.parquet.io.DelegatingPositionOutputStream; import org.apache.parquet.io.OutputFile; import org.apache.parquet.io.PositionOutputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; public class InMemoryOutputFile implements OutputFile { private final ByteArrayOutputStream baos = new ByteArrayOutputStream(); @Override public PositionOutputStream create(long blockSizeHint) throws IOException { return new InMemoryPositionOutputStream(baos); } @Override public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException { return null; } @Override public boolean supportsBlockSize() { return false; } @Override public long defaultBlockSize() { return 0; } public byte[] toArray() { return baos.toByteArray(); } private static class InMemoryPositionOutputStream extends DelegatingPositionOutputStream { public InMemoryPositionOutputStream(OutputStream outputStream) { super(outputStream); } @Override public long getPos() throws IOException { return ((ByteArrayOutputStream) this.getStream()).size(); } } } |
以后还是尽量用 hadoop-common 库吧。