Skip to content

Commit

Permalink
YQ-3184 fixed CTAS for s3 sources (#6159)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Jul 4, 2024
1 parent 4d68f9b commit caa19d5
Show file tree
Hide file tree
Showing 9 changed files with 218 additions and 22 deletions.
5 changes: 3 additions & 2 deletions ydb/core/kqp/host/kqp_statement_rewrite.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,10 @@ namespace {
const TString createTableName = !isAtomicOperation
? tableName
: (TStringBuilder()
<< "/Root/.tmp/sessions/"
<< CanonizePath(AppData()->TenantName)
<< "/.tmp/sessions/"
<< sessionCtx->GetSessionId()
<< tmpTableName);
<< CanonizePath(tmpTableName));

create = exprCtx.ReplaceNode(std::move(create), *columns, exprCtx.NewList(pos, std::move(columnNodes)));

Expand Down
7 changes: 5 additions & 2 deletions ydb/core/kqp/ut/federated_query/common/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ namespace NKikimr::NKqp::NFederatedQueryTest {
NYql::NConnector::IClient::TPtr connectorClient,
NYql::IDatabaseAsyncResolver::TPtr databaseAsyncResolver,
std::optional<NKikimrConfig::TAppConfig> appConfig,
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory)
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory,
const TString& domainRoot)
{
NKikimrConfig::TFeatureFlags featureFlags;
featureFlags.SetEnableExternalDataSources(true);
Expand Down Expand Up @@ -53,7 +54,9 @@ namespace NKikimr::NKqp::NFederatedQueryTest {
.SetFeatureFlags(featureFlags)
.SetFederatedQuerySetupFactory(federatedQuerySetupFactory)
.SetKqpSettings({})
.SetS3ActorsFactory(std::move(s3ActorsFactory));
.SetS3ActorsFactory(std::move(s3ActorsFactory))
.SetWithSampleTables(false)
.SetDomainRoot(domainRoot);

settings = settings.SetAppConfig(appConfig.value());

Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/ut/federated_query/common/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ namespace NKikimr::NKqp::NFederatedQueryTest {
NYql::NConnector::IClient::TPtr connectorClient = nullptr,
NYql::IDatabaseAsyncResolver::TPtr databaseAsyncResolver = nullptr,
std::optional<NKikimrConfig::TAppConfig> appConfig = std::nullopt,
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory = nullptr);
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory = nullptr,
const TString& domainRoot = "Root");
}
169 changes: 169 additions & 0 deletions ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1792,6 +1792,175 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
Y_UNIT_TEST(ExecuteScriptWithThinFile) {
ExecuteSelectQuery("test_bucket_execute_script_with_large_file", 5_MB, 500000);
}

std::shared_ptr<TKikimrRunner> CreateSampleDataSource(const TString& externalDataSourceName, const TString& externalTableName) {
const TString bucket = "test_bucket3";
const TString object = "test_object";

NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);
appConfig.MutableTableServiceConfig()->SetEnableCreateTableAs(true);
appConfig.MutableTableServiceConfig()->SetEnablePerStatementQueryExecution(true);
appConfig.MutableFeatureFlags()->SetEnableTempTables(true);
auto kikimr = NTestUtils::MakeKikimrRunner(appConfig, "TestDomain");

CreateBucketWithObject(bucket, "test_object", TEST_CONTENT);

auto tc = kikimr->GetTableClient();
auto session = tc.CreateSession().GetValueSync().GetSession();
const TString query = fmt::format(R"(
CREATE EXTERNAL DATA SOURCE `{external_source}` WITH (
SOURCE_TYPE="ObjectStorage",
LOCATION="{location}",
AUTH_METHOD="NONE"
);
CREATE EXTERNAL TABLE `{external_table}` (
key Utf8 NOT NULL,
value Utf8 NOT NULL
) WITH (
DATA_SOURCE="{external_source}",
LOCATION="{object}",
FORMAT="json_each_row"
);)",
"external_source"_a = externalDataSourceName,
"external_table"_a = externalTableName,
"location"_a = GetBucketLocation(bucket),
"object"_a = object
);
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());

return kikimr;
}

void ValidateResult(const TExecuteQueryResult& result) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL_C(result.GetResultSets().size(), 1, "Unexpected result sets count");

TResultSetParser resultSet(result.GetResultSet(0));
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2);
UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 2);

UNIT_ASSERT(resultSet.TryNextRow());
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1");
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo");

UNIT_ASSERT(resultSet.TryNextRow());
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2");
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world");

}

void ValidateTables(TQueryClient& client, const TString& oltpTable, const TString& olapTable) {
{
const TString query = TStringBuilder() << "SELECT Unwrap(key), Unwrap(value) FROM `" << oltpTable << "`;";
ValidateResult(client.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync());
}

{
const TString query = TStringBuilder() << "SELECT key, value FROM `" << olapTable << "` ORDER BY key;";
ValidateResult(client.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync());
}
}

Y_UNIT_TEST(CreateTableAsSelectFromExternalDataSource) {
const TString externalDataSourceName = "external_data_source";
const TString externalTableName = "test_binding_resolve";

auto kikimr = CreateSampleDataSource(externalDataSourceName, externalTableName);
auto client = kikimr->GetQueryClient();

const TString oltpTable = "DestinationOltp";
{
const TString query = fmt::format(R"(
PRAGMA TablePathPrefix = "TestDomain";
CREATE TABLE `{destination}` (
PRIMARY KEY (key, value)
)
AS SELECT *
FROM `{external_source}`.`/` WITH (
format="json_each_row",
schema(
key Utf8 NOT NULL,
value Utf8 NOT NULL
)
);)",
"destination"_a = oltpTable,
"external_source"_a = externalDataSourceName
);
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

const TString olapTable = "DestinationOlap";
{
const TString query = fmt::format(R"(
PRAGMA TablePathPrefix = "TestDomain";
CREATE TABLE `{destination}` (
PRIMARY KEY (key, value)
)
WITH (STORE = COLUMN)
AS SELECT *
FROM `{external_source}`.`/` WITH (
format="json_each_row",
schema(
key Utf8 NOT NULL,
value Utf8 NOT NULL
)
);)",
"destination"_a = olapTable,
"external_source"_a = externalDataSourceName
);
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

ValidateTables(client, oltpTable, olapTable);
}

Y_UNIT_TEST(CreateTableAsSelectFromExternalTable) {
const TString externalDataSourceName = "external_data_source";
const TString externalTableName = "test_binding_resolve";

auto kikimr = CreateSampleDataSource(externalDataSourceName, externalTableName);
auto client = kikimr->GetQueryClient();

const TString oltpTable = "DestinationOltp";
{
const TString query = fmt::format(R"(
PRAGMA TablePathPrefix = "TestDomain";
CREATE TABLE `{destination}` (
PRIMARY KEY (key, value)
)
AS SELECT *
FROM `{external_table}`;)",
"destination"_a = oltpTable,
"external_table"_a = externalTableName
);
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

const TString olapTable = "DestinationOlap";
{
const TString query = fmt::format(R"(
PRAGMA TablePathPrefix = "TestDomain";
CREATE TABLE `{destination}` (
PRIMARY KEY (key, value)
)
WITH (STORE = COLUMN)
AS SELECT *
FROM `{external_table}`;)",
"destination"_a = olapTable,
"external_table"_a = externalTableName
);
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

ValidateTables(client, oltpTable, olapTable);
}
}

} // namespace NKikimr::NKqp
4 changes: 2 additions & 2 deletions ydb/core/kqp/ut/federated_query/s3/s3_recipe_ut_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ namespace NTestUtils {

extern const TString TEST_SCHEMA_IDS = R"(["StructType";[["key";["DataType";"Utf8";];];];])";

std::shared_ptr<NKikimr::NKqp::TKikimrRunner> MakeKikimrRunner(std::optional<NKikimrConfig::TAppConfig> appConfig) {
return NKikimr::NKqp::NFederatedQueryTest::MakeKikimrRunner(true, nullptr, nullptr, appConfig, NYql::NDq::CreateS3ActorsFactory());
std::shared_ptr<NKikimr::NKqp::TKikimrRunner> MakeKikimrRunner(std::optional<NKikimrConfig::TAppConfig> appConfig, const TString& domainRoot) {
return NKikimr::NKqp::NFederatedQueryTest::MakeKikimrRunner(true, nullptr, nullptr, appConfig, NYql::NDq::CreateS3ActorsFactory(), domainRoot);
}

Aws::S3::S3Client MakeS3Client() {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/ut/federated_query/s3/s3_recipe_ut_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace NTestUtils {
extern const TString TEST_SCHEMA;
extern const TString TEST_SCHEMA_IDS;

std::shared_ptr<NKikimr::NKqp::TKikimrRunner> MakeKikimrRunner(std::optional<NKikimrConfig::TAppConfig> appConfig = std::nullopt);
std::shared_ptr<NKikimr::NKqp::TKikimrRunner> MakeKikimrRunner(std::optional<NKikimrConfig::TAppConfig> appConfig = std::nullopt, const TString& domainRoot = "Root");

Aws::S3::S3Client MakeS3Client();

Expand Down
1 change: 1 addition & 0 deletions ydb/core/testlib/basics/feature_flags.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class TTestFeatureFlagsHolder {
FEATURE_FLAG_SETTER(EnableTopicDiskSubDomainQuota)
FEATURE_FLAG_SETTER(EnablePQConfigTransactionsAtSchemeShard)
FEATURE_FLAG_SETTER(EnableScriptExecutionOperations)
FEATURE_FLAG_SETTER(EnableExternalDataSources)
FEATURE_FLAG_SETTER(EnableForceImmediateEffectsExecution)
FEATURE_FLAG_SETTER(EnableTopicSplitMerge)
FEATURE_FLAG_SETTER(EnableTempTables)
Expand Down
1 change: 0 additions & 1 deletion ydb/tests/tools/kqprun/configuration/app_config.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ FeatureFlags {
EnableExternalDataSources: true
EnableScriptExecutionOperations: true
EnableExternalSourceSchemaInference: true
EnableResourcePools: true
}

KQPConfig {
Expand Down
48 changes: 35 additions & 13 deletions ydb/tests/tools/kqprun/kqprun.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,9 @@ struct TExecutionOptions {
};


void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunnerOptions& runnerOptions) {
void RunArgumentQueries(const TExecutionOptions& executionOptions, NKqpRun::TKqpRunner& runner) {
NColorizer::TColors colors = NColorizer::AutoColors(Cout);

Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Initialization of kqp runner..." << colors.Default() << Endl;
NKqpRun::TKqpRunner runner(runnerOptions);

if (executionOptions.SchemeQuery) {
Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Executing scheme query..." << colors.Default() << Endl;
if (!runner.ExecuteSchemeQuery(executionOptions.SchemeQuery, executionOptions.TraceId)) {
Expand Down Expand Up @@ -136,20 +133,45 @@ void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunner
ythrow yexception() << "Failed to print script results, reason:\n" << CurrentExceptionMessage();
}
}
}

if (runnerOptions.YdbSettings.MonitoringEnabled) {
Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Started reading commands" << colors.Default() << Endl;
while (true) {
TString command;
Cin >> command;

if (command == "exit") {
break;
}
Cerr << colors.Red() << TInstant::Now().ToIsoStringLocal() << " Invalid command '" << command << "'" << colors.Default() << Endl;
void RunAsDaemon() {
NColorizer::TColors colors = NColorizer::AutoColors(Cout);

Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Started reading commands" << colors.Default() << Endl;
while (true) {
TString command;
Cin >> command;

if (command == "exit") {
break;
}
Cerr << colors.Red() << TInstant::Now().ToIsoStringLocal() << " Invalid command '" << command << "'" << colors.Default() << Endl;
}
}


void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunnerOptions& runnerOptions) {
NColorizer::TColors colors = NColorizer::AutoColors(Cout);

Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Initialization of kqp runner..." << colors.Default() << Endl;
NKqpRun::TKqpRunner runner(runnerOptions);

try {
RunArgumentQueries(executionOptions, runner);
} catch (const yexception& exception) {
if (runnerOptions.YdbSettings.MonitoringEnabled) {
Cerr << colors.Red() << CurrentExceptionMessage() << colors.Default() << Endl;
} else {
throw exception;
}
}

if (runnerOptions.YdbSettings.MonitoringEnabled) {
RunAsDaemon();
}

Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Finalization of kqp runner..." << colors.Default() << Endl;
}

Expand Down

0 comments on commit caa19d5

Please sign in to comment.