Skip to content

Commit

Permalink
serialize special keys in separated thread (ydb-platform#6324)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored and zverevgeny committed Jul 23, 2024
1 parent d90075c commit a566ec4
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 8 deletions.
7 changes: 4 additions & 3 deletions ydb/core/formats/arrow/size_calcer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,12 @@ ui64 GetArrayDataSize(const std::shared_ptr<arrow::Array>& column) {
}

NKikimr::NArrow::TSerializedBatch TSerializedBatch::Build(std::shared_ptr<arrow::RecordBatch> batch, const TBatchSplitttingContext& context) {
std::optional<TFirstLastSpecialKeys> specialKeys;
std::optional<TString> specialKeys;
if (context.GetFieldsForSpecialKeys().size()) {
specialKeys = TFirstLastSpecialKeys(batch, context.GetFieldsForSpecialKeys());
specialKeys = TFirstLastSpecialKeys(batch, context.GetFieldsForSpecialKeys()).SerializeToString();
}
return TSerializedBatch(NArrow::SerializeSchema(*batch->schema()), NArrow::SerializeBatchNoCompression(batch), batch->num_rows(), NArrow::GetBatchDataSize(batch), specialKeys);
return TSerializedBatch(NArrow::SerializeSchema(*batch->schema()), NArrow::SerializeBatchNoCompression(batch), batch->num_rows(),
NArrow::GetBatchDataSize(batch), specialKeys);
}

TConclusionStatus TSerializedBatch::BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, const TBatchSplitttingContext& context, std::optional<TSerializedBatch>& sbL, std::optional<TSerializedBatch>& sbR) {
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/formats/arrow/size_calcer.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,13 @@ class TSerializedBatch {
YDB_READONLY_DEF(TString, Data);
YDB_READONLY(ui32, RowsCount, 0);
YDB_READONLY(ui32, RawBytes, 0);
std::optional<TFirstLastSpecialKeys> SpecialKeys;
std::optional<TString> SpecialKeys;
public:
size_t GetSize() const {
return Data.size();
}

const TFirstLastSpecialKeys& GetSpecialKeysSafe() const {
const TString& GetSpecialKeysSafe() const {
AFL_VERIFY(SpecialKeys);
return *SpecialKeys;
}
Expand All @@ -95,7 +95,7 @@ class TSerializedBatch {
static TConclusionStatus BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, const TBatchSplitttingContext& context, std::optional<TSerializedBatch>& sbL, std::optional<TSerializedBatch>& sbR);
static TSerializedBatch Build(std::shared_ptr<arrow::RecordBatch> batch, const TBatchSplitttingContext& context);

TSerializedBatch(TString&& schemaData, TString&& data, const ui32 rowsCount, const ui32 rawBytes, const std::optional<TFirstLastSpecialKeys>& specialKeys)
TSerializedBatch(TString&& schemaData, TString&& data, const ui32 rowsCount, const ui32 rawBytes, const std::optional<TString>& specialKeys)
: SchemaData(schemaData)
, Data(data)
, RowsCount(rowsCount)
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/background_controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ void TBackgroundController::CheckDeadlines() {
void TBackgroundController::CheckDeadlinesIndexation() {
for (auto&& i : ActiveIndexationTasks) {
if (TMonotonic::Now() - i.second > NOlap::TCompactionLimits::CompactionTimeout) {
AFL_CRIT(NKikimrServices::TX_COLUMNSHARD)("event", "deadline_compaction")("task_id", i.first);
AFL_CRIT(NKikimrServices::TX_COLUMNSHARD)("event", "deadline_indexation")("task_id", i.first);
Y_DEBUG_ABORT_UNLESS(false);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSeriali
meta.SetNumRows(batch->GetRowsCount());
meta.SetRawBytes(batch->GetRawBytes());
meta.SetDirtyWriteTimeSeconds(batch.GetStartInstant().Seconds());
meta.SetSpecialKeysRawData(batch->GetSpecialKeysSafe().SerializeToString());
meta.SetSpecialKeysRawData(batch->GetSpecialKeysSafe());

const auto& blobRange = batch.GetRange();
Y_ABORT_UNLESS(blobRange.GetBlobId().IsValid());
Expand Down

0 comments on commit a566ec4

Please sign in to comment.