Skip to content

Commit

Permalink
[yt] Mixed gateway (#11146)
Browse files Browse the repository at this point in the history
  • Loading branch information
rvu1024 authored Oct 31, 2024
1 parent 5f71cfb commit ed7c207
Show file tree
Hide file tree
Showing 19 changed files with 732 additions and 86 deletions.
107 changes: 98 additions & 9 deletions ydb/library/yql/providers/yt/gateway/file/yql_yt_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
#include <util/system/fstat.h>
#include <util/string/split.h>
#include <util/string/builder.h>
#include <util/string/cast.h>
#include <util/folder/path.h>
#include <util/generic/yexception.h>
#include <util/generic/xrange.h>
Expand Down Expand Up @@ -189,9 +190,11 @@ class TFileTransformProvider {
const TString name(AS_VALUE(TDataLiteral, callable.GetInput(0))->AsValue().AsStringRef());
auto block = TUserDataStorage::FindUserDataBlock(UserDataBlocks, name);
MKQL_ENSURE(block, "File not found: " << name);
MKQL_ENSURE(block->Type == EUserDataType::PATH, "FilePath not supported for non-file data block, name: "
MKQL_ENSURE(block->Type == EUserDataType::PATH || block->FrozenFile, "File is not frozen, name: "
<< name << ", block type: " << block->Type);
return TProgramBuilder(env, *Services->GetFunctionRegistry()).NewDataLiteral<NUdf::EDataSlot::String>(block->Data);
return TProgramBuilder(env, *Services->GetFunctionRegistry()).NewDataLiteral<NUdf::EDataSlot::String>(
block->Type == EUserDataType::PATH ? block->Data : block->FrozenFile->GetPath().GetPath()
);
};
}

Expand All @@ -206,7 +209,7 @@ class TFileTransformProvider {
continue;
}

MKQL_ENSURE(x.second.Type == EUserDataType::PATH, "FilePath not supported for non-file data block, name: "
MKQL_ENSURE(x.second.Type == EUserDataType::PATH, "FolderPath not supported for non-file data block, name: "
<< x.first.Alias() << ", block type: " << x.second.Type);
auto newFolderPath = x.second.Data.substr(0, x.second.Data.size() - (x.first.Alias().size() - folderName.size()));
if (!folderPath) {
Expand Down Expand Up @@ -234,9 +237,8 @@ class TFileTransformProvider {
else if (block->Type == EUserDataType::RAW_INLINE_DATA) {
return pgmBuilder.NewDataLiteral<NUdf::EDataSlot::String>(block->Data);
}
else if (Services->GetFileStorage() && block->Type == EUserDataType::URL) {
auto link = Services->GetFileStorage()->PutUrl(block->Data, "");
auto content = TFileInput(link->GetPath()).ReadAll();
else if (block->FrozenFile && block->Type == EUserDataType::URL) {
auto content = TFileInput(block->FrozenFile->GetPath().GetPath()).ReadAll();
return pgmBuilder.NewDataLiteral<NUdf::EDataSlot::String>(content);
} else {
MKQL_ENSURE(false, "Unsupported block type");
Expand Down Expand Up @@ -336,6 +338,23 @@ class TFileTransformProvider {
std::shared_ptr<THashMap<TString, TRuntimeNode>> ExtraArgs;
};

template <typename TType>
static inline TType OptionFromString(const TStringBuf value) {
if constexpr (std::is_same_v<TString, TType>) {
return TString{value};
} else if constexpr (std::is_same_v<NYT::TNode, TType>) {
return NYT::NodeFromYsonString(value);
} else {
return FromString<TType>(value);
}
}

template <typename TType>
static inline const TType& NoOp(const TType& value) {
return value;
}


///////////////////////////////////////////////////////////////////////////////////////////////////////

class TYtFileGateway : public IYtGateway {
Expand Down Expand Up @@ -844,8 +863,12 @@ class TYtFileGateway : public IYtGateway {

auto publish = TYtPublish(node);

auto mode = NYql::GetSetting(publish.Settings().Ref(), EYtSettingType::Mode);
bool append = mode && FromString<EYtWriteMode>(mode->Child(1)->Content()) == EYtWriteMode::Append;
EYtWriteMode mode = EYtWriteMode::Renew;
if (const auto modeSetting = NYql::GetSetting(publish.Settings().Ref(), EYtSettingType::Mode)) {
mode = FromString<EYtWriteMode>(modeSetting->Child(1)->Content());
}

bool append = mode == EYtWriteMode::Append;
auto cluster = TString{publish.DataSink().Cluster().Value()};

bool isAnonymous = NYql::HasSetting(publish.Publish().Settings().Ref(), EYtSettingType::Anonymous);
Expand Down Expand Up @@ -939,9 +962,65 @@ class TYtFileGateway : public IYtGateway {
columnGroupsSpec = NYT::NodeFromYsonString(setting->Tail().Content());
}
}
if (!append || !attrs.HasKey("schema") || !columnGroupsSpec.IsUndefined()) {
if (!append || !attrs.HasKey("schema") || !columnGroupsSpec.IsUndefined() || dstRowSpec->IsSorted()) {
attrs["schema"] = RowSpecToYTSchema(spec[YqlRowSpecAttribute], nativeYtTypeCompatibility, columnGroupsSpec).ToNode();
}

if (EYtWriteMode::Renew == mode || EYtWriteMode::RenewKeepMeta == mode) {
bool isTimestamp = false, isDuration = false;
TInstant stamp;
TDuration duration;
if (auto e = NYql::GetSetting(publish.Settings().Ref(), EYtSettingType::Expiration)) {
isDuration = TDuration::TryParse(e->Tail().Content(), duration);
if (!isDuration) {
isTimestamp = TInstant::TryParseIso8601(e->Tail().Content(), stamp);
}
}
const TMaybe<TInstant> deadline = options.Config()->ExpirationDeadline.Get(cluster);
const TMaybe<TDuration> interval = options.Config()->ExpirationInterval.Get(cluster);
if (deadline || isTimestamp) {
attrs["expiration_time"] = isTimestamp ? stamp.ToStringUpToSeconds() : deadline->ToStringUpToSeconds();
}
if (interval || isDuration) {
attrs["expiration_timeout"] = isDuration ? duration.MilliSeconds() : interval->MilliSeconds();
}
if (options.Config()->NightlyCompress.Get(cluster).GetOrElse(false)) {
attrs["force_nightly_compress"] = true;
}
}

#define HANDLE_OPT(name, attr, conv) \
auto dst##name = isAnonymous \
? options.Config()->Temporary##name.Get(cluster) \
: options.Config()->Published##name.Get(cluster); \
if (auto s = NYql::GetSetting(publish.Settings().Ref(), EYtSettingType::name)) { \
dst##name = OptionFromString<decltype(dst##name)::value_type>(s->Tail().Content()); \
} \
if (dst##name && dst##name != options.Config()->Temporary##name.Get(cluster)) { \
attrs[attr] = conv(*dst##name); \
}

HANDLE_OPT(CompressionCodec, "compression_codec", NoOp);
HANDLE_OPT(ErasureCodec, "erasure_codec", ToString);
HANDLE_OPT(ReplicationFactor, "replication_factor", static_cast<i64>);
HANDLE_OPT(Media, "media", NoOp);
HANDLE_OPT(PrimaryMedium, "primary_medium", NoOp);
#undef DEFINE_OPT

if (auto optimizeFor = options.Config()->OptimizeFor.Get(cluster)) {
if (dstRowSpec->GetType()->GetSize()) {
attrs["optimize_for"] = ToString(*optimizeFor);
}
}

if (auto ua = NYql::GetSetting(publish.Settings().Ref(), EYtSettingType::UserAttrs)) {
const NYT::TNode mapNode = NYT::NodeFromYsonString(ua->Tail().Content());
const auto& map = mapNode.AsMap();
for (auto it = map.cbegin(); it != map.cend(); ++it) {
attrs[it->first] = it->second;
}
}

TOFStream ofAttr(destFilePath + ".attr");
ofAttr.Write(NYT::NodeToYsonString(attrs, NYson::EYsonFormat::Pretty));
}
Expand Down Expand Up @@ -1037,6 +1116,16 @@ class TYtFileGateway : public IYtGateway {
return res;
}

TFuture<TDownloadTablesResult> DownloadTables(TDownloadTablesOptions&& options) final {
Y_UNUSED(options);
return MakeFuture<TDownloadTablesResult>();
}

TFuture<TUploadTableResult> UploadTable(TUploadTableOptions&& options) final {
Y_UNUSED(options);
return MakeFuture<TUploadTableResult>();
}

TFullResultTableResult PrepareFullResultTable(TFullResultTableOptions&& options) final {
try {
TString cluster = options.Cluster();
Expand Down
22 changes: 22 additions & 0 deletions ydb/library/yql/providers/yt/gateway/mixed/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
LIBRARY()

SRCS(
yql_yt_mixed.cpp
)

PEERDIR(
ydb/library/yql/utils/log
ydb/library/yql/providers/yt/provider
ydb/library/yql/providers/yt/gateway/file
ydb/library/yql/providers/yt/gateway/native
ydb/library/yql/providers/yt/gateway/lib
ydb/library/yql/providers/yt/common
ydb/library/yql/providers/common/provider

library/cpp/threading/future
library/cpp/yson/node
)

YQL_LAST_ABI_VERSION()

END()
Loading

0 comments on commit ed7c207

Please sign in to comment.