Skip to content

Commit

Permalink
Big datetime in Datashard Export/Import (#5761)
Browse files Browse the repository at this point in the history
  • Loading branch information
azevaykin authored Jun 21, 2024
1 parent 2db2848 commit fdd25a5
Show file tree
Hide file tree
Showing 7 changed files with 220 additions and 16 deletions.
8 changes: 8 additions & 0 deletions ydb/core/tx/datashard/export_s3_buffer_raw.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@ bool TS3BufferRaw::Collect(const NTable::IScan::TRow& row, IOutputStream& out) {
case NScheme::NTypeIds::Interval:
serialized = cell.ToStream<i64>(out, ErrorString);
break;
case NScheme::NTypeIds::Date32:
serialized = cell.ToStream<i32>(out, ErrorString);
break;
case NScheme::NTypeIds::Datetime64:
case NScheme::NTypeIds::Timestamp64:
case NScheme::NTypeIds::Interval64:
serialized = cell.ToStream<i64>(out, ErrorString);
break;
case NScheme::NTypeIds::Decimal:
serialized = DecimalToStream(cell.AsValue<std::pair<ui64, i64>>(), out, ErrorString);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -649,8 +649,8 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {

{
TInactiveZone inactive(activeZone);
UploadRows(runtime, "/MyRoot/Table", 0, {1}, {2}, {1});
UploadRows(runtime, "/MyRoot/Table", 1, {1}, {2}, {Max<ui32>()});
UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)});
UploadRow(runtime, "/MyRoot/Table", 1, {1}, {2}, {TCell::Make(Max<ui32>())}, {TCell::Make(Max<ui32>())});
CheckRegistrations(runtime, {"/MyRoot/Table", 2}, {"/MyRoot/Table/Stream/streamImpl", 1});
}
});
Expand Down Expand Up @@ -694,7 +694,8 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {

{
TInactiveZone inactive(activeZone);
UploadRows(runtime, "/MyRoot/Table", 0, {1}, {2}, {1, Max<ui32>()});
UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)});
UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(Max<ui32>())}, {TCell::Make(Max<ui32>())});
CheckRegistrations(runtime, {"/MyRoot/Table", 1}, {"/MyRoot/Table/Stream/streamImpl", 2});
}
});
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/schemeshard/ut_export/ut_export.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1433,7 +1433,7 @@ partitioning_settings {
env.TestWaitNotification(runtime, txId);

// Write bad DyNumber
UploadRows(runtime, "/MyRoot/Table", 0, {1}, {2}, {1});
UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)});

TPortManager portManager;
const ui16 port = portManager.GetPort();
Expand Down
14 changes: 4 additions & 10 deletions ydb/core/tx/schemeshard/ut_helpers/helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2312,7 +2312,7 @@ namespace NSchemeShardUT_Private {
UNIT_ASSERT_VALUES_EQUAL(error, "");
}

void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, int partitionIdx, const TVector<ui32>& keyTags, const TVector<ui32>& valueTags, const TVector<ui32>& recordIds)
void UploadRow(TTestActorRuntime& runtime, const TString& tablePath, int partitionIdx, const TVector<ui32>& keyTags, const TVector<ui32>& valueTags, const TVector<TCell>& keys, const TVector<TCell>& values)
{
auto tableDesc = DescribePath(runtime, tablePath, true, true);
const auto& tablePartitions = tableDesc.GetPathDescription().GetTablePartitions();
Expand All @@ -2330,15 +2330,9 @@ namespace NSchemeShardUT_Private {
scheme.AddValueColumnIds(tag);
}

for (ui32 i : recordIds) {
auto key = TVector<TCell>{TCell::Make(i)};
auto value = TVector<TCell>{TCell::Make(i)};
Cerr << value[0].AsBuf().Size() << Endl;

auto& row = *ev->Record.AddRows();
row.SetKeyColumns(TSerializedCellVec::Serialize(key));
row.SetValueColumns(TSerializedCellVec::Serialize(value));
}
auto& row = *ev->Record.AddRows();
row.SetKeyColumns(TSerializedCellVec::Serialize(keys));
row.SetValueColumns(TSerializedCellVec::Serialize(values));

const auto& sender = runtime.AllocateEdgeActor();
ForwardToTablet(runtime, datashardTabletId, sender, ev.Release());
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/schemeshard/ut_helpers/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ namespace NSchemeShardUT_Private {
void WriteToTopic(TTestActorRuntime& runtime, const TString& path, ui32& msgSeqNo, const TString& message);
void UpdateRow(TTestActorRuntime& runtime, const TString& table, const ui32 key, const TString& value, ui64 tabletId = TTestTxConfig::FakeHiveTablets);
void UpdateRowPg(TTestActorRuntime& runtime, const TString& table, const ui32 key, ui32 value, ui64 tabletId = TTestTxConfig::FakeHiveTablets);
void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, int partitionIdx, const TVector<ui32>& keyTags, const TVector<ui32>& valueTags, const TVector<ui32>& recordIds);
void UploadRow(TTestActorRuntime& runtime, const TString& tablePath, int partitionIdx, const TVector<ui32>& keyTags, const TVector<ui32>& valueTags, const TVector<TCell>& keys, const TVector<TCell>& values);
void WriteRow(TTestActorRuntime& runtime, const ui64 txId, const TString& tablePath, int partitionIdx, const ui32 key, const TString& value, bool successIsExpected = true);

void SendNextValRequest(TTestActorRuntime& runtime, const TActorId& sender, const TString& path);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/schemeshard/ut_helpers/test_env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ NSchemeShardUT_Private::TTestEnv::TTestEnv(TTestActorRuntime& runtime, const TTe
app.SetEnableBackgroundCompaction(opts.EnableBackgroundCompaction_);
app.SetEnableBorrowedSplitCompaction(opts.EnableBorrowedSplitCompaction_);
app.FeatureFlags.SetEnablePublicApiExternalBlobs(true);
app.FeatureFlags.SetEnableTableDatetime64(true);
app.SetEnableMoveIndex(opts.EnableMoveIndex_);
app.SetEnableChangefeedInitialScan(opts.EnableChangefeedInitialScan_);
app.SetEnableNotNullDataColumns(opts.EnableNotNullDataColumns_);
Expand Down
202 changes: 201 additions & 1 deletion ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
#include <ydb/core/wrappers/ut_helpers/s3_mock.h>
#include <ydb/core/metering/metering.h>
#include <ydb/core/ydb_convert/table_description.h>

#include <ydb/library/binary_json/write.h>
#include <ydb/library/dynumber/dynumber.h>
#include <ydb/library/uuid/uuid.h>


#include <ydb/public/api/protos/ydb_import.pb.h>

#include <contrib/libs/zstd/include/zstd.h>
Expand Down Expand Up @@ -665,6 +671,10 @@ value {
<< "2020-08-12T12:34:56.000000Z," // datetime
<< "2020-08-12T12:34:56.123456Z," // timestamp
<< "-300500," // interval
<< "-18486," // negative date32
<< "-1597235696," // negative datetime64
<< "-1597235696123456," // negative timestamp64
<< "-300500," // negative interval64
<< "3.321," // decimal
<< ".3321e1," // dynumber
<< "\"" << CGIEscapeRet("lorem ipsum") << "\"," // string
Expand All @@ -676,19 +686,23 @@ value {

TString yson = TStringBuilder() << "[[[[["
<< "[%true];" // bool
<< "[\"" << -18486 << "\"];" // date32
<< "[\"" << TInstant::ParseIso8601("2020-08-12T00:00:00.000000Z").Days() << "\"];" // date
<< "[\"" << -1597235696 << "\"];" // datetime64
<< "[\"" << TInstant::ParseIso8601("2020-08-12T12:34:56.000000Z").Seconds() << "\"];" // datetime
<< "[\"" << "3.321" << "\"];" // decimal
<< "[\"" << 1.1234 << "\"];" // double
<< "[\"" << ".3321e1" << "\"];" // dynumber
<< "[\"" << -1.123f << "\"];" // float
<< "[\"" << -100500 << "\"];" // int32
<< "[\"" << -200500 << "\"];" // int64
<< "[\"" << -300500 << "\"];" // interval64
<< "[\"" << -300500 << "\"];" // interval
<< "[\"" << "{\\\"key\\\": \\\"value\\\"}" << "\"];" // json
<< "[\"" << "{\\\"key\\\":\\\"value\\\"}" << "\"];" // jsondoc
<< "[\"" << 1 << "\"];" // key
<< "[\"" << "lorem ipsum" << "\"];" // string
<< "[\"" << -1597235696123456 << "\"];" // timestamp64
<< "[\"" << TInstant::ParseIso8601("2020-08-12T12:34:56.123456Z").MicroSeconds() << "\"];" // timestamp
<< "[\"" << 100500 << "\"];" // uint32
<< "[\"" << 200500 << "\"];" // uint64
Expand All @@ -714,6 +728,10 @@ value {
Columns { Name: "datetime_value" Type: "Datetime" }
Columns { Name: "timestamp_value" Type: "Timestamp" }
Columns { Name: "interval_value" Type: "Interval" }
Columns { Name: "date32_value" Type: "Date32" }
Columns { Name: "datetime64_value" Type: "Datetime64" }
Columns { Name: "timestamp64_value" Type: "Timestamp64" }
Columns { Name: "interval64_value" Type: "Interval64" }
Columns { Name: "decimal_value" Type: "Decimal" }
Columns { Name: "dynumber_value" Type: "DyNumber" }
Columns { Name: "string_value" Type: "String" }
Expand All @@ -738,6 +756,10 @@ value {
"datetime_value",
"timestamp_value",
"interval_value",
"date32_value",
"datetime64_value",
"timestamp64_value",
"interval64_value",
"decimal_value",
"dynumber_value",
"string_value",
Expand Down Expand Up @@ -1106,6 +1128,184 @@ value {
UNIT_ASSERT_C(CheckDefaultFromSequence(table), "Invalid default value");
}

Y_UNIT_TEST(ExportImportOnSupportedDatatypes) {
TTestBasicRuntime runtime;
TTestEnv env(runtime, TTestEnvOptions());
ui64 txId = 100;

TestCreateTable(runtime, ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Uint64" }
Columns { Name: "int32_value" Type: "Int32" }
Columns { Name: "uint32_value" Type: "Uint32" }
Columns { Name: "int64_value" Type: "Int64" }
Columns { Name: "uint64_value" Type: "Uint64" }
Columns { Name: "uint8_value" Type: "Uint8" }
Columns { Name: "bool_value" Type: "Bool" }
Columns { Name: "double_value" Type: "Double" }
Columns { Name: "float_value" Type: "Float" }
Columns { Name: "date_value" Type: "Date" }
Columns { Name: "datetime_value" Type: "Datetime" }
Columns { Name: "timestamp_value" Type: "Timestamp" }
Columns { Name: "interval_value" Type: "Interval" }
Columns { Name: "date32_value" Type: "Date32" }
Columns { Name: "datetime64_value" Type: "Datetime64" }
Columns { Name: "timestamp64_value" Type: "Timestamp64" }
Columns { Name: "interval64_value" Type: "Interval64" }
Columns { Name: "decimal_value" Type: "Decimal" }
Columns { Name: "dynumber_value" Type: "DyNumber" }
Columns { Name: "string_value" Type: "String" }
Columns { Name: "utf8_value" Type: "Utf8" }
Columns { Name: "json_value" Type: "Json" }
Columns { Name: "jsondoc_value" Type: "JsonDocument" }
Columns { Name: "uuid_value" Type: "Uuid" }
KeyColumnNames: ["key"]
)");
env.TestWaitNotification(runtime, txId);

const int partitionIdx = 0;

const TVector<TCell> keys = {TCell::Make(1ull)};

const TString string = "test string";
const TString json = R"({"key": "value"})";
auto binaryJson = NBinaryJson::SerializeToBinaryJson(json);
Y_ABORT_UNLESS(binaryJson.Defined());

const std::pair<ui64, ui64> decimal = NYql::NDecimal::MakePair(NYql::NDecimal::FromString("16.17", NScheme::DECIMAL_PRECISION, NScheme::DECIMAL_SCALE));
const TString dynumber = *NDyNumber::ParseDyNumberString("18");

char uuid[16];
NUuid::ParseUuidToArray(TString("65df1ec1-a97d-47b2-ae56-3c023da6ee8c"), reinterpret_cast<ui16*>(uuid), false);

const TVector<TCell> values = {
TCell::Make<i32>(-1), // Int32
TCell::Make<ui32>(2), // Uint32
TCell::Make<i64>(-3), // Int64
TCell::Make<ui64>(4), // Uint64
TCell::Make<ui8>(5), // Uint8
TCell::Make<bool>(true), // Bool
TCell::Make<double>(6.66), // Double
TCell::Make<float>(7.77), // Float
TCell::Make<ui16>(8), // Date
TCell::Make<ui32>(9), // Datetime
TCell::Make<ui64>(10), // Timestamp
TCell::Make<i64>(-11), // Interval
TCell::Make<i32>(-12), // Date32
TCell::Make<i64>(-13), // Datetime64
TCell::Make<i64>(-14), // Timestamp64
TCell::Make<i64>(-15), // Interval64
TCell::Make<std::pair<ui64, ui64>>(decimal), // Decimal
TCell(dynumber.data(), dynumber.size()), // Dynumber
TCell(string.data(), string.size()), // String
TCell(string.data(), string.size()), // Utf8
TCell(json.data(), json.size()), // Json
TCell(binaryJson->Data(), binaryJson->Size()), // JsonDocument
TCell(uuid, sizeof(uuid)), // Uuid
};

const TVector<ui32> keyTags = {1};
TVector<ui32> valueTags(values.size());
std::iota(valueTags.begin(), valueTags.end(), 2);

UploadRow(runtime, "/MyRoot/Table", partitionIdx, keyTags, valueTags, keys, values);

TPortManager portManager;
const ui16 port = portManager.GetPort();

TS3Mock s3Mock({}, TS3Mock::TSettings(port));
UNIT_ASSERT(s3Mock.Start());

TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
ExportToS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
items {
source_path: "/MyRoot/Table"
destination_prefix: "Backup1"
}
}
)", port));
env.TestWaitNotification(runtime, txId);
TestGetExport(runtime, txId, "/MyRoot");

TestImport(runtime, ++txId, "/MyRoot", Sprintf(R"(
ImportFromS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
items {
source_prefix: "Backup1"
destination_path: "/MyRoot/Restored"
}
}
)", port));
env.TestWaitNotification(runtime, txId);
TestGetImport(runtime, txId, "/MyRoot");


TString expectedJson = TStringBuilder() << "[[[[["
<< "[%true];" // bool
<< "[\"" << -12 << "\"];" // date32
<< "[\"" << 8 << "\"];" // date
<< "[\"" << -13 << "\"];" // datetime64
<< "[\"" << 9 << "\"];" // datetime
<< "[\"" << "16.17" << "\"];" // decimal
<< "[\"" << 6.66 << "\"];" // double
<< "[\"" << ".18e2" << "\"];" // dynumber
<< "[\"" << 7.77f << "\"];" // float
<< "[\"" << -1 << "\"];" // int32
<< "[\"" << -3 << "\"];" // int64
<< "[\"" << -15 << "\"];" // interval64
<< "[\"" << -11 << "\"];" // interval
<< "[\"" << "{\\\"key\\\": \\\"value\\\"}" << "\"];" // json
<< "[\"" << "{\\\"key\\\":\\\"value\\\"}" << "\"];" // jsondoc
<< "[\"" << 1 << "\"];" // key
<< "[\"" << "test string" << "\"];" // string
<< "[\"" << -14 << "\"];" // timestamp64
<< "[\"" << 10 << "\"];" // timestamp
<< "[\"" << 2 << "\"];" // uint32
<< "[\"" << 4 << "\"];" // uint64
<< "[\"" << 5 << "\"];" // uint8
<< "[\"" << "test string" << "\"];" // utf8
<< "[[\"" << "wR7fZX2pskeuVjwCPabujA==" << "\"]]" // uuid
<< "]];\%false]]]";

const TReadKeyDesc readKeyDesc = {"key", "Uint64", "0"};

const TVector<TString> readColumns = {
"key",
"int32_value",
"uint32_value",
"int64_value",
"uint64_value",
"uint8_value",
"bool_value",
"double_value",
"float_value",
"date_value",
"datetime_value",
"timestamp_value",
"interval_value",
"date32_value",
"datetime64_value",
"timestamp64_value",
"interval64_value",
"decimal_value",
"dynumber_value",
"string_value",
"utf8_value",
"json_value",
"jsondoc_value",
"uuid_value",
};

auto contentOriginalTable = ReadTable(runtime, TTestTxConfig::FakeHiveTablets, "Table", readKeyDesc, readColumns);
NKqp::CompareYson(expectedJson, contentOriginalTable);

auto contentRestoredTable = ReadTable(runtime, TTestTxConfig::FakeHiveTablets + 2, "Restored", readKeyDesc, readColumns);
NKqp::CompareYson(expectedJson, contentRestoredTable);
}

Y_UNIT_TEST(ExportImportPg) {
TTestBasicRuntime runtime;
TTestEnv env(runtime, TTestEnvOptions().EnableTablePgTypes(true));
Expand All @@ -1119,7 +1319,7 @@ value {
)");
env.TestWaitNotification(runtime, txId);

UploadRows(runtime, "/MyRoot/Table", 0, {1}, {2}, {55555});
UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(55555u)}, {TCell::Make(55555u)});

TPortManager portManager;
const ui16 port = portManager.GetPort();
Expand Down

0 comments on commit fdd25a5

Please sign in to comment.