Skip to content

Commit

Permalink
Merge aec0602 into d8b29c9
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Jul 5, 2024
2 parents d8b29c9 + aec0602 commit b3e994b
Show file tree
Hide file tree
Showing 13 changed files with 48 additions and 35 deletions.
7 changes: 4 additions & 3 deletions ydb/core/formats/arrow/size_calcer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,12 @@ ui64 GetArrayDataSize(const std::shared_ptr<arrow::Array>& column) {
}

NKikimr::NArrow::TSerializedBatch TSerializedBatch::Build(std::shared_ptr<arrow::RecordBatch> batch, const TBatchSplitttingContext& context) {
std::optional<TFirstLastSpecialKeys> specialKeys;
std::optional<TString> specialKeys;
if (context.GetFieldsForSpecialKeys().size()) {
specialKeys = TFirstLastSpecialKeys(batch, context.GetFieldsForSpecialKeys());
specialKeys = TFirstLastSpecialKeys(batch, context.GetFieldsForSpecialKeys()).SerializeToString();
}
return TSerializedBatch(NArrow::SerializeSchema(*batch->schema()), NArrow::SerializeBatchNoCompression(batch), batch->num_rows(), NArrow::GetBatchDataSize(batch), specialKeys);
return TSerializedBatch(NArrow::SerializeSchema(*batch->schema()), NArrow::SerializeBatchNoCompression(batch), batch->num_rows(),
NArrow::GetBatchDataSize(batch), specialKeys);
}

TConclusionStatus TSerializedBatch::BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, const TBatchSplitttingContext& context, std::optional<TSerializedBatch>& sbL, std::optional<TSerializedBatch>& sbR) {
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/formats/arrow/size_calcer.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,13 @@ class TSerializedBatch {
YDB_READONLY_DEF(TString, Data);
YDB_READONLY(ui32, RowsCount, 0);
YDB_READONLY(ui32, RawBytes, 0);
std::optional<TFirstLastSpecialKeys> SpecialKeys;
std::optional<TString> SpecialKeys;
public:
size_t GetSize() const {
return Data.size();
}

const TFirstLastSpecialKeys& GetSpecialKeysSafe() const {
const TString& GetSpecialKeysSafe() const {
AFL_VERIFY(SpecialKeys);
return *SpecialKeys;
}
Expand All @@ -95,7 +95,7 @@ class TSerializedBatch {
static TConclusionStatus BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, const TBatchSplitttingContext& context, std::optional<TSerializedBatch>& sbL, std::optional<TSerializedBatch>& sbR);
static TSerializedBatch Build(std::shared_ptr<arrow::RecordBatch> batch, const TBatchSplitttingContext& context);

TSerializedBatch(TString&& schemaData, TString&& data, const ui32 rowsCount, const ui32 rawBytes, const std::optional<TFirstLastSpecialKeys>& specialKeys)
TSerializedBatch(TString&& schemaData, TString&& data, const ui32 rowsCount, const ui32 rawBytes, const std::optional<TString>& specialKeys)
: SchemaData(schemaData)
, Data(data)
, RowsCount(rowsCount)
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/background_controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ void TBackgroundController::CheckDeadlines() {
void TBackgroundController::CheckDeadlinesIndexation() {
for (auto&& i : ActiveIndexationTasks) {
if (TMonotonic::Now() - i.second > NOlap::TCompactionLimits::CompactionTimeout) {
AFL_CRIT(NKikimrServices::TX_COLUMNSHARD)("event", "deadline_compaction")("task_id", i.first);
AFL_CRIT(NKikimrServices::TX_COLUMNSHARD)("event", "deadline_indexation")("task_id", i.first);
Y_DEBUG_ABORT_UNLESS(false);
}
}
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSeriali
meta.SetNumRows(batch->GetRowsCount());
meta.SetRawBytes(batch->GetRawBytes());
meta.SetDirtyWriteTimeSeconds(batch.GetStartInstant().Seconds());
meta.SetSpecialKeysRawData(batch->GetSpecialKeysSafe().SerializeToString());
meta.SetSpecialKeysRawData(batch->GetSpecialKeysSafe());

const auto& blobRange = batch.GetRange();
Y_ABORT_UNLESS(blobRange.GetBlobId().IsValid());
Expand All @@ -16,9 +16,9 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSeriali
TBlobGroupSelector dsGroupSelector(Self->Info());
NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);

const auto& writeMeta = batch.GetAggregation().GetWriteData()->GetWriteMeta();
const auto& writeMeta = batch.GetAggregation().GetWriteMeta();
meta.SetModificationType(TEnumOperator<NEvWrite::EModificationType>::SerializeToProto(writeMeta.GetModificationType()));
auto schemeVersion = batch.GetAggregation().GetWriteData()->GetData()->GetSchemaVersion();
auto schemeVersion = batch.GetAggregation().GetSchemaVersion();
auto tableSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaVerified(schemeVersion);

NOlap::TInsertedData insertData((ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), blobRange, meta, tableSchema->GetVersion(), batch->GetData());
Expand All @@ -36,7 +36,7 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
ACFL_DEBUG("event", "start_execute");
const NOlap::TWritingBuffer& buffer = PutBlobResult->Get()->MutableWritesBuffer();
for (auto&& aggr : buffer.GetAggregations()) {
const auto& writeMeta = aggr->GetWriteData()->GetWriteMeta();
const auto& writeMeta = aggr->GetWriteMeta();
Y_ABORT_UNLESS(Self->TablesManager.IsReadyForWrite(writeMeta.GetTableId()));
txc.DB.NoMoreReadsForTx();
TWriteOperation::TPtr operation;
Expand Down Expand Up @@ -82,7 +82,7 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
}
Results.clear();
for (auto&& aggr : buffer.GetAggregations()) {
const auto& writeMeta = aggr->GetWriteData()->GetWriteMeta();
const auto& writeMeta = aggr->GetWriteMeta();
if (!writeMeta.HasLongTxId()) {
auto operation = Self->OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId());
Y_ABORT_UNLESS(operation);
Expand Down Expand Up @@ -136,7 +136,7 @@ void TTxWrite::Complete(const TActorContext& ctx) {
i.DoSendReply(ctx);
}
for (ui32 i = 0; i < buffer.GetAggregations().size(); ++i) {
const auto& writeMeta = buffer.GetAggregations()[i]->GetWriteData()->GetWriteMeta();
const auto& writeMeta = buffer.GetAggregations()[i]->GetWriteMeta();
Self->CSCounters.OnWriteTxComplete(now - writeMeta.GetWriteStartInstant());
Self->CSCounters.OnSuccessWriteResponse();
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo
auto wg = WritesMonitor.FinishWrite(wBuffer.GetSumSize(), wBuffer.GetAggregations().size());

for (auto&& aggr : baseAggregations) {
const auto& writeMeta = aggr->GetWriteData()->GetWriteMeta();
const auto& writeMeta = aggr->GetWriteMeta();

if (!TablesManager.IsReadyForWrite(writeMeta.GetTableId())) {
ACFL_ERROR("event", "absent_pathId")("path_id", writeMeta.GetTableId())("has_index", TablesManager.HasPrimaryIndex());
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/engines/writer/buffer/actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ void TActor::Bootstrap() {
void TActor::Flush() {
if (Aggregations.size()) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "flush_writing")("size", SumSize)("count", Aggregations.size());
auto action = Aggregations.front()->GetWriteData()->GetBlobsAction();
auto action = Aggregations.front()->GetBlobsAction();
auto writeController = std::make_shared<NOlap::TIndexedWriteController>(ParentActorId, action, std::move(Aggregations));
if (action->NeedDraftTransaction()) {
TActorContext::AsActorContext().Send(ParentActorId, std::make_unique<NColumnShard::TEvPrivate::TEvWriteDraft>(writeController));
Expand Down Expand Up @@ -48,7 +48,7 @@ void TActor::Handle(TEvAddInsertedDataToBuffer::TPtr& ev) {
auto* evBase = ev->Get();
AFL_VERIFY(evBase->GetWriteData()->GetBlobsAction()->GetStorageId() == NOlap::IStoragesManager::DefaultStorageId);
SumSize += evBase->GetWriteData()->GetSize();
Aggregations.emplace_back(std::make_shared<NOlap::TWriteAggregation>(evBase->GetWriteData(), std::move(evBase->MutableBlobsToWrite())));
Aggregations.emplace_back(std::make_shared<NOlap::TWriteAggregation>(*evBase->GetWriteData(), std::move(evBase->MutableBlobsToWrite())));
if (SumSize > 4 * 1024 * 1024 || Aggregations.size() > 750 || !FlushDuration) {
Flush();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,19 @@ void TWideSerializedBatch::InitBlobId(const TUnifiedBlobId& id) {

void TWritingBuffer::InitReadyInstant(const TMonotonic instant) {
for (auto&& aggr : Aggregations) {
aggr->GetWriteData()->MutableWriteMeta().SetWriteMiddle5StartInstant(instant);
aggr->MutableWriteMeta().SetWriteMiddle5StartInstant(instant);
}
}

void TWritingBuffer::InitStartSending(const TMonotonic instant) {
for (auto&& aggr : Aggregations) {
aggr->GetWriteData()->MutableWriteMeta().SetWriteMiddle4StartInstant(instant);
aggr->MutableWriteMeta().SetWriteMiddle4StartInstant(instant);
}
}

void TWritingBuffer::InitReplyReceived(const TMonotonic instant) {
for (auto&& aggr : Aggregations) {
aggr->GetWriteData()->MutableWriteMeta().SetWriteMiddle6StartInstant(instant);
aggr->MutableWriteMeta().SetWriteMiddle6StartInstant(instant);
}
}

Expand Down
31 changes: 25 additions & 6 deletions ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,23 +86,42 @@ class TWritingBlob {

class TWriteAggregation {
private:
YDB_READONLY_DEF(std::shared_ptr<NEvWrite::TWriteData>, WriteData);
NEvWrite::TWriteMeta WriteMeta;
YDB_READONLY(ui64, SchemaVersion, 0);
YDB_READONLY(ui64, Size, 0);
YDB_ACCESSOR_DEF(std::vector<TWideSerializedBatch>, SplittedBlobs);
YDB_READONLY_DEF(TVector<TWriteId>, WriteIds);
YDB_READONLY_DEF(std::shared_ptr<NOlap::IBlobsWritingAction>, BlobsAction);
public:
const NEvWrite::TWriteMeta& GetWriteMeta() const {
return WriteMeta;
}

NEvWrite::TWriteMeta& MutableWriteMeta() {
return WriteMeta;
}

void AddWriteId(const TWriteId& id) {
WriteIds.emplace_back(id);
}

TWriteAggregation(const std::shared_ptr<NEvWrite::TWriteData>& writeData, std::vector<NArrow::TSerializedBatch>&& splittedBlobs)
: WriteData(writeData) {
TWriteAggregation(const NEvWrite::TWriteData& writeData, std::vector<NArrow::TSerializedBatch>&& splittedBlobs)
: WriteMeta(writeData.GetWriteMeta())
, SchemaVersion(writeData.GetData()->GetSchemaVersion())
, Size(writeData.GetSize())
, BlobsAction(writeData.GetBlobsAction())
{
for (auto&& s : splittedBlobs) {
SplittedBlobs.emplace_back(std::move(s), *this);
}
}

TWriteAggregation(const std::shared_ptr<NEvWrite::TWriteData>& writeData)
: WriteData(writeData) {
TWriteAggregation(const NEvWrite::TWriteData& writeData)
: WriteMeta(writeData.GetWriteMeta())
, SchemaVersion(writeData.GetData()->GetSchemaVersion())
, Size(writeData.GetSize())
, BlobsAction(writeData.GetBlobsAction())
{
}
};

Expand All @@ -120,7 +139,7 @@ class TWritingBuffer: public TMoveOnly {
{
AFL_VERIFY(BlobsAction);
for (auto&& aggr : Aggregations) {
SumSize += aggr->GetWriteData()->GetSize();
SumSize += aggr->GetSize();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace NKikimr::NOlap {

void TBuildBatchesTask::ReplyError(const TString& message) {
auto writeDataPtr = std::make_shared<NEvWrite::TWriteData>(std::move(WriteData));
TWritingBuffer buffer(writeDataPtr->GetBlobsAction(), { std::make_shared<TWriteAggregation>(writeDataPtr) });
TWritingBuffer buffer(writeDataPtr->GetBlobsAction(), { std::make_shared<TWriteAggregation>(*writeDataPtr) });
auto result = NColumnShard::TEvPrivate::TEvWriteBlobsResult::Error(
NKikimrProto::EReplyStatus::CORRUPTED, std::move(buffer), message);
TActorContext::AsActorContext().Send(ParentActorId, result.release());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ NKikimr::TConclusionStatus TModificationRestoreTask::DoOnDataChunk(const std::sh
auto writeDataPtr = std::make_shared<NEvWrite::TWriteData>(std::move(WriteData));
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "restore_data_problems")
("write_id", WriteData.GetWriteMeta().GetWriteId())("tablet_id", TabletId)("message", result.GetErrorMessage());
TWritingBuffer buffer(writeDataPtr->GetBlobsAction(), { std::make_shared<TWriteAggregation>(writeDataPtr) });
TWritingBuffer buffer(writeDataPtr->GetBlobsAction(), { std::make_shared<TWriteAggregation>(*writeDataPtr) });
auto evResult = NColumnShard::TEvPrivate::TEvWriteBlobsResult::Error(NKikimrProto::EReplyStatus::CORRUPTED,
std::move(buffer), result.GetErrorMessage());
TActorContext::AsActorContext().Send(ParentActorId, evResult.release());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ std::optional<std::vector<NKikimr::NArrow::TSerializedBatch>> TBuildSlicesTask::

void TBuildSlicesTask::ReplyError(const TString& message) {
auto writeDataPtr = std::make_shared<NEvWrite::TWriteData>(std::move(WriteData));
TWritingBuffer buffer(writeDataPtr->GetBlobsAction(), { std::make_shared<TWriteAggregation>(writeDataPtr) });
TWritingBuffer buffer(writeDataPtr->GetBlobsAction(), { std::make_shared<TWriteAggregation>(*writeDataPtr) });
auto result = NColumnShard::TEvPrivate::TEvWriteBlobsResult::Error(
NKikimrProto::EReplyStatus::CORRUPTED, std::move(buffer), message);
TActorContext::AsActorContext().Send(ParentActorId, result.release());
Expand Down
5 changes: 0 additions & 5 deletions ydb/core/tx/data_events/write_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,4 @@ TWriteData::TWriteData(const TWriteMeta& writeMeta, IDataContainer::TPtr data, c
Y_ABORT_UNLESS(BlobsAction);
}

const NKikimr::NEvWrite::IDataContainer& TWriteData::GetDataVerified() const {
AFL_VERIFY(Data);
return *Data;
}

}
2 changes: 0 additions & 2 deletions ydb/core/tx/data_events/write_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,6 @@ class TWriteData {
public:
TWriteData(const TWriteMeta& writeMeta, IDataContainer::TPtr data, const std::shared_ptr<arrow::Schema>& primaryKeySchema, const std::shared_ptr<NOlap::IBlobsWritingAction>& blobsAction);

const IDataContainer& GetDataVerified() const;

const TWriteMeta& GetWriteMeta() const {
return WriteMeta;
}
Expand Down

0 comments on commit b3e994b

Please sign in to comment.