Skip to content

Commit

Permalink
ARROW-5889: [C++][Parquet] Add property to indicate origin from conve…
Browse files Browse the repository at this point in the history
…rted type to TimestampLogicalType

This patch:
 - adds a new boolean member isFromConvertedType() to the TimestampLogicalType class that is set to "true" if the LogicalType was created from a converted type of TIMESTAMP_MILLIS or TIMESTAMP_MICROS
- prevents writing the TimestampLogicalType in the Parquet schema if this new property is true
- changes the Arrow reader to ignore the isAdjustedToUTC() property of the TimestampLogicalType if the type annotation came from a converted type

Author: TP Boudreau <tpboudreau@gmail.com>

Closes #4856 from tpboudreau/ARROW-5889 and squashes the following commits:

458f063 <TP Boudreau> Add property showing converted type origin to TimestampLogicalType
  • Loading branch information
tpboudreau authored and kszucs committed Jul 16, 2019
1 parent befd7df commit 9fb5c3d
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 49 deletions.
16 changes: 7 additions & 9 deletions cpp/src/arrow/ipc/metadata-internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -360,10 +360,9 @@ static Status TypeFromFlatbuffer(const flatbuf::Field* field,
auto type_data = field->type();
if (type_data == nullptr) {
return Status::IOError(
"Type-pointer in custom metadata of flatbuffer-encoded Field is null.");
"Type-pointer in custom metadata of flatbuffer-encoded Field is null.");
}
RETURN_NOT_OK(
ConcreteTypeFromFlatbuffer(field->type_type(), type_data, children, out));
RETURN_NOT_OK(ConcreteTypeFromFlatbuffer(field->type_type(), type_data, children, out));

// Look for extension metadata in custom_metadata field
// TODO(wesm): Should this be part of the Field Flatbuffers table?
Expand Down Expand Up @@ -766,8 +765,8 @@ Status FieldFromFlatbuffer(const flatbuf::Field* field, DictionaryMemo* dictiona
auto int_data = encoding->indexType();
if (int_data == nullptr) {
return Status::IOError(
"indexType-pointer in custom metadata of flatbuffer-encoded DictionaryEncoding "
"is null.");
"indexType-pointer in custom metadata of flatbuffer-encoded DictionaryEncoding "
"is null.");
}
RETURN_NOT_OK(IntFromFlatbuffer(int_data, &index_type));
type = ::arrow::dictionary(index_type, type, encoding->isOrdered());
Expand Down Expand Up @@ -1155,7 +1154,7 @@ Status GetTensorMetadata(const Buffer& metadata, std::shared_ptr<DataType>* type
auto type_data = tensor->type();
if (type_data == nullptr) {
return Status::IOError(
"Type-pointer in custom metadata of flatbuffer-encoded Tensor is null.");
"Type-pointer in custom metadata of flatbuffer-encoded Tensor is null.");
}
return ConcreteTypeFromFlatbuffer(tensor->type_type(), type_data, {}, type);
}
Expand Down Expand Up @@ -1204,10 +1203,9 @@ Status GetSparseTensorMetadata(const Buffer& metadata, std::shared_ptr<DataType>
auto type_data = sparse_tensor->type();
if (type_data == nullptr) {
return Status::IOError(
"Type-pointer in custom metadata of flatbuffer-encoded SparseTensor is null.");
"Type-pointer in custom metadata of flatbuffer-encoded SparseTensor is null.");
}
return ConcreteTypeFromFlatbuffer(sparse_tensor->type_type(), type_data, {},
type);
return ConcreteTypeFromFlatbuffer(sparse_tensor->type_type(), type_data, {}, type);
}

// ----------------------------------------------------------------------
Expand Down
9 changes: 6 additions & 3 deletions cpp/src/parquet/arrow/arrow-schema-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,14 @@ TEST_F(TestConvertParquetSchema, ParquetFlatPrimitives) {
parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED,
ParquetType::INT64,
ConvertedType::TIMESTAMP_MILLIS));
arrow_fields.push_back(std::make_shared<Field>(
"timestamp", ::arrow::timestamp(TimeUnit::MILLI, "UTC"), false));
arrow_fields.push_back(
std::make_shared<Field>("timestamp", ::arrow::timestamp(TimeUnit::MILLI), false));

parquet_fields.push_back(PrimitiveNode::Make("timestamp[us]", Repetition::REQUIRED,
ParquetType::INT64,
ConvertedType::TIMESTAMP_MICROS));
arrow_fields.push_back(std::make_shared<Field>(
"timestamp[us]", ::arrow::timestamp(TimeUnit::MICRO, "UTC"), false));
"timestamp[us]", ::arrow::timestamp(TimeUnit::MICRO), false));

parquet_fields.push_back(PrimitiveNode::Make("date", Repetition::REQUIRED,
ParquetType::INT32, ConvertedType::DATE));
Expand Down Expand Up @@ -856,15 +856,18 @@ TEST_F(TestConvertArrowSchema, ArrowFields) {
LogicalType::Time(true, LogicalType::TimeUnit::NANOS), ParquetType::INT64, -1},
{"timestamp(millisecond)", ::arrow::timestamp(::arrow::TimeUnit::MILLI),
LogicalType::Timestamp(false, LogicalType::TimeUnit::MILLIS,
/*is_from_converted_type=*/false,
/*force_set_converted_type=*/true),
ParquetType::INT64, -1},
{"timestamp(microsecond)", ::arrow::timestamp(::arrow::TimeUnit::MICRO),
LogicalType::Timestamp(false, LogicalType::TimeUnit::MICROS,
/*is_from_converted_type=*/false,
/*force_set_converted_type=*/true),
ParquetType::INT64, -1},
// Parquet v1, values converted to microseconds
{"timestamp(nanosecond)", ::arrow::timestamp(::arrow::TimeUnit::NANO),
LogicalType::Timestamp(false, LogicalType::TimeUnit::MICROS,
/*is_from_converted_type=*/false,
/*force_set_converted_type=*/true),
ParquetType::INT64, -1},
{"timestamp(millisecond, UTC)", ::arrow::timestamp(::arrow::TimeUnit::MILLI, "UTC"),
Expand Down
21 changes: 11 additions & 10 deletions cpp/src/parquet/arrow/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -134,23 +134,22 @@ static Status MakeArrowTime64(const LogicalType& logical_type,

static Status MakeArrowTimestamp(const LogicalType& logical_type,
std::shared_ptr<ArrowType>* out) {
static const char* utc = "UTC";
const auto& timestamp = checked_cast<const TimestampLogicalType&>(logical_type);
const bool utc_normalized =
timestamp.is_from_converted_type() ? false : timestamp.is_adjusted_to_utc();
static const char* utc_timezone = "UTC";
switch (timestamp.time_unit()) {
case LogicalType::TimeUnit::MILLIS:
*out = (timestamp.is_adjusted_to_utc()
? ::arrow::timestamp(::arrow::TimeUnit::MILLI, utc)
: ::arrow::timestamp(::arrow::TimeUnit::MILLI));
*out = (utc_normalized ? ::arrow::timestamp(::arrow::TimeUnit::MILLI, utc_timezone)
: ::arrow::timestamp(::arrow::TimeUnit::MILLI));
break;
case LogicalType::TimeUnit::MICROS:
*out = (timestamp.is_adjusted_to_utc()
? ::arrow::timestamp(::arrow::TimeUnit::MICRO, utc)
: ::arrow::timestamp(::arrow::TimeUnit::MICRO));
*out = (utc_normalized ? ::arrow::timestamp(::arrow::TimeUnit::MICRO, utc_timezone)
: ::arrow::timestamp(::arrow::TimeUnit::MICRO));
break;
case LogicalType::TimeUnit::NANOS:
*out = (timestamp.is_adjusted_to_utc()
? ::arrow::timestamp(::arrow::TimeUnit::NANO, utc)
: ::arrow::timestamp(::arrow::TimeUnit::NANO));
*out = (utc_normalized ? ::arrow::timestamp(::arrow::TimeUnit::NANO, utc_timezone)
: ::arrow::timestamp(::arrow::TimeUnit::NANO));
break;
default:
return Status::TypeError("Unrecognized time unit in timestamp logical_type: ",
Expand Down Expand Up @@ -530,9 +529,11 @@ static std::shared_ptr<const LogicalType> TimestampLogicalTypeFromArrowTimestamp
switch (time_unit) {
case ::arrow::TimeUnit::MILLI:
return LogicalType::Timestamp(utc, LogicalType::TimeUnit::MILLIS,
/*is_from_converted_type=*/false,
/*force_set_converted_type=*/true);
case ::arrow::TimeUnit::MICRO:
return LogicalType::Timestamp(utc, LogicalType::TimeUnit::MICROS,
/*is_from_converted_type=*/false,
/*force_set_converted_type=*/true);
case ::arrow::TimeUnit::NANO:
return LogicalType::Timestamp(utc, LogicalType::TimeUnit::NANOS);
Expand Down
72 changes: 59 additions & 13 deletions cpp/src/parquet/schema-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1398,28 +1398,34 @@ TEST(TestLogicalTypeOperation, LogicalTypeRepresentation) {
R"({"Type": "Time", "isAdjustedToUTC": false, "timeUnit": "nanoseconds"})"},
{LogicalType::Timestamp(true, LogicalType::TimeUnit::MILLIS),
"Timestamp(isAdjustedToUTC=true, timeUnit=milliseconds, "
"force_set_converted_type=false)",
R"({"Type": "Timestamp", "isAdjustedToUTC": true, "timeUnit": "milliseconds", "force_set_converted_type": false})"},
"is_from_converted_type=false, force_set_converted_type=false)",
R"({"Type": "Timestamp", "isAdjustedToUTC": true, "timeUnit": "milliseconds", )"
R"("is_from_converted_type": false, "force_set_converted_type": false})"},
{LogicalType::Timestamp(true, LogicalType::TimeUnit::MICROS),
"Timestamp(isAdjustedToUTC=true, timeUnit=microseconds, "
"force_set_converted_type=false)",
R"({"Type": "Timestamp", "isAdjustedToUTC": true, "timeUnit": "microseconds", "force_set_converted_type": false})"},
"is_from_converted_type=false, force_set_converted_type=false)",
R"({"Type": "Timestamp", "isAdjustedToUTC": true, "timeUnit": "microseconds", )"
R"("is_from_converted_type": false, "force_set_converted_type": false})"},
{LogicalType::Timestamp(true, LogicalType::TimeUnit::NANOS),
"Timestamp(isAdjustedToUTC=true, timeUnit=nanoseconds, "
"force_set_converted_type=false)",
R"({"Type": "Timestamp", "isAdjustedToUTC": true, "timeUnit": "nanoseconds", "force_set_converted_type": false})"},
{LogicalType::Timestamp(false, LogicalType::TimeUnit::MILLIS, true),
"is_from_converted_type=false, force_set_converted_type=false)",
R"({"Type": "Timestamp", "isAdjustedToUTC": true, "timeUnit": "nanoseconds", )"
R"("is_from_converted_type": false, "force_set_converted_type": false})"},
{LogicalType::Timestamp(false, LogicalType::TimeUnit::MILLIS, true, true),
"Timestamp(isAdjustedToUTC=false, timeUnit=milliseconds, "
"force_set_converted_type=true)",
R"({"Type": "Timestamp", "isAdjustedToUTC": false, "timeUnit": "milliseconds", "force_set_converted_type": true})"},
"is_from_converted_type=true, force_set_converted_type=true)",
R"({"Type": "Timestamp", "isAdjustedToUTC": false, "timeUnit": "milliseconds", )"
R"("is_from_converted_type": true, "force_set_converted_type": true})"},
{LogicalType::Timestamp(false, LogicalType::TimeUnit::MICROS),
"Timestamp(isAdjustedToUTC=false, timeUnit=microseconds, "
"force_set_converted_type=false)",
R"({"Type": "Timestamp", "isAdjustedToUTC": false, "timeUnit": "microseconds", "force_set_converted_type": false})"},
"is_from_converted_type=false, force_set_converted_type=false)",
R"({"Type": "Timestamp", "isAdjustedToUTC": false, "timeUnit": "microseconds", )"
R"("is_from_converted_type": false, "force_set_converted_type": false})"},
{LogicalType::Timestamp(false, LogicalType::TimeUnit::NANOS),
"Timestamp(isAdjustedToUTC=false, timeUnit=nanoseconds, "
"force_set_converted_type=false)",
R"({"Type": "Timestamp", "isAdjustedToUTC": false, "timeUnit": "nanoseconds", "force_set_converted_type": false})"},
"is_from_converted_type=false, force_set_converted_type=false)",
R"({"Type": "Timestamp", "isAdjustedToUTC": false, "timeUnit": "nanoseconds", )"
R"("is_from_converted_type": false, "force_set_converted_type": false})"},
{LogicalType::Interval(), "Interval", R"({"Type": "Interval"})"},
{LogicalType::Int(8, false), "Int(bitWidth=8, isSigned=false)",
R"({"Type": "Int", "bitWidth": 8, "isSigned": false})"},
Expand Down Expand Up @@ -1673,6 +1679,16 @@ struct SchemaElementConstructionArguments {
std::function<bool()> check_logicalType;
};

struct LegacySchemaElementConstructionArguments {
std::string name;
Type::type physical_type;
int physical_length;
bool expect_converted_type;
ConvertedType::type converted_type;
bool expect_logicalType;
std::function<bool()> check_logicalType;
};

class TestSchemaElementConstruction : public ::testing::Test {
public:
TestSchemaElementConstruction* Reconstruct(
Expand All @@ -1692,6 +1708,23 @@ class TestSchemaElementConstruction : public ::testing::Test {
return this;
}

TestSchemaElementConstruction* LegacyReconstruct(
const LegacySchemaElementConstructionArguments& c) {
// Make node, create serializable Thrift object from it ...
node_ = PrimitiveNode::Make(c.name, Repetition::REQUIRED, c.physical_type,
c.converted_type, c.physical_length);
element_.reset(new format::SchemaElement);
node_->ToParquet(element_.get());

// ... then set aside some values for later inspection.
name_ = c.name;
expect_converted_type_ = c.expect_converted_type;
converted_type_ = c.converted_type;
expect_logicalType_ = c.expect_logicalType;
check_logicalType_ = c.check_logicalType;
return this;
}

void Inspect() {
ASSERT_EQ(element_->name, name_);
if (expect_converted_type_) {
Expand Down Expand Up @@ -1777,6 +1810,17 @@ TEST_F(TestSchemaElementConstruction, SimpleCases) {
for (const SchemaElementConstructionArguments& c : cases) {
this->Reconstruct(c)->Inspect();
}

std::vector<LegacySchemaElementConstructionArguments> legacy_cases = {
{"timestamp_ms", Type::INT64, -1, true, ConvertedType::TIMESTAMP_MILLIS, false,
check_nothing},
{"timestamp_us", Type::INT64, -1, true, ConvertedType::TIMESTAMP_MICROS, false,
check_nothing},
};

for (const LegacySchemaElementConstructionArguments& c : legacy_cases) {
this->LegacyReconstruct(c)->Inspect();
}
}

class TestDecimalSchemaElementConstruction : public TestSchemaElementConstruction {
Expand Down Expand Up @@ -1920,6 +1964,7 @@ TEST_F(TestTemporalSchemaElementConstruction, TemporalCases) {
Type::INT64, -1, false, ConvertedType::NA, true, check_TIMESTAMP},
{"timestamp_F_ms_force",
LogicalType::Timestamp(false, LogicalType::TimeUnit::MILLIS,
/*is_from_converted_type=*/false,
/*force_set_converted_type=*/true),
Type::INT64, -1, true, ConvertedType::TIMESTAMP_MILLIS, true, check_TIMESTAMP},
{"timestamp_T_us", LogicalType::Timestamp(true, LogicalType::TimeUnit::MICROS),
Expand All @@ -1928,6 +1973,7 @@ TEST_F(TestTemporalSchemaElementConstruction, TemporalCases) {
Type::INT64, -1, false, ConvertedType::NA, true, check_TIMESTAMP},
{"timestamp_F_us_force",
LogicalType::Timestamp(false, LogicalType::TimeUnit::MILLIS,
/*is_from_converted_type=*/false,
/*force_set_converted_type=*/true),
Type::INT64, -1, true, ConvertedType::TIMESTAMP_MILLIS, true, check_TIMESTAMP},
{"timestamp_T_ns", LogicalType::Timestamp(true, LogicalType::TimeUnit::NANOS),
Expand Down
Loading

0 comments on commit 9fb5c3d

Please sign in to comment.