Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
MBkkt committed Sep 6, 2024
1 parent 96dfeb9 commit ccc0f87
Show file tree
Hide file tree
Showing 17 changed files with 1,384 additions and 116 deletions.
10 changes: 6 additions & 4 deletions ydb/core/protos/out/out.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#include <ydb/public/api/protos/ydb_table.pb.h>

#include <ydb/core/protos/blobstorage.pb.h>
#include <ydb/core/protos/blobstorage_vdisk_internal.pb.h>
#include <ydb/core/protos/blobstorage_vdisk_config.pb.h>
Expand Down Expand Up @@ -254,6 +252,10 @@ Y_DECLARE_OUT_SPEC(, NKikimrStat::TEvStatisticsResponse::EStatus, stream, value)
stream << NKikimrStat::TEvStatisticsResponse::EStatus_Name(value);
}

Y_DECLARE_OUT_SPEC(, Ydb::Table::IndexBuildState_State, stream, value) {
stream << IndexBuildState_State_Name(value);
Y_DECLARE_OUT_SPEC(, NKikimrIndexBuilder::EBuildStatus, stream, value) {
stream << NKikimrIndexBuilder::EBuildStatus_Name(value);
}

Y_DECLARE_OUT_SPEC(, NKikimrTxDataShard::TEvLocalKMeansRequest_EState, stream, value) {
stream << NKikimrTxDataShard::TEvLocalKMeansRequest_EState_Name(value);
}
66 changes: 66 additions & 0 deletions ydb/core/protos/tx_datashard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import "ydb/core/protos/subdomains.proto";
import "ydb/core/protos/query_stats.proto";
import "ydb/public/api/protos/ydb_issue_message.proto";
import "ydb/public/api/protos/ydb_status_codes.proto";
import "ydb/public/api/protos/ydb_table.proto";
import "ydb/library/yql/dq/actors/protos/dq_events.proto";
import "ydb/library/yql/dq/actors/protos/dq_stats.proto";
import "ydb/library/yql/dq/proto/dq_tasks.proto";
Expand Down Expand Up @@ -1486,6 +1487,71 @@ message TEvSampleKResponse {
repeated bytes Rows = 11;
}

message TEvLocalKMeansRequest {
optional uint64 Id = 1;

optional uint64 TabletId = 2;
optional NKikimrProto.TPathID PathId = 3;

optional uint64 SnapshotTxId = 4;
optional uint64 SnapshotStep = 5;

optional uint64 SeqNoGeneration = 6;
optional uint64 SeqNoRound = 7;

optional Ydb.Table.VectorIndexSettings Settings = 8;

optional uint64 Seed = 9;
optional uint32 K = 10;

enum EState {
SAMPLE = 1;
KMEANS = 2;
UPLOAD_MAIN_TO_TMP = 3;
UPLOAD_MAIN_TO_POSTING = 4;
UPLOAD_TMP_TO_TMP = 5;
UPLOAD_TMP_TO_POSTING = 6;
DONE = 7;
};
optional EState Upload = 11;
// State != DONE
optional EState State = 12;
// State != KMEANS || DoneRounds < NeedsRounds
optional uint32 DoneRounds = 13;
optional uint32 NeedsRounds = 14;

// id of parent cluster
optional uint32 Parent = 15;
// [Child ... Child + K] ids reserved for our clusters
optional uint32 Child = 16;

optional string LevelName = 17;
optional string PostingName = 18;

optional string EmbeddingColumn = 19;
repeated string DataColumns = 20;
}

message TEvLocalKMeansProgressResponse {
optional uint64 Id = 1;

optional uint64 TabletId = 2;
optional NKikimrProto.TPathID PathId = 3;

optional uint64 RequestSeqNoGeneration = 4;
optional uint64 RequestSeqNoRound = 5;

optional NKikimrIndexBuilder.EBuildStatus Status = 6;
repeated Ydb.Issue.IssueMessage Issues = 7;

// TODO(mbkkt) implement slow-path (reliable-path)
// optional uint64 RowsDelta = 8;
// optional uint64 BytesDelta = 9;

// optional TEvLocalKMeansRequest.EState State = 10;
// optional uint32 DoneRounds = 11;
}

message TEvCdcStreamScanRequest {
message TLimits {
optional uint32 BatchMaxBytes = 1 [default = 512000];
Expand Down
15 changes: 12 additions & 3 deletions ydb/core/tablet_flat/flat_scan_lead.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,14 @@ namespace NTable {

struct TLead {
void To(TTagsRef tags, TArrayRef<const TCell> key, ESeek seek)
{
To(key, seek);
SetTags(tags);
}

void To(TArrayRef<const TCell> key, ESeek seek)
{
Valid = true;
Tags.assign(tags.begin(), tags.end());
Relation = seek;
Key = TSerializedCellVec(key);
StopKey = { };
Expand All @@ -24,6 +29,10 @@ namespace NTable {
StopKeyInclusive = inclusive;
}

void SetTags(TTagsRef tags) {
Tags.assign(tags.begin(), tags.end());
}

explicit operator bool() const noexcept
{
return Valid;
Expand All @@ -34,12 +43,12 @@ namespace NTable {
Valid = false;
}

bool Valid = false;
ESeek Relation = ESeek::Exact;
bool Valid = false;
bool StopKeyInclusive = true;
TVector<ui32> Tags;
TSerializedCellVec Key;
TSerializedCellVec StopKey;
bool StopKeyInclusive = true;
};

}
Expand Down
76 changes: 76 additions & 0 deletions ydb/core/tx/datashard/buffer_data.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#include "ydb/core/scheme/scheme_tablecell.h"
#include "ydb/core/tx/datashard/upload_stats.h"
#include "ydb/core/tx/tx_proxy/upload_rows.h"

namespace NKikimr::NDataShard {

using TTypes = NTxProxy::TUploadTypes;
using TRows = NTxProxy::TUploadRows;

class TBufferData: public IStatHolder, public TNonCopyable {
public:
TBufferData()
: Rows{std::make_shared<TRows>()}
{
}

ui64 GetRows() const override final {
return Rows->size();
}

std::shared_ptr<TRows> GetRowsData() const {
return Rows;
}

ui64 GetBytes() const override final {
return ByteSize;
}

void FlushTo(TBufferData& other) {
Y_ABORT_UNLESS(this != &other);
Y_ABORT_UNLESS(other.IsEmpty());
other.Rows.swap(Rows);
other.ByteSize = std::exchange(ByteSize, 0);
other.LastKey = std::exchange(LastKey, {});
}

void Clear() {
Rows->clear();
ByteSize = 0;
LastKey = {};
}

void AddRow(TSerializedCellVec&& key, TSerializedCellVec&& targetPk, TString&& targetValue) {
Rows->emplace_back(std::move(targetPk), std::move(targetValue));
ByteSize += Rows->back().first.GetBuffer().size() + Rows->back().second.size();
LastKey = std::move(key);
}

bool IsEmpty() const {
return Rows->empty();
}

size_t Size() const {
return Rows->size();
}

bool IsReachLimits(const TUploadLimits& Limits) {
// TODO(mbkkt) why [0..BatchRowsLimit) but [0..BatchBytesLimit]
return Rows->size() >= Limits.BatchRowsLimit || ByteSize > Limits.BatchBytesLimit;
}

auto&& ExtractLastKey() {
return std::move(LastKey);
}

const auto& GetLastKey() const {
return LastKey;
}

private:
std::shared_ptr<TRows> Rows;
ui64 ByteSize = 0;
TSerializedCellVec LastKey;
};

}
102 changes: 3 additions & 99 deletions ydb/core/tx/datashard/build_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "range_ops.h"
#include "scan_common.h"
#include "upload_stats.h"
#include "buffer_data.h"

#include <ydb/core/base/appdata.h>
#include <ydb/core/base/counters.h>
Expand All @@ -27,31 +28,6 @@ namespace NKikimr::NDataShard {
#define LOG_W(stream) LOG_WARN_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, stream)
#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, stream)

using TColumnsTypes = THashMap<TString, NScheme::TTypeInfo>;
using TTypes = NTxProxy::TUploadTypes;
using TRows = NTxProxy::TUploadRows;

static TColumnsTypes GetAllTypes(const TUserTable& tableInfo) {
TColumnsTypes result;

for (const auto& it : tableInfo.Columns) {
result[it.second.Name] = it.second.Type;
}

return result;
}

static void ProtoYdbTypeFromTypeInfo(Ydb::Type* type, const NScheme::TTypeInfo typeInfo) {
if (typeInfo.GetTypeId() == NScheme::NTypeIds::Pg) {
auto* typeDesc = typeInfo.GetTypeDesc();
auto* pg = type->mutable_pg_type();
pg->set_type_name(NPg::PgTypeNameFromTypeDesc(typeDesc));
pg->set_oid(NPg::PgTypeIdFromTypeDesc(typeDesc));
} else {
type->set_type_id((Ydb::Type::PrimitiveTypeId)typeInfo.GetTypeId());
}
}

static std::shared_ptr<TTypes> BuildTypes(const TUserTable& tableInfo, const NKikimrIndexBuilder::TColumnBuildSettings& buildSettings) {
auto types = GetAllTypes(tableInfo);

Expand Down Expand Up @@ -119,74 +95,6 @@ bool BuildExtraColumns(TVector<TCell>& cells, const NKikimrIndexBuilder::TColumn
return true;
}

class TBufferData: public IStatHolder, public TNonCopyable {
public:
TBufferData()
: Rows(new TRows)
{
}

ui64 GetRows() const override final {
return Rows->size();
}

std::shared_ptr<TRows> GetRowsData() const {
return Rows;
}

ui64 GetBytes() const override final {
return ByteSize;
}

void FlushTo(TBufferData& other) {
if (this == &other) {
return;
}

Y_ABORT_UNLESS(other.Rows);
Y_ABORT_UNLESS(other.IsEmpty());

other.Rows.swap(Rows);
other.ByteSize = ByteSize;
other.LastKey = std::move(LastKey);

Clear();
}

void Clear() {
Rows->clear();
ByteSize = 0;
LastKey = {};
}

void AddRow(TSerializedCellVec&& key, TSerializedCellVec&& targetPk, TString&& targetValue) {
Rows->emplace_back(std::move(targetPk), std::move(targetValue));
ByteSize += Rows->back().first.GetBuffer().size() + Rows->back().second.size();
LastKey = std::move(key);
}

bool IsEmpty() const {
return Rows->empty();
}

bool IsReachLimits(const TUploadLimits& Limits) {
return Rows->size() >= Limits.BatchRowsLimit || ByteSize > Limits.BatchBytesLimit;
}

void ExtractLastKey(TSerializedCellVec& out) {
out = std::move(LastKey);
}

const TSerializedCellVec& GetLastKey() const {
return LastKey;
}

private:
std::shared_ptr<TRows> Rows;
ui64 ByteSize = 0;
TSerializedCellVec LastKey;
};

template <NKikimrServices::TActivity::EType Activity>
class TBuildScanUpload: public TActor<TBuildScanUpload<Activity>>, public NTable::IScan {
using TThis = TBuildScanUpload<Activity>;
Expand Down Expand Up @@ -382,11 +290,7 @@ class TBuildScanUpload: public TActor<TBuildScanUpload<Activity>>, public NTable
<< " WriteBuf empty: " << WriteBuf.IsEmpty()
<< " " << Debug());

if (ReadBuf.IsEmpty()) {
return EScan::Feed;
}

if (WriteBuf.IsEmpty()) {
if (!ReadBuf.IsEmpty() && WriteBuf.IsEmpty()) {
ReadBuf.FlushTo(WriteBuf);
Upload();
}
Expand Down Expand Up @@ -433,7 +337,7 @@ class TBuildScanUpload: public TActor<TBuildScanUpload<Activity>>, public NTable

if (UploadStatus.IsSuccess()) {
Stats.Aggr(&WriteBuf);
WriteBuf.ExtractLastKey(LastUploadedKey);
LastUploadedKey = WriteBuf.ExtractLastKey();

//send progress
TAutoPtr<TEvDataShard::TEvBuildIndexProgressResponse> progress = new TEvDataShard::TEvBuildIndexProgressResponse;
Expand Down
15 changes: 15 additions & 0 deletions ydb/core/tx/datashard/datashard.h
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,9 @@ struct TEvDataShard {
EvSampleKRequest,
EvSampleKResponse,

EvLocalKMeansRequest,
EvLocalKMeansProgressResponse,

EvEnd
};

Expand Down Expand Up @@ -1454,6 +1457,18 @@ struct TEvDataShard {
TEvDataShard::EvSampleKResponse> {
};

struct TEvLocalKMeansRequest
: public TEventPB<TEvLocalKMeansRequest,
NKikimrTxDataShard::TEvLocalKMeansRequest,
TEvDataShard::EvLocalKMeansRequest> {
};

struct TEvLocalKMeansProgressResponse
: public TEventPB<TEvLocalKMeansProgressResponse,
NKikimrTxDataShard::TEvLocalKMeansProgressResponse,
TEvDataShard::EvLocalKMeansProgressResponse> {
};

struct TEvKqpScan
: public TEventPB<TEvKqpScan,
NKikimrTxDataShard::TEvKqpScan,
Expand Down
Loading

0 comments on commit ccc0f87

Please sign in to comment.