Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQ-3184 fixed CTAS for s3 sources #6159

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions ydb/core/kqp/host/kqp_statement_rewrite.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,10 @@ namespace {
const TString createTableName = !isAtomicOperation
? tableName
: (TStringBuilder()
<< "/Root/.tmp/sessions/"
<< CanonizePath(AppData()->TenantName)
<< "/.tmp/sessions/"
<< sessionCtx->GetSessionId()
<< tmpTableName);
<< CanonizePath(tmpTableName));

create = exprCtx.ReplaceNode(std::move(create), *columns, exprCtx.NewList(pos, std::move(columnNodes)));

Expand Down
7 changes: 5 additions & 2 deletions ydb/core/kqp/ut/federated_query/common/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ namespace NKikimr::NKqp::NFederatedQueryTest {
NYql::NConnector::IClient::TPtr connectorClient,
NYql::IDatabaseAsyncResolver::TPtr databaseAsyncResolver,
std::optional<NKikimrConfig::TAppConfig> appConfig,
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory)
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory,
const TString& domainRoot)
{
NKikimrConfig::TFeatureFlags featureFlags;
featureFlags.SetEnableExternalDataSources(true);
Expand Down Expand Up @@ -53,7 +54,9 @@ namespace NKikimr::NKqp::NFederatedQueryTest {
.SetFeatureFlags(featureFlags)
.SetFederatedQuerySetupFactory(federatedQuerySetupFactory)
.SetKqpSettings({})
.SetS3ActorsFactory(std::move(s3ActorsFactory));
.SetS3ActorsFactory(std::move(s3ActorsFactory))
.SetWithSampleTables(false)
.SetDomainRoot(domainRoot);

settings = settings.SetAppConfig(appConfig.value());

Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/ut/federated_query/common/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ namespace NKikimr::NKqp::NFederatedQueryTest {
NYql::NConnector::IClient::TPtr connectorClient = nullptr,
NYql::IDatabaseAsyncResolver::TPtr databaseAsyncResolver = nullptr,
std::optional<NKikimrConfig::TAppConfig> appConfig = std::nullopt,
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory = nullptr);
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory = nullptr,
const TString& domainRoot = "Root");
}
169 changes: 169 additions & 0 deletions ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1792,6 +1792,175 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
Y_UNIT_TEST(ExecuteScriptWithThinFile) {
ExecuteSelectQuery("test_bucket_execute_script_with_large_file", 5_MB, 500000);
}

std::shared_ptr<TKikimrRunner> CreateSampleDataSource(const TString& externalDataSourceName, const TString& externalTableName) {
const TString bucket = "test_bucket3";
const TString object = "test_object";

NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);
appConfig.MutableTableServiceConfig()->SetEnableCreateTableAs(true);
appConfig.MutableTableServiceConfig()->SetEnablePerStatementQueryExecution(true);
appConfig.MutableFeatureFlags()->SetEnableTempTables(true);
auto kikimr = NTestUtils::MakeKikimrRunner(appConfig, "TestDomain");

CreateBucketWithObject(bucket, "test_object", TEST_CONTENT);

auto tc = kikimr->GetTableClient();
auto session = tc.CreateSession().GetValueSync().GetSession();
const TString query = fmt::format(R"(
CREATE EXTERNAL DATA SOURCE `{external_source}` WITH (
SOURCE_TYPE="ObjectStorage",
LOCATION="{location}",
AUTH_METHOD="NONE"
);
CREATE EXTERNAL TABLE `{external_table}` (
key Utf8 NOT NULL,
value Utf8 NOT NULL
) WITH (
DATA_SOURCE="{external_source}",
LOCATION="{object}",
FORMAT="json_each_row"
);)",
"external_source"_a = externalDataSourceName,
"external_table"_a = externalTableName,
"location"_a = GetBucketLocation(bucket),
"object"_a = object
);
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());

return kikimr;
}

void ValidateResult(const TExecuteQueryResult& result) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL_C(result.GetResultSets().size(), 1, "Unexpected result sets count");

TResultSetParser resultSet(result.GetResultSet(0));
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2);
UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 2);

UNIT_ASSERT(resultSet.TryNextRow());
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1");
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo");

UNIT_ASSERT(resultSet.TryNextRow());
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2");
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world");

}

void ValidateTables(TQueryClient& client, const TString& oltpTable, const TString& olapTable) {
{
const TString query = TStringBuilder() << "SELECT Unwrap(key), Unwrap(value) FROM `" << oltpTable << "`;";
ValidateResult(client.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync());
}

{
const TString query = TStringBuilder() << "SELECT key, value FROM `" << olapTable << "` ORDER BY key;";
ValidateResult(client.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync());
}
}

Y_UNIT_TEST(CreateTableAsSelectFromExternalDataSource) {
const TString externalDataSourceName = "external_data_source";
const TString externalTableName = "test_binding_resolve";

auto kikimr = CreateSampleDataSource(externalDataSourceName, externalTableName);
auto client = kikimr->GetQueryClient();

const TString oltpTable = "DestinationOltp";
{
const TString query = fmt::format(R"(
PRAGMA TablePathPrefix = "TestDomain";
CREATE TABLE `{destination}` (
PRIMARY KEY (key, value)
)
AS SELECT *
FROM `{external_source}`.`/` WITH (
format="json_each_row",
schema(
key Utf8 NOT NULL,
value Utf8 NOT NULL
)
);)",
"destination"_a = oltpTable,
"external_source"_a = externalDataSourceName
);
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

const TString olapTable = "DestinationOlap";
{
const TString query = fmt::format(R"(
PRAGMA TablePathPrefix = "TestDomain";
CREATE TABLE `{destination}` (
PRIMARY KEY (key, value)
)
WITH (STORE = COLUMN)
AS SELECT *
FROM `{external_source}`.`/` WITH (
format="json_each_row",
schema(
key Utf8 NOT NULL,
value Utf8 NOT NULL
)
);)",
"destination"_a = olapTable,
"external_source"_a = externalDataSourceName
);
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

ValidateTables(client, oltpTable, olapTable);
}

Y_UNIT_TEST(CreateTableAsSelectFromExternalTable) {
const TString externalDataSourceName = "external_data_source";
const TString externalTableName = "test_binding_resolve";

auto kikimr = CreateSampleDataSource(externalDataSourceName, externalTableName);
auto client = kikimr->GetQueryClient();

const TString oltpTable = "DestinationOltp";
{
const TString query = fmt::format(R"(
PRAGMA TablePathPrefix = "TestDomain";
CREATE TABLE `{destination}` (
PRIMARY KEY (key, value)
)
AS SELECT *
FROM `{external_table}`;)",
"destination"_a = oltpTable,
"external_table"_a = externalTableName
);
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

const TString olapTable = "DestinationOlap";
{
const TString query = fmt::format(R"(
PRAGMA TablePathPrefix = "TestDomain";
CREATE TABLE `{destination}` (
PRIMARY KEY (key, value)
)
WITH (STORE = COLUMN)
AS SELECT *
FROM `{external_table}`;)",
"destination"_a = olapTable,
"external_table"_a = externalTableName
);
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

ValidateTables(client, oltpTable, olapTable);
}
}

} // namespace NKikimr::NKqp
4 changes: 2 additions & 2 deletions ydb/core/kqp/ut/federated_query/s3/s3_recipe_ut_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ namespace NTestUtils {

extern const TString TEST_SCHEMA_IDS = R"(["StructType";[["key";["DataType";"Utf8";];];];])";

std::shared_ptr<NKikimr::NKqp::TKikimrRunner> MakeKikimrRunner(std::optional<NKikimrConfig::TAppConfig> appConfig) {
return NKikimr::NKqp::NFederatedQueryTest::MakeKikimrRunner(true, nullptr, nullptr, appConfig, NYql::NDq::CreateS3ActorsFactory());
std::shared_ptr<NKikimr::NKqp::TKikimrRunner> MakeKikimrRunner(std::optional<NKikimrConfig::TAppConfig> appConfig, const TString& domainRoot) {
return NKikimr::NKqp::NFederatedQueryTest::MakeKikimrRunner(true, nullptr, nullptr, appConfig, NYql::NDq::CreateS3ActorsFactory(), domainRoot);
}

Aws::S3::S3Client MakeS3Client() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace NTestUtils {
extern const TString TEST_SCHEMA;
extern const TString TEST_SCHEMA_IDS;

std::shared_ptr<NKikimr::NKqp::TKikimrRunner> MakeKikimrRunner(std::optional<NKikimrConfig::TAppConfig> appConfig = std::nullopt);
std::shared_ptr<NKikimr::NKqp::TKikimrRunner> MakeKikimrRunner(std::optional<NKikimrConfig::TAppConfig> appConfig = std::nullopt, const TString& domainRoot = "Root");

Aws::S3::S3Client MakeS3Client();

Expand Down
1 change: 1 addition & 0 deletions ydb/core/testlib/basics/feature_flags.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class TTestFeatureFlagsHolder {
FEATURE_FLAG_SETTER(EnableTopicDiskSubDomainQuota)
FEATURE_FLAG_SETTER(EnablePQConfigTransactionsAtSchemeShard)
FEATURE_FLAG_SETTER(EnableScriptExecutionOperations)
FEATURE_FLAG_SETTER(EnableExternalDataSources)
FEATURE_FLAG_SETTER(EnableForceImmediateEffectsExecution)
FEATURE_FLAG_SETTER(EnableTopicSplitMerge)
FEATURE_FLAG_SETTER(EnableTempTables)
Expand Down
1 change: 0 additions & 1 deletion ydb/tests/tools/kqprun/configuration/app_config.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ FeatureFlags {
EnableExternalDataSources: true
EnableScriptExecutionOperations: true
EnableExternalSourceSchemaInference: true
EnableResourcePools: true
}

KQPConfig {
Expand Down
48 changes: 35 additions & 13 deletions ydb/tests/tools/kqprun/kqprun.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,9 @@ struct TExecutionOptions {
};


void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunnerOptions& runnerOptions) {
void RunArgumentQueries(const TExecutionOptions& executionOptions, NKqpRun::TKqpRunner& runner) {
NColorizer::TColors colors = NColorizer::AutoColors(Cout);

Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Initialization of kqp runner..." << colors.Default() << Endl;
NKqpRun::TKqpRunner runner(runnerOptions);

if (executionOptions.SchemeQuery) {
Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Executing scheme query..." << colors.Default() << Endl;
if (!runner.ExecuteSchemeQuery(executionOptions.SchemeQuery, executionOptions.TraceId)) {
Expand Down Expand Up @@ -136,20 +133,45 @@ void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunner
ythrow yexception() << "Failed to print script results, reason:\n" << CurrentExceptionMessage();
}
}
}

if (runnerOptions.YdbSettings.MonitoringEnabled) {
Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Started reading commands" << colors.Default() << Endl;
while (true) {
TString command;
Cin >> command;

if (command == "exit") {
break;
}
Cerr << colors.Red() << TInstant::Now().ToIsoStringLocal() << " Invalid command '" << command << "'" << colors.Default() << Endl;
void RunAsDaemon() {
NColorizer::TColors colors = NColorizer::AutoColors(Cout);

Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Started reading commands" << colors.Default() << Endl;
while (true) {
TString command;
Cin >> command;

if (command == "exit") {
break;
}
Cerr << colors.Red() << TInstant::Now().ToIsoStringLocal() << " Invalid command '" << command << "'" << colors.Default() << Endl;
}
}


void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunnerOptions& runnerOptions) {
NColorizer::TColors colors = NColorizer::AutoColors(Cout);

Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Initialization of kqp runner..." << colors.Default() << Endl;
NKqpRun::TKqpRunner runner(runnerOptions);

try {
RunArgumentQueries(executionOptions, runner);
} catch (const yexception& exception) {
if (runnerOptions.YdbSettings.MonitoringEnabled) {
Cerr << colors.Red() << CurrentExceptionMessage() << colors.Default() << Endl;
} else {
throw exception;
}
}

if (runnerOptions.YdbSettings.MonitoringEnabled) {
RunAsDaemon();
}

Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Finalization of kqp runner..." << colors.Default() << Endl;
}

Expand Down
Loading