Skip to content

Commit

Permalink
apacheGH-32538: [C++][Parquet] Add JSON canonical extension type (apa…
Browse files Browse the repository at this point in the history
…che#13901)

Arrow now provides a canonical extension type for JSON data. This
extension is backed by utf8(). Parquet will recognize this extension
and appropriately propagate the LogicalType to the storage format.
* GitHub Issue: apache#32538

Lead-authored-by: Rok Mihevc <rok@mihevc.org>
Co-authored-by: Pradeep Gollakota <pgollakota@google.com>
Co-authored-by: Antoine Pitrou <antoine@python.org>
Co-authored-by: mwish <maplewish117@gmail.com>
Co-authored-by: Antoine Pitrou <pitrou@free.fr>
Signed-off-by: Antoine Pitrou <antoine@python.org>
  • Loading branch information
5 people authored Sep 11, 2024
1 parent 21f5968 commit 27acf8b
Show file tree
Hide file tree
Showing 20 changed files with 460 additions and 56 deletions.
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ set(ARROW_SRCS
device_allocation_type_set.cc
extension_type.cc
extension/bool8.cc
extension/json.cc
extension/uuid.cc
pretty_print.cc
record_batch.cc
Expand Down
20 changes: 16 additions & 4 deletions cpp/src/arrow/array/validate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -985,10 +985,22 @@ Status ValidateArrayFull(const Array& array) { return ValidateArrayFull(*array.d

ARROW_EXPORT
Status ValidateUTF8(const ArrayData& data) {
DCHECK(data.type->id() == Type::STRING || data.type->id() == Type::STRING_VIEW ||
data.type->id() == Type::LARGE_STRING);
UTF8DataValidator validator{data};
return VisitTypeInline(*data.type, &validator);
const auto& storage_type =
(data.type->id() == Type::EXTENSION)
? checked_cast<const ExtensionType&>(*data.type).storage_type()
: data.type;
DCHECK(storage_type->id() == Type::STRING || storage_type->id() == Type::STRING_VIEW ||
storage_type->id() == Type::LARGE_STRING);

if (data.type->id() == Type::EXTENSION) {
ArrayData ext_data(data);
ext_data.type = storage_type;
UTF8DataValidator validator{ext_data};
return VisitTypeInline(*storage_type, &validator);
} else {
UTF8DataValidator validator{data};
return VisitTypeInline(*storage_type, &validator);
}
}

ARROW_EXPORT
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/extension/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.

set(CANONICAL_EXTENSION_TESTS bool8_test.cc uuid_test.cc)
set(CANONICAL_EXTENSION_TESTS bool8_test.cc json_test.cc uuid_test.cc)

if(ARROW_JSON)
list(APPEND CANONICAL_EXTENSION_TESTS fixed_shape_tensor_test.cc opaque_test.cc)
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/arrow/extension/fixed_shape_tensor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ TEST_F(TestExtensionType, RoundtripBatch) {
std::shared_ptr<RecordBatch> read_batch;
auto ext_field = field(/*name=*/"f0", /*type=*/ext_type_);
auto batch = RecordBatch::Make(schema({ext_field}), ext_arr->length(), {ext_arr});
RoundtripBatch(batch, &read_batch);
ASSERT_OK(RoundtripBatch(batch, &read_batch));
CompareBatch(*batch, *read_batch, /*compare_metadata=*/true);

// Pass extension metadata and storage array, expect getting back extension array
Expand All @@ -216,7 +216,7 @@ TEST_F(TestExtensionType, RoundtripBatch) {
ext_field = field(/*name=*/"f0", /*type=*/element_type_, /*nullable=*/true,
/*metadata=*/ext_metadata);
auto batch2 = RecordBatch::Make(schema({ext_field}), fsla_arr->length(), {fsla_arr});
RoundtripBatch(batch2, &read_batch2);
ASSERT_OK(RoundtripBatch(batch2, &read_batch2));
CompareBatch(*batch, *read_batch2, /*compare_metadata=*/true);
}

Expand Down Expand Up @@ -469,7 +469,7 @@ TEST_F(TestExtensionType, RoundtripBatchFromTensor) {
auto ext_field = field("f0", ext_type_, true, ext_metadata);
auto batch = RecordBatch::Make(schema({ext_field}), ext_arr->length(), {ext_arr});
std::shared_ptr<RecordBatch> read_batch;
RoundtripBatch(batch, &read_batch);
ASSERT_OK(RoundtripBatch(batch, &read_batch));
CompareBatch(*batch, *read_batch, /*compare_metadata=*/true);
}

Expand Down
61 changes: 61 additions & 0 deletions cpp/src/arrow/extension/json.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/extension/json.h"

#include <string>

#include "arrow/extension_type.h"
#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/type_fwd.h"
#include "arrow/util/logging.h"

namespace arrow::extension {

bool JsonExtensionType::ExtensionEquals(const ExtensionType& other) const {
return other.extension_name() == this->extension_name();
}

Result<std::shared_ptr<DataType>> JsonExtensionType::Deserialize(
std::shared_ptr<DataType> storage_type, const std::string& serialized) const {
if (storage_type->id() != Type::STRING && storage_type->id() != Type::STRING_VIEW &&
storage_type->id() != Type::LARGE_STRING) {
return Status::Invalid("Invalid storage type for JsonExtensionType: ",
storage_type->ToString());
}
return std::make_shared<JsonExtensionType>(storage_type);
}

std::string JsonExtensionType::Serialize() const { return ""; }

std::shared_ptr<Array> JsonExtensionType::MakeArray(
std::shared_ptr<ArrayData> data) const {
DCHECK_EQ(data->type->id(), Type::EXTENSION);
DCHECK_EQ("arrow.json",
internal::checked_cast<const ExtensionType&>(*data->type).extension_name());
return std::make_shared<ExtensionArray>(data);
}

std::shared_ptr<DataType> json(const std::shared_ptr<DataType> storage_type) {
ARROW_CHECK(storage_type->id() != Type::STRING ||
storage_type->id() != Type::STRING_VIEW ||
storage_type->id() != Type::LARGE_STRING);
return std::make_shared<JsonExtensionType>(storage_type);
}

} // namespace arrow::extension
56 changes: 56 additions & 0 deletions cpp/src/arrow/extension/json.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <stdexcept>
#include <string>

#include "arrow/extension_type.h"
#include "arrow/result.h"
#include "arrow/type_fwd.h"
#include "arrow/util/visibility.h"

namespace arrow::extension {

/// \brief Concrete type class for variable-size JSON data, utf8-encoded.
class ARROW_EXPORT JsonExtensionType : public ExtensionType {
public:
explicit JsonExtensionType(const std::shared_ptr<DataType>& storage_type)
: ExtensionType(storage_type), storage_type_(storage_type) {}

std::string extension_name() const override { return "arrow.json"; }

bool ExtensionEquals(const ExtensionType& other) const override;

Result<std::shared_ptr<DataType>> Deserialize(
std::shared_ptr<DataType> storage_type,
const std::string& serialized_data) const override;

std::string Serialize() const override;

std::shared_ptr<Array> MakeArray(std::shared_ptr<ArrayData> data) const override;

private:
std::shared_ptr<DataType> storage_type_;
};

/// \brief Return a JsonExtensionType instance.
ARROW_EXPORT std::shared_ptr<DataType> json(
std::shared_ptr<DataType> storage_type = utf8());

} // namespace arrow::extension
83 changes: 83 additions & 0 deletions cpp/src/arrow/extension/json_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/extension/json.h"

#include "arrow/array/validate.h"
#include "arrow/ipc/test_common.h"
#include "arrow/record_batch.h"
#include "arrow/testing/gtest_util.h"
#include "parquet/exception.h"

namespace arrow {

using arrow::ipc::test::RoundtripBatch;
using extension::json;

class TestJsonExtensionType : public ::testing::Test {};

std::shared_ptr<Array> ExampleJson(const std::shared_ptr<DataType>& storage_type) {
std::shared_ptr<Array> arr = ArrayFromJSON(storage_type, R"([
"null",
"1234",
"3.14159",
"true",
"false",
"\"a json string\"",
"[\"a\", \"json\", \"array\"]",
"{\"obj\": \"a simple json object\"}"
])");
return ExtensionType::WrapArray(arrow::extension::json(storage_type), arr);
}

TEST_F(TestJsonExtensionType, JsonRoundtrip) {
for (const auto& storage_type : {utf8(), large_utf8(), utf8_view()}) {
std::shared_ptr<Array> ext_arr = ExampleJson(storage_type);
auto batch =
RecordBatch::Make(schema({field("f0", json(storage_type))}), 8, {ext_arr});

std::shared_ptr<RecordBatch> read_batch;
ASSERT_OK(RoundtripBatch(batch, &read_batch));
ASSERT_OK(read_batch->ValidateFull());
CompareBatch(*batch, *read_batch, /*compare_metadata*/ true);

auto read_ext_arr = read_batch->column(0);
ASSERT_OK(internal::ValidateUTF8(*read_ext_arr));
ASSERT_OK(read_ext_arr->ValidateFull());
}
}

TEST_F(TestJsonExtensionType, InvalidUTF8) {
for (const auto& storage_type : {utf8(), large_utf8(), utf8_view()}) {
auto json_type = json(storage_type);
auto invalid_input = ArrayFromJSON(storage_type, "[\"Ⱥa\xFF\", \"\xe1\xbdⱤaA\"]");
auto ext_arr = ExtensionType::WrapArray(json_type, invalid_input);

ASSERT_RAISES_WITH_MESSAGE(Invalid,
"Invalid: Invalid UTF8 sequence at string index 0",
ext_arr->ValidateFull());
ASSERT_RAISES_WITH_MESSAGE(Invalid,
"Invalid: Invalid UTF8 sequence at string index 0",
arrow::internal::ValidateUTF8(*ext_arr));

auto batch = RecordBatch::Make(schema({field("f0", json_type)}), 2, {ext_arr});
std::shared_ptr<RecordBatch> read_batch;
ASSERT_OK(RoundtripBatch(batch, &read_batch));
}
}

} // namespace arrow
4 changes: 2 additions & 2 deletions cpp/src/arrow/extension/uuid_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ TEST(TestUuuidExtensionType, RoundtripBatch) {
std::shared_ptr<RecordBatch> read_batch;
auto ext_field = field(/*name=*/"f0", /*type=*/ext_type);
auto batch = RecordBatch::Make(schema({ext_field}), ext_arr->length(), {ext_arr});
RoundtripBatch(batch, &read_batch);
ASSERT_OK(RoundtripBatch(batch, &read_batch));
CompareBatch(*batch, *read_batch, /*compare_metadata=*/true);

// Pass extension metadata and storage array, expect getting back extension array
Expand All @@ -65,7 +65,7 @@ TEST(TestUuuidExtensionType, RoundtripBatch) {
ext_field = field(/*name=*/"f0", /*type=*/exact_ext_type->storage_type(),
/*nullable=*/true, /*metadata=*/ext_metadata);
auto batch2 = RecordBatch::Make(schema({ext_field}), arr->length(), {arr});
RoundtripBatch(batch2, &read_batch2);
ASSERT_OK(RoundtripBatch(batch2, &read_batch2));
CompareBatch(*batch, *read_batch2, /*compare_metadata=*/true);
}

Expand Down
4 changes: 3 additions & 1 deletion cpp/src/arrow/extension_type.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
# include "arrow/extension/fixed_shape_tensor.h"
# include "arrow/extension/opaque.h"
#endif
#include "arrow/extension/json.h"
#include "arrow/extension/uuid.h"
#include "arrow/status.h"
#include "arrow/type.h"
Expand Down Expand Up @@ -148,7 +149,8 @@ static void CreateGlobalRegistry() {
// Register canonical extension types

g_registry = std::make_shared<ExtensionTypeRegistryImpl>();
std::vector<std::shared_ptr<DataType>> ext_types{extension::bool8(), extension::uuid()};
std::vector<std::shared_ptr<DataType>> ext_types{extension::bool8(), extension::json(),
extension::uuid()};

#ifdef ARROW_JSON
ext_types.push_back(extension::fixed_shape_tensor(int64(), {}));
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/arrow/extension_type_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,14 @@ TEST_F(TestExtensionType, IpcRoundtrip) {
auto batch = RecordBatch::Make(schema({field("f0", uuid())}), 4, {ext_arr});

std::shared_ptr<RecordBatch> read_batch;
RoundtripBatch(batch, &read_batch);
ASSERT_OK(RoundtripBatch(batch, &read_batch));
CompareBatch(*batch, *read_batch, false /* compare_metadata */);

// Wrap type in a ListArray and ensure it also makes it
auto offsets_arr = ArrayFromJSON(int32(), "[0, 0, 2, 4]");
ASSERT_OK_AND_ASSIGN(auto list_arr, ListArray::FromArrays(*offsets_arr, *ext_arr));
batch = RecordBatch::Make(schema({field("f0", list(uuid()))}), 3, {list_arr});
RoundtripBatch(batch, &read_batch);
ASSERT_OK(RoundtripBatch(batch, &read_batch));
CompareBatch(*batch, *read_batch, false /* compare_metadata */);
}

Expand Down Expand Up @@ -289,7 +289,7 @@ TEST_F(TestExtensionType, ParametricTypes) {
4, {p1, p2, p3, p4});

std::shared_ptr<RecordBatch> read_batch;
RoundtripBatch(batch, &read_batch);
ASSERT_OK(RoundtripBatch(batch, &read_batch));
CompareBatch(*batch, *read_batch, false /* compare_metadata */);
}

Expand Down
17 changes: 9 additions & 8 deletions cpp/src/arrow/ipc/test_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1236,18 +1236,19 @@ Status MakeRandomTensor(const std::shared_ptr<DataType>& type,
return Tensor::Make(type, buf, shape, strides).Value(out);
}

void RoundtripBatch(const std::shared_ptr<RecordBatch>& batch,
std::shared_ptr<RecordBatch>* out) {
ASSERT_OK_AND_ASSIGN(auto out_stream, io::BufferOutputStream::Create());
ASSERT_OK(ipc::WriteRecordBatchStream({batch}, ipc::IpcWriteOptions::Defaults(),
out_stream.get()));
Status RoundtripBatch(const std::shared_ptr<RecordBatch>& batch,
std::shared_ptr<RecordBatch>* out) {
ARROW_ASSIGN_OR_RAISE(auto out_stream, io::BufferOutputStream::Create());
RETURN_NOT_OK(ipc::WriteRecordBatchStream({batch}, ipc::IpcWriteOptions::Defaults(),
out_stream.get()));

ASSERT_OK_AND_ASSIGN(auto complete_ipc_stream, out_stream->Finish());
ARROW_ASSIGN_OR_RAISE(auto complete_ipc_stream, out_stream->Finish());

io::BufferReader reader(complete_ipc_stream);
std::shared_ptr<RecordBatchReader> batch_reader;
ASSERT_OK_AND_ASSIGN(batch_reader, ipc::RecordBatchStreamReader::Open(&reader));
ASSERT_OK(batch_reader->ReadNext(out));
ARROW_ASSIGN_OR_RAISE(batch_reader, ipc::RecordBatchStreamReader::Open(&reader));
RETURN_NOT_OK(batch_reader->ReadNext(out));
return Status::OK();
}

} // namespace test
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/ipc/test_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ Status MakeRandomTensor(const std::shared_ptr<DataType>& type,
const std::vector<int64_t>& shape, bool row_major_p,
std::shared_ptr<Tensor>* out, uint32_t seed = 0);

ARROW_TESTING_EXPORT void RoundtripBatch(const std::shared_ptr<RecordBatch>& batch,
std::shared_ptr<RecordBatch>* out);
ARROW_TESTING_EXPORT Status RoundtripBatch(const std::shared_ptr<RecordBatch>& batch,
std::shared_ptr<RecordBatch>* out);

} // namespace test
} // namespace ipc
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/testing/gtest_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include "arrow/buffer.h"
#include "arrow/compute/api_vector.h"
#include "arrow/datum.h"
#include "arrow/extension/json.h"
#include "arrow/io/memory.h"
#include "arrow/ipc/json_simple.h"
#include "arrow/ipc/reader.h"
Expand Down
Loading

0 comments on commit 27acf8b

Please sign in to comment.