diff --git a/pixels-cache/pom.xml b/pixels-cache/pom.xml index a8d2e61a2..f0b021d6f 100644 --- a/pixels-cache/pom.xml +++ b/pixels-cache/pom.xml @@ -60,11 +60,6 @@ true - - com.google.code.findbugs - jsr305 - true - diff --git a/pixels-daemon/pom.xml b/pixels-daemon/pom.xml index 41be6421b..3e080254e 100644 --- a/pixels-daemon/pom.xml +++ b/pixels-daemon/pom.xml @@ -30,14 +30,6 @@ true - - com.alibaba fastjson diff --git a/pixels-hive/src/main/java/cn/edu/ruc/iir/pixels/hive/PixelsConf.java b/pixels-hive/src/main/java/cn/edu/ruc/iir/pixels/hive/PixelsConf.java index 3ad9e5efa..050a29de8 100644 --- a/pixels-hive/src/main/java/cn/edu/ruc/iir/pixels/hive/PixelsConf.java +++ b/pixels-hive/src/main/java/cn/edu/ruc/iir/pixels/hive/PixelsConf.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -25,7 +25,8 @@ * Define the configuration properties that Pixels understands. * refer: [OrcConf](https://github.com/apache/orc/blob/master/java/core/src/java/org/apache/orc/OrcConf.java) */ -public enum PixelsConf { +public enum PixelsConf +{ ROW_INDEX_STRIDE("pixels.row.index.stride", "hive.exec.pixels.default.row.index.stride", 10000, "Define the default Pixels index stride in number of rows. (Stride is the\n" + @@ -73,101 +74,125 @@ public enum PixelsConf { PixelsConf(String attribute, String hiveConfName, Object defaultValue, - String description) { + String description) + { this.attribute = attribute; this.hiveConfName = hiveConfName; this.defaultValue = defaultValue; this.description = description; } - public String getAttribute() { + public String getAttribute() + { return attribute; } - public String getHiveConfName() { + public String getHiveConfName() + { return hiveConfName; } - public Object getDefaultValue() { + public Object getDefaultValue() + { return defaultValue; } - public String getDescription() { + public String getDescription() + { return description; } - private String lookupValue(Properties tbl, Configuration conf) { + private String lookupValue(Properties tbl, Configuration conf) + { String result = null; - if (tbl != null) { + if (tbl != null) + { result = tbl.getProperty(attribute); } - if (result == null && conf != null) { + if (result == null && conf != null) + { result = conf.get(attribute); - if (result == null && hiveConfName != null) { + if (result == null && hiveConfName != null) + { result = conf.get(hiveConfName); } } return result; } - public long getLong(Properties tbl, Configuration conf) { + public long getLong(Properties tbl, Configuration conf) + { String value = lookupValue(tbl, conf); - if (value != null) { + if (value != null) + { return Long.parseLong(value); } return ((Number) defaultValue).longValue(); } - public long getLong(Configuration conf) { + public long getLong(Configuration conf) + { return getLong(null, conf); } - public void setLong(Configuration conf, long value) { + public void setLong(Configuration conf, long value) + { conf.setLong(attribute, value); } - public String getString(Properties tbl, Configuration conf) { + public String getString(Properties tbl, Configuration conf) + { String value = lookupValue(tbl, conf); return value == null ? (String) defaultValue : value; } - public String getString(Configuration conf) { + public String getString(Configuration conf) + { return getString(null, conf); } - public void setString(Configuration conf, String value) { + public void setString(Configuration conf, String value) + { conf.set(attribute, value); } - public boolean getBoolean(Properties tbl, Configuration conf) { + public boolean getBoolean(Properties tbl, Configuration conf) + { String value = lookupValue(tbl, conf); - if (value != null) { + if (value != null) + { return Boolean.parseBoolean(value); } return (Boolean) defaultValue; } - public boolean getBoolean(Configuration conf) { + public boolean getBoolean(Configuration conf) + { return getBoolean(null, conf); } - public void setBoolean(Configuration conf, boolean value) { + public void setBoolean(Configuration conf, boolean value) + { conf.setBoolean(attribute, value); } - public double getDouble(Properties tbl, Configuration conf) { + public double getDouble(Properties tbl, Configuration conf) + { String value = lookupValue(tbl, conf); - if (value != null) { + if (value != null) + { return Double.parseDouble(value); } return ((Number) defaultValue).doubleValue(); } - public double getDouble(Configuration conf) { + public double getDouble(Configuration conf) + { return getDouble(null, conf); } - public void setDouble(Configuration conf, double value) { + public void setDouble(Configuration conf, double value) + { conf.setDouble(attribute, value); } } diff --git a/pixels-hive/src/main/java/cn/edu/ruc/iir/pixels/hive/PixelsFile.java b/pixels-hive/src/main/java/cn/edu/ruc/iir/pixels/hive/PixelsFile.java index 8940e10b6..11eae518d 100644 --- a/pixels-hive/src/main/java/cn/edu/ruc/iir/pixels/hive/PixelsFile.java +++ b/pixels-hive/src/main/java/cn/edu/ruc/iir/pixels/hive/PixelsFile.java @@ -37,13 +37,16 @@ * Contains factory methods to read or write PIXELS files. * refer: [OrcFile](https://github.com/apache/orc/blob/master/java/core/src/java/org/apache/orc/OrcFile.java) */ -public class PixelsFile { +public class PixelsFile +{ private static Logger log = LogManager.getLogger(PixelsFile.class); - protected PixelsFile() { + protected PixelsFile() + { } - public static class ReaderOptions { + public static class ReaderOptions + { private final Configuration conf; private FileSystem filesystem; private PixelsReaderOption option; @@ -51,11 +54,14 @@ public static class ReaderOptions { private long offset = 0L; private long length = 9223372036854775807L; - public ReaderOptions(Configuration conf, FileSplit split) { + public ReaderOptions(Configuration conf, FileSplit split) + { this.conf = conf; - try { + try + { this.filesystem = FileSystem.get(conf); - } catch (IOException e) { + } catch (IOException e) + { e.printStackTrace(); } this.option = new PixelsReaderOption(); @@ -66,28 +72,35 @@ public ReaderOptions(Configuration conf, FileSplit split) { // this.option.includeCols(new String[]{}); } - public ReaderOptions filesystem(FileSystem fs) { + public ReaderOptions filesystem(FileSystem fs) + { this.filesystem = fs; return this; } - public Configuration getConfiguration() { + public Configuration getConfiguration() + { return conf; } - public FileSystem getFilesystem() { + public FileSystem getFilesystem() + { return filesystem; } - public PixelsReaderOption getOption() { + public PixelsReaderOption getOption() + { return option; } - public ReaderOptions setOption(TypeDescription schema) { - if (!ColumnProjectionUtils.isReadAllColumns(conf)) { + public ReaderOptions setOption(TypeDescription schema) + { + if (!ColumnProjectionUtils.isReadAllColumns(conf)) + { included = ColumnProjectionUtils.getReadColumnIDs(conf); log.info("genIncludedColumns:" + included.toString()); - } else { + } else + { log.info("genIncludedColumns:null"); } @@ -99,38 +112,46 @@ public ReaderOptions setOption(TypeDescription schema) { return this; } - public ReaderOptions filesystem(JobConf conf) { - try { + public ReaderOptions filesystem(JobConf conf) + { + try + { this.filesystem = FileSystem.get(conf); - } catch (IOException e) { + } catch (IOException e) + { e.printStackTrace(); } return this; } - public ReaderOptions include(List included) { + public ReaderOptions include(List included) + { this.included = included; return this; } - public ReaderOptions range(long offset, long length) { + public ReaderOptions range(long offset, long length) + { this.offset = offset; this.length = length; return this; } - - public List getIncluded() { + + public List getIncluded() + { return included; } } - public static ReaderOptions readerOptions(Configuration conf, FileSplit split) { + public static ReaderOptions readerOptions(Configuration conf, FileSplit split) + { return new ReaderOptions(conf, split); } public static PixelsReader createReader(Path path, - ReaderOptions options) throws IOException { + ReaderOptions options) throws IOException + { FileSystem fs = options.getFilesystem(); return PixelsReaderImpl.newBuilder() .setFS(fs) @@ -143,7 +164,8 @@ public static PixelsReader createReader(Path path, /** * Options for creating PIXELS file writers. */ - public static class WriterOptions implements Cloneable { + public static class WriterOptions implements Cloneable + { private final Configuration configuration; private FileSystem fileSystemValue = null; private TypeDescription schema = null; @@ -155,7 +177,8 @@ public static class WriterOptions implements Cloneable { private boolean encodingStrategy; private int compressionStrategy; - protected WriterOptions(Properties tableProperties, Configuration conf) { + protected WriterOptions(Properties tableProperties, Configuration conf) + { configuration = conf; stripeSizeValue = PixelsConf.STRIPE_SIZE.getLong(tableProperties, conf); blockSizeValue = PixelsConf.BLOCK_SIZE.getLong(tableProperties, conf); @@ -173,10 +196,13 @@ protected WriterOptions(Properties tableProperties, Configuration conf) { /** * @return a SHALLOW clone */ - public WriterOptions clone() { - try { + public WriterOptions clone() + { + try + { return (WriterOptions) super.clone(); - } catch (CloneNotSupportedException ex) { + } catch (CloneNotSupportedException ex) + { throw new AssertionError("Expected super.clone() to work"); } } @@ -185,7 +211,8 @@ public WriterOptions clone() { * Provide the filesystem for the path, if the client has it available. * If it is not provided, it will be found from the path. */ - public WriterOptions fileSystem(FileSystem value) { + public WriterOptions fileSystem(FileSystem value) + { fileSystemValue = value; return this; } @@ -195,7 +222,8 @@ public WriterOptions fileSystem(FileSystem value) { * stripe in memory until this memory limit is reached and the stripe * is flushed to the HDFS file and the next stripe started. */ - public WriterOptions stripeSize(long value) { + public WriterOptions stripeSize(long value) + { stripeSizeValue = value; return this; } @@ -204,7 +232,8 @@ public WriterOptions stripeSize(long value) { * Set the file system block size for the file. For optimal performance, * set the block size to be multiple factors of stripe size. */ - public WriterOptions blockSize(long value) { + public WriterOptions blockSize(long value) + { blockSizeValue = value; return this; } @@ -214,7 +243,8 @@ public WriterOptions blockSize(long value) { * 1000 to prevent the index from overwhelming the data. If the stride is * set to 0, no indexes will be included in the file. */ - public WriterOptions rowIndexStride(int value) { + public WriterOptions rowIndexStride(int value) + { rowIndexStrideValue = value; return this; } @@ -226,7 +256,8 @@ public WriterOptions rowIndexStride(int value) { * writing and memory utilization. To enforce writer to use the requested * buffer size use enforceBufferSize(). */ - public WriterOptions blockReplication(short value) { + public WriterOptions blockReplication(short value) + { blockReplication = value; return this; } @@ -236,7 +267,8 @@ public WriterOptions blockReplication(short value) { * straddling blocks. Padding improves locality and thus the speed of * reading, but costs space. */ - public WriterOptions blockPadding(boolean value) { + public WriterOptions blockPadding(boolean value) + { blockPaddingValue = value; return this; } @@ -244,12 +276,14 @@ public WriterOptions blockPadding(boolean value) { /** * Sets the encoding strategy that is used to encode the data. */ - public WriterOptions encodingStrategy(boolean strategy) { + public WriterOptions encodingStrategy(boolean strategy) + { encodingStrategy = strategy; return this; } - public WriterOptions compressionStrategy(int strategy) { + public WriterOptions compressionStrategy(int strategy) + { compressionStrategy = strategy; return this; } @@ -260,48 +294,59 @@ public WriterOptions compressionStrategy(int strategy) { * @param schema the schema for the file. * @return this */ - public WriterOptions setSchema(TypeDescription schema) { + public WriterOptions setSchema(TypeDescription schema) + { this.schema = schema; return this; } - public boolean getBlockPadding() { + public boolean getBlockPadding() + { return blockPaddingValue; } - public long getBlockSize() { + public long getBlockSize() + { return blockSizeValue; } - public FileSystem getFileSystem() { + public FileSystem getFileSystem() + { return fileSystemValue; } - public Configuration getConfiguration() { + public Configuration getConfiguration() + { return configuration; } - public TypeDescription getSchema() { + public TypeDescription getSchema() + { return schema; } - public long getStripeSize() { + public long getStripeSize() + { return stripeSizeValue; } - public short getBlockReplication() { + public short getBlockReplication() + { return blockReplication; } - public int getRowIndexStride() { + public int getRowIndexStride() + { return rowIndexStrideValue; } - public int getCompressionStrategy() { + public int getCompressionStrategy() + { return compressionStrategy; } - public boolean getEncodingStrategy() { + public boolean getEncodingStrategy() + { return encodingStrategy; } @@ -313,7 +358,8 @@ public boolean getEncodingStrategy() { * @param conf the configuration to use for values * @return A WriterOptions object that can be modified */ - public static WriterOptions writerOptions(Configuration conf) { + public static WriterOptions writerOptions(Configuration conf) + { return new WriterOptions(null, conf); } @@ -326,7 +372,8 @@ public static WriterOptions writerOptions(Configuration conf) { * @return a WriterOptions object that can be modified */ public static WriterOptions writerOptions(Properties tableProperties, - Configuration conf) { + Configuration conf) + { return new WriterOptions(tableProperties, conf); } @@ -341,7 +388,8 @@ public static WriterOptions writerOptions(Properties tableProperties, */ public static PixelsWriter createWriter(Path path, WriterOptions opts - ) throws IOException { + ) throws IOException + { FileSystem fs = opts.getFileSystem() == null ? path.getFileSystem(opts.getConfiguration()) : opts.getFileSystem(); return diff --git a/pixels-hive/src/main/java/cn/edu/ruc/iir/pixels/hive/PixelsSerDe.java b/pixels-hive/src/main/java/cn/edu/ruc/iir/pixels/hive/PixelsSerDe.java index ddbf9bc72..a9bb8d612 100644 --- a/pixels-hive/src/main/java/cn/edu/ruc/iir/pixels/hive/PixelsSerDe.java +++ b/pixels-hive/src/main/java/cn/edu/ruc/iir/pixels/hive/PixelsSerDe.java @@ -43,14 +43,16 @@ * @author: tao * @date: Create in 2018-12-11 15:29 **/ -public class PixelsSerDe extends AbstractSerDe { +public class PixelsSerDe extends AbstractSerDe +{ private static Logger log = LogManager.getLogger(PixelsSerDe.class); private final PixelsSerdeRow row = new PixelsSerdeRow(); private ObjectInspector inspector = null; @Override - public void initialize(@Nullable Configuration configuration, Properties table) throws SerDeException { + public void initialize(@Nullable Configuration configuration, Properties table) throws SerDeException + { List included = ColumnProjectionUtils.getReadColumnIDs(configuration); log.info("configuration:" + included.toString()); @@ -63,14 +65,19 @@ public void initialize(@Nullable Configuration configuration, Properties table) // Parse the configuration parameters ArrayList columnNames = new ArrayList<>(); - if (columnNameProperty != null && columnNameProperty.length() > 0) { + if (columnNameProperty != null && columnNameProperty.length() > 0) + { Collections.addAll(columnNames, columnNameProperty.split(columnNameDelimiter)); } - if (columnTypeProperty != null) { + + if (columnTypeProperty == null) + { // Default type: all string StringBuilder sb = new StringBuilder(); - for (int i = 0; i < columnNames.size(); i++) { - if (i > 0) { + for (int i = 0; i < columnNames.size(); i++) + { + if (i > 0) + { sb.append(":"); } sb.append("string"); @@ -90,52 +97,62 @@ public void initialize(@Nullable Configuration configuration, Properties table) } @Override - public Class getSerializedClass() { + public Class getSerializedClass() + { return PixelsSerdeRow.class; } @Override - public Writable serialize(Object realRow, ObjectInspector objectInspector) throws SerDeException { + public Writable serialize(Object realRow, ObjectInspector objectInspector) throws SerDeException + { row.realRow = realRow; row.inspector = inspector; return row; } @Override - public SerDeStats getSerDeStats() { + public SerDeStats getSerDeStats() + { return null; } @Override - public Object deserialize(Writable writable) throws SerDeException { + public Object deserialize(Writable writable) throws SerDeException + { // log.info("deserialize"); return writable; } @Override - public ObjectInspector getObjectInspector() throws SerDeException { + public ObjectInspector getObjectInspector() throws SerDeException + { return inspector; } - public class PixelsSerdeRow implements Writable { + public class PixelsSerdeRow implements Writable + { Object realRow; ObjectInspector inspector; @Override - public void write(DataOutput dataOutput) throws IOException { + public void write(DataOutput dataOutput) throws IOException + { throw new UnsupportedOperationException("can't write the bundle"); } @Override - public void readFields(DataInput dataInput) throws IOException { + public void readFields(DataInput dataInput) throws IOException + { throw new UnsupportedOperationException("can't read the bundle"); } - ObjectInspector getInspector() { + ObjectInspector getInspector() + { return inspector; } - Object getRow() { + Object getRow() + { return realRow; } } diff --git a/pixels-hive/src/main/java/cn/edu/ruc/iir/pixels/hive/PixelsStruct.java b/pixels-hive/src/main/java/cn/edu/ruc/iir/pixels/hive/PixelsStruct.java index 3390c0a71..6ab6fb7b0 100644 --- a/pixels-hive/src/main/java/cn/edu/ruc/iir/pixels/hive/PixelsStruct.java +++ b/pixels-hive/src/main/java/cn/edu/ruc/iir/pixels/hive/PixelsStruct.java @@ -40,24 +40,29 @@ * @author: tao * @date: Create in 2018-12-12 22:35 **/ -final public class PixelsStruct implements Writable { +final public class PixelsStruct implements Writable +{ private static Logger log = LogManager.getLogger(PixelsStruct.class); private Object[] fields; - public PixelsStruct(int children) { + public PixelsStruct(int children) + { fields = new Object[children]; } - public Object getFieldValue(int fieldIndex) { + public Object getFieldValue(int fieldIndex) + { return fields[fieldIndex]; } - public void setFieldValue(int fieldIndex, Object value) { + public void setFieldValue(int fieldIndex, Object value) + { fields[fieldIndex] = value; } - public int getNumFields() { + public int getNumFields() + { return fields.length; } @@ -67,8 +72,10 @@ public int getNumFields() { * * @param numFields the new number of fields */ - public void setNumFields(int numFields) { - if (fields.length != numFields) { + public void setNumFields(int numFields) + { + if (fields.length != numFields) + { Object[] oldFields = fields; fields = new Object[numFields]; System.arraycopy(oldFields, 0, fields, 0, @@ -81,36 +88,48 @@ public void setNumFields(int numFields) { * * @param other the value to point to */ - void linkFields(PixelsStruct other) { + void linkFields(PixelsStruct other) + { fields = other.fields; } @Override - public void write(DataOutput dataOutput) throws IOException { + public void write(DataOutput dataOutput) throws IOException + { throw new UnsupportedOperationException("write unsupported"); } @Override - public void readFields(DataInput dataInput) throws IOException { + public void readFields(DataInput dataInput) throws IOException + { throw new UnsupportedOperationException("readFields unsupported"); } @Override - public boolean equals(Object other) { - if (other == null || other.getClass() != PixelsStruct.class) { + public boolean equals(Object other) + { + if (other == null || other.getClass() != PixelsStruct.class) + { return false; - } else { + } else + { PixelsStruct oth = (PixelsStruct) other; - if (fields.length != oth.fields.length) { + if (fields.length != oth.fields.length) + { return false; } - for (int i = 0; i < fields.length; ++i) { - if (fields[i] == null) { - if (oth.fields[i] != null) { + for (int i = 0; i < fields.length; ++i) + { + if (fields[i] == null) + { + if (oth.fields[i] != null) + { return false; } - } else { - if (!fields[i].equals(oth.fields[i])) { + } else + { + if (!fields[i].equals(oth.fields[i])) + { return false; } } @@ -120,10 +139,13 @@ public boolean equals(Object other) { } @Override - public int hashCode() { + public int hashCode() + { int result = fields.length; - for (Object field : fields) { - if (field != null) { + for (Object field : fields) + { + if (field != null) + { result ^= field.hashCode(); } } @@ -131,11 +153,14 @@ public int hashCode() { } @Override - public String toString() { + public String toString() + { StringBuilder buffer = new StringBuilder(); buffer.append("{"); - for (int i = 0; i < fields.length; ++i) { - if (i != 0) { + for (int i = 0; i < fields.length; ++i) + { + if (i != 0) + { buffer.append(", "); } buffer.append(fields[i]); @@ -147,8 +172,10 @@ public String toString() { /* Routines for stubbing into Writables */ - public static Object createValue(TypeDescription type, int[] colIndexs) { - switch (type.getCategory()) { + public static Object createValue(TypeDescription type, int[] colIndexs) + { + switch (type.getCategory()) + { case BOOLEAN: return new BooleanWritable(); case BYTE: @@ -171,7 +198,8 @@ public static Object createValue(TypeDescription type, int[] colIndexs) { return new Text(); case DATE: return new DateWritable(); - case STRUCT: { + case STRUCT: + { PixelsStruct result = new PixelsStruct(colIndexs.length); int c = 0; List child = type.getChildren(); @@ -181,7 +209,6 @@ public static Object createValue(TypeDescription type, int[] colIndexs) { // for (TypeDescription child : type.getChildren()) { // result.setFieldValue(c++, createValue(child, colIndexs)); // } - log.info("result:" + result); return result; } default: @@ -189,64 +216,77 @@ public static Object createValue(TypeDescription type, int[] colIndexs) { } } - static class Field implements StructField { + static class Field implements StructField + { private final String name; private final ObjectInspector inspector; private final int offset; - Field(String name, ObjectInspector inspector, int offset) { + Field(String name, ObjectInspector inspector, int offset) + { this.name = name; this.inspector = inspector; this.offset = offset; } @Override - public String getFieldName() { + public String getFieldName() + { return name; } @Override - public ObjectInspector getFieldObjectInspector() { + public ObjectInspector getFieldObjectInspector() + { return inspector; } @Override - public int getFieldID() { + public int getFieldID() + { return offset; } @Override - public String getFieldComment() { + public String getFieldComment() + { return null; } } - static class PixelsStructInspector extends SettableStructObjectInspector { + static class PixelsStructInspector extends SettableStructObjectInspector + { private List fields; - protected PixelsStructInspector() { + protected PixelsStructInspector() + { super(); } - PixelsStructInspector(List fields) { + PixelsStructInspector(List fields) + { this.fields = fields; } - PixelsStructInspector(StructTypeInfo info) { + PixelsStructInspector(StructTypeInfo info) + { ArrayList fieldNames = info.getAllStructFieldNames(); ArrayList fieldTypes = info.getAllStructFieldTypeInfos(); - fields = new ArrayList(fieldNames.size()); - for (int i = 0; i < fieldNames.size(); ++i) { + fields = new ArrayList<>(fieldNames.size()); + for (int i = 0; i < fieldNames.size(); ++i) + { fields.add(new Field(fieldNames.get(i), createObjectInspector(fieldTypes.get(i)), i)); } } - PixelsStructInspector(int columnId, List types) { + PixelsStructInspector(int columnId, List types) + { PixelsProto.Type type = types.get(columnId); int fieldCount = type.getSubtypesCount(); - fields = new ArrayList(fieldCount); - for (int i = 0; i < fieldCount; ++i) { + fields = new ArrayList<>(fieldCount); + for (int i = 0; i < fieldCount; ++i) + { int fieldType = type.getSubtypes(i); fields.add(new Field(type.getName(), createObjectInspector(fieldType, types), i)); @@ -254,14 +294,18 @@ protected PixelsStructInspector() { } @Override - public List getAllStructFieldRefs() { + public List getAllStructFieldRefs() + { return fields; } @Override - public StructField getStructFieldRef(String s) { - for (StructField field : fields) { - if (field.getFieldName().equalsIgnoreCase(s)) { + public StructField getStructFieldRef(String s) + { + for (StructField field : fields) + { + if (field.getFieldName().equalsIgnoreCase(s)) + { return field; } } @@ -269,13 +313,16 @@ public StructField getStructFieldRef(String s) { } @Override - public Object getStructFieldData(Object object, StructField field) { - if (object == null) { + public Object getStructFieldData(Object object, StructField field) + { + if (object == null) + { return null; } int offset = ((Field) field).offset; PixelsStruct struct = (PixelsStruct) object; - if (offset >= struct.fields.length) { + if (offset >= struct.fields.length) + { return null; } @@ -283,25 +330,31 @@ public Object getStructFieldData(Object object, StructField field) { } @Override - public List getStructFieldsDataAsList(Object object) { - if (object == null) { + public List getStructFieldsDataAsList(Object object) + { + if (object == null) + { return null; } PixelsStruct struct = (PixelsStruct) object; - List result = new ArrayList(struct.fields.length); - for (Object child : struct.fields) { + List result = new ArrayList<>(struct.fields.length); + for (Object child : struct.fields) + { result.add(child); } return result; } @Override - public String getTypeName() { + public String getTypeName() + { StringBuilder buffer = new StringBuilder(); buffer.append("struct<"); - for (int i = 0; i < fields.size(); ++i) { + for (int i = 0; i < fields.size(); ++i) + { StructField field = fields.get(i); - if (i != 0) { + if (i != 0) + { buffer.append(","); } buffer.append(field.getFieldName()); @@ -313,22 +366,26 @@ public String getTypeName() { } @Override - public ObjectInspector.Category getCategory() { + public ObjectInspector.Category getCategory() + { return ObjectInspector.Category.STRUCT; } @Override - public Object create() { + public Object create() + { return new PixelsStruct(0); } @Override public Object setStructFieldData(Object struct, StructField field, - Object fieldValue) { + Object fieldValue) + { PixelsStruct pixelsStruct = (PixelsStruct) struct; int offset = ((Field) field).offset; // if the offset is bigger than our current number of fields, grow it - if (pixelsStruct.getNumFields() <= offset) { + if (pixelsStruct.getNumFields() <= offset) + { pixelsStruct.setNumFields(offset + 1); } pixelsStruct.setFieldValue(offset, fieldValue); @@ -336,22 +393,29 @@ public Object setStructFieldData(Object struct, StructField field, } @Override - public boolean equals(Object o) { - if (o == null || o.getClass() != getClass()) { + public boolean equals(Object o) + { + if (o == null || o.getClass() != getClass()) + { return false; - } else if (o == this) { + } else if (o == this) + { return true; - } else { + } else + { List other = ((PixelsStructInspector) o).fields; - if (other.size() != fields.size()) { + if (other.size() != fields.size()) + { return false; } - for (int i = 0; i < fields.size(); ++i) { + for (int i = 0; i < fields.size(); ++i) + { StructField left = other.get(i); StructField right = fields.get(i); if (!(left.getFieldName().equalsIgnoreCase(right.getFieldName()) && left.getFieldObjectInspector().equals - (right.getFieldObjectInspector()))) { + (right.getFieldObjectInspector()))) + { return false; } } @@ -361,97 +425,118 @@ public boolean equals(Object o) { } static class PixelsMapObjectInspector - implements MapObjectInspector, SettableMapObjectInspector { + implements MapObjectInspector, SettableMapObjectInspector + { private ObjectInspector key; private ObjectInspector value; - private PixelsMapObjectInspector() { + private PixelsMapObjectInspector() + { super(); } - PixelsMapObjectInspector(MapTypeInfo info) { + PixelsMapObjectInspector(MapTypeInfo info) + { key = createObjectInspector(info.getMapKeyTypeInfo()); value = createObjectInspector(info.getMapValueTypeInfo()); } - PixelsMapObjectInspector(int columnId, List types) { + PixelsMapObjectInspector(int columnId, List types) + { PixelsProto.Type type = types.get(columnId); key = createObjectInspector(type.getSubtypes(0), types); value = createObjectInspector(type.getSubtypes(1), types); } @Override - public ObjectInspector getMapKeyObjectInspector() { + public ObjectInspector getMapKeyObjectInspector() + { return key; } @Override - public ObjectInspector getMapValueObjectInspector() { + public ObjectInspector getMapValueObjectInspector() + { return value; } @Override - public Object getMapValueElement(Object map, Object key) { + public Object getMapValueElement(Object map, Object key) + { return ((map == null || key == null) ? null : ((Map) map).get(key)); } @Override @SuppressWarnings("unchecked") - public Map getMap(Object map) { - if (map == null) { + public Map getMap(Object map) + { + if (map == null) + { return null; } return (Map) map; } @Override - public int getMapSize(Object map) { - if (map == null) { + public int getMapSize(Object map) + { + if (map == null) + { return -1; } return ((Map) map).size(); } @Override - public String getTypeName() { + public String getTypeName() + { return "map<" + key.getTypeName() + "," + value.getTypeName() + ">"; } @Override - public Category getCategory() { + public Category getCategory() + { return Category.MAP; } @Override - public Object create() { + public Object create() + { return new LinkedHashMap(); } @Override - public Object put(Object map, Object key, Object value) { + public Object put(Object map, Object key, Object value) + { ((Map) map).put(key, value); return map; } @Override - public Object remove(Object map, Object key) { + public Object remove(Object map, Object key) + { ((Map) map).remove(key); return map; } @Override - public Object clear(Object map) { + public Object clear(Object map) + { ((Map) map).clear(); return map; } @Override - public boolean equals(Object o) { - if (o == null || o.getClass() != getClass()) { + public boolean equals(Object o) + { + if (o == null || o.getClass() != getClass()) + { return false; - } else if (o == this) { + } else if (o == this) + { return true; - } else { + } else + { PixelsMapObjectInspector other = (PixelsMapObjectInspector) o; return other.key.equals(key) && other.value.equals(value); } @@ -459,38 +544,47 @@ public boolean equals(Object o) { } static class PixelsListObjectInspector - implements ListObjectInspector, SettableListObjectInspector { + implements ListObjectInspector, SettableListObjectInspector + { private ObjectInspector child; - private PixelsListObjectInspector() { + private PixelsListObjectInspector() + { super(); } - PixelsListObjectInspector(ListTypeInfo info) { + PixelsListObjectInspector(ListTypeInfo info) + { child = createObjectInspector(info.getListElementTypeInfo()); } - PixelsListObjectInspector(int columnId, List types) { + PixelsListObjectInspector(int columnId, List types) + { PixelsProto.Type type = types.get(columnId); child = createObjectInspector(type.getSubtypes(0), types); } @Override - public ObjectInspector getListElementObjectInspector() { + public ObjectInspector getListElementObjectInspector() + { return child; } @Override - public Object getListElement(Object list, int i) { - if (list == null || i < 0 || i >= getListLength(list)) { + public Object getListElement(Object list, int i) + { + if (list == null || i < 0 || i >= getListLength(list)) + { return null; } return ((List) list).get(i); } @Override - public int getListLength(Object list) { - if (list == null) { + public int getListLength(Object list) + { + if (list == null) + { return -1; } return ((List) list).size(); @@ -498,36 +592,44 @@ public int getListLength(Object list) { @Override @SuppressWarnings("unchecked") - public List getList(Object list) { - if (list == null) { + public List getList(Object list) + { + if (list == null) + { return null; } return (List) list; } @Override - public String getTypeName() { + public String getTypeName() + { return "array<" + child.getTypeName() + ">"; } @Override - public ObjectInspector.Category getCategory() { + public ObjectInspector.Category getCategory() + { return ObjectInspector.Category.LIST; } @Override - public Object create(int size) { + public Object create(int size) + { ArrayList result = new ArrayList(size); - for (int i = 0; i < size; ++i) { + for (int i = 0; i < size; ++i) + { result.add(null); } return result; } @Override - public Object set(Object list, int index, Object element) { + public Object set(Object list, int index, Object element) + { List l = (List) list; - for (int i = l.size(); i < index + 1; ++i) { + for (int i = l.size(); i < index + 1; ++i) + { l.add(null); } l.set(index, element); @@ -535,28 +637,36 @@ public Object set(Object list, int index, Object element) { } @Override - public Object resize(Object list, int newSize) { + public Object resize(Object list, int newSize) + { ((ArrayList) list).ensureCapacity(newSize); return list; } @Override - public boolean equals(Object o) { - if (o == null || o.getClass() != getClass()) { + public boolean equals(Object o) + { + if (o == null || o.getClass() != getClass()) + { return false; - } else if (o == this) { + } else if (o == this) + { return true; - } else { + } else + { ObjectInspector other = ((PixelsListObjectInspector) o).child; return other.equals(child); } } } - static public ObjectInspector createObjectInspector(TypeInfo info) { - switch (info.getCategory()) { + static public ObjectInspector createObjectInspector(TypeInfo info) + { + switch (info.getCategory()) + { case PRIMITIVE: - switch (((PrimitiveTypeInfo) info).getPrimitiveCategory()) { + switch (((PrimitiveTypeInfo) info).getPrimitiveCategory()) + { case FLOAT: return PrimitiveObjectInspectorFactory.writableFloatObjectInspector; case DOUBLE: @@ -605,9 +715,11 @@ static public ObjectInspector createObjectInspector(TypeInfo info) { } public static ObjectInspector createObjectInspector(int columnId, - List types) { + List types) + { PixelsProto.Type type = types.get(columnId); - switch (type.getKind()) { + switch (type.getKind()) + { case FLOAT: return PrimitiveObjectInspectorFactory.writableFloatObjectInspector; case DOUBLE: @@ -627,14 +739,16 @@ public static ObjectInspector createObjectInspector(int columnId, case STRING: return PrimitiveObjectInspectorFactory.writableStringObjectInspector; case CHAR: - if (!type.hasMaximumLength()) { + if (!type.hasMaximumLength()) + { throw new UnsupportedOperationException( "Illegal use of char type without length in PIXELS type definition."); } return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector( TypeInfoFactory.getCharTypeInfo(type.getMaximumLength())); case VARCHAR: - if (!type.hasMaximumLength()) { + if (!type.hasMaximumLength()) + { throw new UnsupportedOperationException( "Illegal use of varchar type without length in PIXELS type definition."); } diff --git a/pixels-hive/src/main/java/cn/edu/ruc/iir/pixels/hive/mapred/PixelsInputFormat.java b/pixels-hive/src/main/java/cn/edu/ruc/iir/pixels/hive/mapred/PixelsInputFormat.java index bd0151cbc..3bd1832a9 100644 --- a/pixels-hive/src/main/java/cn/edu/ruc/iir/pixels/hive/mapred/PixelsInputFormat.java +++ b/pixels-hive/src/main/java/cn/edu/ruc/iir/pixels/hive/mapred/PixelsInputFormat.java @@ -57,7 +57,8 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException public RecordReader getRecordReader(InputSplit inputSplit, JobConf conf, - Reporter reporter) throws IOException { + Reporter reporter) throws IOException + { FileSplit split = (FileSplit) inputSplit; PixelsFile.ReaderOptions option = PixelsFile.readerOptions(conf, split); log.info(split.toString()); diff --git a/pixels-hive/src/main/java/cn/edu/ruc/iir/pixels/hive/mapred/PixelsKey.java b/pixels-hive/src/main/java/cn/edu/ruc/iir/pixels/hive/mapred/PixelsKey.java index 69003d58d..bb951e6ff 100644 --- a/pixels-hive/src/main/java/cn/edu/ruc/iir/pixels/hive/mapred/PixelsKey.java +++ b/pixels-hive/src/main/java/cn/edu/ruc/iir/pixels/hive/mapred/PixelsKey.java @@ -36,31 +36,38 @@ * refer: [OrcKey](https://github.com/apache/orc/blob/master/java/mapreduce/src/java/org/apache/orc/mapred/OrcKey.java) */ public final class PixelsKey - implements WritableComparable, JobConfigurable { + implements WritableComparable, JobConfigurable +{ public WritableComparable key; - public PixelsKey(WritableComparable key) { + public PixelsKey(WritableComparable key) + { this.key = key; } - public PixelsKey() { + public PixelsKey() + { key = null; } @Override - public void write(DataOutput dataOutput) throws IOException { + public void write(DataOutput dataOutput) throws IOException + { key.write(dataOutput); } @Override - public void readFields(DataInput dataInput) throws IOException { + public void readFields(DataInput dataInput) throws IOException + { key.readFields(dataInput); } @Override - public void configure(JobConf conf) { - if (key == null) { + public void configure(JobConf conf) + { + if (key == null) + { TypeDescription schema = TypeDescription.fromString(PixelsConf.MAPRED_SHUFFLE_KEY_SCHEMA .getString(conf)); @@ -69,23 +76,29 @@ public void configure(JobConf conf) { } @Override - public int compareTo(PixelsKey o) { + public int compareTo(PixelsKey o) + { return key.compareTo(o.key); } @Override - public boolean equals(Object o) { - if (o == null || key == null) { + public boolean equals(Object o) + { + if (o == null || key == null) + { return false; - } else if (o.getClass() != getClass()) { + } else if (o.getClass() != getClass()) + { return false; - } else { + } else + { return key.equals(((PixelsKey) o).key); } } @Override - public int hashCode() { + public int hashCode() + { return key.hashCode(); } } diff --git a/pixels-hive/src/main/java/cn/edu/ruc/iir/pixels/hive/mapred/PixelsMapredRecordReader.java b/pixels-hive/src/main/java/cn/edu/ruc/iir/pixels/hive/mapred/PixelsMapredRecordReader.java index dce100281..c38b0ef6d 100644 --- a/pixels-hive/src/main/java/cn/edu/ruc/iir/pixels/hive/mapred/PixelsMapredRecordReader.java +++ b/pixels-hive/src/main/java/cn/edu/ruc/iir/pixels/hive/mapred/PixelsMapredRecordReader.java @@ -40,7 +40,8 @@ * @param the root type of the file */ public class PixelsMapredRecordReader - implements org.apache.hadoop.mapred.RecordReader, StatsProvidingRecordReader { + implements org.apache.hadoop.mapred.RecordReader, StatsProvidingRecordReader +{ private static Logger log = LogManager.getLogger(PixelsMapredRecordReader.class); private static final int BATCH_SIZE = 10000; @@ -49,14 +50,21 @@ public class PixelsMapredRecordReader private VectorizedRowBatch batch; private int rowInBatch; private List included; + private List columnTypes; + private int numColumn; private final SerDeStats stats; private final PixelsReader file; public PixelsMapredRecordReader(PixelsReader fileReader, - PixelsFile.ReaderOptions options) throws IOException { + PixelsFile.ReaderOptions options) throws IOException + { this.file = fileReader; this.batchReader = fileReader.read(options.getOption()); this.schema = fileReader.getFileSchema(); + // schema should be of struct type. + assert schema.getCategory() == TypeDescription.Category.STRUCT; + this.columnTypes = schema.getChildren(); + this.numColumn = columnTypes.size(); this.batch = batchReader.readBatch(BATCH_SIZE); this.rowInBatch = 0; this.included = options.getIncluded(); @@ -69,11 +77,14 @@ public PixelsMapredRecordReader(PixelsReader fileReader, * @return true if we have rows available. * @throws IOException */ - boolean ensureBatch() throws IOException { - if (rowInBatch >= batch.size) { + boolean ensureBatch() throws IOException + { + if (rowInBatch >= batch.size) + { rowInBatch = 0; batch = batchReader.readBatch(BATCH_SIZE); - if (this.batch.size <= 0 || this.batch.endOfFile) { + if (this.batch.size <= 0 || this.batch.endOfFile) + { return false; } } @@ -81,261 +92,324 @@ boolean ensureBatch() throws IOException { } @Override - public boolean next(NullWritable key, PixelsStruct value) throws IOException { - if (!ensureBatch()) { + public boolean next(NullWritable key, PixelsStruct value) throws IOException + { + // value is created by createValue, is should not be null. + assert value != null; + + if (!ensureBatch()) + { return false; } - // get the length of IncludedColumns - if (this.included.size() == 0) { + + if (this.included.size() == 0) + { rowInBatch += 1; return true; } - if (schema.getCategory() == TypeDescription.Category.STRUCT) { - List children = schema.getChildren(); - int numberOfChildren = this.included.size(); - PixelsStruct result; - if (value == null) { - result = new PixelsStruct(numberOfChildren); - } else { - result = value; - } - for (int i = 0; i < numberOfChildren; ++i) { - result.setFieldValue(i, nextValue(batch.cols[i], rowInBatch, - children.get(included.get(i)), result.getFieldValue(i))); - } - } else { - nextValue(batch.cols[0], rowInBatch, schema, value); + + int numberOfIncluded = this.included.size(); + for (int i = 0; i < numberOfIncluded; ++i) + { + value.setFieldValue(included.get(i), nextValue(batch.cols[i], rowInBatch, + columnTypes.get(included.get(i)), value.getFieldValue(included.get(i)))); } + rowInBatch += 1; return true; } @Override - public NullWritable createKey() { + public NullWritable createKey() + { return NullWritable.get(); } @Override - public PixelsStruct createValue() { - return new PixelsStruct(this.included.size()); + public PixelsStruct createValue() + { + return new PixelsStruct(this.numColumn); } @Override - public long getPos() throws IOException { + public long getPos() throws IOException + { return 0; } @Override - public void close() throws IOException { + public void close() throws IOException + { batchReader.close(); } // todo get progress @Override - public float getProgress() throws IOException { + public float getProgress() throws IOException + { return 0; } static BooleanWritable nextBoolean(ColumnVector vector, int row, - Object previous) { - if (vector.isRepeating) { + Object previous) + { + if (vector.isRepeating) + { row = 0; } - if (vector.noNulls || !vector.isNull[row]) { + if (vector.noNulls || !vector.isNull[row]) + { BooleanWritable result; - if (previous == null || previous.getClass() != BooleanWritable.class) { + if (previous == null || previous.getClass() != BooleanWritable.class) + { result = new BooleanWritable(); - } else { + } else + { result = (BooleanWritable) previous; } result.set(((ByteColumnVector) vector).vector[row] != 0); return result; - } else { + } else + { return null; } } static ByteWritable nextByte(ColumnVector vector, int row, - Object previous) { - if (vector.isRepeating) { + Object previous) + { + if (vector.isRepeating) + { row = 0; } - if (vector.noNulls || !vector.isNull[row]) { + if (vector.noNulls || !vector.isNull[row]) + { ByteWritable result; - if (previous == null || previous.getClass() != ByteWritable.class) { + if (previous == null || previous.getClass() != ByteWritable.class) + { result = new ByteWritable(); - } else { + } else + { result = (ByteWritable) previous; } result.set((byte) ((LongColumnVector) vector).vector[row]); return result; - } else { + } else + { return null; } } static ShortWritable nextShort(ColumnVector vector, int row, - Object previous) { - if (vector.isRepeating) { + Object previous) + { + if (vector.isRepeating) + { row = 0; } - if (vector.noNulls || !vector.isNull[row]) { + if (vector.noNulls || !vector.isNull[row]) + { ShortWritable result; - if (previous == null || previous.getClass() != ShortWritable.class) { + if (previous == null || previous.getClass() != ShortWritable.class) + { result = new ShortWritable(); - } else { + } else + { result = (ShortWritable) previous; } result.set((short) ((LongColumnVector) vector).vector[row]); return result; - } else { + } else + { return null; } } static IntWritable nextInt(ColumnVector vector, int row, - Object previous) { - if (vector.isRepeating) { + Object previous) + { + if (vector.isRepeating) + { row = 0; } - if (vector.noNulls || !vector.isNull[row]) { + if (vector.noNulls || !vector.isNull[row]) + { IntWritable result; - if (previous == null || previous.getClass() != IntWritable.class) { + if (previous == null || previous.getClass() != IntWritable.class) + { result = new IntWritable(); - } else { + } else + { result = (IntWritable) previous; } result.set((int) ((LongColumnVector) vector).vector[row]); return result; - } else { + } else + { return null; } } static LongWritable nextLong(ColumnVector vector, int row, - Object previous) { - if (vector.isRepeating) { + Object previous) + { + if (vector.isRepeating) + { row = 0; } - if (vector.noNulls || !vector.isNull[row]) { + if (vector.noNulls || !vector.isNull[row]) + { LongWritable result; - if (previous == null || previous.getClass() != LongWritable.class) { + if (previous == null || previous.getClass() != LongWritable.class) + { result = new LongWritable(); - } else { + } else + { result = (LongWritable) previous; } result.set(((LongColumnVector) vector).vector[row]); return result; - } else { + } else + { return null; } } static FloatWritable nextFloat(ColumnVector vector, int row, - Object previous) { - if (vector.isRepeating) { + Object previous) + { + if (vector.isRepeating) + { row = 0; } - if (vector.noNulls || !vector.isNull[row]) { + if (vector.noNulls || !vector.isNull[row]) + { FloatWritable result; - if (previous == null || previous.getClass() != FloatWritable.class) { + if (previous == null || previous.getClass() != FloatWritable.class) + { result = new FloatWritable(); - } else { + } else + { result = (FloatWritable) previous; } result.set((float) ((DoubleColumnVector) vector).vector[row]); return result; - } else { + } else + { return null; } } static DoubleWritable nextDouble(ColumnVector vector, int row, - Object previous) { - if (vector.isRepeating) { + Object previous) + { + if (vector.isRepeating) + { row = 0; } - if (vector.noNulls || !vector.isNull[row]) { + if (vector.noNulls || !vector.isNull[row]) + { DoubleWritable result; - if (previous == null || previous.getClass() != DoubleWritable.class) { + if (previous == null || previous.getClass() != DoubleWritable.class) + { result = new DoubleWritable(); - } else { + } else + { result = (DoubleWritable) previous; } result.set(((DoubleColumnVector) vector).vector[row]); return result; - } else { + } else + { return null; } } static Text nextString(ColumnVector vector, int row, - Object previous) { - if (vector.isRepeating) { + Object previous) + { + if (vector.isRepeating) + { row = 0; } - if (vector.noNulls || !vector.isNull[row]) { + if (vector.noNulls || !vector.isNull[row]) + { Text result; - if (previous == null || previous.getClass() != Text.class) { + if (previous == null || previous.getClass() != Text.class) + { result = new Text(); - } else { + } else + { result = (Text) previous; } BinaryColumnVector bytes = (BinaryColumnVector) vector; result.set(bytes.vector[row], bytes.start[row], bytes.lens[row]); return result; - } else { + } else + { return null; } } static BytesWritable nextBinary(ColumnVector vector, int row, - Object previous) { - if (vector.isRepeating) { + Object previous) + { + if (vector.isRepeating) + { row = 0; } - if (vector.noNulls || !vector.isNull[row]) { + if (vector.noNulls || !vector.isNull[row]) + { BytesWritable result; - if (previous == null || previous.getClass() != BytesWritable.class) { + if (previous == null || previous.getClass() != BytesWritable.class) + { result = new BytesWritable(); - } else { + } else + { result = (BytesWritable) previous; } BinaryColumnVector bytes = (BinaryColumnVector) vector; result.set(bytes.vector[row], bytes.start[row], bytes.lens[row]); return result; - } else { + } else + { return null; } } static DateWritable nextDate(ColumnVector vector, int row, - Object previous) { - if (vector.isRepeating) { + Object previous) + { + if (vector.isRepeating) + { row = 0; } - if (vector.noNulls || !vector.isNull[row]) { + if (vector.noNulls || !vector.isNull[row]) + { DateWritable result; - if (previous == null || previous.getClass() != DateWritable.class) { + if (previous == null || previous.getClass() != DateWritable.class) + { result = new DateWritable(); - } else { + } else + { result = (DateWritable) previous; } int date = (int) ((LongColumnVector) vector).vector[row]; result.set(date); return result; - } else { + } else + { return null; } } @@ -343,26 +417,33 @@ static DateWritable nextDate(ColumnVector vector, PixelsStruct nextStruct(ColumnVector vector, int row, TypeDescription schema, - Object previous) { - if (vector.isRepeating) { + Object previous) + { + if (vector.isRepeating) + { row = 0; } - if (vector.noNulls || !vector.isNull[row]) { + if (vector.noNulls || !vector.isNull[row]) + { PixelsStruct result; List childrenTypes = schema.getChildren(); int numChildren = childrenTypes.size(); - if (previous == null || previous.getClass() != PixelsStruct.class) { + if (previous == null || previous.getClass() != PixelsStruct.class) + { result = new PixelsStruct(numChildren); - } else { + } else + { result = (PixelsStruct) previous; } StructColumnVector struct = (StructColumnVector) vector; - for (int f = 0; f < numChildren; ++f) { + for (int f = 0; f < numChildren; ++f) + { result.setFieldValue(f, nextValue(struct.fields[f], row, childrenTypes.get(f), result.getFieldValue(f))); } return result; - } else { + } else + { return null; } } @@ -370,8 +451,10 @@ PixelsStruct nextStruct(ColumnVector vector, public Object nextValue(ColumnVector vector, int row, TypeDescription schema, - Object previous) { - switch (schema.getCategory()) { + Object previous) + { + switch (schema.getCategory()) + { case BOOLEAN: return nextBoolean(vector, row, previous); case BYTE: @@ -402,7 +485,8 @@ public Object nextValue(ColumnVector vector, } @Override - public SerDeStats getStats() { + public SerDeStats getStats() + { stats.setRawDataSize(file.getCompressionBlockSize()); stats.setRowCount(file.getNumberOfRows()); return stats; diff --git a/pixels-hive/src/main/java/cn/edu/ruc/iir/pixels/hive/mapred/PixelsMapredRecordWriter.java b/pixels-hive/src/main/java/cn/edu/ruc/iir/pixels/hive/mapred/PixelsMapredRecordWriter.java index 49fd2faea..7ffc27a05 100644 --- a/pixels-hive/src/main/java/cn/edu/ruc/iir/pixels/hive/mapred/PixelsMapredRecordWriter.java +++ b/pixels-hive/src/main/java/cn/edu/ruc/iir/pixels/hive/mapred/PixelsMapredRecordWriter.java @@ -44,14 +44,16 @@ * @param the root type of the file */ public class PixelsMapredRecordWriter - implements RecordWriter { + implements RecordWriter +{ private final PixelsWriter writer; private final VectorizedRowBatch batch; private final TypeDescription schema; private final ObjectInspector inspector; private final StructField[] fields; - public PixelsMapredRecordWriter(PixelsWriter writer) { + public PixelsMapredRecordWriter(PixelsWriter writer) + { this.writer = writer; schema = writer.getSchema(); this.inspector = null; @@ -59,42 +61,51 @@ public PixelsMapredRecordWriter(PixelsWriter writer) { this.fields = initializeFieldsFromOi(inspector); } - private static StructField[] initializeFieldsFromOi(ObjectInspector inspector) { - if (inspector instanceof StructObjectInspector) { + private static StructField[] initializeFieldsFromOi(ObjectInspector inspector) + { + if (inspector instanceof StructObjectInspector) + { List fieldList = ((StructObjectInspector) inspector).getAllStructFieldRefs(); StructField[] fields = new StructField[fieldList.size()]; fieldList.toArray(fields); return fields; - } else { + } else + { return null; } } - static void setLongValue(ColumnVector vector, int row, long value) { + static void setLongValue(ColumnVector vector, int row, long value) + { ((LongColumnVector) vector).vector[row] = value; } - static void setDoubleValue(ColumnVector vector, int row, double value) { + static void setDoubleValue(ColumnVector vector, int row, double value) + { ((DoubleColumnVector) vector).vector[row] = Double.doubleToLongBits(value); } static void setBinaryValue(ColumnVector vector, int row, - BinaryComparable value) { + BinaryComparable value) + { ((BinaryColumnVector) vector).setVal(row, value.getBytes(), 0, value.getLength()); } static void setBinaryValue(ColumnVector vector, int row, - BinaryComparable value, int maxLength) { + BinaryComparable value, int maxLength) + { ((BinaryColumnVector) vector).setVal(row, value.getBytes(), 0, Math.min(maxLength, value.getLength())); } private static final ThreadLocal SPACE_BUFFER = - new ThreadLocal() { + new ThreadLocal() + { @Override - protected byte[] initialValue() { + protected byte[] initialValue() + { byte[] result = new byte[100]; Arrays.fill(result, (byte) ' '); return result; @@ -104,14 +115,18 @@ protected byte[] initialValue() { static void setCharValue(BinaryColumnVector vector, int row, Text value, - int length) { + int length) + { // we need to trim or pad the string with spaces to required length int actualLength = value.getLength(); - if (actualLength >= length) { + if (actualLength >= length) + { setBinaryValue(vector, row, value, length); - } else { + } else + { byte[] spaces = SPACE_BUFFER.get(); - if (length - actualLength > spaces.length) { + if (length - actualLength > spaces.length) + { spaces = new byte[length - actualLength]; Arrays.fill(spaces, (byte) ' '); SPACE_BUFFER.set(spaces); @@ -124,9 +139,11 @@ static void setCharValue(BinaryColumnVector vector, static void setStructValue(TypeDescription schema, StructColumnVector vector, int row, - PixelsStruct value) { + PixelsStruct value) + { List children = schema.getChildren(); - for (int c = 0; c < value.getNumFields(); ++c) { + for (int c = 0; c < value.getNumFields(); ++c) + { setColumn(children.get(c), vector.fields[c], row, value.getFieldValue(c)); } } @@ -134,12 +151,16 @@ static void setStructValue(TypeDescription schema, public static void setColumn(TypeDescription schema, ColumnVector vector, int row, - Object value) { - if (value == null) { + Object value) + { + if (value == null) + { vector.noNulls = false; vector.isNull[row] = true; - } else { - switch (schema.getCategory()) { + } else + { + switch (schema.getCategory()) + { case BOOLEAN: setLongValue(vector, row, ((BooleanWritable) value).get() ? 1 : 0); break; @@ -188,90 +209,108 @@ public static void setColumn(TypeDescription schema, } static void setColumn(int rowId, ColumnVector column, - ObjectInspector inspector, Object obj) { - if (obj == null) { + ObjectInspector inspector, Object obj) + { + if (obj == null) + { column.noNulls = false; column.isNull[rowId] = true; - } else { - switch (inspector.getCategory()) { + } else + { + switch (inspector.getCategory()) + { case PRIMITIVE: switch (((PrimitiveObjectInspector) inspector) - .getPrimitiveCategory()) { - case BOOLEAN: { + .getPrimitiveCategory()) + { + case BOOLEAN: + { LongColumnVector vector = (LongColumnVector) column; vector.vector[rowId] = ((BooleanObjectInspector) inspector).get(obj) ? 1 : 0; break; } - case BYTE: { + case BYTE: + { LongColumnVector vector = (LongColumnVector) column; vector.vector[rowId] = ((ByteObjectInspector) inspector).get(obj); break; } - case SHORT: { + case SHORT: + { LongColumnVector vector = (LongColumnVector) column; vector.vector[rowId] = ((ShortObjectInspector) inspector).get(obj); break; } - case INT: { + case INT: + { LongColumnVector vector = (LongColumnVector) column; vector.vector[rowId] = ((IntObjectInspector) inspector).get(obj); break; } - case LONG: { + case LONG: + { LongColumnVector vector = (LongColumnVector) column; vector.vector[rowId] = ((LongObjectInspector) inspector).get(obj); break; } - case FLOAT: { + case FLOAT: + { DoubleColumnVector vector = (DoubleColumnVector) column; vector.vector[rowId] = Float.floatToIntBits(((FloatObjectInspector) inspector).get(obj)); break; } - case DOUBLE: { + case DOUBLE: + { DoubleColumnVector vector = (DoubleColumnVector) column; vector.vector[rowId] = Double.doubleToLongBits(((DoubleObjectInspector) inspector).get(obj)); break; } - case BINARY: { + case BINARY: + { BinaryColumnVector vector = (BinaryColumnVector) column; BytesWritable blob = ((BinaryObjectInspector) inspector) .getPrimitiveWritableObject(obj); vector.setVal(rowId, blob.getBytes(), 0, blob.getLength()); break; } - case STRING: { + case STRING: + { BinaryColumnVector vector = (BinaryColumnVector) column; Text blob = ((StringObjectInspector) inspector) .getPrimitiveWritableObject(obj); vector.setVal(rowId, blob.getBytes(), 0, blob.getLength()); break; } - case VARCHAR: { + case VARCHAR: + { BinaryColumnVector vector = (BinaryColumnVector) column; Text blob = ((HiveVarcharObjectInspector) inspector) .getPrimitiveWritableObject(obj).getTextValue(); vector.setVal(rowId, blob.getBytes(), 0, blob.getLength()); break; } - case CHAR: { + case CHAR: + { BinaryColumnVector vector = (BinaryColumnVector) column; Text blob = ((HiveCharObjectInspector) inspector) .getPrimitiveWritableObject(obj).getTextValue(); vector.setVal(rowId, blob.getBytes(), 0, blob.getLength()); break; } - case TIMESTAMP: { + case TIMESTAMP: + { TimestampColumnVector vector = (TimestampColumnVector) column; Timestamp ts = ((TimestampObjectInspector) inspector) .getPrimitiveJavaObject(obj); vector.set(rowId, ts); break; } - case DATE: { + case DATE: + { LongColumnVector vector = (LongColumnVector) column; vector.vector[rowId] = ((DateObjectInspector) inspector) .getPrimitiveWritableObject(obj).getDays(); @@ -279,11 +318,13 @@ static void setColumn(int rowId, ColumnVector column, } } break; - case STRUCT: { + case STRUCT: + { StructColumnVector vector = (StructColumnVector) column; StructObjectInspector oi = (StructObjectInspector) inspector; List fields = oi.getAllStructFieldRefs(); - for (int c = 0; c < vector.fields.length; ++c) { + for (int c = 0; c < vector.fields.length; ++c) + { StructField field = fields.get(c); setColumn(rowId, vector.fields[c], field.getFieldObjectInspector(), oi.getStructFieldData(obj, field)); @@ -298,9 +339,11 @@ static void setColumn(int rowId, ColumnVector column, } @Override - public void write(NullWritable nullWritable, PixelsSerDe.PixelsSerdeRow row) throws IOException { + public void write(NullWritable nullWritable, PixelsSerDe.PixelsSerdeRow row) throws IOException + { // if the batch is full, write it out. - if (batch.size == batch.getMaxSize()) { + if (batch.size == batch.getMaxSize()) + { writer.addRowBatch(batch); batch.reset(); } @@ -308,21 +351,26 @@ public void write(NullWritable nullWritable, PixelsSerDe.PixelsSerdeRow row) thr // add the new row int rowId = batch.size++; // skip over the PixelsKey or PixelsValue - if (fields != null) { + if (fields != null) + { StructObjectInspector soi = (StructObjectInspector) inspector; - for (int i = 0; i < fields.length; ++i) { + for (int i = 0; i < fields.length; ++i) + { setColumn(rowId, batch.cols[i], fields[i].getFieldObjectInspector(), soi.getStructFieldData(row, fields[i])); } - } else { + } else + { setColumn(rowId, batch.cols[0], inspector, row); } } @Override - public void close(Reporter reporter) throws IOException { - if (batch.size != 0) { + public void close(Reporter reporter) throws IOException + { + if (batch.size != 0) + { writer.addRowBatch(batch); batch.reset(); } diff --git a/pixels-hive/src/main/java/cn/edu/ruc/iir/pixels/hive/mapred/PixelsOutputFormat.java b/pixels-hive/src/main/java/cn/edu/ruc/iir/pixels/hive/mapred/PixelsOutputFormat.java index 1985710eb..ed39e5f39 100644 --- a/pixels-hive/src/main/java/cn/edu/ruc/iir/pixels/hive/mapred/PixelsOutputFormat.java +++ b/pixels-hive/src/main/java/cn/edu/ruc/iir/pixels/hive/mapred/PixelsOutputFormat.java @@ -51,7 +51,8 @@ public class PixelsOutputFormat * @param conf the job configuration * @return a new options object */ - public static PixelsFile.WriterOptions buildOptions(Configuration conf) { + public static PixelsFile.WriterOptions buildOptions(Configuration conf) + { return PixelsFile.writerOptions(conf) .setSchema(TypeDescription.fromString(PixelsConf.MAPRED_OUTPUT_SCHEMA .getString(conf))) @@ -68,7 +69,8 @@ public RecordWriter getRecordWriter(Fi JobConf conf, String name, Progressable progressable - ) throws IOException { + ) throws IOException + { Path path = getTaskOutputPath(conf, name); PixelsWriter writer = PixelsFile.createWriter(path, buildOptions(conf).fileSystem(fileSystem)); @@ -76,7 +78,8 @@ public RecordWriter getRecordWriter(Fi } @Override - public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf, Path path, Class aClass, boolean b, Properties properties, Progressable progressable) throws IOException { + public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf, Path path, Class aClass, boolean b, Properties properties, Progressable progressable) throws IOException + { return null; } } diff --git a/pixels-hive/src/main/java/cn/edu/ruc/iir/pixels/hive/mapred/PixelsValue.java b/pixels-hive/src/main/java/cn/edu/ruc/iir/pixels/hive/mapred/PixelsValue.java index b61158a69..b293056cb 100644 --- a/pixels-hive/src/main/java/cn/edu/ruc/iir/pixels/hive/mapred/PixelsValue.java +++ b/pixels-hive/src/main/java/cn/edu/ruc/iir/pixels/hive/mapred/PixelsValue.java @@ -36,31 +36,38 @@ * string of the type. * refer: [OrcValue](https://github.com/apache/orc/blob/master/java/mapreduce/src/java/org/apache/orc/mapred/OrcValue.java) */ -public final class PixelsValue implements Writable, JobConfigurable { +public final class PixelsValue implements Writable, JobConfigurable +{ public WritableComparable value; - public PixelsValue(WritableComparable value) { + public PixelsValue(WritableComparable value) + { this.value = value; } - public PixelsValue() { + public PixelsValue() + { value = null; } @Override - public void write(DataOutput dataOutput) throws IOException { + public void write(DataOutput dataOutput) throws IOException + { value.write(dataOutput); } @Override - public void readFields(DataInput dataInput) throws IOException { + public void readFields(DataInput dataInput) throws IOException + { value.readFields(dataInput); } @Override - public void configure(JobConf conf) { - if (value == null) { + public void configure(JobConf conf) + { + if (value == null) + { TypeDescription schema = TypeDescription.fromString(PixelsConf.MAPRED_SHUFFLE_VALUE_SCHEMA .getString(conf));