Skip to content

Commit

Permalink
Convert BYTE_ARRAY to StringType or List<UInt8> depending on the logi…
Browse files Browse the repository at this point in the history
…cal type
  • Loading branch information
wesm committed Mar 26, 2016
1 parent b7b9ca9 commit 74d6bae
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 43 deletions.
8 changes: 4 additions & 4 deletions cpp/src/arrow/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@
# arrow_parquet : Arrow <-> Parquet adapter

set(PARQUET_SRCS
schema.cc
schema.cc
)

set(PARQUET_LIBS
arrow
${PARQUET_SHARED_LIB}
arrow
${PARQUET_SHARED_LIB}
)

add_library(arrow_parquet STATIC
add_library(arrow_parquet SHARED
${PARQUET_SRCS}
)
target_link_libraries(arrow_parquet ${PARQUET_LIBS})
Expand Down
49 changes: 44 additions & 5 deletions cpp/src/arrow/parquet/parquet-schema-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

#include "arrow/parquet/schema.h"

#include "arrow/test-util.h"
#include "arrow/util/status.h"

namespace arrow {

namespace parquet {
Expand All @@ -28,21 +31,24 @@ using parquet_cpp::schema::NodePtr;
using parquet_cpp::schema::PrimitiveNode;

TEST(TestNodeConversion, Primitive) {
std::shared_ptr<Field> field;

NodePtr node = PrimitiveNode::Make("boolean", Repetition::REQUIRED,
parquet_cpp::Type::BOOLEAN);
std::shared_ptr<Field> field = NodeToField(node);

ASSERT_OK(NodeToField(node, &field));
ASSERT_EQ(field->name, "boolean");
ASSERT_TRUE(field->type->Equals(std::make_shared<BooleanType>()));
ASSERT_FALSE(field->nullable);

node = PrimitiveNode::Make("int32", Repetition::REQUIRED, parquet_cpp::Type::INT32);
field = NodeToField(node);
ASSERT_OK(NodeToField(node, &field));
ASSERT_EQ(field->name, "int32");
ASSERT_TRUE(field->type->Equals(std::make_shared<Int32Type>()));
ASSERT_FALSE(field->nullable);

node = PrimitiveNode::Make("int64", Repetition::REQUIRED, parquet_cpp::Type::INT64);
field = NodeToField(node);
ASSERT_OK(NodeToField(node, &field));
ASSERT_EQ(field->name, "int64");
ASSERT_TRUE(field->type->Equals(std::make_shared<Int64Type>()));
ASSERT_FALSE(field->nullable);
Expand All @@ -55,14 +61,14 @@ TEST(TestNodeConversion, Primitive) {

// case parquet_cpp::Type::FLOAT:
node = PrimitiveNode::Make("float", Repetition::REQUIRED, parquet_cpp::Type::FLOAT);
field = NodeToField(node);
ASSERT_OK(NodeToField(node, &field));
ASSERT_EQ(field->name, "float");
ASSERT_TRUE(field->type->Equals(std::make_shared<FloatType>()));
ASSERT_FALSE(field->nullable);

// case parquet_cpp::Type::DOUBLE:
node = PrimitiveNode::Make("double", Repetition::REQUIRED, parquet_cpp::Type::DOUBLE);
field = NodeToField(node);
ASSERT_OK(NodeToField(node, &field));
ASSERT_EQ(field->name, "double");
ASSERT_TRUE(field->type->Equals(std::make_shared<DoubleType>()));
ASSERT_FALSE(field->nullable);
Expand All @@ -80,6 +86,39 @@ TEST(TestNodeConversion, Primitive) {
// TODO: Assertions
}

const auto UINT8 = std::make_shared<UInt8Type>();

TEST(TestNodeConversion, Int96Timestamp) {
}

TEST(TestNodeConversion, ByteArray) {
std::shared_ptr<Field> field;

NodePtr node = PrimitiveNode::Make("field0", Repetition::OPTIONAL,
parquet_cpp::Type::BYTE_ARRAY);
ASSERT_OK(NodeToField(node, &field));

std::shared_ptr<DataType> ex_type = std::make_shared<ListType>(
std::make_shared<Field>("", UINT8));

ASSERT_EQ(field->name, "field0");
ASSERT_TRUE(field->type->Equals(ex_type));
ASSERT_TRUE(field->nullable);

node = PrimitiveNode::Make("field1", Repetition::OPTIONAL,
parquet_cpp::Type::BYTE_ARRAY,
parquet_cpp::LogicalType::UTF8);
ASSERT_OK(NodeToField(node, &field));
ex_type = std::make_shared<StringType>();

ASSERT_EQ(field->name, "field1");
ASSERT_TRUE(field->type->Equals(ex_type));
ASSERT_TRUE(field->nullable);
}

TEST(TestNodeConversion, FixedLenByteArray) {
}

TEST(TestNodeConversion, Logical) {
}

Expand Down
90 changes: 64 additions & 26 deletions cpp/src/arrow/parquet/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,75 +15,109 @@
// specific language governing permissions and limitations
// under the License.

#include "arrow/parquet/schema.h"

#include <vector>

#include "arrow/parquet/schema.h"
#include "parquet/api/schema.h"

#include "arrow/util/status.h"
#include "arrow/types/decimal.h"

using parquet_cpp::schema::Node;
using parquet_cpp::schema::NodePtr;
using parquet_cpp::schema::GroupNode;
using parquet_cpp::schema::PrimitiveNode;

using parquet_cpp::LogicalType;

namespace arrow {

namespace parquet {

const auto BOOL = std::make_shared<BooleanType>();
const auto UINT8 = std::make_shared<UInt8Type>();
const auto INT32 = std::make_shared<Int32Type>();
const auto INT64 = std::make_shared<Int64Type>();
const auto FLOAT = std::make_shared<FloatType>();
const auto DOUBLE = std::make_shared<DoubleType>();
const auto UTF8 = std::make_shared<StringType>();
const auto BINARY = std::make_shared<ListType>(
std::make_shared<Field>("", UINT8));

TypePtr MakeDecimalType(const PrimitiveNode* node) {
int precision = node->decimal_metadata().precision;
int scale = node->decimal_metadata().scale;
return TypePtr(new DecimalType(precision, scale));
return std::make_shared<DecimalType>(precision, scale);
}

static Status FromByteArray(const PrimitiveNode* node, TypePtr* out) {
switch (node->logical_type()) {
case LogicalType::UTF8:
*out = UTF8;
break;
default:
// BINARY
*out = BINARY;
break;
}
return Status::OK();
}

static Status FromFLBA(const PrimitiveNode* node, TypePtr* out) {
switch (node->logical_type()) {
case LogicalType::DECIMAL:
*out = MakeDecimalType(node);
break;
default:
return Status::NotImplemented("unhandled type");
break;
}

return Status::OK();
}

// TODO: Logical Type Handling
std::shared_ptr<Field> NodeToField(const NodePtr& node) {
Status NodeToField(const NodePtr& node, std::shared_ptr<Field>* out) {
TypePtr type;

if (node->is_group()) {
const GroupNode* group = static_cast<const GroupNode*>(node.get());
std::vector<std::shared_ptr<Field>> fields;
std::vector<std::shared_ptr<Field>> fields(group->field_count());
for (int i = 0; i < group->field_count(); i++) {
fields.push_back(NodeToField(group->field(i)));
RETURN_NOT_OK(NodeToField(group->field(i), &fields[i]));
}
type = TypePtr(new StructType(fields));
type = std::make_shared<StructType>(fields);
} else {
// Primitive (leaf) node
const PrimitiveNode* primitive = static_cast<const PrimitiveNode*>(node.get());

switch (primitive->physical_type()) {
case parquet_cpp::Type::BOOLEAN:
type = TypePtr(new BooleanType());
type = BOOL;
break;
case parquet_cpp::Type::INT32:
type = TypePtr(new Int32Type());
type = INT32;
break;
case parquet_cpp::Type::INT64:
type = TypePtr(new Int64Type());
type = INT64;
break;
case parquet_cpp::Type::INT96:
// TODO: Do we have that type in Arrow?
// type = TypePtr(new Int96Type());
break;
return Status::NotImplemented("int96");
case parquet_cpp::Type::FLOAT:
type = TypePtr(new FloatType());
type = FLOAT;
break;
case parquet_cpp::Type::DOUBLE:
type = TypePtr(new DoubleType());
type = DOUBLE;
break;
case parquet_cpp::Type::BYTE_ARRAY:
// TODO: Do we have that type in Arrow?
// type = TypePtr(new Int96Type());
RETURN_NOT_OK(FromByteArray(primitive, &type));
break;
case parquet_cpp::Type::FIXED_LEN_BYTE_ARRAY:
switch (primitive->logical_type()) {
case parquet_cpp::LogicalType::DECIMAL:
type = MakeDecimalType(primitive);
break;
default:
// TODO: Do we have that type in Arrow?
break;
}
RETURN_NOT_OK(FromFLBA(primitive, &type));
break;
}
}
Expand All @@ -92,21 +126,25 @@ std::shared_ptr<Field> NodeToField(const NodePtr& node) {
type = TypePtr(new ListType(type));
}

return std::shared_ptr<Field>(new Field(node->name(), type, !node->is_required()));
*out = std::make_shared<Field>(node->name(), type, !node->is_required());

return Status::OK();
}

std::shared_ptr<Schema> FromParquetSchema(
const parquet_cpp::SchemaDescriptor* parquet_schema) {
Status FromParquetSchema(const parquet_cpp::SchemaDescriptor* parquet_schema,
std::shared_ptr<Schema>* out) {
std::vector<std::shared_ptr<Field>> fields;
const GroupNode* schema_node = static_cast<const GroupNode*>(
parquet_schema->schema().get());

// TODO: What to with the head node?
fields.resize(schema_node->field_count());
for (int i = 0; i < schema_node->field_count(); i++) {
fields.push_back(NodeToField(schema_node->field(i)));
RETURN_NOT_OK(NodeToField(schema_node->field(i), &fields[i]));
}

return std::shared_ptr<Schema>(new Schema(fields));
*out = std::make_shared<Schema>(fields);
return Status::OK();
}

} // namespace parquet
Expand Down
20 changes: 12 additions & 8 deletions cpp/src/arrow/parquet/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,24 @@
#ifndef ARROW_PARQUET_SCHEMA_H
#define ARROW_PARQUET_SCHEMA_H

#include <arrow/schema.h>
#include <arrow/type.h>
#include <parquet/schema/descriptor.h>
#include <parquet/schema/types.h>

#include <memory>

#include "parquet/api/schema.h"

#include "arrow/schema.h"
#include "arrow/type.h"

namespace arrow {

class Status;

namespace parquet {

std::shared_ptr<Field> NodeToField(const parquet_cpp::schema::NodePtr& node);
std::shared_ptr<Schema> FromParquetSchema(
const parquet_cpp::SchemaDescriptor* parquet_schema);
Status NodeToField(const parquet_cpp::schema::NodePtr& node,
std::shared_ptr<Field>* out);

Status FromParquetSchema(const parquet_cpp::SchemaDescriptor* parquet_schema,
std::shared_ptr<Schema>* out);

} // namespace parquet

Expand Down

0 comments on commit 74d6bae

Please sign in to comment.