diff --git a/.github/scripts/tests/transform-ya-junit.py.orig b/.github/scripts/tests/transform-ya-junit.py.orig new file mode 100644 index 000000000000..569e229cc714 --- /dev/null +++ b/.github/scripts/tests/transform-ya-junit.py.orig @@ -0,0 +1,283 @@ +#!/usr/bin/env python3 +import argparse +import re +import json +import os +import sys +import urllib.parse +import zipfile +<<<<<<< HEAD +from typing import Set +======= +>>>>>>> stable-24-1 +from xml.etree import ElementTree as ET +from mute_utils import mute_target, pattern_to_re +from junit_utils import add_junit_link_property, is_faulty_testcase + + +def log_print(*args, **kwargs): + print(*args, file=sys.stderr, **kwargs) + + +class YaMuteCheck: + def __init__(self): + self.regexps = set() + self.regexps = [] + + def load(self, fn): + with open(fn, "r") as fp: + for line in fp: + line = line.strip() + try: + testsuite, testcase = line.split(" ", maxsplit=1) + except ValueError: + log_print(f"SKIP INVALID MUTE CONFIG LINE: {line!r}") + continue + self.populate(testsuite, testcase) + + def populate(self, testsuite, testcase): + check = [] + + for p in (pattern_to_re(testsuite), pattern_to_re(testcase)): + try: + check.append(re.compile(p)) + except re.error: + log_print(f"Unable to compile regex {p!r}") + return + + self.regexps.append(tuple(check)) + + def __call__(self, suite_name, test_name): + for ps, pt in self.regexps: + if ps.match(suite_name) and pt.match(test_name): + return True + return False + + +class YTestReportTrace: + def __init__(self, out_root): + self.out_root = out_root + self.traces = {} +<<<<<<< HEAD + self.logs_dir = set() +======= + self.logs_dir = None +>>>>>>> stable-24-1 + + def abs_path(self, path): + return path.replace("$(BUILD_ROOT)", self.out_root) + + def load(self, subdir): + test_results_dir = os.path.join(self.out_root, f"{subdir}/test-results/") + + if not os.path.isdir(test_results_dir): + log_print(f"Directory {test_results_dir} doesn't exist") + return + + # find the test result + for folder in os.listdir(test_results_dir): + fn = os.path.join(self.out_root, test_results_dir, folder, "ytest.report.trace") + + if not os.path.isfile(fn): + continue + + with open(fn, "r") as fp: + for line in fp: + event = json.loads(line.strip()) + if event["name"] == "subtest-finished": + event = event["value"] + cls = event["class"] + subtest = event["subtest"] + cls = cls.replace("::", ".") + self.traces[(cls, subtest)] = event + logs_dir = self.abs_path(event['logs']['logsdir']) +<<<<<<< HEAD + self.logs_dir.add(logs_dir) +======= + self.logs_dir = logs_dir + break +>>>>>>> stable-24-1 + + def has(self, cls, name): + return (cls, name) in self.traces + + def get_logs(self, cls, name): + trace = self.traces.get((cls, name)) + + if not trace: + return {} + + logs = trace["logs"] + + result = {} + for k, path in logs.items(): + if k == "logsdir": + continue + + result[k] = self.abs_path(path) + + return result + + +def filter_empty_logs(logs): + result = {} + for k, v in logs.items(): + if not os.path.isfile(v) or os.stat(v).st_size == 0: + continue + result[k] = v + return result + + +def save_log(build_root, fn, out_dir, log_url_prefix, trunc_size): + fpath = os.path.relpath(fn, build_root) + + if out_dir is not None: + out_fn = os.path.join(out_dir, fpath) + fsize = os.stat(fn).st_size + + out_fn_dir = os.path.dirname(out_fn) + + if not os.path.isdir(out_fn_dir): + os.makedirs(out_fn_dir, 0o700) + + if trunc_size and fsize > trunc_size: + with open(fn, "rb") as in_fp: + in_fp.seek(fsize - trunc_size) + log_print(f"truncate {out_fn} to {trunc_size}") + with open(out_fn, "wb") as out_fp: + while 1: + buf = in_fp.read(8192) + if not buf: + break + out_fp.write(buf) + else: + os.symlink(fn, out_fn) + quoted_fpath = urllib.parse.quote(fpath) + return f"{log_url_prefix}{quoted_fpath}" + + +<<<<<<< HEAD +def save_zip(suite_name, out_dir, url_prefix, logs_dir: Set[str]): +======= +def save_zip(suite_name, out_dir, url_prefix, logs_dir): +>>>>>>> stable-24-1 + arc_name = f"{suite_name.replace('/', '-')}.zip" + + arc_fn = os.path.join(out_dir, arc_name) + + zf = zipfile.ZipFile(arc_fn, mode="w", compression=zipfile.ZIP_DEFLATED, compresslevel=9) + +<<<<<<< HEAD + for path in logs_dir: + # path is .../test-results/black/testing_out_stuff + log_print(f"put {path} into {arc_name}") + test_type = os.path.basename(os.path.dirname(path)) + for root, dirs, files in os.walk(path): + for f in files: + filename = os.path.join(root, f) + zf.write(filename, os.path.join(test_type, os.path.relpath(filename, path))) +======= + log_print(f"put {logs_dir} into {arc_name}") + for root, dirs, files in os.walk(logs_dir): + for f in files: + filename = os.path.join(root, f) + zf.write(filename, os.path.relpath(filename, logs_dir)) +>>>>>>> stable-24-1 + zf.close() + + quoted_fpath = urllib.parse.quote(arc_name) + return f"{url_prefix}{quoted_fpath}" + + +def transform(fp, mute_check: YaMuteCheck, ya_out_dir, save_inplace, log_url_prefix, log_out_dir, log_trunc_size, + test_stuff_out, test_stuff_prefix): + tree = ET.parse(fp) + root = tree.getroot() + + for suite in root.findall("testsuite"): + suite_name = suite.get("name") + traces = YTestReportTrace(ya_out_dir) + traces.load(suite_name) + + has_fail_tests = False + + for case in suite.findall("testcase"): + test_name = case.get("name") + case.set("classname", suite_name) + + is_fail = is_faulty_testcase(case) + has_fail_tests |= is_fail + + if mute_check(suite_name, test_name): + log_print("mute", suite_name, test_name) + mute_target(case) + + if is_fail and "." in test_name: + test_cls, test_method = test_name.rsplit(".", maxsplit=1) + logs = filter_empty_logs(traces.get_logs(test_cls, test_method)) + + if logs: + log_print(f"add {list(logs.keys())!r} properties for {test_cls}.{test_method}") + for name, fn in logs.items(): + url = save_log(ya_out_dir, fn, log_out_dir, log_url_prefix, log_trunc_size) + add_junit_link_property(case, name, url) + + if has_fail_tests: + if not traces.logs_dir: + log_print(f"no logsdir for {suite_name}") + continue + + url = save_zip(suite_name, test_stuff_out, test_stuff_prefix, traces.logs_dir) + + for case in suite.findall("testcase"): + add_junit_link_property(case, 'logsdir', url) + + if save_inplace: + tree.write(fp.name) + else: + ET.indent(root) + print(ET.tostring(root, encoding="unicode")) + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument( + "-i", action="store_true", dest="save_inplace", default=False, help="modify input file in-place" + ) + parser.add_argument("-m", help="muted test list") + parser.add_argument("--log-url-prefix", default="./", help="url prefix for logs") + parser.add_argument("--log-out-dir", help="symlink logs to specific directory") + parser.add_argument( + "--log-truncate-size", + dest="log_trunc_size", + type=int, + default=134217728, + help="truncate log after specific size, 0 disables truncation", + ) + parser.add_argument("--ya-out", help="ya make output dir (for searching logs and artifacts)") + parser.add_argument('--test-stuff-out', help='output folder for archive testing_out_stuff') + parser.add_argument('--test-stuff-prefix', help='url prefix for testing_out_stuff') + parser.add_argument("in_file", type=argparse.FileType("r")) + + args = parser.parse_args() + + mute_check = YaMuteCheck() + + if args.m: + mute_check.load(args.m) + + transform( + args.in_file, + mute_check, + args.ya_out, + args.save_inplace, + args.log_url_prefix, + args.log_out_dir, + args.log_trunc_size, + args.test_stuff_out, + args.test_stuff_prefix, + ) + + +if __name__ == "__main__": + main() diff --git a/.github/workflows/pr_check.yml.orig b/.github/workflows/pr_check.yml.orig new file mode 100644 index 000000000000..86ffa113425a --- /dev/null +++ b/.github/workflows/pr_check.yml.orig @@ -0,0 +1,227 @@ +name: PR-check +on: + pull_request_target: + branches: + - 'main' + - 'stable-*' + - 'stream-nb-*' +<<<<<<< HEAD + - '*-stable-*' +======= +>>>>>>> stable-24-1 + paths-ignore: + - 'ydb/docs/**' + - '*' + types: + - 'opened' + - 'synchronize' + - 'reopened' + - 'labeled' +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number }} + cancel-in-progress: true +jobs: + check-running-allowed: + if: ${{vars.CHECKS_SWITCH != '' && fromJSON(vars.CHECKS_SWITCH).pr_check == true}} + runs-on: ubuntu-latest + outputs: + result: ${{ steps.check-ownership-membership.outputs.result == 'true' && steps.check-is-mergeable.outputs.result == 'true' }} + commit_sha: ${{ steps.check-is-mergeable.outputs.commit_sha }} + steps: + - name: Check if running tests is allowed + id: check-ownership-membership + uses: actions/github-script@v6 + with: + github-token: ${{ secrets.GH_PERSONAL_ACCESS_TOKEN }} + script: | + const labels = context.payload.pull_request.labels; + const okToTestLabel = labels.find( + label => label.name == 'ok-to-test' + ); + + console.log("okToTestLabel=%o", okToTestLabel !== undefined); + + if (okToTestLabel !== undefined) { + return true; + } + + // This is used primarily in forks. Repository owner + // should be allowed to run anything. + const userLogin = context.payload.pull_request.user.login; + + // How to interpret membership status code: + // https://docs.github.com/rest/collaborators/collaborators#check-if-a-user-is-a-repository-collaborator + const isRepoCollaborator = async function () { + try { + const response = await github.rest.repos.checkCollaborator({ + owner: context.payload.repository.owner.login, + repo: context.payload.repository.name, + username: userLogin, + }); + return response.status == 204; + } catch (error) { + if (error.status && error.status == 404) { + return false; + } + throw error; + } + } + + if (context.payload.repository.owner.login == userLogin) { + console.log("You are the repository owner!"); + return true; + } + + if (await isRepoCollaborator()) { + console.log("You are a collaborator!"); + return true; + } + + return false; + - name: comment-if-waiting-on-ok + if: steps.check-ownership-membership.outputs.result == 'false' && + github.event.action == 'opened' + uses: actions/github-script@v6 + with: + script: | + let externalContributorLabel = 'external'; + + github.rest.issues.createComment({ + issue_number: context.issue.number, + owner: context.repo.owner, + repo: context.repo.repo, + body: 'Hi! Thank you for contributing!\nThe tests on this PR will run after a maintainer adds an `ok-to-test` label to this PR manually. Thank you for your patience!' + }); + + github.rest.issues.addLabels({ + ...context.repo, + issue_number: context.issue.number, + labels: [externalContributorLabel] + }); + + - name: cleanup-test-label + uses: actions/github-script@v6 + with: + script: | + let labelsToRemove = ['ok-to-test', 'rebase-and-check']; + const prNumber = context.payload.pull_request.number; + const prLabels = new Set(context.payload.pull_request.labels.map(l => l.name)); + for await (const label of labelsToRemove.filter(l => prLabels.has(l))) { + core.info(`remove label=${label} for pr=${prNumber}`); + try { + const result = await github.rest.issues.removeLabel({ + ...context.repo, + issue_number: prNumber, + name: label + }); + } catch(error) { + // ignore the 404 error that arises + // when the label did not exist for the + // organization member + if (error.status && error.status != 404) { + throw error; + } + } + } + - name: check is mergeable + id: check-is-mergeable + if: steps.check-ownership-membership.outputs.result == 'true' + uses: actions/github-script@v6 + with: + result-encoding: string + script: | + let pr = context.payload.pull_request; + const delay = ms => new Promise(resolve => setTimeout(resolve, ms)); + const header = `\n`; + + const fail_msg = header + ':red_circle: Unable to merge your PR into the `main` branch. ' + + 'Please rebase or merge it with the `main` branch.' + + let i = 0; + + while (pr.mergeable == null || i >= 60) { + console.log("get pull-request status"); + + let result = await github.rest.pulls.get({ + ...context.repo, + pull_number: pr.number + }) + + pr = result.data; + + if (pr.mergeable == null) { + await delay(5000); + } + + i += 1; + } + + console.log("pr.mergeable=%o", pr.mergeable); + + const { data: comments } = await github.rest.issues.listComments({ + issue_number: context.issue.number, + owner: context.repo.owner, + repo: context.repo.repo + }); + + const commentToUpdate = comments.find(comment => comment.body.startsWith(header)); + + if (pr.mergeable === false) { + let commentParams = { + ...context.repo, + issue_number: context.issue.number, + body: fail_msg + }; + + if (commentToUpdate) { + await github.rest.issues.updateComment({ + ...commentParams, + comment_id: commentToUpdate.id, + }); + } else { + await github.rest.issues.createComment({...commentParams}); + } + core.setFailed("Merge conflict detected"); + return false; + } else if (commentToUpdate) { + await github.rest.issues.deleteComment({ + ...context.repo, + issue_number: context.issue.number, + comment_id: commentToUpdate.id, + }); + } + core.info(`commit_sha=${pr.commit_sha}`); + core.setOutput('commit_sha', pr.merge_commit_sha); + return true; + build_and_test: + needs: + - check-running-allowed + if: needs.check-running-allowed.outputs.result == 'true' && needs.check-running-allowed.outputs.commit_sha != '' + strategy: + fail-fast: false + matrix: + build_preset: ["relwithdebinfo", "release-asan", "release-clang14"] + runs-on: [ self-hosted, auto-provisioned, "${{ format('build-preset-{0}', matrix.build_preset) }}" ] + name: Build and test ${{ matrix.build_preset }} + steps: + - name: Checkout + uses: actions/checkout@v3 + with: + ref: ${{ needs.check-running-allowed.outputs.commit_sha }} + fetch-depth: 2 + - name: Build and test + uses: ./.github/actions/build_and_test_ya + with: + build_preset: ${{ matrix.build_preset }} + build_target: "ydb/" + increment: true + run_tests: ${{ contains(fromJSON('["relwithdebinfo", "release-asan"]'), matrix.build_preset) }} + test_size: "small,medium" + test_type: "unittest,py3test,py2test,pytest" + test_threads: 52 + put_build_results_to_cache: true + run_tests_if_build_fails: false + secs: ${{ format('{{"TESTMO_TOKEN":"{0}","AWS_KEY_ID":"{1}","AWS_KEY_VALUE":"{2}","REMOTE_CACHE_USERNAME":"{3}","REMOTE_CACHE_PASSWORD":"{4}"}}', + secrets.TESTMO_TOKEN, secrets.AWS_KEY_ID, secrets.AWS_KEY_VALUE, secrets.REMOTE_CACHE_USERNAME, secrets.REMOTE_CACHE_PASSWORD ) }} + vars: ${{ format('{{"AWS_BUCKET":"{0}","AWS_ENDPOINT":"{1}","REMOTE_CACHE_URL":"{2}","TESTMO_URL":"{3}","TESTMO_PROJECT_ID":"{4}"}}', + vars.AWS_BUCKET, vars.AWS_ENDPOINT, vars.REMOTE_CACHE_URL_YA, vars.TESTMO_URL, vars.TESTMO_PROJECT_ID ) }} diff --git a/.github/workflows/sync_cmakebuild.yml.orig b/.github/workflows/sync_cmakebuild.yml.orig new file mode 100644 index 000000000000..3c23c8e255e5 --- /dev/null +++ b/.github/workflows/sync_cmakebuild.yml.orig @@ -0,0 +1,51 @@ +name: Sync cmakebuild with main +on: + schedule: +<<<<<<< HEAD + - cron: "0 * * * *" # At minute 0 every hour +======= + - cron: "0 * * * *" +>>>>>>> stable-24-1 + workflow_dispatch: +concurrency: + group: ${{ github.workflow }} + cancel-in-progress: true +env: + REPO: ${{ github.repository }} + TOKEN: ${{ secrets.GH_PERSONAL_ACCESS_TOKEN }} +jobs: + sync: + runs-on: ubuntu-latest + steps: + - name: Sync + run: | + mainsha=$(curl -s -H "Accept: application/vnd.github.VERSION.sha" https://api.github.com/repos/$REPO/commits/main) + echo "Main sha: ${mainsha}" + lastsha=$(curl -s https://raw.githubusercontent.com/$REPO/cmakebuild/ydb/ci/cmakegen.txt) + echo "Last sha: ${lastsha}" + if [ "${mainsha}" == "${lastsha}" ];then + echo "No new commits on the main branch to merge, exiting" + exit 0 + fi + git clone -b main --shallow-exclude cmakebuild https://$TOKEN@github.com/$REPO.git ydb + git config --global user.email "alex@ydb.tech" + git config --global user.name "Alexander Smirnov" + cd ydb + git fetch --depth `expr $(git rev-list --count HEAD) + 1` + # Depth 10: 1st with cmake generation, 2nd with previous merge, 3rd is common between main and cmakebuild, + # others for possible commits directly on cmakebuild branch + git fetch origin cmakebuild:cmakebuild --depth 10 + mainsha=$(git rev-parse HEAD) + git checkout cmakebuild + prevsha=$(git rev-parse HEAD) + git merge main --no-edit + currsha=$(git rev-parse HEAD) + if [ ${prevsha} == ${currsha} ];then + echo "Merge did not bring any changes, exiting" + exit + fi + ./generate_cmake -k + echo ${mainsha} > ydb/ci/cmakegen.txt + git add . + git commit -m "Generate cmake for ${mainsha}" + git push --set-upstream origin cmakebuild diff --git a/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp.orig b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp.orig new file mode 100644 index 000000000000..42113a808dfe --- /dev/null +++ b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp.orig @@ -0,0 +1,1773 @@ +#include "s3_recipe_ut_helpers.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace NKikimr::NKqp { + +using namespace NYdb; +using namespace NYdb::NQuery; +using namespace NKikimr::NKqp::NFederatedQueryTest; +using namespace NTestUtils; +using namespace fmt::literals; + +Y_UNIT_TEST_SUITE(KqpFederatedQuery) { + Y_UNIT_TEST(ExecuteScriptWithExternalTableResolve) { + const TString externalDataSourceName = "/Root/external_data_source"; + const TString externalTableName = "/Root/test_binding_resolve"; + const TString bucket = "test_bucket1"; + const TString object = "test_object"; + + CreateBucketWithObject(bucket, object, TEST_CONTENT); + + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); + + auto tc = kikimr->GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + const TString query = fmt::format(R"( + CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{location}", + AUTH_METHOD="NONE" + ); + CREATE EXTERNAL TABLE `{external_table}` ( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ) WITH ( + DATA_SOURCE="{external_source}", + LOCATION="{object}", + FORMAT="json_each_row" + );)", + "external_source"_a = externalDataSourceName, + "external_table"_a = externalTableName, + "location"_a = GetBucketLocation(bucket), + "object"_a = object + ); + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + const TString sql = fmt::format(R"( + PRAGMA s3.AtomicUploadCommit = "1"; --Check that pragmas are OK + SELECT * FROM `{external_table}` + )", "external_table"_a=externalTableName); + + auto db = kikimr->GetQueryClient(); + auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); + UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); + + NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); + UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); + TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync(); + UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); + + TResultSetParser resultSet(results.ExtractResultSet()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2); + UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 2); + + UNIT_ASSERT(resultSet.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1"); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo"); + + UNIT_ASSERT(resultSet.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2"); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world"); + } + + Y_UNIT_TEST(ExecuteQueryWithExternalTableResolve) { + const TString externalDataSourceName = "/Root/external_data_source"; + const TString externalTableName = "/Root/test_binding_resolve"; + const TString bucket = "test_bucket_execute_query"; + const TString object = "test_object"; + + CreateBucketWithObject(bucket, object, TEST_CONTENT); + + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); + + auto tc = kikimr->GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + const TString query = fmt::format(R"( + CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{location}", + AUTH_METHOD="NONE" + ); + CREATE EXTERNAL TABLE `{external_table}` ( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ) WITH ( + DATA_SOURCE="{external_source}", + LOCATION="{object}", + FORMAT="json_each_row" + );)", + "external_source"_a = externalDataSourceName, + "external_table"_a = externalTableName, + "location"_a = GetBucketLocation(bucket), + "object"_a = object + ); + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + const TString sql = fmt::format(R"( + SELECT * FROM `{external_table}` + )", "external_table"_a=externalTableName); + + auto db = kikimr->GetQueryClient(); + auto executeQueryIterator = db.StreamExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + + size_t currentRow = 0; + while (true) { + auto part = executeQueryIterator.ReadNext().ExtractValueSync(); + if (!part.IsSuccess()) { + UNIT_ASSERT_C(part.EOS(), part.GetIssues().ToString()); + break; + } + + if (!part.HasResultSet()) { + continue; + } + + auto result = part.GetResultSet(); + + TResultSetParser resultSet(result); + while (resultSet.TryNextRow()) { + if (currentRow == 0) { + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1"); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo"); + } else if (currentRow == 1) { + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2"); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world"); + } else { + UNIT_ASSERT(false); + } + ++currentRow; + } + UNIT_ASSERT(currentRow > 0); + } + } + + Y_UNIT_TEST(ExecuteScriptWithS3ReadNotCached) { + const TString externalDataSourceName = "/Root/external_data_source"; + const TString externalTableName = "/Root/test_binding_resolve"; + const TString bucket = "test_bucket1"; + const TString object = "test_object"; + + CreateBucketWithObject(bucket, object, TEST_CONTENT); + + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); + + auto tc = kikimr->GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + const TString query = fmt::format(R"( + CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{location}", + AUTH_METHOD="NONE" + ); + CREATE EXTERNAL TABLE `{external_table}` ( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ) WITH ( + DATA_SOURCE="{external_source}", + LOCATION="{object}", + FORMAT="json_each_row" + );)", + "external_source"_a = externalDataSourceName, + "external_table"_a = externalTableName, + "location"_a = GetBucketLocation(bucket), + "object"_a = object + ); + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + auto settings = TExecuteScriptSettings().StatsMode(Ydb::Query::STATS_MODE_BASIC); + + const TString sql = fmt::format(R"( + SELECT * FROM `{external_table}` + )", "external_table"_a=externalTableName); + + auto db = kikimr->GetQueryClient(); + auto scriptExecutionOperation = db.ExecuteScript(sql, settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); + UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); + + NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); + UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); + UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStats.compilation().from_cache(), false); + + scriptExecutionOperation = db.ExecuteScript(sql, settings).ExtractValueSync(); + readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); + UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); + UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStats.compilation().from_cache(), false); + } + + Y_UNIT_TEST(ExecuteScriptWithDataSource) { + const TString externalDataSourceName = "/Root/external_data_source"; + const TString bucket = "test_bucket3"; + + CreateBucketWithObject(bucket, "test_object", TEST_CONTENT); + + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); + auto tc = kikimr->GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + const TString query = fmt::format(R"( + CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{location}", + AUTH_METHOD="NONE" + );)", + "external_source"_a = externalDataSourceName, + "location"_a = GetBucketLocation(bucket) + ); + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + auto db = kikimr->GetQueryClient(); + auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"( + SELECT * FROM `{external_source}`.`/` WITH ( + format="json_each_row", + schema( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ) + ) + )", "external_source"_a = externalDataSourceName)).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); + UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); + + NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); + UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); + TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync(); + UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); + + TResultSetParser resultSet(results.ExtractResultSet()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2); + UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 2); + UNIT_ASSERT(resultSet.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1"); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo"); + UNIT_ASSERT(resultSet.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2"); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world"); + } + + Y_UNIT_TEST(ExecuteScriptWithDataSourceJoinYdb) { + const TString externalDataSourceName = "/Root/external_data_source_2"; + const TString ydbTable = "/Root/ydb_table"; + const TString bucket = "test_bucket4"; + + CreateBucketWithObject(bucket, "test_object", TEST_CONTENT); + + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); + auto tc = kikimr->GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + { + const TString query = fmt::format(R"( + CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{location}", + AUTH_METHOD="NONE" + ); + CREATE TABLE `{ydb_table}` ( + key Utf8, + value Utf8, + PRIMARY KEY (key) + ); + )", + "external_source"_a = externalDataSourceName, + "location"_a = GetBucketLocation(bucket), + "ydb_table"_a = ydbTable + ); + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + { + const TString query = fmt::format(R"( + REPLACE INTO `{ydb_table}` (key, value) VALUES + ("1", "one"), + ("2", "two") + )", + "ydb_table"_a = ydbTable + ); + auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + auto db = kikimr->GetQueryClient(); + auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"( + SELECT t1.key as key, t1.value as v1, t2.value as v2 FROM `{external_source}`.`/` WITH ( + format="json_each_row", + schema( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ) + ) AS t1 JOIN `ydb_table` AS t2 ON t1.key = t2.key + ORDER BY key + )" + , "external_source"_a = externalDataSourceName + , "ydb_table"_a = ydbTable)).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); + UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); + + NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); + UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); + TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync(); + UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); + + TResultSetParser resultSet(results.ExtractResultSet()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 3); + UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 2); + UNIT_ASSERT(resultSet.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1"); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo"); + UNIT_ASSERT_VALUES_EQUAL(*resultSet.ColumnParser(2).GetOptionalUtf8(), "one"); + UNIT_ASSERT(resultSet.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2"); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world"); + UNIT_ASSERT_VALUES_EQUAL(*resultSet.ColumnParser(2).GetOptionalUtf8(), "two"); + } + + Y_UNIT_TEST(ExecuteScriptWithExternalTableResolveCheckPragma) { + const TString externalDataSourceName = "/Root/external_data_source"; + const TString externalTableName = "/Root/test_binding_resolve"; + const TString bucket = "test_bucket5"; + const TString object = "test_object"; + + CreateBucketWithObject(bucket, object, TEST_CONTENT); + + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); + + auto tc = kikimr->GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + const TString query = fmt::format(R"( + CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{location}", + AUTH_METHOD="NONE" + ); + CREATE EXTERNAL TABLE `{external_table}` ( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ) WITH ( + DATA_SOURCE="{external_source}", + LOCATION="{object}", + FORMAT="json_each_row" + );)", + "external_source"_a = externalDataSourceName, + "external_table"_a = externalTableName, + "location"_a = GetBucketLocation(bucket), + "object"_a = object + ); + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + auto db = kikimr->GetQueryClient(); + auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"( + PRAGMA s3.JsonListSizeLimit = "10"; + PRAGMA s3.SourceCoroActor = 'true'; + PRAGMA kikimr.OptEnableOlapPushdown = "false"; + SELECT * FROM `{external_table}` + )", "external_table"_a=externalTableName)).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); + UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); + + NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); + UNIT_ASSERT_C(readyOp.Metadata().ExecStatus == EExecStatus::Completed, readyOp.Status().GetIssues().ToString()); + TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync(); + UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); + + TResultSetParser resultSet(results.ExtractResultSet()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2); + UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 2); + + UNIT_ASSERT(resultSet.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1"); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo"); + + UNIT_ASSERT(resultSet.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2"); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world"); + } + + Y_UNIT_TEST(ExecuteScriptWithDataSourceJoinYdbCheckPragma) { + const TString externalDataSourceName = "/Root/external_data_source_2"; + const TString ydbTable = "/Root/ydb_table"; + const TString bucket = "test_bucket6"; + + CreateBucketWithObject(bucket, "test_object", TEST_CONTENT); + + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); + auto tc = kikimr->GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + { + const TString query = fmt::format(R"( + CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{location}", + AUTH_METHOD="NONE" + ); + CREATE TABLE `{ydb_table}` ( + key Utf8, + value Utf8, + PRIMARY KEY (key) + ); + )", + "external_source"_a = externalDataSourceName, + "location"_a = GetBucketLocation(bucket), + "ydb_table"_a = ydbTable + ); + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + { + const TString query = fmt::format(R"( + REPLACE INTO `{ydb_table}` (key, value) VALUES + ("1", "one"), + ("2", "two") + )", + "ydb_table"_a = ydbTable + ); + auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + auto db = kikimr->GetQueryClient(); + auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"( + PRAGMA s3.JsonListSizeLimit = "10"; + PRAGMA s3.SourceCoroActor = 'true'; + PRAGMA kikimr.OptEnableOlapPushdown = "false"; + SELECT t1.key as key, t1.value as v1, t2.value as v2 FROM `{external_source}`.`/` WITH ( + format="json_each_row", + schema( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ) + ) AS t1 JOIN `ydb_table` AS t2 ON t1.key = t2.key + ORDER BY key + )" + , "external_source"_a = externalDataSourceName + , "ydb_table"_a = ydbTable)).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); + UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); + + NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); + UNIT_ASSERT_C(readyOp.Metadata().ExecStatus == EExecStatus::Completed, readyOp.Status().GetIssues().ToString()); + TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync(); + UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); + + TResultSetParser resultSet(results.ExtractResultSet()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 3); + UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 2); + UNIT_ASSERT(resultSet.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1"); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo"); + UNIT_ASSERT_VALUES_EQUAL(*resultSet.ColumnParser(2).GetOptionalUtf8(), "one"); + UNIT_ASSERT(resultSet.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2"); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world"); + UNIT_ASSERT_VALUES_EQUAL(*resultSet.ColumnParser(2).GetOptionalUtf8(), "two"); + } + + Y_UNIT_TEST(ExecuteScriptWithDataSourceAndTablePathPrefix) { + const TString externalDataSourceName = "external_data_source"; + const TString bucket = "test_bucket7"; + + CreateBucketWithObject(bucket, "test_object", TEST_CONTENT); + + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); + auto tc = kikimr->GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + const TString query = fmt::format(R"( + CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{location}", + AUTH_METHOD="NONE" + );)", + "external_source"_a = externalDataSourceName, + "location"_a = GetEnv("S3_ENDPOINT") + "/" + bucket + "/" + ); + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + auto db = kikimr->GetQueryClient(); + auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"( + SELECT * FROM `{external_source}`.`*` WITH ( + format="json_each_row", + schema( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ) + ) + )", "external_source"_a = externalDataSourceName)).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); + UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); + + NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); + UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); + TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync(); + UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); + + TResultSetParser resultSet(results.ExtractResultSet()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2); + UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 2); + UNIT_ASSERT(resultSet.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1"); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo"); + UNIT_ASSERT(resultSet.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2"); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world"); + } + + std::pair ExecuteScriptOverBinding(NKikimrConfig::TTableServiceConfig::EBindingsMode mode) { + const TString externalDataSourceName = "/Root/external_data_source"; + const TString externalTableName = "/Root/test_binding_resolve"; + const TString bucket = "test_bucket1"; + const TString object = "test_object"; + + CreateBucketWithObject(bucket, object, TEST_CONTENT); + + auto appConfig = std::make_optional(); + appConfig->MutableTableServiceConfig()->SetBindingsMode(mode); + + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make(), nullptr, nullptr, appConfig); + + auto tc = kikimr->GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + const TString query = fmt::format(R"( + CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{location}", + AUTH_METHOD="NONE" + ); + CREATE EXTERNAL TABLE `{external_table}` ( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ) WITH ( + DATA_SOURCE="{external_source}", + LOCATION="{object}", + FORMAT="json_each_row" + );)", + "external_source"_a = externalDataSourceName, + "external_table"_a = externalTableName, + "location"_a = GetEnv("S3_ENDPOINT") + "/" + bucket + "/", + "object"_a = object + ); + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + const TString sql = fmt::format(R"( + SELECT * FROM bindings.`{external_table}` + )", "external_table"_a=externalTableName); + + auto db = kikimr->GetQueryClient(); + auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); + UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); + + NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); + TFetchScriptResultsResult results(TStatus(EStatus::SUCCESS, {})); + if (readyOp.Metadata().ExecStatus == EExecStatus::Completed) { + results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync(); + UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); + } + return {readyOp, results}; + } + + Y_UNIT_TEST(ExecuteScriptWithDifferentBindingsMode) { + { + auto [readyOp, results] = ExecuteScriptOverBinding(NKikimrConfig::TTableServiceConfig::BM_DROP); + UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); + UNIT_ASSERT_VALUES_EQUAL(readyOp.Status().GetIssues().ToString(), ""); + + TResultSetParser resultSet(results.ExtractResultSet()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2); + UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 2); + + UNIT_ASSERT(resultSet.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1"); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo"); + + UNIT_ASSERT(resultSet.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2"); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world"); + + } + + { + auto [readyOp, results] = ExecuteScriptOverBinding(NKikimrConfig::TTableServiceConfig::BM_DROP_WITH_WARNING); + UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); + UNIT_ASSERT_VALUES_EQUAL(readyOp.Status().GetIssues().ToString(), "
:2:31: Warning: Please remove 'bindings.' from your query, the support for this syntax will be dropped soon, code: 4538\n"); + + TResultSetParser resultSet(results.ExtractResultSet()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2); + UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 2); + + UNIT_ASSERT(resultSet.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1"); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo"); + + UNIT_ASSERT(resultSet.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2"); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world"); + } + + { + auto [readyOp, results] = ExecuteScriptOverBinding(NKikimrConfig::TTableServiceConfig::BM_ENABLED); + UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Failed); + UNIT_ASSERT_VALUES_EQUAL(readyOp.Status().GetIssues().ToString(), "
:2:40: Error: Table binding `/Root/test_binding_resolve` is not defined\n"); + } + + { + auto [readyOp, results] = ExecuteScriptOverBinding(NKikimrConfig::TTableServiceConfig::BM_DISABLED); + UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Failed); + UNIT_ASSERT_VALUES_EQUAL(readyOp.Status().GetIssues().ToString(), "
:2:31: Error: Please remove 'bindings.' from your query, the support for this syntax has ended, code: 4601\n"); + } + } + + Y_UNIT_TEST(InsertIntoBucket) { + const TString readDataSourceName = "/Root/read_data_source"; + const TString readTableName = "/Root/read_binding"; + const TString readBucket = "test_bucket_read"; + const TString readObject = "test_object_read"; + const TString writeDataSourceName = "/Root/write_data_source"; + const TString writeTableName = "/Root/write_binding"; + const TString writeBucket = "test_bucket_write"; + const TString writeObject = "test_object_write/"; + + { + Aws::S3::S3Client s3Client = MakeS3Client(); + CreateBucketWithObject(readBucket, readObject, TEST_CONTENT, s3Client); + CreateBucket(writeBucket, s3Client); + } + + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); + + auto tc = kikimr->GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + const TString query = fmt::format(R"( + CREATE EXTERNAL DATA SOURCE `{read_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{read_location}", + AUTH_METHOD="NONE" + ); + CREATE EXTERNAL TABLE `{read_table}` ( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ) WITH ( + DATA_SOURCE="{read_source}", + LOCATION="{read_object}", + FORMAT="json_each_row" + ); + + CREATE EXTERNAL DATA SOURCE `{write_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{write_location}", + AUTH_METHOD="NONE" + ); + CREATE EXTERNAL TABLE `{write_table}` ( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ) WITH ( + DATA_SOURCE="{write_source}", + LOCATION="{write_object}", + FORMAT="tsv_with_names" + ); + )", + "read_source"_a = readDataSourceName, + "read_table"_a = readTableName, + "read_location"_a = GetBucketLocation(readBucket), + "read_object"_a = readObject, + "write_source"_a = writeDataSourceName, + "write_table"_a = writeTableName, + "write_location"_a = GetBucketLocation(writeBucket), + "write_object"_a = writeObject + ); + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + const TString sql = fmt::format(R"( + INSERT INTO `{write_table}` + SELECT * FROM `{read_table}` + )", + "read_table"_a=readTableName, + "write_table"_a = writeTableName); + + auto db = kikimr->GetQueryClient(); + auto resultFuture = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()); + resultFuture.Wait(); + UNIT_ASSERT_C(resultFuture.GetValueSync().IsSuccess(), resultFuture.GetValueSync().GetIssues().ToString()); + + TString content = GetAllObjects(writeBucket); + UNIT_ASSERT_STRING_CONTAINS(content, "key\tvalue\n"); // tsv header + UNIT_ASSERT_STRING_CONTAINS(content, "1\ttrololo\n"); + UNIT_ASSERT_STRING_CONTAINS(content, "2\thello world\n"); + } + + void ExecuteInsertQuery(TQueryClient& client, const TString& writeTableName, const TString& readTableName, bool expectCached) { + const TString sql = fmt::format(R"( + INSERT INTO `{write_table}` + SELECT * FROM `{read_table}`; + )", + "write_table"_a = writeTableName, + "read_table"_a = readTableName); + auto settings = TExecuteQuerySettings().StatsMode(EStatsMode::Basic); + auto resultFuture = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings); + resultFuture.Wait(); + UNIT_ASSERT_C(resultFuture.GetValueSync().IsSuccess(), resultFuture.GetValueSync().GetIssues().ToString()); + auto& stats = NYdb::TProtoAccessor::GetProto(*resultFuture.GetValueSync().GetStats()); + UNIT_ASSERT_EQUAL_C(stats.compilation().from_cache(), expectCached, "expected: " << expectCached); + } + + Y_UNIT_TEST(InsertIntoBucketCaching) { + const TString writeDataSourceName = "/Root/write_data_source"; + const TString writeTableName = "/Root/write_binding"; + const TString writeBucket = "test_bucket_cache"; + const TString writeObject = "test_object_write/"; + const TString writeAnotherObject = "test_another_object_write/"; + const TString readTableName = "/Root/read_table"; + { + Aws::S3::S3Client s3Client = MakeS3Client(); + CreateBucket(writeBucket, s3Client); + } + + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); + + auto tc = kikimr->GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + { + const TString query = fmt::format(R"( + CREATE EXTERNAL DATA SOURCE `{write_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{write_location}", + AUTH_METHOD="NONE" + ); + CREATE EXTERNAL TABLE `{write_table}` ( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ) WITH ( + DATA_SOURCE="{write_source}", + LOCATION="{write_object}", + FORMAT="tsv_with_names" + ); + + CREATE TABLE `{read_table}` ( + key Utf8 NOT NULL, + value Utf8 NOT NULL, + PRIMARY KEY (key) + ); + )", + "write_source"_a = writeDataSourceName, + "write_table"_a = writeTableName, + "write_location"_a = GetBucketLocation(writeBucket), + "write_object"_a = writeObject, + "read_table"_a = readTableName); + + + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + { + const TString query = fmt::format(R"( + REPLACE INTO `{read_table}` (key, value) VALUES + ("1", "one") + )", + "read_table"_a = readTableName); + auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + auto db = kikimr->GetQueryClient(); + ExecuteInsertQuery(db, writeTableName, readTableName, false); + ExecuteInsertQuery(db, writeTableName, readTableName, true); + { + const TString modifyQuery = fmt::format(R"( + DROP EXTERNAL TABLE `{write_table}`; + CREATE EXTERNAL TABLE `{write_table}` ( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ) WITH ( + DATA_SOURCE="{write_source}", + LOCATION="{write_object}", + FORMAT="tsv_with_names" + ); + )", + "write_table"_a = writeTableName, + "write_object"_a = writeAnotherObject, + "write_source"_a = writeDataSourceName); + auto result = session.ExecuteSchemeQuery(modifyQuery).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + ExecuteInsertQuery(db, writeTableName, readTableName, false); + + UNIT_ASSERT_EQUAL(GetObjectKeys(writeBucket).size(), 3); + } + + Y_UNIT_TEST(UpdateExternalTable) { + const TString readDataSourceName = "/Root/read_data_source"; + const TString readTableName = "/Root/read_binding"; + const TString readBucket = "test_bucket_read"; + const TString readObject = "test_object_read"; + + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); + + auto tc = kikimr->GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + const TString query = fmt::format(R"( + CREATE EXTERNAL DATA SOURCE `{read_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{read_location}", + AUTH_METHOD="NONE" + ); + CREATE EXTERNAL TABLE `{read_table}` ( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ) WITH ( + DATA_SOURCE="{read_source}", + LOCATION="{read_object}", + FORMAT="json_each_row" + ); + )", + "read_source"_a = readDataSourceName, + "read_table"_a = readTableName, + "read_location"_a = GetBucketLocation(readBucket), + "read_object"_a = readObject + ); + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + const TString sql = R"( + UPDATE `/Root/read_binding` + SET key = "abc"u + )"; + + auto db = kikimr->GetQueryClient(); + auto resultFuture = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()); + resultFuture.Wait(); + UNIT_ASSERT_C(!resultFuture.GetValueSync().IsSuccess(), resultFuture.GetValueSync().GetIssues().ToString()); + + UNIT_ASSERT_NO_DIFF(resultFuture.GetValueSync().GetIssues().ToString(), "
: Error: Pre type annotation, code: 1020\n" + "
:3:27: Error: Write mode 'update' is not supported for external entities\n"); + } + + Y_UNIT_TEST(JoinTwoSources) { + const TString dataSource = "/Root/data_source"; + const TString bucket = "test_bucket_mixed"; + const TString dataTable = "/Root/data"; + const TString dataObject = "data"; + const TString keysTable = "/Root/keys"; + const TString keysObject = "keys"; + + { + Aws::S3::S3Client s3Client = MakeS3Client(); + CreateBucket(bucket, s3Client); + UploadObject(bucket, dataObject, TEST_CONTENT, s3Client); + UploadObject(bucket, keysObject, TEST_CONTENT_KEYS, s3Client); + } + + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); + + auto tc = kikimr->GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + const TString query = fmt::format(R"( + CREATE EXTERNAL DATA SOURCE `{data_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{bucket_location}", + AUTH_METHOD="NONE" + ); + + CREATE EXTERNAL TABLE `{data_table}` ( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ) WITH ( + DATA_SOURCE="{data_source}", + LOCATION="{data_object}", + FORMAT="json_each_row" + ); + + CREATE EXTERNAL TABLE `{keys_table}` ( + key Utf8 NOT NULL + ) WITH ( + DATA_SOURCE="{data_source}", + LOCATION="{keys_object}", + FORMAT="json_each_row" + ); + )", + "data_source"_a = dataSource, + "bucket_location"_a = GetBucketLocation(bucket), + "data_table"_a = dataTable, + "data_object"_a = dataObject, + "keys_table"_a = keysTable, + "keys_object"_a = keysObject + ); + auto schemeQueryesult = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(schemeQueryesult.GetStatus() == NYdb::EStatus::SUCCESS, schemeQueryesult.GetIssues().ToString()); + + const TString sql = fmt::format(R"( + SELECT * FROM `{data_table}` + WHERE key IN ( + SELECT key FROM `{keys_table}` + ) + )", + "data_table"_a = dataTable, + "keys_table"_a = keysTable); + + auto db = kikimr->GetQueryClient(); + auto resultFuture = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()); + resultFuture.Wait(); + UNIT_ASSERT_C(resultFuture.GetValueSync().IsSuccess(), resultFuture.GetValueSync().GetIssues().ToString()); + auto result = resultFuture.GetValueSync().GetResultSetParser(0); + UNIT_ASSERT_VALUES_EQUAL(result.RowsCount(), 1); + UNIT_ASSERT(result.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(result.ColumnParser("key").GetUtf8(), "1"); + UNIT_ASSERT_VALUES_EQUAL(result.ColumnParser("value").GetUtf8(), "trololo"); + UNIT_ASSERT(!result.TryNextRow()); + } + + Y_UNIT_TEST(ExecuteScriptWithExternalTableResolveCheckPartitionedBy) { + const TString externalDataSourceName = "/Root/external_data_source"; + const TString externalTableName = "/Root/test_binding_resolve"; + const TString bucket = "test_bucket1"; + const TString object = "year=1/month=2/test_object"; + const TString content = "data,year,month\ntest,1,2"; + + CreateBucketWithObject(bucket, object, content); + + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); + + auto tc = kikimr->GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + const TString query = fmt::format(R"( + CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{location}", + AUTH_METHOD="NONE" + ); + CREATE EXTERNAL TABLE `{external_table}` ( + data STRING NOT NULL, + year UINT32 NOT NULL, + month UINT32 NOT NULL + ) WITH ( + DATA_SOURCE="{external_source}", + LOCATION="/", + FORMAT="csv_with_names", + PARTITIONED_BY="[year, month]" + );)", + "external_source"_a = externalDataSourceName, + "external_table"_a = externalTableName, + "location"_a = GetBucketLocation(bucket) + ); + + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + auto db = kikimr->GetQueryClient(); + const TString sql = fmt::format(R"( + SELECT * FROM `{external_table}` + )", "external_table"_a = externalTableName); + + auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); + + NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); + UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); + TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync(); + UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); + + TResultSetParser resultSet(results.ExtractResultSet()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 3); + UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 1); + + UNIT_ASSERT(resultSet.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser("data").GetString(), "test"); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser("year").GetUint32(), 1); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser("month").GetUint32(), 2); + } + + Y_UNIT_TEST(ExecuteScriptWithEmptyCustomPartitioning) { + const TString bucket = "test_bucket1"; + const TString object = "year=2021/test_object"; + + CreateBucketWithObject(bucket, object, ""); + + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); + + auto tc = kikimr->GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + const TString query = fmt::format(R"( + CREATE EXTERNAL DATA SOURCE `/Root/external_data_source` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{location}", + AUTH_METHOD="NONE" + );)", + "location"_a = GetBucketLocation(bucket) + ); + + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + auto db = kikimr->GetQueryClient(); + const TString sql = R"( + $projection = @@ { + "projection.enabled" : "true", + "storage.location.template" : "/${date}", + "projection.date.type" : "date", + "projection.date.min" : "2022-11-02", + "projection.date.max" : "2023-12-02", + "projection.date.interval" : "1", + "projection.date.format" : "/year=%Y", + "projection.date.unit" : "YEARS" + } @@; + + SELECT * + FROM `/Root/external_data_source`.`/` + WITH ( + FORMAT="raw", + + SCHEMA=( + `data` String NOT NULL, + `date` Date NOT NULL + ), + + partitioned_by=(`date`), + projection=$projection + ) + )"; + + auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); + + NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); + UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); + TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync(); + UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); + + TResultSetParser resultSet(results.ExtractResultSet()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2); + UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 0); + } + + Y_UNIT_TEST(ExecuteScriptWithTruncatedMultiplyResults) { + const TString bucket = "test_bucket"; + CreateBucket(bucket); + + constexpr size_t NUMBER_OF_OBJECTS = 20; + constexpr size_t ROWS_LIMIT = NUMBER_OF_OBJECTS / 2; + + const TString object = "/test_object"; + const TString content = "test content"; + for (size_t i = 0; i < NUMBER_OF_OBJECTS; ++i) { + UploadObject(bucket, object + ToString(i), content); + } + + NKikimrConfig::TAppConfig appCfg; + appCfg.MutableQueryServiceConfig()->set_scriptresultrowslimit(ROWS_LIMIT); + appCfg.MutableTableServiceConfig()->MutableQueryLimits()->set_resultrowslimit(ROWS_LIMIT); + + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make(), nullptr, nullptr, appCfg); + + auto tc = kikimr->GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + const TString query = fmt::format(R"( + CREATE EXTERNAL DATA SOURCE `/Root/external_data_source` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{location}", + AUTH_METHOD="NONE" + );)", + "location"_a = GetBucketLocation(bucket) + ); + + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + auto db = kikimr->GetQueryClient(); + const TString sql = R"( + SELECT `data` + FROM `/Root/external_data_source`.`/` + WITH ( + FORMAT="raw", + SCHEMA=( + `data` String NOT NULL + ) + ) + )"; + + auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); + + NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); + UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); + TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync(); + UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); + + TResultSetParser resultSet(results.ExtractResultSet()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 1); + UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), ROWS_LIMIT); + + for (size_t i = 0; i < ROWS_LIMIT; ++i) { + resultSet.TryNextRow(); + UNIT_ASSERT_VALUES_EQUAL(resultSet.GetValue(0).GetProto().bytes_value(), content); + } + } + + Y_UNIT_TEST(ForbiddenCallablesForYdbTables) { + const TString readDataSourceName = "/Root/read_data_source"; + const TString readTableName = "/Root/read_table"; + const TString readBucket = "test_read_bucket_forbidden_callables"; + const TString readObject = "test_object_forbidden_callables"; + const TString writeDataSourceName = "/Root/write_data_source"; + const TString writeTableName = "/Root/write_table"; + const TString writeBucket = "test_write_bucket_forbidden_callables"; + const TString writeObject = "test_object_forbidden_callables/"; + const TString writeYdbTable = "/Root/test_ydb_table"; + + { + Aws::S3::S3Client s3Client = MakeS3Client(); + CreateBucketWithObject(readBucket, readObject, TEST_CONTENT, s3Client); + CreateBucket(writeBucket, s3Client); + } + + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); + + auto tc = kikimr->GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + const TString query = fmt::format(R"( + CREATE EXTERNAL DATA SOURCE `{read_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{read_location}", + AUTH_METHOD="NONE" + ); + CREATE EXTERNAL TABLE `{read_table}` ( + key Utf8, -- Nullable + value Utf8 -- Nullable + ) WITH ( + DATA_SOURCE="{read_source}", + LOCATION="{read_object}", + FORMAT="json_each_row" + ); + + CREATE EXTERNAL DATA SOURCE `{write_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{write_location}", + AUTH_METHOD="NONE" + ); + CREATE EXTERNAL TABLE `{write_table}` ( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ) WITH ( + DATA_SOURCE="{write_source}", + LOCATION="{write_object}", + FORMAT="json_each_row" + ); + + CREATE TABLE `{write_ydb_table}` ( + key Utf8 NOT NULL, + value Utf8 NOT NULL, + PRIMARY KEY (key) + ); + )", + "read_source"_a = readDataSourceName, + "read_table"_a = readTableName, + "read_location"_a = GetBucketLocation(readBucket), + "read_object"_a = readObject, + "write_source"_a = writeDataSourceName, + "write_table"_a = writeTableName, + "write_location"_a = GetBucketLocation(writeBucket), + "write_object"_a = writeObject, + "write_ydb_table"_a = writeYdbTable + ); + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + // Forbidden callable like Unwrap is allowed in S3-only queries, + // but not allowed in mixed queries. + { + const TString sql = fmt::format(R"( + INSERT INTO `{write_table}` + SELECT Unwrap(key) AS key, Unwrap(value) AS value FROM `{read_table}` + )", + "read_table"_a=readTableName, + "write_table"_a = writeTableName); + + auto db = kikimr->GetQueryClient(); + auto resultFuture = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()); + resultFuture.Wait(); + UNIT_ASSERT_C(resultFuture.GetValueSync().IsSuccess(), resultFuture.GetValueSync().GetIssues().ToString()); + } + + // Unwrap is used in query with effect applied to YDB table. + { + const TString sql = fmt::format(R"( + INSERT INTO `{write_table}` + SELECT Unwrap(key) AS key, Unwrap(value) AS value FROM `{read_table}`; + + DELETE FROM `{write_ydb_table}` + WHERE key = "42"; + )", + "read_table"_a=readTableName, + "write_table"_a = writeTableName, + "write_ydb_table"_a = writeYdbTable); + + auto db = kikimr->GetQueryClient(); + auto resultFuture = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()); + resultFuture.Wait(); + UNIT_ASSERT_C(!resultFuture.GetValueSync().IsSuccess(), resultFuture.GetValueSync().GetIssues().ToString()); + UNIT_ASSERT_STRING_CONTAINS(resultFuture.GetValueSync().GetIssues().ToString(), "Callable not expected in effects tx: Unwrap"); + } + } + + Y_UNIT_TEST(ExecuteScriptWithLocationWithoutSlashAtTheEnd) { + const TString externalDataSourceName = "/Root/external_data_source"; + const TString externalTableName = "/Root/test_binding_resolve"; + const TString bucket = "test_bucket_with_location_without_slash_at_the_end"; + const TString object = "year=1/month=2/test_object"; + const TString content = "data,year,month\ntest,1,2"; + + CreateBucketWithObject(bucket, object, content); + + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); + + auto tc = kikimr->GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + const TString query = fmt::format(R"( + CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{location}", + AUTH_METHOD="NONE" + ); + CREATE EXTERNAL TABLE `{external_table}` ( + data STRING NOT NULL, + year UINT32 NOT NULL, + month UINT32 NOT NULL + ) WITH ( + DATA_SOURCE="{external_source}", + LOCATION="/", + FORMAT="csv_with_names", + PARTITIONED_BY="[year, month]" + );)", + "external_source"_a = externalDataSourceName, + "external_table"_a = externalTableName, + "location"_a = TStringBuilder() << GetEnv("S3_ENDPOINT") << '/' << bucket + ); + + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + auto db = kikimr->GetQueryClient(); + const TString sql = fmt::format(R"( + SELECT * FROM `{external_table}` + )", "external_table"_a = externalTableName); + + auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); + + NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); + UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); + TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync(); + UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); + + TResultSetParser resultSet(results.ExtractResultSet()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 3); + UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 1); + + UNIT_ASSERT(resultSet.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser("data").GetString(), "test"); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser("year").GetUint32(), 1); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser("month").GetUint32(), 2); + } + + TString CreateSimpleGenericQuery(std::shared_ptr kikimr, const TString& bucket) { + using namespace fmt::literals; + const TString externalDataSourceName = "/Root/external_data_source"; + const TString object = "test_object"; + const TString content = "key\n1"; + + CreateBucketWithObject(bucket, object, content); + + auto tc = kikimr->GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + const TString query = fmt::format(R"( + CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{location}", + AUTH_METHOD="NONE" + );)", + "external_source"_a = externalDataSourceName, + "location"_a = GetBucketLocation(bucket) + ); + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + return fmt::format(R"( + SELECT * FROM `{external_source}`.`/` WITH ( + FORMAT="csv_with_names", + SCHEMA ( + key Int NOT NULL + ) + ) + )", "external_source"_a=externalDataSourceName); + } + + Y_UNIT_TEST(ExecuteScriptWithGenericAutoDetection) { + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); + const TString sql = CreateSimpleGenericQuery(kikimr, "test_bucket_execute_generic_auto_detection"); + + auto driver = kikimr->GetDriver(); + NScripting::TScriptingClient yqlScriptClient(driver); + + auto scriptResult = yqlScriptClient.ExecuteYqlScript(sql).GetValueSync(); + UNIT_ASSERT_C(scriptResult.IsSuccess(), scriptResult.GetIssues().ToString()); + + TResultSetParser resultSet(scriptResult.GetResultSet(0)); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 1); + UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 1); + + UNIT_ASSERT(resultSet.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser("key").GetInt32(), 1); + } + + Y_UNIT_TEST(ExplainScriptWithGenericAutoDetection) { + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); + const TString sql = CreateSimpleGenericQuery(kikimr, "test_bucket_explain_generic_auto_detection"); + + auto driver = kikimr->GetDriver(); + NScripting::TScriptingClient yqlScriptClient(driver); + + NScripting::TExplainYqlRequestSettings settings; + settings.Mode(NScripting::ExplainYqlRequestMode::Plan); + + auto scriptResult = yqlScriptClient.ExplainYqlScript(sql, settings).GetValueSync(); + UNIT_ASSERT_C(scriptResult.IsSuccess(), scriptResult.GetIssues().ToString()); + UNIT_ASSERT(scriptResult.GetPlan()); + } + + Y_UNIT_TEST(ReadFromDataSourceWithoutTable) { + const TString externalDataSourceName = "/Root/external_data_source"; + const TString bucket = "test_bucket_inline_desc"; + const TString object = "test_object_inline_desc"; + + CreateBucketWithObject(bucket, object, TEST_CONTENT); + + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); + + auto tc = kikimr->GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + const TString query = fmt::format(R"sql( + CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{location}", + AUTH_METHOD="NONE" + );)sql", + "external_source"_a = externalDataSourceName, + "location"_a = GetBucketLocation(bucket), + "object"_a = object + ); + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + { + const TString sql = fmt::format(R"sql( + SELECT * FROM `{external_data_source}`; + )sql", + "external_data_source"_a=externalDataSourceName); + + auto db = kikimr->GetQueryClient(); + auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); + UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); + + NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); + UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Failed); + UNIT_ASSERT_STRING_CONTAINS(readyOp.Status().GetIssues().ToString(), "Attempt to read from external data source"); + } + + // select using inline syntax is well + { + const TString sql = fmt::format(R"sql( + SELECT * FROM `{external_data_source}`.`{obj_path}` + WITH ( + SCHEMA = ( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ), + FORMAT = "json_each_row" + ) + )sql", + "external_data_source"_a=externalDataSourceName, + "obj_path"_a = object); + + auto db = kikimr->GetQueryClient(); + auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); + UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); + + NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); + UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); + TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync(); + UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); + + TResultSetParser resultSet(results.ExtractResultSet()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2); + UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 2); + + UNIT_ASSERT(resultSet.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1"); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo"); + + UNIT_ASSERT(resultSet.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2"); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world"); + } + } + + Y_UNIT_TEST(InsertIntoDataSourceWithoutTable) { + const TString readDataSourceName = "/Root/read_data_source"; + const TString readTableName = "/Root/read_binding"; + const TString readBucket = "test_bucket_read_insert_into_data_source"; + const TString readObject = "test_object_read"; + const TString writeDataSourceName = "/Root/write_data_source"; + const TString writeTableName = "/Root/write_binding"; + const TString writeBucket = "test_bucket_write_insert_into_data_source"; + const TString writeObject = "test_object_write/"; + + { + Aws::S3::S3Client s3Client = MakeS3Client(); + CreateBucketWithObject(readBucket, readObject, TEST_CONTENT, s3Client); + CreateBucket(writeBucket, s3Client); + } + + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); + + auto tc = kikimr->GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + const TString query = fmt::format(R"sql( + CREATE EXTERNAL DATA SOURCE `{read_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{read_location}", + AUTH_METHOD="NONE" + ); + CREATE EXTERNAL TABLE `{read_table}` ( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ) WITH ( + DATA_SOURCE="{read_source}", + LOCATION="{read_object}", + FORMAT="json_each_row" + ); + + CREATE EXTERNAL DATA SOURCE `{write_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{write_location}", + AUTH_METHOD="NONE" + ); + )sql", + "read_source"_a = readDataSourceName, + "read_table"_a = readTableName, + "read_location"_a = GetBucketLocation(readBucket), + "read_object"_a = readObject, + "write_source"_a = writeDataSourceName, + "write_table"_a = writeTableName, + "write_location"_a = GetBucketLocation(writeBucket) + ); + auto schemeResult = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(schemeResult.GetStatus() == NYdb::EStatus::SUCCESS, schemeResult.GetIssues().ToString()); + + { + const TString sql = fmt::format(R"sql( + INSERT INTO `{write_source}` + SELECT * FROM `{read_table}` + )sql", + "read_table"_a=readTableName, + "write_source"_a = writeDataSourceName); + + auto db = kikimr->GetQueryClient(); + auto result = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT_C(!result.IsSuccess(), result.GetIssues().ToString()); + UNIT_ASSERT_EQUAL_C(result.GetStatus(), NYdb::EStatus::GENERIC_ERROR, static_cast(result.GetStatus()) << ", " << result.GetIssues().ToString()); + UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Attempt to write to external data source"); + } + + // insert with inline syntax is well + { + Cerr << "Run inplace insert" << Endl; + const TString sql = fmt::format(R"sql( + INSERT INTO `{write_source}`.`{write_object}` + WITH ( + SCHEMA = ( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ), + FORMAT = "json_each_row" + ) + SELECT * FROM `{read_table}` + )sql", + "read_table"_a=readTableName, + "write_source"_a = writeDataSourceName, + "write_object"_a = writeObject); + + auto db = kikimr->GetQueryClient(); + auto result = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + } + + Y_UNIT_TEST(SpecifyExternalTableInsteadOfExternalDataSource) { + const TString externalDataSourceName = "external_data_source"; + const TString externalTableName = "external_table"; + const TString bucket = "test_bucket_specify_external_table"; + const TString object = "test_object_specify_external_table"; + + CreateBucketWithObject(bucket, object, TEST_CONTENT); + + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); + + auto tc = kikimr->GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + const TString query = fmt::format(R"sql( + CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{location}", + AUTH_METHOD="NONE" + ); + + CREATE EXTERNAL TABLE `{external_table}` ( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ) WITH ( + DATA_SOURCE="{external_source}", + LOCATION="{object}", + FORMAT="json_each_row" + ); + )sql", + "external_source"_a = externalDataSourceName, + "external_table"_a = externalTableName, + "location"_a = GetBucketLocation(bucket), + "object"_a = object + ); + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + { + const TString sql = fmt::format(R"sql( + SELECT * FROM `{external_table}`.`{object}` + WITH ( + SCHEMA = ( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ), + FORMAT = "json_each_row" + ); + )sql", + "external_table"_a=externalTableName, + "object"_a = object); + + auto db = kikimr->GetQueryClient(); + auto queryExecutionOperation = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_EQUAL_C(queryExecutionOperation.GetStatus(), EStatus::BAD_REQUEST, static_cast(queryExecutionOperation.GetStatus()) << ", " << queryExecutionOperation.GetIssues().ToString()); + UNIT_ASSERT_STRING_CONTAINS(queryExecutionOperation.GetIssues().ToString(), "\"/Root/external_table\" is expected to be external data source"); + } + + { + const TString sql = fmt::format(R"sql( + INSERT INTO `{external_table}`.`{object}` + WITH ( + SCHEMA = ( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ), + FORMAT = "json_each_row" + ) + SELECT * FROM `{external_table}` WHERE key = '42'; + )sql", + "external_table"_a=externalTableName, + "object"_a = object); + + auto db = kikimr->GetQueryClient(); + auto queryExecutionOperation = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_EQUAL_C(queryExecutionOperation.GetStatus(), EStatus::BAD_REQUEST, static_cast(queryExecutionOperation.GetStatus()) << ", " << queryExecutionOperation.GetIssues().ToString()); + UNIT_ASSERT_STRING_CONTAINS(queryExecutionOperation.GetIssues().ToString(), "\"/Root/external_table\" is expected to be external data source"); + } + } + + Y_UNIT_TEST(QueryWithNoDataInS3) { + const TString externalDataSourceName = "tpc_h_s3_storage_connection"; + const TString bucket = "test_bucket_no_data"; + + Aws::S3::S3Client s3Client = MakeS3Client(); + CreateBucket(bucket, s3Client); + // Uncomment if you want to compare with query with data + //UploadObject(bucket, "l/l", R"json({"l_extendedprice": 0.0, "l_discount": 1.0, "l_partkey": 1})json", s3Client); + //UploadObject(bucket, "p/p", R"json({"p_partkey": 1, "p_type": "t"})json", s3Client); + + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); + auto client = kikimr->GetQueryClient(); + + { + const TString query = fmt::format(R"sql( + CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{location}", + AUTH_METHOD="NONE" + ); + )sql", + "external_source"_a = externalDataSourceName, + "location"_a = GetBucketLocation(bucket) + ); + auto result = client.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + // YQ-2750 + const TString query = fmt::format(R"sql( + $border = Date("1994-08-01"); + select + 100.00 * sum(case + when StartsWith(p.p_type, 'PROMO') + then l.l_extendedprice * (1 - l.l_discount) + else 0 + end) / sum(l.l_extendedprice * (1 - l.l_discount)) as promo_revenue + from + {external_source}.`l/` with ( schema ( + l_extendedprice double, + l_discount double, + l_partkey int64, + l_shipdate date + ), + format = "json_each_row" + ) as l + join + {external_source}.`p/` with ( schema ( + p_partkey int64, + p_type string + ), + format = "json_each_row" + ) as p + on + l.l_partkey = p.p_partkey + where + cast(l.l_shipdate as timestamp) >= $border + and cast(l.l_shipdate as timestamp) < ($border + Interval("P31D")); + )sql", + "external_source"_a = externalDataSourceName + ); + auto result = client.ExecuteQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + auto rs = result.GetResultSetParser(0); + UNIT_ASSERT_VALUES_EQUAL(rs.RowsCount(), 1); + rs.TryNextRow(); + TMaybe sum = rs.ColumnParser(0).GetOptionalDouble(); + UNIT_ASSERT(!sum); + } + } +<<<<<<< HEAD + + void ExecuteSelectQuery(const TString& bucket, size_t fileSize, size_t numberRows) { + using namespace fmt::literals; + + // Create test file + TString content = "id,data\n"; + const TString rowContent(fileSize / numberRows, 'a'); + for (size_t i = 0; i < numberRows; ++i) { + content += TStringBuilder() << ToString(i) << "," << rowContent << "\n"; + } + + // Upload test file + CreateBucketWithObject(bucket, "test_object", content); + + // Create external data source + const TString externalDataSourceName = "/Root/external_data_source"; + + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); + auto tableClient = kikimr->GetTableClient(); + auto session = tableClient.CreateSession().GetValueSync().GetSession(); + const TString schemeQuery = fmt::format(R"( + CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{location}", + AUTH_METHOD="NONE" + ))", + "external_source"_a = externalDataSourceName, + "location"_a = GetBucketLocation(bucket) + ); + + const auto schemeResult = session.ExecuteSchemeQuery(schemeQuery).GetValueSync(); + UNIT_ASSERT_C(schemeResult.GetStatus() == NYdb::EStatus::SUCCESS, schemeResult.GetIssues().ToString()); + + // Execute test query + auto queryClient = kikimr->GetQueryClient(); + const TString query = fmt::format(R"( + SELECT * FROM `{external_source}`.`/` WITH ( + FORMAT="csv_with_names", + SCHEMA ( + id Uint64 NOT NULL, + data String NOT NULL + ) + ))", + "external_source"_a=externalDataSourceName + ); + + auto scriptExecutionOperation = queryClient.ExecuteScript(query).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); + + NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); + UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); + + // Validate query results + TFetchScriptResultsSettings settings; + settings.RowsLimit(0); + size_t rowsFetched = 0; + while (true) { + TFetchScriptResultsResult results = queryClient.FetchScriptResults(scriptExecutionOperation.Id(), 0, settings).ExtractValueSync(); + UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); + + TResultSetParser resultSet(results.ExtractResultSet()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2); + + while (resultSet.TryNextRow()) { + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser("id").GetUint64(), rowsFetched++); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser("data").GetString(), rowContent); + } + + if (!results.GetNextFetchToken()) { + break; + } + + settings.FetchToken(results.GetNextFetchToken()); + } + UNIT_ASSERT_VALUES_EQUAL(rowsFetched, numberRows); + + // Test forget operation + TInstant forgetOperationTimeout = TInstant::Now() + NSan::PlainOrUnderSanitizer(TDuration::Minutes(5), TDuration::Minutes(20)); + NYdb::NOperation::TOperationClient operationClient(kikimr->GetDriver()); + while (TInstant::Now() < forgetOperationTimeout) { + auto status = operationClient.Forget(scriptExecutionOperation.Id()).ExtractValueSync(); + if (status.GetStatus() == NYdb::EStatus::SUCCESS || status.GetStatus() == NYdb::EStatus::NOT_FOUND) { + return; + } + + UNIT_ASSERT_C(status.GetStatus() == NYdb::EStatus::ABORTED || status.GetStatus() == NYdb::EStatus::TIMEOUT || status.GetStatus() == NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED, status.GetIssues().ToString()); + + if (status.GetStatus() == NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED) { + // Wait until last forget is not finished + Sleep(TDuration::Seconds(30)); + } + } + UNIT_ASSERT_C(false, "Forget operation timeout"); + } + + Y_UNIT_TEST(ExecuteScriptWithLargeStrings) { + ExecuteSelectQuery("test_bucket_execute_script_with_large_strings", 100_MB, 100); + } + + Y_UNIT_TEST(ExecuteScriptWithLargeFile) { + ExecuteSelectQuery("test_bucket_execute_script_with_large_file", 5_MB, 500000); + } +======= +>>>>>>> stable-24-1 +} + +} // namespace NKikimr::NKqp