Skip to content

Commit

Permalink
Merging inference updates (ydb-platform#7929)
Browse files Browse the repository at this point in the history
  • Loading branch information
evanevanevanevannnn authored Aug 16, 2024
1 parent 9e45694 commit f179f77
Show file tree
Hide file tree
Showing 24 changed files with 346 additions and 122 deletions.
11 changes: 6 additions & 5 deletions ydb/core/external_sources/object_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ struct TObjectStorageExternalSource : public IExternalSource {
}
for (const auto& entry : entries.Objects) {
if (entry.Size > 0) {
return entry.Path;
return entry;
}
}
throw yexception() << "couldn't find any files for type inference, please check that the right path is provided";
Expand All @@ -349,30 +349,31 @@ struct TObjectStorageExternalSource : public IExternalSource {
meta->Attributes.erase("withinfer");

auto fileFormat = NObjectStorage::NInference::ConvertFileFormat(*format);
auto arrowFetcherId = ActorSystem->Register(NObjectStorage::NInference::CreateArrowFetchingActor(s3FetcherId, fileFormat));
auto arrowFetcherId = ActorSystem->Register(NObjectStorage::NInference::CreateArrowFetchingActor(s3FetcherId, fileFormat, meta->Attributes));
auto arrowInferencinatorId = ActorSystem->Register(NObjectStorage::NInference::CreateArrowInferencinator(arrowFetcherId, fileFormat, meta->Attributes));

return afterListing.Apply([arrowInferencinatorId, meta, actorSystem = ActorSystem](const NThreading::TFuture<TString>& pathFut) {
return afterListing.Apply([arrowInferencinatorId, meta, actorSystem = ActorSystem](const NThreading::TFuture<NYql::NS3Lister::TObjectListEntry>& entryFut) {
auto promise = NThreading::NewPromise<TMetadataResult>();
auto schemaToMetadata = [meta](NThreading::TPromise<TMetadataResult> metaPromise, NObjectStorage::TEvInferredFileSchema&& response) {
if (!response.Status.IsSuccess()) {
metaPromise.SetValue(NYql::NCommon::ResultFromError<TMetadataResult>(response.Status.GetIssues()));
return;
}
TMetadataResult result;
meta->Changed = true;
meta->Schema.clear_column();
for (const auto& column : response.Fields) {
auto& destColumn = *meta->Schema.add_column();
destColumn = column;
}
TMetadataResult result;
result.SetSuccess();
result.Metadata = meta;
metaPromise.SetValue(std::move(result));
};
auto [path, size, _] = entryFut.GetValue();
actorSystem->Register(new NKqp::TActorRequestHandler<NObjectStorage::TEvInferFileSchema, NObjectStorage::TEvInferredFileSchema, TMetadataResult>(
arrowInferencinatorId,
new NObjectStorage::TEvInferFileSchema(TString{pathFut.GetValue()}),
new NObjectStorage::TEvInferFileSchema(TString{path}, size),
promise,
std::move(schemaToMetadata)
));
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/external_sources/object_storage/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,13 @@ struct TEvArrowFile : public NActors::TEventLocal<TEvArrowFile, EvArrowFile> {
};

struct TEvInferFileSchema : public NActors::TEventLocal<TEvInferFileSchema, EvInferFileSchema> {
explicit TEvInferFileSchema(TString&& path)
explicit TEvInferFileSchema(TString&& path, ui64 size)
: Path{std::move(path)}
, Size{size}
{}

TString Path;
ui64 Size = 0;
};

struct TEvInferredFileSchema : public NActors::TEventLocal<TEvInferredFileSchema, EvInferredFileSchema> {
Expand Down
142 changes: 132 additions & 10 deletions ydb/core/external_sources/object_storage/inference/arrow_fetcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,32 @@
#include <arrow/csv/chunker.h>
#include <arrow/csv/options.h>
#include <arrow/io/memory.h>
#include <arrow/util/endian.h>

#include <util/generic/guid.h>
#include <util/generic/size_literals.h>

#include <ydb/core/external_sources/object_storage/events.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/hfunc.h>
#include <ydb/library/yql/providers/s3/compressors/factory.h>
#include <ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadBufferFromString.h>

namespace NKikimr::NExternalSource::NObjectStorage::NInference {

class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher> {
static constexpr uint64_t PrefixSize = 10_MB;
public:
TArrowFileFetcher(NActors::TActorId s3FetcherId, EFileFormat format)
TArrowFileFetcher(NActors::TActorId s3FetcherId, EFileFormat format, const THashMap<TString, TString>& params)
: S3FetcherId_{s3FetcherId}
, Format_{format}
{
Y_ABORT_UNLESS(IsArrowInferredFormat(Format_));

auto decompression = params.FindPtr("compression");
if (decompression) {
DecompressionFormat_ = *decompression;
}
}

void Bootstrap() {
Expand All @@ -40,15 +48,20 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
const auto& request = *ev->Get();
TRequest localRequest{
.Path = request.Path,
.RequestId = {},
.RequestId = TGUID::Create(),
.Requester = ev->Sender,
.MetadataRequest = false,
};
CreateGuid(&localRequest.RequestId);

switch (Format_) {
case EFileFormat::CsvWithNames:
case EFileFormat::TsvWithNames: {
HandleAsPrefixFile(std::move(localRequest), ctx);
RequestPartialFile(std::move(localRequest), ctx, 0, 10_MB);
break;
}
case EFileFormat::Parquet: {
localRequest.MetadataRequest = true;
RequestPartialFile(std::move(localRequest), ctx, request.Size - 8, request.Size - 4);
break;
}
default: {
Expand All @@ -67,6 +80,15 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>

const auto& request = requestIt->second;

TString data = std::move(response.Data);
if (DecompressionFormat_) {
auto decompressedData = DecompressFile(data, request, ctx);
if (!decompressedData) {
return;
}
data = std::move(*decompressedData);
}

std::shared_ptr<arrow::io::RandomAccessFile> file;
switch (Format_) {
case EFileFormat::CsvWithNames:
Expand All @@ -76,7 +98,16 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
if (Format_ == EFileFormat::TsvWithNames) {
options.delimiter = '\t';
}
file = CleanupCsvFile(response.Data, request, options, ctx);
file = CleanupCsvFile(data, request, options, ctx);
ctx.Send(request.Requester, new TEvArrowFile(std::move(file), request.Path));
break;
}
case EFileFormat::Parquet: {
if (request.MetadataRequest) {
HandleMetadataSizeRequest(data, request, ctx);
return;
}
file = BuildParquetFileFromMetadata(data, request, ctx);
ctx.Send(request.Requester, new TEvArrowFile(std::move(file), request.Path));
break;
}
Expand Down Expand Up @@ -104,14 +135,15 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
uint64_t From = 0;
uint64_t To = 0;
NActors::TActorId Requester;
bool MetadataRequest;
};

// Reading file

void HandleAsPrefixFile(TRequest&& insertedRequest, const NActors::TActorContext& ctx) {
void RequestPartialFile(TRequest&& insertedRequest, const NActors::TActorContext& ctx, uint64_t from, uint64_t to) {
auto path = insertedRequest.Path;
insertedRequest.From = 0;
insertedRequest.To = 10_MB;
insertedRequest.From = from;
insertedRequest.To = to;
auto it = InflightRequests_.try_emplace(path, std::move(insertedRequest));
Y_ABORT_UNLESS(it.second, "couldn't insert request for path: %s", insertedRequest.RequestId.AsGuidString().c_str());

Expand All @@ -135,6 +167,43 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>

// Cutting file

TMaybe<TString> DecompressFile(const TString& data, const TRequest& request, const NActors::TActorContext& ctx) {
try {
NDB::ReadBufferFromString dataBuffer(data);
auto decompressorBuffer = NYql::MakeDecompressor(dataBuffer, *DecompressionFormat_);
if (!decompressorBuffer) {
auto error = MakeError(
request.Path,
NFq::TIssuesIds::INTERNAL_ERROR,
TStringBuilder{} << "unknown compression: " << *DecompressionFormat_ << ". Use one of: gzip, zstd, lz4, brotli, bzip2, xz"
);
SendError(ctx, error);
return {};
}

TStringBuilder decompressedData;
while (!decompressorBuffer->eof() && decompressedData.size() < 10_MB) {
decompressorBuffer->nextIfAtEnd();
size_t maxDecompressedChunkSize = std::min(
decompressorBuffer->available(),
10_MB - decompressedData.size()
);
TString decompressedChunk{maxDecompressedChunkSize, ' '};
decompressorBuffer->read(&decompressedChunk.front(), maxDecompressedChunkSize);
decompressedData << decompressedChunk;
}
return std::move(decompressedData);
} catch (const yexception& error) {
auto errorEv = MakeError(
request.Path,
NFq::TIssuesIds::INTERNAL_ERROR,
TStringBuilder{} << "couldn't decompress file, check compression params: " << error.what()
);
SendError(ctx, errorEv);
return {};
}
}

std::shared_ptr<arrow::io::RandomAccessFile> CleanupCsvFile(const TString& data, const TRequest& request, const arrow::csv::ParseOptions& options, const NActors::TActorContext& ctx) {
auto chunker = arrow::csv::MakeChunker(options);
std::shared_ptr<arrow::Buffer> whole, partial;
Expand Down Expand Up @@ -170,6 +239,58 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
return std::make_shared<arrow::io::BufferReader>(std::move(whole));
}

void HandleMetadataSizeRequest(const TString& data, TRequest request, const NActors::TActorContext& ctx) {
uint32_t metadataSize = arrow::BitUtil::FromLittleEndian<uint32_t>(ReadUnaligned<uint32_t>(data.data()));

if (metadataSize > 10_MB) {
auto error = MakeError(
request.Path,
NFq::TIssuesIds::INTERNAL_ERROR,
TStringBuilder{} << "couldn't load parquet metadata, size is bigger than 10MB : " << metadataSize
);
SendError(ctx, error);
return;
}

InflightRequests_.erase(request.Path);

TRequest localRequest{
.Path = request.Path,
.RequestId = TGUID::Create(),
.Requester = request.Requester,
.MetadataRequest = false,
};
RequestPartialFile(std::move(localRequest), ctx, request.From - metadataSize, request.To + 4);
}

std::shared_ptr<arrow::io::RandomAccessFile> BuildParquetFileFromMetadata(const TString& data, const TRequest& request, const NActors::TActorContext& ctx) {
auto arrowData = std::make_shared<arrow::Buffer>(nullptr, 0);
arrow::BufferBuilder builder;
auto buildRes = builder.Append(data.data(), data.size());
if (!buildRes.ok()) {
auto error = MakeError(
request.Path,
NFq::TIssuesIds::INTERNAL_ERROR,
TStringBuilder{} << "couldn't read data from S3Fetcher: " << buildRes.ToString()
);
SendError(ctx, error);
return nullptr;
}

buildRes = builder.Finish(&arrowData);
if (!buildRes.ok()) {
auto error = MakeError(
request.Path,
NFq::TIssuesIds::INTERNAL_ERROR,
TStringBuilder{} << "couldn't copy data from S3Fetcher: " << buildRes.ToString()
);
SendError(ctx, error);
return nullptr;
}

return std::make_shared<arrow::io::BufferReader>(std::move(arrowData));
}

// Utility
void SendError(const NActors::TActorContext& ctx, TEvFileError* error) {
auto requestIt = InflightRequests_.find(error->Path);
Expand All @@ -183,10 +304,11 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
// Fields
NActors::TActorId S3FetcherId_;
EFileFormat Format_;
TMaybe<TString> DecompressionFormat_;
std::unordered_map<TString, TRequest> InflightRequests_; // Path -> Request
};

NActors::IActor* CreateArrowFetchingActor(NActors::TActorId s3FetcherId, EFileFormat format) {
return new TArrowFileFetcher{s3FetcherId, format};
NActors::IActor* CreateArrowFetchingActor(NActors::TActorId s3FetcherId, EFileFormat format, const THashMap<TString, TString>& params) {
return new TArrowFileFetcher{s3FetcherId, format, params};
}
} // namespace NKikimr::NExternalSource::NObjectStorage::NInference
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@

namespace NKikimr::NExternalSource::NObjectStorage::NInference {

NActors::IActor* CreateArrowFetchingActor(NActors::TActorId s3FetcherId, EFileFormat format);
NActors::IActor* CreateArrowFetchingActor(NActors::TActorId s3FetcherId, EFileFormat format, const THashMap<TString, TString>& params);
} // namespace NKikimr::NExternalSource::NObjectStorage::NInference
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,22 @@
#include <arrow/table.h>
#include <arrow/csv/options.h>
#include <arrow/csv/reader.h>
#include <parquet/arrow/reader.h>

#include <ydb/core/external_sources/object_storage/events.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/hfunc.h>
#include <ydb/library/actors/core/log.h>
#include <ydb/public/api/protos/ydb_value.pb.h>

#define LOG_E(name, stream) \
LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::OBJECT_STORAGE_INFERENCINATOR, name << ": " << this->SelfId() << ". " << stream)
#define LOG_I(name, stream) \
LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::OBJECT_STORAGE_INFERENCINATOR, name << ": " << this->SelfId() << ". " << stream)
#define LOG_D(name, stream) \
LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::OBJECT_STORAGE_INFERENCINATOR, name << ": " << this->SelfId() << ". " << stream)
#define LOG_T(name, stream) \
LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::OBJECT_STORAGE_INFERENCINATOR, name << ": " << this->SelfId() << ". " << stream)

namespace NKikimr::NExternalSource::NObjectStorage::NInference {

Expand Down Expand Up @@ -202,12 +212,37 @@ std::variant<ArrowFields, TString> InferCsvTypes(std::shared_ptr<arrow::io::Rand
return table->fields();
}

std::variant<ArrowFields, TString> InferParquetTypes(std::shared_ptr<arrow::io::RandomAccessFile> file) {
parquet::arrow::FileReaderBuilder builder;
builder.properties(parquet::ArrowReaderProperties(false));
auto openStatus = builder.Open(std::move(file));
if (!openStatus.ok()) {
return TStringBuilder{} << "couldn't parse parquet file, check format params: " << openStatus.ToString();
}

std::unique_ptr<parquet::arrow::FileReader> reader;
auto readerStatus = builder.Build(&reader);
if (!readerStatus.ok()) {
return TStringBuilder{} << "couldn't parse parquet file, check format params: " << openStatus.ToString();
}

std::shared_ptr<arrow::Schema> schema;
auto schemaRes = reader->GetSchema(&schema);
if (!schemaRes.ok()) {
return TStringBuilder{} << "couldn't parse parquet file, check format params: " << openStatus.ToString();
}

return schema->fields();
}

std::variant<ArrowFields, TString> InferType(EFileFormat format, std::shared_ptr<arrow::io::RandomAccessFile> file, const FormatConfig& config) {
switch (format) {
case EFileFormat::CsvWithNames:
return InferCsvTypes(std::move(file), static_cast<const CsvConfig&>(config));
case EFileFormat::TsvWithNames:
return InferCsvTypes(std::move(file), static_cast<const TsvConfig&>(config));
case EFileFormat::Parquet:
return InferParquetTypes(std::move(file));
case EFileFormat::Undefined:
default:
return std::variant<ArrowFields, TString>{std::in_place_type_t<TString>{}, TStringBuilder{} << "unexpected format: " << ConvertFileFormat(format)};
Expand Down Expand Up @@ -240,7 +275,10 @@ std::unique_ptr<FormatConfig> MakeFormatConfig(EFileFormat format, const THashMa

class TArrowInferencinator : public NActors::TActorBootstrapped<TArrowInferencinator> {
public:
TArrowInferencinator(NActors::TActorId arrowFetcher, EFileFormat format, const THashMap<TString, TString>& params)
TArrowInferencinator(
NActors::TActorId arrowFetcher,
EFileFormat format,
const THashMap<TString, TString>& params)
: Format_{format}
, Config_{MakeFormatConfig(Format_, params)}
, ArrowFetcherId_{arrowFetcher}
Expand Down Expand Up @@ -270,7 +308,6 @@ class TArrowInferencinator : public NActors::TActorBootstrapped<TArrowInferencin
ctx.Send(RequesterId_, MakeErrorSchema(file.Path, NFq::TIssuesIds::INTERNAL_ERROR, std::get<TString>(mbArrowFields)));
return;
}

auto& arrowFields = std::get<ArrowFields>(mbArrowFields);
std::vector<Ydb::Column> ydbFields;
for (const auto& field : arrowFields) {
Expand All @@ -286,7 +323,7 @@ class TArrowInferencinator : public NActors::TActorBootstrapped<TArrowInferencin
}

void HandleFileError(TEvFileError::TPtr& ev, const NActors::TActorContext& ctx) {
Cout << "TArrowInferencinator::HandleFileError" << Endl;
LOG_D("TArrowInferencinator", "HandleFileError: " << ev->Get()->Issues.ToOneLineString());
ctx.Send(RequesterId_, new TEvInferredFileSchema(ev->Get()->Path, std::move(ev->Get()->Issues)));
}

Expand All @@ -297,7 +334,11 @@ class TArrowInferencinator : public NActors::TActorBootstrapped<TArrowInferencin
NActors::TActorId RequesterId_;
};

NActors::IActor* CreateArrowInferencinator(NActors::TActorId arrowFetcher, EFileFormat format, const THashMap<TString, TString>& params) {
NActors::IActor* CreateArrowInferencinator(
NActors::TActorId arrowFetcher,
EFileFormat format,
const THashMap<TString, TString>& params) {

return new TArrowInferencinator{arrowFetcher, format, params};
}
} // namespace NKikimr::NExternalSource::NObjectStorage::NInference
Loading

0 comments on commit f179f77

Please sign in to comment.