diff --git a/.github/scripts/tests/transform-ya-junit.py.orig b/.github/scripts/tests/transform-ya-junit.py.orig deleted file mode 100644 index 569e229cc714..000000000000 --- a/.github/scripts/tests/transform-ya-junit.py.orig +++ /dev/null @@ -1,283 +0,0 @@ -#!/usr/bin/env python3 -import argparse -import re -import json -import os -import sys -import urllib.parse -import zipfile -<<<<<<< HEAD -from typing import Set -======= ->>>>>>> stable-24-1 -from xml.etree import ElementTree as ET -from mute_utils import mute_target, pattern_to_re -from junit_utils import add_junit_link_property, is_faulty_testcase - - -def log_print(*args, **kwargs): - print(*args, file=sys.stderr, **kwargs) - - -class YaMuteCheck: - def __init__(self): - self.regexps = set() - self.regexps = [] - - def load(self, fn): - with open(fn, "r") as fp: - for line in fp: - line = line.strip() - try: - testsuite, testcase = line.split(" ", maxsplit=1) - except ValueError: - log_print(f"SKIP INVALID MUTE CONFIG LINE: {line!r}") - continue - self.populate(testsuite, testcase) - - def populate(self, testsuite, testcase): - check = [] - - for p in (pattern_to_re(testsuite), pattern_to_re(testcase)): - try: - check.append(re.compile(p)) - except re.error: - log_print(f"Unable to compile regex {p!r}") - return - - self.regexps.append(tuple(check)) - - def __call__(self, suite_name, test_name): - for ps, pt in self.regexps: - if ps.match(suite_name) and pt.match(test_name): - return True - return False - - -class YTestReportTrace: - def __init__(self, out_root): - self.out_root = out_root - self.traces = {} -<<<<<<< HEAD - self.logs_dir = set() -======= - self.logs_dir = None ->>>>>>> stable-24-1 - - def abs_path(self, path): - return path.replace("$(BUILD_ROOT)", self.out_root) - - def load(self, subdir): - test_results_dir = os.path.join(self.out_root, f"{subdir}/test-results/") - - if not os.path.isdir(test_results_dir): - log_print(f"Directory {test_results_dir} doesn't exist") - return - - # find the test result - for folder in os.listdir(test_results_dir): - fn = os.path.join(self.out_root, test_results_dir, folder, "ytest.report.trace") - - if not os.path.isfile(fn): - continue - - with open(fn, "r") as fp: - for line in fp: - event = json.loads(line.strip()) - if event["name"] == "subtest-finished": - event = event["value"] - cls = event["class"] - subtest = event["subtest"] - cls = cls.replace("::", ".") - self.traces[(cls, subtest)] = event - logs_dir = self.abs_path(event['logs']['logsdir']) -<<<<<<< HEAD - self.logs_dir.add(logs_dir) -======= - self.logs_dir = logs_dir - break ->>>>>>> stable-24-1 - - def has(self, cls, name): - return (cls, name) in self.traces - - def get_logs(self, cls, name): - trace = self.traces.get((cls, name)) - - if not trace: - return {} - - logs = trace["logs"] - - result = {} - for k, path in logs.items(): - if k == "logsdir": - continue - - result[k] = self.abs_path(path) - - return result - - -def filter_empty_logs(logs): - result = {} - for k, v in logs.items(): - if not os.path.isfile(v) or os.stat(v).st_size == 0: - continue - result[k] = v - return result - - -def save_log(build_root, fn, out_dir, log_url_prefix, trunc_size): - fpath = os.path.relpath(fn, build_root) - - if out_dir is not None: - out_fn = os.path.join(out_dir, fpath) - fsize = os.stat(fn).st_size - - out_fn_dir = os.path.dirname(out_fn) - - if not os.path.isdir(out_fn_dir): - os.makedirs(out_fn_dir, 0o700) - - if trunc_size and fsize > trunc_size: - with open(fn, "rb") as in_fp: - in_fp.seek(fsize - trunc_size) - log_print(f"truncate {out_fn} to {trunc_size}") - with open(out_fn, "wb") as out_fp: - while 1: - buf = in_fp.read(8192) - if not buf: - break - out_fp.write(buf) - else: - os.symlink(fn, out_fn) - quoted_fpath = urllib.parse.quote(fpath) - return f"{log_url_prefix}{quoted_fpath}" - - -<<<<<<< HEAD -def save_zip(suite_name, out_dir, url_prefix, logs_dir: Set[str]): -======= -def save_zip(suite_name, out_dir, url_prefix, logs_dir): ->>>>>>> stable-24-1 - arc_name = f"{suite_name.replace('/', '-')}.zip" - - arc_fn = os.path.join(out_dir, arc_name) - - zf = zipfile.ZipFile(arc_fn, mode="w", compression=zipfile.ZIP_DEFLATED, compresslevel=9) - -<<<<<<< HEAD - for path in logs_dir: - # path is .../test-results/black/testing_out_stuff - log_print(f"put {path} into {arc_name}") - test_type = os.path.basename(os.path.dirname(path)) - for root, dirs, files in os.walk(path): - for f in files: - filename = os.path.join(root, f) - zf.write(filename, os.path.join(test_type, os.path.relpath(filename, path))) -======= - log_print(f"put {logs_dir} into {arc_name}") - for root, dirs, files in os.walk(logs_dir): - for f in files: - filename = os.path.join(root, f) - zf.write(filename, os.path.relpath(filename, logs_dir)) ->>>>>>> stable-24-1 - zf.close() - - quoted_fpath = urllib.parse.quote(arc_name) - return f"{url_prefix}{quoted_fpath}" - - -def transform(fp, mute_check: YaMuteCheck, ya_out_dir, save_inplace, log_url_prefix, log_out_dir, log_trunc_size, - test_stuff_out, test_stuff_prefix): - tree = ET.parse(fp) - root = tree.getroot() - - for suite in root.findall("testsuite"): - suite_name = suite.get("name") - traces = YTestReportTrace(ya_out_dir) - traces.load(suite_name) - - has_fail_tests = False - - for case in suite.findall("testcase"): - test_name = case.get("name") - case.set("classname", suite_name) - - is_fail = is_faulty_testcase(case) - has_fail_tests |= is_fail - - if mute_check(suite_name, test_name): - log_print("mute", suite_name, test_name) - mute_target(case) - - if is_fail and "." in test_name: - test_cls, test_method = test_name.rsplit(".", maxsplit=1) - logs = filter_empty_logs(traces.get_logs(test_cls, test_method)) - - if logs: - log_print(f"add {list(logs.keys())!r} properties for {test_cls}.{test_method}") - for name, fn in logs.items(): - url = save_log(ya_out_dir, fn, log_out_dir, log_url_prefix, log_trunc_size) - add_junit_link_property(case, name, url) - - if has_fail_tests: - if not traces.logs_dir: - log_print(f"no logsdir for {suite_name}") - continue - - url = save_zip(suite_name, test_stuff_out, test_stuff_prefix, traces.logs_dir) - - for case in suite.findall("testcase"): - add_junit_link_property(case, 'logsdir', url) - - if save_inplace: - tree.write(fp.name) - else: - ET.indent(root) - print(ET.tostring(root, encoding="unicode")) - - -def main(): - parser = argparse.ArgumentParser() - parser.add_argument( - "-i", action="store_true", dest="save_inplace", default=False, help="modify input file in-place" - ) - parser.add_argument("-m", help="muted test list") - parser.add_argument("--log-url-prefix", default="./", help="url prefix for logs") - parser.add_argument("--log-out-dir", help="symlink logs to specific directory") - parser.add_argument( - "--log-truncate-size", - dest="log_trunc_size", - type=int, - default=134217728, - help="truncate log after specific size, 0 disables truncation", - ) - parser.add_argument("--ya-out", help="ya make output dir (for searching logs and artifacts)") - parser.add_argument('--test-stuff-out', help='output folder for archive testing_out_stuff') - parser.add_argument('--test-stuff-prefix', help='url prefix for testing_out_stuff') - parser.add_argument("in_file", type=argparse.FileType("r")) - - args = parser.parse_args() - - mute_check = YaMuteCheck() - - if args.m: - mute_check.load(args.m) - - transform( - args.in_file, - mute_check, - args.ya_out, - args.save_inplace, - args.log_url_prefix, - args.log_out_dir, - args.log_trunc_size, - args.test_stuff_out, - args.test_stuff_prefix, - ) - - -if __name__ == "__main__": - main() diff --git a/.github/workflows/pr_check.yml.orig b/.github/workflows/pr_check.yml.orig deleted file mode 100644 index 86ffa113425a..000000000000 --- a/.github/workflows/pr_check.yml.orig +++ /dev/null @@ -1,227 +0,0 @@ -name: PR-check -on: - pull_request_target: - branches: - - 'main' - - 'stable-*' - - 'stream-nb-*' -<<<<<<< HEAD - - '*-stable-*' -======= ->>>>>>> stable-24-1 - paths-ignore: - - 'ydb/docs/**' - - '*' - types: - - 'opened' - - 'synchronize' - - 'reopened' - - 'labeled' -concurrency: - group: ${{ github.workflow }}-${{ github.event.pull_request.number }} - cancel-in-progress: true -jobs: - check-running-allowed: - if: ${{vars.CHECKS_SWITCH != '' && fromJSON(vars.CHECKS_SWITCH).pr_check == true}} - runs-on: ubuntu-latest - outputs: - result: ${{ steps.check-ownership-membership.outputs.result == 'true' && steps.check-is-mergeable.outputs.result == 'true' }} - commit_sha: ${{ steps.check-is-mergeable.outputs.commit_sha }} - steps: - - name: Check if running tests is allowed - id: check-ownership-membership - uses: actions/github-script@v6 - with: - github-token: ${{ secrets.GH_PERSONAL_ACCESS_TOKEN }} - script: | - const labels = context.payload.pull_request.labels; - const okToTestLabel = labels.find( - label => label.name == 'ok-to-test' - ); - - console.log("okToTestLabel=%o", okToTestLabel !== undefined); - - if (okToTestLabel !== undefined) { - return true; - } - - // This is used primarily in forks. Repository owner - // should be allowed to run anything. - const userLogin = context.payload.pull_request.user.login; - - // How to interpret membership status code: - // https://docs.github.com/rest/collaborators/collaborators#check-if-a-user-is-a-repository-collaborator - const isRepoCollaborator = async function () { - try { - const response = await github.rest.repos.checkCollaborator({ - owner: context.payload.repository.owner.login, - repo: context.payload.repository.name, - username: userLogin, - }); - return response.status == 204; - } catch (error) { - if (error.status && error.status == 404) { - return false; - } - throw error; - } - } - - if (context.payload.repository.owner.login == userLogin) { - console.log("You are the repository owner!"); - return true; - } - - if (await isRepoCollaborator()) { - console.log("You are a collaborator!"); - return true; - } - - return false; - - name: comment-if-waiting-on-ok - if: steps.check-ownership-membership.outputs.result == 'false' && - github.event.action == 'opened' - uses: actions/github-script@v6 - with: - script: | - let externalContributorLabel = 'external'; - - github.rest.issues.createComment({ - issue_number: context.issue.number, - owner: context.repo.owner, - repo: context.repo.repo, - body: 'Hi! Thank you for contributing!\nThe tests on this PR will run after a maintainer adds an `ok-to-test` label to this PR manually. Thank you for your patience!' - }); - - github.rest.issues.addLabels({ - ...context.repo, - issue_number: context.issue.number, - labels: [externalContributorLabel] - }); - - - name: cleanup-test-label - uses: actions/github-script@v6 - with: - script: | - let labelsToRemove = ['ok-to-test', 'rebase-and-check']; - const prNumber = context.payload.pull_request.number; - const prLabels = new Set(context.payload.pull_request.labels.map(l => l.name)); - for await (const label of labelsToRemove.filter(l => prLabels.has(l))) { - core.info(`remove label=${label} for pr=${prNumber}`); - try { - const result = await github.rest.issues.removeLabel({ - ...context.repo, - issue_number: prNumber, - name: label - }); - } catch(error) { - // ignore the 404 error that arises - // when the label did not exist for the - // organization member - if (error.status && error.status != 404) { - throw error; - } - } - } - - name: check is mergeable - id: check-is-mergeable - if: steps.check-ownership-membership.outputs.result == 'true' - uses: actions/github-script@v6 - with: - result-encoding: string - script: | - let pr = context.payload.pull_request; - const delay = ms => new Promise(resolve => setTimeout(resolve, ms)); - const header = `\n`; - - const fail_msg = header + ':red_circle: Unable to merge your PR into the `main` branch. ' - + 'Please rebase or merge it with the `main` branch.' - - let i = 0; - - while (pr.mergeable == null || i >= 60) { - console.log("get pull-request status"); - - let result = await github.rest.pulls.get({ - ...context.repo, - pull_number: pr.number - }) - - pr = result.data; - - if (pr.mergeable == null) { - await delay(5000); - } - - i += 1; - } - - console.log("pr.mergeable=%o", pr.mergeable); - - const { data: comments } = await github.rest.issues.listComments({ - issue_number: context.issue.number, - owner: context.repo.owner, - repo: context.repo.repo - }); - - const commentToUpdate = comments.find(comment => comment.body.startsWith(header)); - - if (pr.mergeable === false) { - let commentParams = { - ...context.repo, - issue_number: context.issue.number, - body: fail_msg - }; - - if (commentToUpdate) { - await github.rest.issues.updateComment({ - ...commentParams, - comment_id: commentToUpdate.id, - }); - } else { - await github.rest.issues.createComment({...commentParams}); - } - core.setFailed("Merge conflict detected"); - return false; - } else if (commentToUpdate) { - await github.rest.issues.deleteComment({ - ...context.repo, - issue_number: context.issue.number, - comment_id: commentToUpdate.id, - }); - } - core.info(`commit_sha=${pr.commit_sha}`); - core.setOutput('commit_sha', pr.merge_commit_sha); - return true; - build_and_test: - needs: - - check-running-allowed - if: needs.check-running-allowed.outputs.result == 'true' && needs.check-running-allowed.outputs.commit_sha != '' - strategy: - fail-fast: false - matrix: - build_preset: ["relwithdebinfo", "release-asan", "release-clang14"] - runs-on: [ self-hosted, auto-provisioned, "${{ format('build-preset-{0}', matrix.build_preset) }}" ] - name: Build and test ${{ matrix.build_preset }} - steps: - - name: Checkout - uses: actions/checkout@v3 - with: - ref: ${{ needs.check-running-allowed.outputs.commit_sha }} - fetch-depth: 2 - - name: Build and test - uses: ./.github/actions/build_and_test_ya - with: - build_preset: ${{ matrix.build_preset }} - build_target: "ydb/" - increment: true - run_tests: ${{ contains(fromJSON('["relwithdebinfo", "release-asan"]'), matrix.build_preset) }} - test_size: "small,medium" - test_type: "unittest,py3test,py2test,pytest" - test_threads: 52 - put_build_results_to_cache: true - run_tests_if_build_fails: false - secs: ${{ format('{{"TESTMO_TOKEN":"{0}","AWS_KEY_ID":"{1}","AWS_KEY_VALUE":"{2}","REMOTE_CACHE_USERNAME":"{3}","REMOTE_CACHE_PASSWORD":"{4}"}}', - secrets.TESTMO_TOKEN, secrets.AWS_KEY_ID, secrets.AWS_KEY_VALUE, secrets.REMOTE_CACHE_USERNAME, secrets.REMOTE_CACHE_PASSWORD ) }} - vars: ${{ format('{{"AWS_BUCKET":"{0}","AWS_ENDPOINT":"{1}","REMOTE_CACHE_URL":"{2}","TESTMO_URL":"{3}","TESTMO_PROJECT_ID":"{4}"}}', - vars.AWS_BUCKET, vars.AWS_ENDPOINT, vars.REMOTE_CACHE_URL_YA, vars.TESTMO_URL, vars.TESTMO_PROJECT_ID ) }} diff --git a/.github/workflows/sync_cmakebuild.yml.orig b/.github/workflows/sync_cmakebuild.yml.orig deleted file mode 100644 index 3c23c8e255e5..000000000000 --- a/.github/workflows/sync_cmakebuild.yml.orig +++ /dev/null @@ -1,51 +0,0 @@ -name: Sync cmakebuild with main -on: - schedule: -<<<<<<< HEAD - - cron: "0 * * * *" # At minute 0 every hour -======= - - cron: "0 * * * *" ->>>>>>> stable-24-1 - workflow_dispatch: -concurrency: - group: ${{ github.workflow }} - cancel-in-progress: true -env: - REPO: ${{ github.repository }} - TOKEN: ${{ secrets.GH_PERSONAL_ACCESS_TOKEN }} -jobs: - sync: - runs-on: ubuntu-latest - steps: - - name: Sync - run: | - mainsha=$(curl -s -H "Accept: application/vnd.github.VERSION.sha" https://api.github.com/repos/$REPO/commits/main) - echo "Main sha: ${mainsha}" - lastsha=$(curl -s https://raw.githubusercontent.com/$REPO/cmakebuild/ydb/ci/cmakegen.txt) - echo "Last sha: ${lastsha}" - if [ "${mainsha}" == "${lastsha}" ];then - echo "No new commits on the main branch to merge, exiting" - exit 0 - fi - git clone -b main --shallow-exclude cmakebuild https://$TOKEN@github.com/$REPO.git ydb - git config --global user.email "alex@ydb.tech" - git config --global user.name "Alexander Smirnov" - cd ydb - git fetch --depth `expr $(git rev-list --count HEAD) + 1` - # Depth 10: 1st with cmake generation, 2nd with previous merge, 3rd is common between main and cmakebuild, - # others for possible commits directly on cmakebuild branch - git fetch origin cmakebuild:cmakebuild --depth 10 - mainsha=$(git rev-parse HEAD) - git checkout cmakebuild - prevsha=$(git rev-parse HEAD) - git merge main --no-edit - currsha=$(git rev-parse HEAD) - if [ ${prevsha} == ${currsha} ];then - echo "Merge did not bring any changes, exiting" - exit - fi - ./generate_cmake -k - echo ${mainsha} > ydb/ci/cmakegen.txt - git add . - git commit -m "Generate cmake for ${mainsha}" - git push --set-upstream origin cmakebuild diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp index 041bddf51230..2b1c3dc78104 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp @@ -182,7 +182,7 @@ class TKqpCompileActor : public TActorBootstrapped { Config->FeatureFlags = AppData(ctx)->FeatureFlags; KqpHost = CreateKqpHost(Gateway, QueryId.Cluster, QueryId.Database, Config, ModuleResolverState->ModuleResolver, - FederatedQuerySetup, UserToken, AppData(ctx)->FunctionRegistry, false, false, std::move(TempTablesState)); + FederatedQuerySetup, UserToken, QueryServiceConfig, AppData(ctx)->FunctionRegistry, false, false, std::move(TempTablesState)); IKqpHost::TPrepareSettings prepareSettings; prepareSettings.DocumentApiRestricted = QueryId.Settings.DocumentApiRestricted; diff --git a/ydb/core/kqp/executer_actor/ut/kqp_executer_ut.cpp b/ydb/core/kqp/executer_actor/ut/kqp_executer_ut.cpp index f395537d6945..7e921e3c3578 100644 --- a/ydb/core/kqp/executer_actor/ut/kqp_executer_ut.cpp +++ b/ydb/core/kqp/executer_actor/ut/kqp_executer_ut.cpp @@ -8,6 +8,7 @@ #include #include +#include namespace NKikimr { namespace NKqp { @@ -28,7 +29,7 @@ NKqpProto::TKqpPhyTx BuildTxPlan(const TString& sql, TIntrusivePtr IModuleResolver::TPtr moduleResolver; UNIT_ASSERT(GetYqlDefaultModuleResolver(moduleCtx, moduleResolver)); - auto qp = CreateKqpHost(gateway, cluster, "/Root", config, moduleResolver, NYql::IHTTPGateway::Make(), nullptr, nullptr, false, false, nullptr, actorSystem); + auto qp = CreateKqpHost(gateway, cluster, "/Root", config, moduleResolver, NYql::IHTTPGateway::Make(), nullptr, NKikimrConfig::TQueryServiceConfig(), nullptr, false, false, nullptr, actorSystem); auto result = qp->SyncPrepareDataQuery(sql, IKqpHost::TPrepareSettings()); result.Issues().PrintTo(Cerr); UNIT_ASSERT(result.Success()); diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index ce2167fea459..5c206c40aab8 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -955,6 +955,7 @@ class TKqpHost : public IKqpHost { TKqpHost(TIntrusivePtr gateway, const TString& cluster, const TString& database, TKikimrConfiguration::TPtr config, IModuleResolver::TPtr moduleResolver, std::optional federatedQuerySetup, const TIntrusiveConstPtr& userToken, + const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges, bool isInternalCall, TKqpTempTablesState::TConstPtr tempTablesState = nullptr, NActors::TActorSystem* actorSystem = nullptr) @@ -972,6 +973,7 @@ class TKqpHost : public IKqpHost { , FakeWorld(ExprCtx->NewWorld(TPosition())) , ExecuteCtx(MakeIntrusive()) , ActorSystem(actorSystem ? actorSystem : NActors::TActivationContext::ActorSystem()) + , QueryServiceConfig(queryServiceConfig) { if (funcRegistry) { FuncRegistry = funcRegistry; @@ -1605,10 +1607,15 @@ class TKqpHost : public IKqpHost { || settingName == "Warning" || settingName == "UseBlocks" || settingName == "BlockEngine" + || settingName == "TimeOrderRecoverDelay" + || settingName == "TimeOrderRecoverAhead" + || settingName == "TimeOrderRecoverRowLimit" + || settingName == "MatchRecognizeStream" ; }; auto configProvider = CreateConfigProvider(*TypesCtx, gatewaysConfig, {}, allowSettings); TypesCtx->AddDataSource(ConfigProviderName, configProvider); + TypesCtx->MatchRecognize = QueryServiceConfig.GetEnableMatchRecognize(); YQL_ENSURE(TypesCtx->Initialize(*ExprCtx)); @@ -1701,6 +1708,7 @@ class TKqpHost : public IKqpHost { TKqpTempTablesState::TConstPtr TempTablesState; NActors::TActorSystem* ActorSystem = nullptr; + NKikimrConfig::TQueryServiceConfig QueryServiceConfig; }; } // namespace @@ -1721,11 +1729,12 @@ Ydb::Table::QueryStatsCollection::Mode GetStatsMode(NYql::EKikimrStatsMode stats TIntrusivePtr CreateKqpHost(TIntrusivePtr gateway, const TString& cluster, const TString& database, TKikimrConfiguration::TPtr config, IModuleResolver::TPtr moduleResolver, std::optional federatedQuerySetup, const TIntrusiveConstPtr& userToken, + const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges, bool isInternalCall, TKqpTempTablesState::TConstPtr tempTablesState, NActors::TActorSystem* actorSystem) { - return MakeIntrusive(gateway, cluster, database, config, moduleResolver, federatedQuerySetup, userToken, funcRegistry, - keepConfigChanges, isInternalCall, std::move(tempTablesState), actorSystem); + return MakeIntrusive(gateway, cluster, database, config, moduleResolver, federatedQuerySetup, userToken, queryServiceConfig, + funcRegistry, keepConfigChanges, isInternalCall, std::move(tempTablesState), actorSystem); } } // namespace NKqp diff --git a/ydb/core/kqp/host/kqp_host.h b/ydb/core/kqp/host/kqp_host.h index 3b8022221dff..60f56d0ae047 100644 --- a/ydb/core/kqp/host/kqp_host.h +++ b/ydb/core/kqp/host/kqp_host.h @@ -109,7 +109,8 @@ class IKqpHost : public TThrRefBase { TIntrusivePtr CreateKqpHost(TIntrusivePtr gateway, const TString& cluster, const TString& database, NYql::TKikimrConfiguration::TPtr config, NYql::IModuleResolver::TPtr moduleResolver, - std::optional federatedQuerySetup, const TIntrusiveConstPtr& userToken, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry = nullptr, + std::optional federatedQuerySetup, const TIntrusiveConstPtr& userToken, + const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry = nullptr, bool keepConfigChanges = false, bool isInternalCall = false, TKqpTempTablesState::TConstPtr tempTablesState = nullptr, NActors::TActorSystem* actorSystem = nullptr /*take from TLS by default*/); diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp index cbe50d797ad3..951cb2c3b0cd 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp @@ -6,6 +6,7 @@ #include #include +#include #include #include #include @@ -54,7 +55,7 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase { AddHandler(0, &TKqlLookupTableBase::Match, HNDL(ApplyExtractMembersToLookupTable)); AddHandler(0, &TCoTop::Match, HNDL(TopSortOverExtend)); AddHandler(0, &TCoTopSort::Match, HNDL(TopSortOverExtend)); - + AddHandler(0, &TCoMatchRecognize::Match, HNDL(MatchRecognize)); AddHandler(1, &TCoFlatMap::Match, HNDL(LatePushExtractedPredicateToReadTable)); AddHandler(1, &TCoTop::Match, HNDL(RewriteTopSortOverIndexRead)); AddHandler(1, &TCoTopSort::Match, HNDL(RewriteTopSortOverIndexRead)); @@ -255,6 +256,14 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase { return output; } + TMaybeNode MatchRecognize(TExprBase node, TExprContext& ctx) { + auto output = ExpandMatchRecognize(node.Ptr(), ctx, TypesCtx); + if (output) { + DumpAppliedRule("MatchRecognize", node.Ptr(), output, ctx); + } + return output; + } + template TMaybeNode ApplyExtractMembersToReadTable(TExprBase node, TExprContext& ctx, const TGetParents& getParents) diff --git a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp index 518f166726b6..5bffeca30c03 100644 --- a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp @@ -187,7 +187,7 @@ class TKqpWorkerActor : public TActorBootstrapped { Config->FeatureFlags = AppData(ctx)->FeatureFlags; KqpHost = CreateKqpHost(Gateway, Settings.Cluster, Settings.Database, Config, ModuleResolverState->ModuleResolver, - FederatedQuerySetup, QueryState->RequestEv->GetUserToken(), AppData(ctx)->FunctionRegistry, !Settings.LongSession, false); + FederatedQuerySetup, QueryState->RequestEv->GetUserToken(), QueryServiceConfig, AppData(ctx)->FunctionRegistry, !Settings.LongSession, false); auto& queryRequest = QueryState->RequestEv; QueryState->ProxyRequestId = proxyRequestId; diff --git a/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp.orig b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp.orig deleted file mode 100644 index 42113a808dfe..000000000000 --- a/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp.orig +++ /dev/null @@ -1,1773 +0,0 @@ -#include "s3_recipe_ut_helpers.h" - -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include - -namespace NKikimr::NKqp { - -using namespace NYdb; -using namespace NYdb::NQuery; -using namespace NKikimr::NKqp::NFederatedQueryTest; -using namespace NTestUtils; -using namespace fmt::literals; - -Y_UNIT_TEST_SUITE(KqpFederatedQuery) { - Y_UNIT_TEST(ExecuteScriptWithExternalTableResolve) { - const TString externalDataSourceName = "/Root/external_data_source"; - const TString externalTableName = "/Root/test_binding_resolve"; - const TString bucket = "test_bucket1"; - const TString object = "test_object"; - - CreateBucketWithObject(bucket, object, TEST_CONTENT); - - auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); - - auto tc = kikimr->GetTableClient(); - auto session = tc.CreateSession().GetValueSync().GetSession(); - const TString query = fmt::format(R"( - CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( - SOURCE_TYPE="ObjectStorage", - LOCATION="{location}", - AUTH_METHOD="NONE" - ); - CREATE EXTERNAL TABLE `{external_table}` ( - key Utf8 NOT NULL, - value Utf8 NOT NULL - ) WITH ( - DATA_SOURCE="{external_source}", - LOCATION="{object}", - FORMAT="json_each_row" - );)", - "external_source"_a = externalDataSourceName, - "external_table"_a = externalTableName, - "location"_a = GetBucketLocation(bucket), - "object"_a = object - ); - auto result = session.ExecuteSchemeQuery(query).GetValueSync(); - UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); - - const TString sql = fmt::format(R"( - PRAGMA s3.AtomicUploadCommit = "1"; --Check that pragmas are OK - SELECT * FROM `{external_table}` - )", "external_table"_a=externalTableName); - - auto db = kikimr->GetQueryClient(); - auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); - UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); - - NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); - UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); - TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync(); - UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); - - TResultSetParser resultSet(results.ExtractResultSet()); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2); - UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 2); - - UNIT_ASSERT(resultSet.TryNextRow()); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1"); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo"); - - UNIT_ASSERT(resultSet.TryNextRow()); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2"); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world"); - } - - Y_UNIT_TEST(ExecuteQueryWithExternalTableResolve) { - const TString externalDataSourceName = "/Root/external_data_source"; - const TString externalTableName = "/Root/test_binding_resolve"; - const TString bucket = "test_bucket_execute_query"; - const TString object = "test_object"; - - CreateBucketWithObject(bucket, object, TEST_CONTENT); - - auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); - - auto tc = kikimr->GetTableClient(); - auto session = tc.CreateSession().GetValueSync().GetSession(); - const TString query = fmt::format(R"( - CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( - SOURCE_TYPE="ObjectStorage", - LOCATION="{location}", - AUTH_METHOD="NONE" - ); - CREATE EXTERNAL TABLE `{external_table}` ( - key Utf8 NOT NULL, - value Utf8 NOT NULL - ) WITH ( - DATA_SOURCE="{external_source}", - LOCATION="{object}", - FORMAT="json_each_row" - );)", - "external_source"_a = externalDataSourceName, - "external_table"_a = externalTableName, - "location"_a = GetBucketLocation(bucket), - "object"_a = object - ); - auto result = session.ExecuteSchemeQuery(query).GetValueSync(); - UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); - - const TString sql = fmt::format(R"( - SELECT * FROM `{external_table}` - )", "external_table"_a=externalTableName); - - auto db = kikimr->GetQueryClient(); - auto executeQueryIterator = db.StreamExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); - - size_t currentRow = 0; - while (true) { - auto part = executeQueryIterator.ReadNext().ExtractValueSync(); - if (!part.IsSuccess()) { - UNIT_ASSERT_C(part.EOS(), part.GetIssues().ToString()); - break; - } - - if (!part.HasResultSet()) { - continue; - } - - auto result = part.GetResultSet(); - - TResultSetParser resultSet(result); - while (resultSet.TryNextRow()) { - if (currentRow == 0) { - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1"); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo"); - } else if (currentRow == 1) { - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2"); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world"); - } else { - UNIT_ASSERT(false); - } - ++currentRow; - } - UNIT_ASSERT(currentRow > 0); - } - } - - Y_UNIT_TEST(ExecuteScriptWithS3ReadNotCached) { - const TString externalDataSourceName = "/Root/external_data_source"; - const TString externalTableName = "/Root/test_binding_resolve"; - const TString bucket = "test_bucket1"; - const TString object = "test_object"; - - CreateBucketWithObject(bucket, object, TEST_CONTENT); - - auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); - - auto tc = kikimr->GetTableClient(); - auto session = tc.CreateSession().GetValueSync().GetSession(); - const TString query = fmt::format(R"( - CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( - SOURCE_TYPE="ObjectStorage", - LOCATION="{location}", - AUTH_METHOD="NONE" - ); - CREATE EXTERNAL TABLE `{external_table}` ( - key Utf8 NOT NULL, - value Utf8 NOT NULL - ) WITH ( - DATA_SOURCE="{external_source}", - LOCATION="{object}", - FORMAT="json_each_row" - );)", - "external_source"_a = externalDataSourceName, - "external_table"_a = externalTableName, - "location"_a = GetBucketLocation(bucket), - "object"_a = object - ); - auto result = session.ExecuteSchemeQuery(query).GetValueSync(); - UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); - - auto settings = TExecuteScriptSettings().StatsMode(Ydb::Query::STATS_MODE_BASIC); - - const TString sql = fmt::format(R"( - SELECT * FROM `{external_table}` - )", "external_table"_a=externalTableName); - - auto db = kikimr->GetQueryClient(); - auto scriptExecutionOperation = db.ExecuteScript(sql, settings).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); - UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); - - NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); - UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); - UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStats.compilation().from_cache(), false); - - scriptExecutionOperation = db.ExecuteScript(sql, settings).ExtractValueSync(); - readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); - UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); - UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStats.compilation().from_cache(), false); - } - - Y_UNIT_TEST(ExecuteScriptWithDataSource) { - const TString externalDataSourceName = "/Root/external_data_source"; - const TString bucket = "test_bucket3"; - - CreateBucketWithObject(bucket, "test_object", TEST_CONTENT); - - auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); - auto tc = kikimr->GetTableClient(); - auto session = tc.CreateSession().GetValueSync().GetSession(); - const TString query = fmt::format(R"( - CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( - SOURCE_TYPE="ObjectStorage", - LOCATION="{location}", - AUTH_METHOD="NONE" - );)", - "external_source"_a = externalDataSourceName, - "location"_a = GetBucketLocation(bucket) - ); - auto result = session.ExecuteSchemeQuery(query).GetValueSync(); - UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); - auto db = kikimr->GetQueryClient(); - auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"( - SELECT * FROM `{external_source}`.`/` WITH ( - format="json_each_row", - schema( - key Utf8 NOT NULL, - value Utf8 NOT NULL - ) - ) - )", "external_source"_a = externalDataSourceName)).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); - UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); - - NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); - UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); - TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync(); - UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); - - TResultSetParser resultSet(results.ExtractResultSet()); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2); - UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 2); - UNIT_ASSERT(resultSet.TryNextRow()); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1"); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo"); - UNIT_ASSERT(resultSet.TryNextRow()); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2"); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world"); - } - - Y_UNIT_TEST(ExecuteScriptWithDataSourceJoinYdb) { - const TString externalDataSourceName = "/Root/external_data_source_2"; - const TString ydbTable = "/Root/ydb_table"; - const TString bucket = "test_bucket4"; - - CreateBucketWithObject(bucket, "test_object", TEST_CONTENT); - - auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); - auto tc = kikimr->GetTableClient(); - auto session = tc.CreateSession().GetValueSync().GetSession(); - { - const TString query = fmt::format(R"( - CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( - SOURCE_TYPE="ObjectStorage", - LOCATION="{location}", - AUTH_METHOD="NONE" - ); - CREATE TABLE `{ydb_table}` ( - key Utf8, - value Utf8, - PRIMARY KEY (key) - ); - )", - "external_source"_a = externalDataSourceName, - "location"_a = GetBucketLocation(bucket), - "ydb_table"_a = ydbTable - ); - auto result = session.ExecuteSchemeQuery(query).GetValueSync(); - UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); - } - { - const TString query = fmt::format(R"( - REPLACE INTO `{ydb_table}` (key, value) VALUES - ("1", "one"), - ("2", "two") - )", - "ydb_table"_a = ydbTable - ); - auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()).GetValueSync(); - UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); - } - auto db = kikimr->GetQueryClient(); - auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"( - SELECT t1.key as key, t1.value as v1, t2.value as v2 FROM `{external_source}`.`/` WITH ( - format="json_each_row", - schema( - key Utf8 NOT NULL, - value Utf8 NOT NULL - ) - ) AS t1 JOIN `ydb_table` AS t2 ON t1.key = t2.key - ORDER BY key - )" - , "external_source"_a = externalDataSourceName - , "ydb_table"_a = ydbTable)).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); - UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); - - NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); - UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); - TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync(); - UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); - - TResultSetParser resultSet(results.ExtractResultSet()); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 3); - UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 2); - UNIT_ASSERT(resultSet.TryNextRow()); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1"); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo"); - UNIT_ASSERT_VALUES_EQUAL(*resultSet.ColumnParser(2).GetOptionalUtf8(), "one"); - UNIT_ASSERT(resultSet.TryNextRow()); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2"); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world"); - UNIT_ASSERT_VALUES_EQUAL(*resultSet.ColumnParser(2).GetOptionalUtf8(), "two"); - } - - Y_UNIT_TEST(ExecuteScriptWithExternalTableResolveCheckPragma) { - const TString externalDataSourceName = "/Root/external_data_source"; - const TString externalTableName = "/Root/test_binding_resolve"; - const TString bucket = "test_bucket5"; - const TString object = "test_object"; - - CreateBucketWithObject(bucket, object, TEST_CONTENT); - - auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); - - auto tc = kikimr->GetTableClient(); - auto session = tc.CreateSession().GetValueSync().GetSession(); - const TString query = fmt::format(R"( - CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( - SOURCE_TYPE="ObjectStorage", - LOCATION="{location}", - AUTH_METHOD="NONE" - ); - CREATE EXTERNAL TABLE `{external_table}` ( - key Utf8 NOT NULL, - value Utf8 NOT NULL - ) WITH ( - DATA_SOURCE="{external_source}", - LOCATION="{object}", - FORMAT="json_each_row" - );)", - "external_source"_a = externalDataSourceName, - "external_table"_a = externalTableName, - "location"_a = GetBucketLocation(bucket), - "object"_a = object - ); - auto result = session.ExecuteSchemeQuery(query).GetValueSync(); - UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); - - auto db = kikimr->GetQueryClient(); - auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"( - PRAGMA s3.JsonListSizeLimit = "10"; - PRAGMA s3.SourceCoroActor = 'true'; - PRAGMA kikimr.OptEnableOlapPushdown = "false"; - SELECT * FROM `{external_table}` - )", "external_table"_a=externalTableName)).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); - UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); - - NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); - UNIT_ASSERT_C(readyOp.Metadata().ExecStatus == EExecStatus::Completed, readyOp.Status().GetIssues().ToString()); - TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync(); - UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); - - TResultSetParser resultSet(results.ExtractResultSet()); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2); - UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 2); - - UNIT_ASSERT(resultSet.TryNextRow()); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1"); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo"); - - UNIT_ASSERT(resultSet.TryNextRow()); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2"); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world"); - } - - Y_UNIT_TEST(ExecuteScriptWithDataSourceJoinYdbCheckPragma) { - const TString externalDataSourceName = "/Root/external_data_source_2"; - const TString ydbTable = "/Root/ydb_table"; - const TString bucket = "test_bucket6"; - - CreateBucketWithObject(bucket, "test_object", TEST_CONTENT); - - auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); - auto tc = kikimr->GetTableClient(); - auto session = tc.CreateSession().GetValueSync().GetSession(); - { - const TString query = fmt::format(R"( - CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( - SOURCE_TYPE="ObjectStorage", - LOCATION="{location}", - AUTH_METHOD="NONE" - ); - CREATE TABLE `{ydb_table}` ( - key Utf8, - value Utf8, - PRIMARY KEY (key) - ); - )", - "external_source"_a = externalDataSourceName, - "location"_a = GetBucketLocation(bucket), - "ydb_table"_a = ydbTable - ); - auto result = session.ExecuteSchemeQuery(query).GetValueSync(); - UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); - } - { - const TString query = fmt::format(R"( - REPLACE INTO `{ydb_table}` (key, value) VALUES - ("1", "one"), - ("2", "two") - )", - "ydb_table"_a = ydbTable - ); - auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()).GetValueSync(); - UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); - } - auto db = kikimr->GetQueryClient(); - auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"( - PRAGMA s3.JsonListSizeLimit = "10"; - PRAGMA s3.SourceCoroActor = 'true'; - PRAGMA kikimr.OptEnableOlapPushdown = "false"; - SELECT t1.key as key, t1.value as v1, t2.value as v2 FROM `{external_source}`.`/` WITH ( - format="json_each_row", - schema( - key Utf8 NOT NULL, - value Utf8 NOT NULL - ) - ) AS t1 JOIN `ydb_table` AS t2 ON t1.key = t2.key - ORDER BY key - )" - , "external_source"_a = externalDataSourceName - , "ydb_table"_a = ydbTable)).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); - UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); - - NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); - UNIT_ASSERT_C(readyOp.Metadata().ExecStatus == EExecStatus::Completed, readyOp.Status().GetIssues().ToString()); - TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync(); - UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); - - TResultSetParser resultSet(results.ExtractResultSet()); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 3); - UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 2); - UNIT_ASSERT(resultSet.TryNextRow()); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1"); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo"); - UNIT_ASSERT_VALUES_EQUAL(*resultSet.ColumnParser(2).GetOptionalUtf8(), "one"); - UNIT_ASSERT(resultSet.TryNextRow()); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2"); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world"); - UNIT_ASSERT_VALUES_EQUAL(*resultSet.ColumnParser(2).GetOptionalUtf8(), "two"); - } - - Y_UNIT_TEST(ExecuteScriptWithDataSourceAndTablePathPrefix) { - const TString externalDataSourceName = "external_data_source"; - const TString bucket = "test_bucket7"; - - CreateBucketWithObject(bucket, "test_object", TEST_CONTENT); - - auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); - auto tc = kikimr->GetTableClient(); - auto session = tc.CreateSession().GetValueSync().GetSession(); - const TString query = fmt::format(R"( - CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( - SOURCE_TYPE="ObjectStorage", - LOCATION="{location}", - AUTH_METHOD="NONE" - );)", - "external_source"_a = externalDataSourceName, - "location"_a = GetEnv("S3_ENDPOINT") + "/" + bucket + "/" - ); - auto result = session.ExecuteSchemeQuery(query).GetValueSync(); - UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); - auto db = kikimr->GetQueryClient(); - auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"( - SELECT * FROM `{external_source}`.`*` WITH ( - format="json_each_row", - schema( - key Utf8 NOT NULL, - value Utf8 NOT NULL - ) - ) - )", "external_source"_a = externalDataSourceName)).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); - UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); - - NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); - UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); - TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync(); - UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); - - TResultSetParser resultSet(results.ExtractResultSet()); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2); - UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 2); - UNIT_ASSERT(resultSet.TryNextRow()); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1"); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo"); - UNIT_ASSERT(resultSet.TryNextRow()); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2"); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world"); - } - - std::pair ExecuteScriptOverBinding(NKikimrConfig::TTableServiceConfig::EBindingsMode mode) { - const TString externalDataSourceName = "/Root/external_data_source"; - const TString externalTableName = "/Root/test_binding_resolve"; - const TString bucket = "test_bucket1"; - const TString object = "test_object"; - - CreateBucketWithObject(bucket, object, TEST_CONTENT); - - auto appConfig = std::make_optional(); - appConfig->MutableTableServiceConfig()->SetBindingsMode(mode); - - auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make(), nullptr, nullptr, appConfig); - - auto tc = kikimr->GetTableClient(); - auto session = tc.CreateSession().GetValueSync().GetSession(); - const TString query = fmt::format(R"( - CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( - SOURCE_TYPE="ObjectStorage", - LOCATION="{location}", - AUTH_METHOD="NONE" - ); - CREATE EXTERNAL TABLE `{external_table}` ( - key Utf8 NOT NULL, - value Utf8 NOT NULL - ) WITH ( - DATA_SOURCE="{external_source}", - LOCATION="{object}", - FORMAT="json_each_row" - );)", - "external_source"_a = externalDataSourceName, - "external_table"_a = externalTableName, - "location"_a = GetEnv("S3_ENDPOINT") + "/" + bucket + "/", - "object"_a = object - ); - auto result = session.ExecuteSchemeQuery(query).GetValueSync(); - UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); - - const TString sql = fmt::format(R"( - SELECT * FROM bindings.`{external_table}` - )", "external_table"_a=externalTableName); - - auto db = kikimr->GetQueryClient(); - auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); - UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); - - NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); - TFetchScriptResultsResult results(TStatus(EStatus::SUCCESS, {})); - if (readyOp.Metadata().ExecStatus == EExecStatus::Completed) { - results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync(); - UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); - } - return {readyOp, results}; - } - - Y_UNIT_TEST(ExecuteScriptWithDifferentBindingsMode) { - { - auto [readyOp, results] = ExecuteScriptOverBinding(NKikimrConfig::TTableServiceConfig::BM_DROP); - UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); - UNIT_ASSERT_VALUES_EQUAL(readyOp.Status().GetIssues().ToString(), ""); - - TResultSetParser resultSet(results.ExtractResultSet()); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2); - UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 2); - - UNIT_ASSERT(resultSet.TryNextRow()); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1"); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo"); - - UNIT_ASSERT(resultSet.TryNextRow()); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2"); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world"); - - } - - { - auto [readyOp, results] = ExecuteScriptOverBinding(NKikimrConfig::TTableServiceConfig::BM_DROP_WITH_WARNING); - UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); - UNIT_ASSERT_VALUES_EQUAL(readyOp.Status().GetIssues().ToString(), "
:2:31: Warning: Please remove 'bindings.' from your query, the support for this syntax will be dropped soon, code: 4538\n"); - - TResultSetParser resultSet(results.ExtractResultSet()); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2); - UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 2); - - UNIT_ASSERT(resultSet.TryNextRow()); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1"); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo"); - - UNIT_ASSERT(resultSet.TryNextRow()); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2"); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world"); - } - - { - auto [readyOp, results] = ExecuteScriptOverBinding(NKikimrConfig::TTableServiceConfig::BM_ENABLED); - UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Failed); - UNIT_ASSERT_VALUES_EQUAL(readyOp.Status().GetIssues().ToString(), "
:2:40: Error: Table binding `/Root/test_binding_resolve` is not defined\n"); - } - - { - auto [readyOp, results] = ExecuteScriptOverBinding(NKikimrConfig::TTableServiceConfig::BM_DISABLED); - UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Failed); - UNIT_ASSERT_VALUES_EQUAL(readyOp.Status().GetIssues().ToString(), "
:2:31: Error: Please remove 'bindings.' from your query, the support for this syntax has ended, code: 4601\n"); - } - } - - Y_UNIT_TEST(InsertIntoBucket) { - const TString readDataSourceName = "/Root/read_data_source"; - const TString readTableName = "/Root/read_binding"; - const TString readBucket = "test_bucket_read"; - const TString readObject = "test_object_read"; - const TString writeDataSourceName = "/Root/write_data_source"; - const TString writeTableName = "/Root/write_binding"; - const TString writeBucket = "test_bucket_write"; - const TString writeObject = "test_object_write/"; - - { - Aws::S3::S3Client s3Client = MakeS3Client(); - CreateBucketWithObject(readBucket, readObject, TEST_CONTENT, s3Client); - CreateBucket(writeBucket, s3Client); - } - - auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); - - auto tc = kikimr->GetTableClient(); - auto session = tc.CreateSession().GetValueSync().GetSession(); - const TString query = fmt::format(R"( - CREATE EXTERNAL DATA SOURCE `{read_source}` WITH ( - SOURCE_TYPE="ObjectStorage", - LOCATION="{read_location}", - AUTH_METHOD="NONE" - ); - CREATE EXTERNAL TABLE `{read_table}` ( - key Utf8 NOT NULL, - value Utf8 NOT NULL - ) WITH ( - DATA_SOURCE="{read_source}", - LOCATION="{read_object}", - FORMAT="json_each_row" - ); - - CREATE EXTERNAL DATA SOURCE `{write_source}` WITH ( - SOURCE_TYPE="ObjectStorage", - LOCATION="{write_location}", - AUTH_METHOD="NONE" - ); - CREATE EXTERNAL TABLE `{write_table}` ( - key Utf8 NOT NULL, - value Utf8 NOT NULL - ) WITH ( - DATA_SOURCE="{write_source}", - LOCATION="{write_object}", - FORMAT="tsv_with_names" - ); - )", - "read_source"_a = readDataSourceName, - "read_table"_a = readTableName, - "read_location"_a = GetBucketLocation(readBucket), - "read_object"_a = readObject, - "write_source"_a = writeDataSourceName, - "write_table"_a = writeTableName, - "write_location"_a = GetBucketLocation(writeBucket), - "write_object"_a = writeObject - ); - auto result = session.ExecuteSchemeQuery(query).GetValueSync(); - UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); - - const TString sql = fmt::format(R"( - INSERT INTO `{write_table}` - SELECT * FROM `{read_table}` - )", - "read_table"_a=readTableName, - "write_table"_a = writeTableName); - - auto db = kikimr->GetQueryClient(); - auto resultFuture = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()); - resultFuture.Wait(); - UNIT_ASSERT_C(resultFuture.GetValueSync().IsSuccess(), resultFuture.GetValueSync().GetIssues().ToString()); - - TString content = GetAllObjects(writeBucket); - UNIT_ASSERT_STRING_CONTAINS(content, "key\tvalue\n"); // tsv header - UNIT_ASSERT_STRING_CONTAINS(content, "1\ttrololo\n"); - UNIT_ASSERT_STRING_CONTAINS(content, "2\thello world\n"); - } - - void ExecuteInsertQuery(TQueryClient& client, const TString& writeTableName, const TString& readTableName, bool expectCached) { - const TString sql = fmt::format(R"( - INSERT INTO `{write_table}` - SELECT * FROM `{read_table}`; - )", - "write_table"_a = writeTableName, - "read_table"_a = readTableName); - auto settings = TExecuteQuerySettings().StatsMode(EStatsMode::Basic); - auto resultFuture = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings); - resultFuture.Wait(); - UNIT_ASSERT_C(resultFuture.GetValueSync().IsSuccess(), resultFuture.GetValueSync().GetIssues().ToString()); - auto& stats = NYdb::TProtoAccessor::GetProto(*resultFuture.GetValueSync().GetStats()); - UNIT_ASSERT_EQUAL_C(stats.compilation().from_cache(), expectCached, "expected: " << expectCached); - } - - Y_UNIT_TEST(InsertIntoBucketCaching) { - const TString writeDataSourceName = "/Root/write_data_source"; - const TString writeTableName = "/Root/write_binding"; - const TString writeBucket = "test_bucket_cache"; - const TString writeObject = "test_object_write/"; - const TString writeAnotherObject = "test_another_object_write/"; - const TString readTableName = "/Root/read_table"; - { - Aws::S3::S3Client s3Client = MakeS3Client(); - CreateBucket(writeBucket, s3Client); - } - - auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); - - auto tc = kikimr->GetTableClient(); - auto session = tc.CreateSession().GetValueSync().GetSession(); - { - const TString query = fmt::format(R"( - CREATE EXTERNAL DATA SOURCE `{write_source}` WITH ( - SOURCE_TYPE="ObjectStorage", - LOCATION="{write_location}", - AUTH_METHOD="NONE" - ); - CREATE EXTERNAL TABLE `{write_table}` ( - key Utf8 NOT NULL, - value Utf8 NOT NULL - ) WITH ( - DATA_SOURCE="{write_source}", - LOCATION="{write_object}", - FORMAT="tsv_with_names" - ); - - CREATE TABLE `{read_table}` ( - key Utf8 NOT NULL, - value Utf8 NOT NULL, - PRIMARY KEY (key) - ); - )", - "write_source"_a = writeDataSourceName, - "write_table"_a = writeTableName, - "write_location"_a = GetBucketLocation(writeBucket), - "write_object"_a = writeObject, - "read_table"_a = readTableName); - - - auto result = session.ExecuteSchemeQuery(query).GetValueSync(); - UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); - } - { - const TString query = fmt::format(R"( - REPLACE INTO `{read_table}` (key, value) VALUES - ("1", "one") - )", - "read_table"_a = readTableName); - auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()).GetValueSync(); - UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); - } - auto db = kikimr->GetQueryClient(); - ExecuteInsertQuery(db, writeTableName, readTableName, false); - ExecuteInsertQuery(db, writeTableName, readTableName, true); - { - const TString modifyQuery = fmt::format(R"( - DROP EXTERNAL TABLE `{write_table}`; - CREATE EXTERNAL TABLE `{write_table}` ( - key Utf8 NOT NULL, - value Utf8 NOT NULL - ) WITH ( - DATA_SOURCE="{write_source}", - LOCATION="{write_object}", - FORMAT="tsv_with_names" - ); - )", - "write_table"_a = writeTableName, - "write_object"_a = writeAnotherObject, - "write_source"_a = writeDataSourceName); - auto result = session.ExecuteSchemeQuery(modifyQuery).GetValueSync(); - UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); - } - ExecuteInsertQuery(db, writeTableName, readTableName, false); - - UNIT_ASSERT_EQUAL(GetObjectKeys(writeBucket).size(), 3); - } - - Y_UNIT_TEST(UpdateExternalTable) { - const TString readDataSourceName = "/Root/read_data_source"; - const TString readTableName = "/Root/read_binding"; - const TString readBucket = "test_bucket_read"; - const TString readObject = "test_object_read"; - - auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); - - auto tc = kikimr->GetTableClient(); - auto session = tc.CreateSession().GetValueSync().GetSession(); - const TString query = fmt::format(R"( - CREATE EXTERNAL DATA SOURCE `{read_source}` WITH ( - SOURCE_TYPE="ObjectStorage", - LOCATION="{read_location}", - AUTH_METHOD="NONE" - ); - CREATE EXTERNAL TABLE `{read_table}` ( - key Utf8 NOT NULL, - value Utf8 NOT NULL - ) WITH ( - DATA_SOURCE="{read_source}", - LOCATION="{read_object}", - FORMAT="json_each_row" - ); - )", - "read_source"_a = readDataSourceName, - "read_table"_a = readTableName, - "read_location"_a = GetBucketLocation(readBucket), - "read_object"_a = readObject - ); - auto result = session.ExecuteSchemeQuery(query).GetValueSync(); - UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); - - const TString sql = R"( - UPDATE `/Root/read_binding` - SET key = "abc"u - )"; - - auto db = kikimr->GetQueryClient(); - auto resultFuture = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()); - resultFuture.Wait(); - UNIT_ASSERT_C(!resultFuture.GetValueSync().IsSuccess(), resultFuture.GetValueSync().GetIssues().ToString()); - - UNIT_ASSERT_NO_DIFF(resultFuture.GetValueSync().GetIssues().ToString(), "
: Error: Pre type annotation, code: 1020\n" - "
:3:27: Error: Write mode 'update' is not supported for external entities\n"); - } - - Y_UNIT_TEST(JoinTwoSources) { - const TString dataSource = "/Root/data_source"; - const TString bucket = "test_bucket_mixed"; - const TString dataTable = "/Root/data"; - const TString dataObject = "data"; - const TString keysTable = "/Root/keys"; - const TString keysObject = "keys"; - - { - Aws::S3::S3Client s3Client = MakeS3Client(); - CreateBucket(bucket, s3Client); - UploadObject(bucket, dataObject, TEST_CONTENT, s3Client); - UploadObject(bucket, keysObject, TEST_CONTENT_KEYS, s3Client); - } - - auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); - - auto tc = kikimr->GetTableClient(); - auto session = tc.CreateSession().GetValueSync().GetSession(); - const TString query = fmt::format(R"( - CREATE EXTERNAL DATA SOURCE `{data_source}` WITH ( - SOURCE_TYPE="ObjectStorage", - LOCATION="{bucket_location}", - AUTH_METHOD="NONE" - ); - - CREATE EXTERNAL TABLE `{data_table}` ( - key Utf8 NOT NULL, - value Utf8 NOT NULL - ) WITH ( - DATA_SOURCE="{data_source}", - LOCATION="{data_object}", - FORMAT="json_each_row" - ); - - CREATE EXTERNAL TABLE `{keys_table}` ( - key Utf8 NOT NULL - ) WITH ( - DATA_SOURCE="{data_source}", - LOCATION="{keys_object}", - FORMAT="json_each_row" - ); - )", - "data_source"_a = dataSource, - "bucket_location"_a = GetBucketLocation(bucket), - "data_table"_a = dataTable, - "data_object"_a = dataObject, - "keys_table"_a = keysTable, - "keys_object"_a = keysObject - ); - auto schemeQueryesult = session.ExecuteSchemeQuery(query).GetValueSync(); - UNIT_ASSERT_C(schemeQueryesult.GetStatus() == NYdb::EStatus::SUCCESS, schemeQueryesult.GetIssues().ToString()); - - const TString sql = fmt::format(R"( - SELECT * FROM `{data_table}` - WHERE key IN ( - SELECT key FROM `{keys_table}` - ) - )", - "data_table"_a = dataTable, - "keys_table"_a = keysTable); - - auto db = kikimr->GetQueryClient(); - auto resultFuture = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()); - resultFuture.Wait(); - UNIT_ASSERT_C(resultFuture.GetValueSync().IsSuccess(), resultFuture.GetValueSync().GetIssues().ToString()); - auto result = resultFuture.GetValueSync().GetResultSetParser(0); - UNIT_ASSERT_VALUES_EQUAL(result.RowsCount(), 1); - UNIT_ASSERT(result.TryNextRow()); - UNIT_ASSERT_VALUES_EQUAL(result.ColumnParser("key").GetUtf8(), "1"); - UNIT_ASSERT_VALUES_EQUAL(result.ColumnParser("value").GetUtf8(), "trololo"); - UNIT_ASSERT(!result.TryNextRow()); - } - - Y_UNIT_TEST(ExecuteScriptWithExternalTableResolveCheckPartitionedBy) { - const TString externalDataSourceName = "/Root/external_data_source"; - const TString externalTableName = "/Root/test_binding_resolve"; - const TString bucket = "test_bucket1"; - const TString object = "year=1/month=2/test_object"; - const TString content = "data,year,month\ntest,1,2"; - - CreateBucketWithObject(bucket, object, content); - - auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); - - auto tc = kikimr->GetTableClient(); - auto session = tc.CreateSession().GetValueSync().GetSession(); - const TString query = fmt::format(R"( - CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( - SOURCE_TYPE="ObjectStorage", - LOCATION="{location}", - AUTH_METHOD="NONE" - ); - CREATE EXTERNAL TABLE `{external_table}` ( - data STRING NOT NULL, - year UINT32 NOT NULL, - month UINT32 NOT NULL - ) WITH ( - DATA_SOURCE="{external_source}", - LOCATION="/", - FORMAT="csv_with_names", - PARTITIONED_BY="[year, month]" - );)", - "external_source"_a = externalDataSourceName, - "external_table"_a = externalTableName, - "location"_a = GetBucketLocation(bucket) - ); - - auto result = session.ExecuteSchemeQuery(query).GetValueSync(); - UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); - - auto db = kikimr->GetQueryClient(); - const TString sql = fmt::format(R"( - SELECT * FROM `{external_table}` - )", "external_table"_a = externalTableName); - - auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); - - NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); - UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); - TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync(); - UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); - - TResultSetParser resultSet(results.ExtractResultSet()); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 3); - UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 1); - - UNIT_ASSERT(resultSet.TryNextRow()); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser("data").GetString(), "test"); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser("year").GetUint32(), 1); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser("month").GetUint32(), 2); - } - - Y_UNIT_TEST(ExecuteScriptWithEmptyCustomPartitioning) { - const TString bucket = "test_bucket1"; - const TString object = "year=2021/test_object"; - - CreateBucketWithObject(bucket, object, ""); - - auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); - - auto tc = kikimr->GetTableClient(); - auto session = tc.CreateSession().GetValueSync().GetSession(); - const TString query = fmt::format(R"( - CREATE EXTERNAL DATA SOURCE `/Root/external_data_source` WITH ( - SOURCE_TYPE="ObjectStorage", - LOCATION="{location}", - AUTH_METHOD="NONE" - );)", - "location"_a = GetBucketLocation(bucket) - ); - - auto result = session.ExecuteSchemeQuery(query).GetValueSync(); - UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); - - auto db = kikimr->GetQueryClient(); - const TString sql = R"( - $projection = @@ { - "projection.enabled" : "true", - "storage.location.template" : "/${date}", - "projection.date.type" : "date", - "projection.date.min" : "2022-11-02", - "projection.date.max" : "2023-12-02", - "projection.date.interval" : "1", - "projection.date.format" : "/year=%Y", - "projection.date.unit" : "YEARS" - } @@; - - SELECT * - FROM `/Root/external_data_source`.`/` - WITH ( - FORMAT="raw", - - SCHEMA=( - `data` String NOT NULL, - `date` Date NOT NULL - ), - - partitioned_by=(`date`), - projection=$projection - ) - )"; - - auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); - - NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); - UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); - TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync(); - UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); - - TResultSetParser resultSet(results.ExtractResultSet()); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2); - UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 0); - } - - Y_UNIT_TEST(ExecuteScriptWithTruncatedMultiplyResults) { - const TString bucket = "test_bucket"; - CreateBucket(bucket); - - constexpr size_t NUMBER_OF_OBJECTS = 20; - constexpr size_t ROWS_LIMIT = NUMBER_OF_OBJECTS / 2; - - const TString object = "/test_object"; - const TString content = "test content"; - for (size_t i = 0; i < NUMBER_OF_OBJECTS; ++i) { - UploadObject(bucket, object + ToString(i), content); - } - - NKikimrConfig::TAppConfig appCfg; - appCfg.MutableQueryServiceConfig()->set_scriptresultrowslimit(ROWS_LIMIT); - appCfg.MutableTableServiceConfig()->MutableQueryLimits()->set_resultrowslimit(ROWS_LIMIT); - - auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make(), nullptr, nullptr, appCfg); - - auto tc = kikimr->GetTableClient(); - auto session = tc.CreateSession().GetValueSync().GetSession(); - const TString query = fmt::format(R"( - CREATE EXTERNAL DATA SOURCE `/Root/external_data_source` WITH ( - SOURCE_TYPE="ObjectStorage", - LOCATION="{location}", - AUTH_METHOD="NONE" - );)", - "location"_a = GetBucketLocation(bucket) - ); - - auto result = session.ExecuteSchemeQuery(query).GetValueSync(); - UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); - - auto db = kikimr->GetQueryClient(); - const TString sql = R"( - SELECT `data` - FROM `/Root/external_data_source`.`/` - WITH ( - FORMAT="raw", - SCHEMA=( - `data` String NOT NULL - ) - ) - )"; - - auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); - - NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); - UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); - TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync(); - UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); - - TResultSetParser resultSet(results.ExtractResultSet()); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 1); - UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), ROWS_LIMIT); - - for (size_t i = 0; i < ROWS_LIMIT; ++i) { - resultSet.TryNextRow(); - UNIT_ASSERT_VALUES_EQUAL(resultSet.GetValue(0).GetProto().bytes_value(), content); - } - } - - Y_UNIT_TEST(ForbiddenCallablesForYdbTables) { - const TString readDataSourceName = "/Root/read_data_source"; - const TString readTableName = "/Root/read_table"; - const TString readBucket = "test_read_bucket_forbidden_callables"; - const TString readObject = "test_object_forbidden_callables"; - const TString writeDataSourceName = "/Root/write_data_source"; - const TString writeTableName = "/Root/write_table"; - const TString writeBucket = "test_write_bucket_forbidden_callables"; - const TString writeObject = "test_object_forbidden_callables/"; - const TString writeYdbTable = "/Root/test_ydb_table"; - - { - Aws::S3::S3Client s3Client = MakeS3Client(); - CreateBucketWithObject(readBucket, readObject, TEST_CONTENT, s3Client); - CreateBucket(writeBucket, s3Client); - } - - auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); - - auto tc = kikimr->GetTableClient(); - auto session = tc.CreateSession().GetValueSync().GetSession(); - const TString query = fmt::format(R"( - CREATE EXTERNAL DATA SOURCE `{read_source}` WITH ( - SOURCE_TYPE="ObjectStorage", - LOCATION="{read_location}", - AUTH_METHOD="NONE" - ); - CREATE EXTERNAL TABLE `{read_table}` ( - key Utf8, -- Nullable - value Utf8 -- Nullable - ) WITH ( - DATA_SOURCE="{read_source}", - LOCATION="{read_object}", - FORMAT="json_each_row" - ); - - CREATE EXTERNAL DATA SOURCE `{write_source}` WITH ( - SOURCE_TYPE="ObjectStorage", - LOCATION="{write_location}", - AUTH_METHOD="NONE" - ); - CREATE EXTERNAL TABLE `{write_table}` ( - key Utf8 NOT NULL, - value Utf8 NOT NULL - ) WITH ( - DATA_SOURCE="{write_source}", - LOCATION="{write_object}", - FORMAT="json_each_row" - ); - - CREATE TABLE `{write_ydb_table}` ( - key Utf8 NOT NULL, - value Utf8 NOT NULL, - PRIMARY KEY (key) - ); - )", - "read_source"_a = readDataSourceName, - "read_table"_a = readTableName, - "read_location"_a = GetBucketLocation(readBucket), - "read_object"_a = readObject, - "write_source"_a = writeDataSourceName, - "write_table"_a = writeTableName, - "write_location"_a = GetBucketLocation(writeBucket), - "write_object"_a = writeObject, - "write_ydb_table"_a = writeYdbTable - ); - auto result = session.ExecuteSchemeQuery(query).GetValueSync(); - UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); - - // Forbidden callable like Unwrap is allowed in S3-only queries, - // but not allowed in mixed queries. - { - const TString sql = fmt::format(R"( - INSERT INTO `{write_table}` - SELECT Unwrap(key) AS key, Unwrap(value) AS value FROM `{read_table}` - )", - "read_table"_a=readTableName, - "write_table"_a = writeTableName); - - auto db = kikimr->GetQueryClient(); - auto resultFuture = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()); - resultFuture.Wait(); - UNIT_ASSERT_C(resultFuture.GetValueSync().IsSuccess(), resultFuture.GetValueSync().GetIssues().ToString()); - } - - // Unwrap is used in query with effect applied to YDB table. - { - const TString sql = fmt::format(R"( - INSERT INTO `{write_table}` - SELECT Unwrap(key) AS key, Unwrap(value) AS value FROM `{read_table}`; - - DELETE FROM `{write_ydb_table}` - WHERE key = "42"; - )", - "read_table"_a=readTableName, - "write_table"_a = writeTableName, - "write_ydb_table"_a = writeYdbTable); - - auto db = kikimr->GetQueryClient(); - auto resultFuture = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()); - resultFuture.Wait(); - UNIT_ASSERT_C(!resultFuture.GetValueSync().IsSuccess(), resultFuture.GetValueSync().GetIssues().ToString()); - UNIT_ASSERT_STRING_CONTAINS(resultFuture.GetValueSync().GetIssues().ToString(), "Callable not expected in effects tx: Unwrap"); - } - } - - Y_UNIT_TEST(ExecuteScriptWithLocationWithoutSlashAtTheEnd) { - const TString externalDataSourceName = "/Root/external_data_source"; - const TString externalTableName = "/Root/test_binding_resolve"; - const TString bucket = "test_bucket_with_location_without_slash_at_the_end"; - const TString object = "year=1/month=2/test_object"; - const TString content = "data,year,month\ntest,1,2"; - - CreateBucketWithObject(bucket, object, content); - - auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); - - auto tc = kikimr->GetTableClient(); - auto session = tc.CreateSession().GetValueSync().GetSession(); - const TString query = fmt::format(R"( - CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( - SOURCE_TYPE="ObjectStorage", - LOCATION="{location}", - AUTH_METHOD="NONE" - ); - CREATE EXTERNAL TABLE `{external_table}` ( - data STRING NOT NULL, - year UINT32 NOT NULL, - month UINT32 NOT NULL - ) WITH ( - DATA_SOURCE="{external_source}", - LOCATION="/", - FORMAT="csv_with_names", - PARTITIONED_BY="[year, month]" - );)", - "external_source"_a = externalDataSourceName, - "external_table"_a = externalTableName, - "location"_a = TStringBuilder() << GetEnv("S3_ENDPOINT") << '/' << bucket - ); - - auto result = session.ExecuteSchemeQuery(query).GetValueSync(); - UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); - - auto db = kikimr->GetQueryClient(); - const TString sql = fmt::format(R"( - SELECT * FROM `{external_table}` - )", "external_table"_a = externalTableName); - - auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); - - NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); - UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); - TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync(); - UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); - - TResultSetParser resultSet(results.ExtractResultSet()); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 3); - UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 1); - - UNIT_ASSERT(resultSet.TryNextRow()); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser("data").GetString(), "test"); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser("year").GetUint32(), 1); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser("month").GetUint32(), 2); - } - - TString CreateSimpleGenericQuery(std::shared_ptr kikimr, const TString& bucket) { - using namespace fmt::literals; - const TString externalDataSourceName = "/Root/external_data_source"; - const TString object = "test_object"; - const TString content = "key\n1"; - - CreateBucketWithObject(bucket, object, content); - - auto tc = kikimr->GetTableClient(); - auto session = tc.CreateSession().GetValueSync().GetSession(); - const TString query = fmt::format(R"( - CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( - SOURCE_TYPE="ObjectStorage", - LOCATION="{location}", - AUTH_METHOD="NONE" - );)", - "external_source"_a = externalDataSourceName, - "location"_a = GetBucketLocation(bucket) - ); - auto result = session.ExecuteSchemeQuery(query).GetValueSync(); - UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); - - return fmt::format(R"( - SELECT * FROM `{external_source}`.`/` WITH ( - FORMAT="csv_with_names", - SCHEMA ( - key Int NOT NULL - ) - ) - )", "external_source"_a=externalDataSourceName); - } - - Y_UNIT_TEST(ExecuteScriptWithGenericAutoDetection) { - auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); - const TString sql = CreateSimpleGenericQuery(kikimr, "test_bucket_execute_generic_auto_detection"); - - auto driver = kikimr->GetDriver(); - NScripting::TScriptingClient yqlScriptClient(driver); - - auto scriptResult = yqlScriptClient.ExecuteYqlScript(sql).GetValueSync(); - UNIT_ASSERT_C(scriptResult.IsSuccess(), scriptResult.GetIssues().ToString()); - - TResultSetParser resultSet(scriptResult.GetResultSet(0)); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 1); - UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 1); - - UNIT_ASSERT(resultSet.TryNextRow()); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser("key").GetInt32(), 1); - } - - Y_UNIT_TEST(ExplainScriptWithGenericAutoDetection) { - auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); - const TString sql = CreateSimpleGenericQuery(kikimr, "test_bucket_explain_generic_auto_detection"); - - auto driver = kikimr->GetDriver(); - NScripting::TScriptingClient yqlScriptClient(driver); - - NScripting::TExplainYqlRequestSettings settings; - settings.Mode(NScripting::ExplainYqlRequestMode::Plan); - - auto scriptResult = yqlScriptClient.ExplainYqlScript(sql, settings).GetValueSync(); - UNIT_ASSERT_C(scriptResult.IsSuccess(), scriptResult.GetIssues().ToString()); - UNIT_ASSERT(scriptResult.GetPlan()); - } - - Y_UNIT_TEST(ReadFromDataSourceWithoutTable) { - const TString externalDataSourceName = "/Root/external_data_source"; - const TString bucket = "test_bucket_inline_desc"; - const TString object = "test_object_inline_desc"; - - CreateBucketWithObject(bucket, object, TEST_CONTENT); - - auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); - - auto tc = kikimr->GetTableClient(); - auto session = tc.CreateSession().GetValueSync().GetSession(); - const TString query = fmt::format(R"sql( - CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( - SOURCE_TYPE="ObjectStorage", - LOCATION="{location}", - AUTH_METHOD="NONE" - );)sql", - "external_source"_a = externalDataSourceName, - "location"_a = GetBucketLocation(bucket), - "object"_a = object - ); - auto result = session.ExecuteSchemeQuery(query).GetValueSync(); - UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); - - { - const TString sql = fmt::format(R"sql( - SELECT * FROM `{external_data_source}`; - )sql", - "external_data_source"_a=externalDataSourceName); - - auto db = kikimr->GetQueryClient(); - auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); - UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); - - NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); - UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Failed); - UNIT_ASSERT_STRING_CONTAINS(readyOp.Status().GetIssues().ToString(), "Attempt to read from external data source"); - } - - // select using inline syntax is well - { - const TString sql = fmt::format(R"sql( - SELECT * FROM `{external_data_source}`.`{obj_path}` - WITH ( - SCHEMA = ( - key Utf8 NOT NULL, - value Utf8 NOT NULL - ), - FORMAT = "json_each_row" - ) - )sql", - "external_data_source"_a=externalDataSourceName, - "obj_path"_a = object); - - auto db = kikimr->GetQueryClient(); - auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); - UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); - - NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); - UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); - TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync(); - UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); - - TResultSetParser resultSet(results.ExtractResultSet()); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2); - UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 2); - - UNIT_ASSERT(resultSet.TryNextRow()); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1"); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo"); - - UNIT_ASSERT(resultSet.TryNextRow()); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2"); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world"); - } - } - - Y_UNIT_TEST(InsertIntoDataSourceWithoutTable) { - const TString readDataSourceName = "/Root/read_data_source"; - const TString readTableName = "/Root/read_binding"; - const TString readBucket = "test_bucket_read_insert_into_data_source"; - const TString readObject = "test_object_read"; - const TString writeDataSourceName = "/Root/write_data_source"; - const TString writeTableName = "/Root/write_binding"; - const TString writeBucket = "test_bucket_write_insert_into_data_source"; - const TString writeObject = "test_object_write/"; - - { - Aws::S3::S3Client s3Client = MakeS3Client(); - CreateBucketWithObject(readBucket, readObject, TEST_CONTENT, s3Client); - CreateBucket(writeBucket, s3Client); - } - - auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); - - auto tc = kikimr->GetTableClient(); - auto session = tc.CreateSession().GetValueSync().GetSession(); - const TString query = fmt::format(R"sql( - CREATE EXTERNAL DATA SOURCE `{read_source}` WITH ( - SOURCE_TYPE="ObjectStorage", - LOCATION="{read_location}", - AUTH_METHOD="NONE" - ); - CREATE EXTERNAL TABLE `{read_table}` ( - key Utf8 NOT NULL, - value Utf8 NOT NULL - ) WITH ( - DATA_SOURCE="{read_source}", - LOCATION="{read_object}", - FORMAT="json_each_row" - ); - - CREATE EXTERNAL DATA SOURCE `{write_source}` WITH ( - SOURCE_TYPE="ObjectStorage", - LOCATION="{write_location}", - AUTH_METHOD="NONE" - ); - )sql", - "read_source"_a = readDataSourceName, - "read_table"_a = readTableName, - "read_location"_a = GetBucketLocation(readBucket), - "read_object"_a = readObject, - "write_source"_a = writeDataSourceName, - "write_table"_a = writeTableName, - "write_location"_a = GetBucketLocation(writeBucket) - ); - auto schemeResult = session.ExecuteSchemeQuery(query).GetValueSync(); - UNIT_ASSERT_C(schemeResult.GetStatus() == NYdb::EStatus::SUCCESS, schemeResult.GetIssues().ToString()); - - { - const TString sql = fmt::format(R"sql( - INSERT INTO `{write_source}` - SELECT * FROM `{read_table}` - )sql", - "read_table"_a=readTableName, - "write_source"_a = writeDataSourceName); - - auto db = kikimr->GetQueryClient(); - auto result = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); - UNIT_ASSERT_C(!result.IsSuccess(), result.GetIssues().ToString()); - UNIT_ASSERT_EQUAL_C(result.GetStatus(), NYdb::EStatus::GENERIC_ERROR, static_cast(result.GetStatus()) << ", " << result.GetIssues().ToString()); - UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Attempt to write to external data source"); - } - - // insert with inline syntax is well - { - Cerr << "Run inplace insert" << Endl; - const TString sql = fmt::format(R"sql( - INSERT INTO `{write_source}`.`{write_object}` - WITH ( - SCHEMA = ( - key Utf8 NOT NULL, - value Utf8 NOT NULL - ), - FORMAT = "json_each_row" - ) - SELECT * FROM `{read_table}` - )sql", - "read_table"_a=readTableName, - "write_source"_a = writeDataSourceName, - "write_object"_a = writeObject); - - auto db = kikimr->GetQueryClient(); - auto result = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - } - } - - Y_UNIT_TEST(SpecifyExternalTableInsteadOfExternalDataSource) { - const TString externalDataSourceName = "external_data_source"; - const TString externalTableName = "external_table"; - const TString bucket = "test_bucket_specify_external_table"; - const TString object = "test_object_specify_external_table"; - - CreateBucketWithObject(bucket, object, TEST_CONTENT); - - auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); - - auto tc = kikimr->GetTableClient(); - auto session = tc.CreateSession().GetValueSync().GetSession(); - const TString query = fmt::format(R"sql( - CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( - SOURCE_TYPE="ObjectStorage", - LOCATION="{location}", - AUTH_METHOD="NONE" - ); - - CREATE EXTERNAL TABLE `{external_table}` ( - key Utf8 NOT NULL, - value Utf8 NOT NULL - ) WITH ( - DATA_SOURCE="{external_source}", - LOCATION="{object}", - FORMAT="json_each_row" - ); - )sql", - "external_source"_a = externalDataSourceName, - "external_table"_a = externalTableName, - "location"_a = GetBucketLocation(bucket), - "object"_a = object - ); - auto result = session.ExecuteSchemeQuery(query).GetValueSync(); - UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); - - { - const TString sql = fmt::format(R"sql( - SELECT * FROM `{external_table}`.`{object}` - WITH ( - SCHEMA = ( - key Utf8 NOT NULL, - value Utf8 NOT NULL - ), - FORMAT = "json_each_row" - ); - )sql", - "external_table"_a=externalTableName, - "object"_a = object); - - auto db = kikimr->GetQueryClient(); - auto queryExecutionOperation = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); - UNIT_ASSERT_EQUAL_C(queryExecutionOperation.GetStatus(), EStatus::BAD_REQUEST, static_cast(queryExecutionOperation.GetStatus()) << ", " << queryExecutionOperation.GetIssues().ToString()); - UNIT_ASSERT_STRING_CONTAINS(queryExecutionOperation.GetIssues().ToString(), "\"/Root/external_table\" is expected to be external data source"); - } - - { - const TString sql = fmt::format(R"sql( - INSERT INTO `{external_table}`.`{object}` - WITH ( - SCHEMA = ( - key Utf8 NOT NULL, - value Utf8 NOT NULL - ), - FORMAT = "json_each_row" - ) - SELECT * FROM `{external_table}` WHERE key = '42'; - )sql", - "external_table"_a=externalTableName, - "object"_a = object); - - auto db = kikimr->GetQueryClient(); - auto queryExecutionOperation = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); - UNIT_ASSERT_EQUAL_C(queryExecutionOperation.GetStatus(), EStatus::BAD_REQUEST, static_cast(queryExecutionOperation.GetStatus()) << ", " << queryExecutionOperation.GetIssues().ToString()); - UNIT_ASSERT_STRING_CONTAINS(queryExecutionOperation.GetIssues().ToString(), "\"/Root/external_table\" is expected to be external data source"); - } - } - - Y_UNIT_TEST(QueryWithNoDataInS3) { - const TString externalDataSourceName = "tpc_h_s3_storage_connection"; - const TString bucket = "test_bucket_no_data"; - - Aws::S3::S3Client s3Client = MakeS3Client(); - CreateBucket(bucket, s3Client); - // Uncomment if you want to compare with query with data - //UploadObject(bucket, "l/l", R"json({"l_extendedprice": 0.0, "l_discount": 1.0, "l_partkey": 1})json", s3Client); - //UploadObject(bucket, "p/p", R"json({"p_partkey": 1, "p_type": "t"})json", s3Client); - - auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); - auto client = kikimr->GetQueryClient(); - - { - const TString query = fmt::format(R"sql( - CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( - SOURCE_TYPE="ObjectStorage", - LOCATION="{location}", - AUTH_METHOD="NONE" - ); - )sql", - "external_source"_a = externalDataSourceName, - "location"_a = GetBucketLocation(bucket) - ); - auto result = client.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync(); - UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); - } - - { - // YQ-2750 - const TString query = fmt::format(R"sql( - $border = Date("1994-08-01"); - select - 100.00 * sum(case - when StartsWith(p.p_type, 'PROMO') - then l.l_extendedprice * (1 - l.l_discount) - else 0 - end) / sum(l.l_extendedprice * (1 - l.l_discount)) as promo_revenue - from - {external_source}.`l/` with ( schema ( - l_extendedprice double, - l_discount double, - l_partkey int64, - l_shipdate date - ), - format = "json_each_row" - ) as l - join - {external_source}.`p/` with ( schema ( - p_partkey int64, - p_type string - ), - format = "json_each_row" - ) as p - on - l.l_partkey = p.p_partkey - where - cast(l.l_shipdate as timestamp) >= $border - and cast(l.l_shipdate as timestamp) < ($border + Interval("P31D")); - )sql", - "external_source"_a = externalDataSourceName - ); - auto result = client.ExecuteQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync(); - UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); - auto rs = result.GetResultSetParser(0); - UNIT_ASSERT_VALUES_EQUAL(rs.RowsCount(), 1); - rs.TryNextRow(); - TMaybe sum = rs.ColumnParser(0).GetOptionalDouble(); - UNIT_ASSERT(!sum); - } - } -<<<<<<< HEAD - - void ExecuteSelectQuery(const TString& bucket, size_t fileSize, size_t numberRows) { - using namespace fmt::literals; - - // Create test file - TString content = "id,data\n"; - const TString rowContent(fileSize / numberRows, 'a'); - for (size_t i = 0; i < numberRows; ++i) { - content += TStringBuilder() << ToString(i) << "," << rowContent << "\n"; - } - - // Upload test file - CreateBucketWithObject(bucket, "test_object", content); - - // Create external data source - const TString externalDataSourceName = "/Root/external_data_source"; - - auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); - auto tableClient = kikimr->GetTableClient(); - auto session = tableClient.CreateSession().GetValueSync().GetSession(); - const TString schemeQuery = fmt::format(R"( - CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( - SOURCE_TYPE="ObjectStorage", - LOCATION="{location}", - AUTH_METHOD="NONE" - ))", - "external_source"_a = externalDataSourceName, - "location"_a = GetBucketLocation(bucket) - ); - - const auto schemeResult = session.ExecuteSchemeQuery(schemeQuery).GetValueSync(); - UNIT_ASSERT_C(schemeResult.GetStatus() == NYdb::EStatus::SUCCESS, schemeResult.GetIssues().ToString()); - - // Execute test query - auto queryClient = kikimr->GetQueryClient(); - const TString query = fmt::format(R"( - SELECT * FROM `{external_source}`.`/` WITH ( - FORMAT="csv_with_names", - SCHEMA ( - id Uint64 NOT NULL, - data String NOT NULL - ) - ))", - "external_source"_a=externalDataSourceName - ); - - auto scriptExecutionOperation = queryClient.ExecuteScript(query).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); - - NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); - UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); - - // Validate query results - TFetchScriptResultsSettings settings; - settings.RowsLimit(0); - size_t rowsFetched = 0; - while (true) { - TFetchScriptResultsResult results = queryClient.FetchScriptResults(scriptExecutionOperation.Id(), 0, settings).ExtractValueSync(); - UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); - - TResultSetParser resultSet(results.ExtractResultSet()); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2); - - while (resultSet.TryNextRow()) { - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser("id").GetUint64(), rowsFetched++); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser("data").GetString(), rowContent); - } - - if (!results.GetNextFetchToken()) { - break; - } - - settings.FetchToken(results.GetNextFetchToken()); - } - UNIT_ASSERT_VALUES_EQUAL(rowsFetched, numberRows); - - // Test forget operation - TInstant forgetOperationTimeout = TInstant::Now() + NSan::PlainOrUnderSanitizer(TDuration::Minutes(5), TDuration::Minutes(20)); - NYdb::NOperation::TOperationClient operationClient(kikimr->GetDriver()); - while (TInstant::Now() < forgetOperationTimeout) { - auto status = operationClient.Forget(scriptExecutionOperation.Id()).ExtractValueSync(); - if (status.GetStatus() == NYdb::EStatus::SUCCESS || status.GetStatus() == NYdb::EStatus::NOT_FOUND) { - return; - } - - UNIT_ASSERT_C(status.GetStatus() == NYdb::EStatus::ABORTED || status.GetStatus() == NYdb::EStatus::TIMEOUT || status.GetStatus() == NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED, status.GetIssues().ToString()); - - if (status.GetStatus() == NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED) { - // Wait until last forget is not finished - Sleep(TDuration::Seconds(30)); - } - } - UNIT_ASSERT_C(false, "Forget operation timeout"); - } - - Y_UNIT_TEST(ExecuteScriptWithLargeStrings) { - ExecuteSelectQuery("test_bucket_execute_script_with_large_strings", 100_MB, 100); - } - - Y_UNIT_TEST(ExecuteScriptWithLargeFile) { - ExecuteSelectQuery("test_bucket_execute_script_with_large_file", 5_MB, 500000); - } -======= ->>>>>>> stable-24-1 -} - -} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp index 71b6d3ff747a..7be6ece0d76d 100644 --- a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp +++ b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp @@ -55,7 +55,7 @@ TIntrusivePtr CreateKikimrQueryProcessor(TIntrusivePtr ga auto federatedQuerySetup = std::make_optional({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr, {}, {}, {}}); return NKqp::CreateKqpHost(gateway, cluster, "/Root", kikimrConfig, moduleResolver, - federatedQuerySetup, nullptr, funcRegistry, funcRegistry, keepConfigChanges, nullptr, actorSystem); + federatedQuerySetup, nullptr, NKikimrConfig::TQueryServiceConfig(), funcRegistry, funcRegistry, keepConfigChanges, nullptr, actorSystem); } NYql::NNodes::TExprBase GetExpr(const TString& ast, NYql::TExprContext& ctx, NYql::IModuleResolver* moduleResolver) { diff --git a/ydb/core/kqp/ut/yql/kqp_pragma_ut.cpp b/ydb/core/kqp/ut/yql/kqp_pragma_ut.cpp index 8cd9b54a6857..edf5740b7ef2 100644 --- a/ydb/core/kqp/ut/yql/kqp_pragma_ut.cpp +++ b/ydb/core/kqp/ut/yql/kqp_pragma_ut.cpp @@ -84,6 +84,93 @@ Y_UNIT_TEST_SUITE(KqpPragma) { UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); UNIT_ASSERT_C(result.GetIssues().Empty(), result.GetIssues().ToString()); } + + Y_UNIT_TEST(MatchRecognizeWithTimeOrderRecoverer) { + TKikimrSettings settings; + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableQueryServiceConfig()->SetEnableMatchRecognize(true); + settings.SetAppConfig(appConfig); + + TKikimrRunner kikimr(settings); + NYdb::NScripting::TScriptingClient client(kikimr.GetDriver()); + + auto result = client.ExecuteYqlScript(R"( + PRAGMA FeatureR010="prototype"; + + CREATE TABLE `/Root/NewTable` ( + dt Uint64, + value String, + PRIMARY KEY (dt) + ); + COMMIT; + + INSERT INTO `/Root/NewTable` (dt, value) VALUES + (1, 'value1'), (2, 'value2'), (3, 'value3'), (4, 'value4'); + COMMIT; + + SELECT * FROM (SELECT dt, value FROM `/Root/NewTable`) + MATCH_RECOGNIZE( + ORDER BY CAST(dt as Timestamp) + MEASURES + LAST(V1.dt) as v1, + LAST(V4.dt) as v4 + ONE ROW PER MATCH + PATTERN (V1 V* V4) + DEFINE + V1 as V1.value = "value1", + V as True, + V4 as V4.value = "value4" + ); + )").GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];[4u]]; + ])", FormatResultSetYson(result.GetResultSet(0))); + } + + Y_UNIT_TEST(MatchRecognizeWithoutTimeOrderRecoverer) { + TKikimrSettings settings; + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableQueryServiceConfig()->SetEnableMatchRecognize(true); + settings.SetAppConfig(appConfig); + + TKikimrRunner kikimr(settings); + NYdb::NScripting::TScriptingClient client(kikimr.GetDriver()); + + auto result = client.ExecuteYqlScript(R"( + PRAGMA FeatureR010="prototype"; + PRAGMA config.flags("MatchRecognizeStream", "disable"); + + CREATE TABLE `/Root/NewTable` ( + dt Uint64, + value String, + PRIMARY KEY (dt) + ); + COMMIT; + + INSERT INTO `/Root/NewTable` (dt, value) VALUES + (1, 'value1'), (2, 'value2'), (3, 'value3'), (4, 'value4'); + COMMIT; + + SELECT * FROM (SELECT dt, value FROM `/Root/NewTable`) + MATCH_RECOGNIZE( + ORDER BY CAST(dt as Timestamp) + MEASURES + LAST(V1.dt) as v1, + LAST(V4.dt) as v4 + ONE ROW PER MATCH + PATTERN (V1 V* V4) + DEFINE + V1 as V1.value = "value1", + V as True, + V4 as V4.value = "value4" + ); + )").GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];[4u]]; + ])", FormatResultSetYson(result.GetResultSet(0))); + } } } // namspace NKqp diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 8cb9cf5442a4..7856f1fd6734 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1052,6 +1052,7 @@ message TQueryServiceConfig { optional NYql.TGenericGatewayConfig Generic = 11; optional TFinalizeScriptServiceConfig FinalizeScriptServiceConfig = 12; optional uint64 ProgressStatsPeriodMs = 14 [default = 0]; // 0 = disabled + optional bool EnableMatchRecognize = 20 [default = false]; } // Config describes immediate controls and allows diff --git a/ydb/library/yql/core/yql_opt_match_recognize.cpp b/ydb/library/yql/core/yql_opt_match_recognize.cpp index 5e99874b5415..b7f05afa37ab 100644 --- a/ydb/library/yql/core/yql_opt_match_recognize.cpp +++ b/ydb/library/yql/core/yql_opt_match_recognize.cpp @@ -146,7 +146,7 @@ TExprNode::TPtr ExpandMatchRecognize(const TExprNode::TPtr& node, TExprContext& ExtractSortKeyAndOrder(pos, sortTraits, sortKey, sortOrder, ctx); TExprNode::TPtr result; if (isStreaming) { - YQL_ENSURE(sortOrder->ChildrenSize() == 1, "Expect ORDER BY timestamp for MATCH_RECOGNIZE on streams"); + YQL_ENSURE(sortOrder->ChildrenSize() == 1, "Expect ORDER BY timestamp for MATCH_RECOGNIZE"); const auto reordered = ctx.Builder(pos) .Lambda() .Param("partition") @@ -216,37 +216,15 @@ TExprNode::TPtr ExpandMatchRecognize(const TExprNode::TPtr& node, TExprContext& .Seal() .Build(); } else { //non-streaming - if (partitionColumns->ChildrenSize() != 0) { - result = ctx.Builder(pos) - .Callable("PartitionsByKeys") - .Add(0, input) - .Add(1, partitionKeySelector) - .Add(2, sortOrder) - .Add(3, sortKey) - .Add(4, matchRecognize) - .Seal() - .Build(); - } else { - if (sortOrder->IsCallable("Void")) { - result = ctx.Builder(pos) - .Apply(matchRecognize) - .With(0, input) - .Seal() - .Build();; - } else { - result = ctx.Builder(pos) - .Apply(matchRecognize) - .With(0) - .Callable("Sort") - .Add(0, input) - .Add(1, sortOrder) - .Add(2, sortKey) - .Seal() - .Done() - .Seal() - .Build(); - } - } + result = ctx.Builder(pos) + .Callable("PartitionsByKeys") + .Add(0, input) + .Add(1, partitionKeySelector) + .Add(2, sortOrder) + .Add(3, sortKey) + .Add(4, matchRecognize) + .Seal() + .Build(); } YQL_CLOG(INFO, Core) << "Expanded MatchRecognize"; return result; diff --git a/ydb/library/yql/tests/sql/dq_file/part16/canondata/result.json b/ydb/library/yql/tests/sql/dq_file/part16/canondata/result.json index 320fd2a7a715..afbba816045f 100644 --- a/ydb/library/yql/tests/sql/dq_file/part16/canondata/result.json +++ b/ydb/library/yql/tests/sql/dq_file/part16/canondata/result.json @@ -1811,9 +1811,9 @@ ], "test.test[match_recognize-test_type-default.txt-Debug]": [ { - "checksum": "413daca8efd9dda687246a807bf6d461", - "size": 3206, - "uri": "https://{canondata_backend}/1599023/6ea95a71ae6e3995d639ef495d263a106e521882/resource.tar.gz#test.test_match_recognize-test_type-default.txt-Debug_/opt.yql_patched" + "checksum": "d5867efa618053b3a7c823ca3c65ac62", + "size": 3444, + "uri": "https://{canondata_backend}/1597364/713adc15e86f968fe8142e7d100829cc68d825bb/resource.tar.gz#test.test_match_recognize-test_type-default.txt-Debug_/opt.yql_patched" } ], "test.test[match_recognize-test_type-default.txt-Plan]": [ diff --git a/ydb/library/yql/tests/sql/dq_file/part17/canondata/result.json b/ydb/library/yql/tests/sql/dq_file/part17/canondata/result.json index 127b9d24af13..e94137d171bd 100644 --- a/ydb/library/yql/tests/sql/dq_file/part17/canondata/result.json +++ b/ydb/library/yql/tests/sql/dq_file/part17/canondata/result.json @@ -1572,9 +1572,9 @@ ], "test.test[match_recognize-alerts-default.txt-Debug]": [ { - "checksum": "eb440da9e5a9e63e803c9f13e41929ed", - "size": 5588, - "uri": "https://{canondata_backend}/1936273/7c78e1e45ae282daee686c006624daa21a7c6ca6/resource.tar.gz#test.test_match_recognize-alerts-default.txt-Debug_/opt.yql_patched" + "checksum": "36044ee7a7ae01d0b976600d0fb6112e", + "size": 5756, + "uri": "https://{canondata_backend}/212715/6b165614609e516560d0c88a6f8ecd84824ba506/resource.tar.gz#test.test_match_recognize-alerts-default.txt-Debug_/opt.yql_patched" } ], "test.test[match_recognize-alerts-default.txt-Plan]": [ diff --git a/ydb/library/yql/tests/sql/dq_file/part19/canondata/result.json b/ydb/library/yql/tests/sql/dq_file/part19/canondata/result.json index a65c9617027f..bcf923e43781 100644 --- a/ydb/library/yql/tests/sql/dq_file/part19/canondata/result.json +++ b/ydb/library/yql/tests/sql/dq_file/part19/canondata/result.json @@ -1691,6 +1691,28 @@ } ], "test.test[limit-empty_input_after_limit-default.txt-Results]": [], + "test.test[match_recognize-alerts_without_order-default.txt-Analyze]": [ + { + "checksum": "b4dd508a329723c74293d80f0278c705", + "size": 505, + "uri": "https://{canondata_backend}/1597364/fc135efcabe2a4c94deee0dd810e591fa1b56eef/resource.tar.gz#test.test_match_recognize-alerts_without_order-default.txt-Analyze_/plan.txt" + } + ], + "test.test[match_recognize-alerts_without_order-default.txt-Debug]": [ + { + "checksum": "9583095b37a579b3d8f782ca1410d648", + "size": 5669, + "uri": "https://{canondata_backend}/1597364/fc135efcabe2a4c94deee0dd810e591fa1b56eef/resource.tar.gz#test.test_match_recognize-alerts_without_order-default.txt-Debug_/opt.yql_patched" + } + ], + "test.test[match_recognize-alerts_without_order-default.txt-Plan]": [ + { + "checksum": "b4dd508a329723c74293d80f0278c705", + "size": 505, + "uri": "https://{canondata_backend}/1597364/fc135efcabe2a4c94deee0dd810e591fa1b56eef/resource.tar.gz#test.test_match_recognize-alerts_without_order-default.txt-Plan_/plan.txt" + } + ], + "test.test[match_recognize-alerts_without_order-default.txt-Results]": [], "test.test[optimizers-unused_columns_group_one_of_multi--Analyze]": [ { "checksum": "ffcfe803a5b4bbfe9af72cc128197217", diff --git a/ydb/library/yql/tests/sql/sql2yql/canondata/result.json b/ydb/library/yql/tests/sql/sql2yql/canondata/result.json index 52447344f9c8..4f8b05c8587f 100644 --- a/ydb/library/yql/tests/sql/sql2yql/canondata/result.json +++ b/ydb/library/yql/tests/sql/sql2yql/canondata/result.json @@ -9736,6 +9736,13 @@ "uri": "https://{canondata_backend}/1946324/07f86b02802add9cba590f938304abe892044623/resource.tar.gz#test_sql2yql.test_match_recognize-alerts_/sql.yql" } ], + "test_sql2yql.test[match_recognize-alerts_without_order]": [ + { + "checksum": "ef289fff70d333859534243df7451fab", + "size": 8759, + "uri": "https://{canondata_backend}/1031349/a543dabda3236eb2bb759444c05037e62724fa5f/resource.tar.gz#test_sql2yql.test_match_recognize-alerts_without_order_/sql.yql" + } + ], "test_sql2yql.test[match_recognize-permute]": [ { "checksum": "9d2b3a8c5c3d29384d1d5967ca15caf4", @@ -9766,9 +9773,9 @@ ], "test_sql2yql.test[match_recognize-test_type]": [ { - "checksum": "b2b6731a78df7ac091d8513f85d3435e", - "size": 9549, - "uri": "https://{canondata_backend}/1946324/07f86b02802add9cba590f938304abe892044623/resource.tar.gz#test_sql2yql.test_match_recognize-test_type_/sql.yql" + "checksum": "64d8e2edbef833724049eac26329ff12", + "size": 10144, + "uri": "https://{canondata_backend}/1031349/a543dabda3236eb2bb759444c05037e62724fa5f/resource.tar.gz#test_sql2yql.test_match_recognize-test_type_/sql.yql" } ], "test_sql2yql.test[match_recognize-test_type_predicate]": [ @@ -27215,6 +27222,13 @@ "uri": "https://{canondata_backend}/1937001/da4215d5087e56eec0224ec5e7754dafd0b2bdcf/resource.tar.gz#test_sql_format.test_match_recognize-alerts_/formatted.sql" } ], + "test_sql_format.test[match_recognize-alerts_without_order]": [ + { + "checksum": "779c2c3a4eab619646509ce5008863e8", + "size": 2906, + "uri": "https://{canondata_backend}/1031349/a543dabda3236eb2bb759444c05037e62724fa5f/resource.tar.gz#test_sql_format.test_match_recognize-alerts_without_order_/formatted.sql" + } + ], "test_sql_format.test[match_recognize-permute]": [ { "checksum": "998e6752ce413cc78e952b9958dfab74", @@ -27245,9 +27259,9 @@ ], "test_sql_format.test[match_recognize-test_type]": [ { - "checksum": "3fcf6d53720604b982ad58beed055a26", - "size": 1127, - "uri": "https://{canondata_backend}/1880306/64654158d6bfb1289c66c626a8162239289559d0/resource.tar.gz#test_sql_format.test_match_recognize-test_type_/formatted.sql" + "checksum": "36104b385f3b9986c22f409931b80564", + "size": 1302, + "uri": "https://{canondata_backend}/1031349/a543dabda3236eb2bb759444c05037e62724fa5f/resource.tar.gz#test_sql_format.test_match_recognize-test_type_/formatted.sql" } ], "test_sql_format.test[match_recognize-test_type_predicate]": [ diff --git a/ydb/library/yql/tests/sql/suites/match_recognize/alerts_without_order.sql b/ydb/library/yql/tests/sql/suites/match_recognize/alerts_without_order.sql new file mode 100644 index 000000000000..7d92f0f18c7b --- /dev/null +++ b/ydb/library/yql/tests/sql/suites/match_recognize/alerts_without_order.sql @@ -0,0 +1,59 @@ +$osquery_data = [ +<|dt:1688910000, host:"fqdn1", ev_type:"someEv", ev_status:"", user:"", vpn:false, |>, +<|dt:1688910050, host:"fqdn2", ev_type:"login", ev_status:"success", user:"", vpn:true, |>, +<|dt:1688910100, host:"fqdn1", ev_type:"login", ev_status:"success", user:"", vpn:true, |>, +<|dt:1688910220, host:"fqdn1", ev_type:"login", ev_status:"success", user:"", vpn:false, |>, +<|dt:1688910300, host:"fqdn1", ev_type:"delete_all", ev_status:"", user:"", vpn:false, |>, +<|dt:1688910400, host:"fqdn2", ev_type:"delete_all", ev_status:"", user:"", vpn:false, |>, +<|dt:1688910500, host:"fqdn1", ev_type:"login", ev_status:"failed", user:"user1", vpn:false, |>, +<|dt:1688910500, host:"fqdn1", ev_type:"login", ev_status:"failed", user:"user2", vpn:false, |>, +<|dt:1688910600, host:"fqdn", ev_type:"someEv", ev_status:"", user:"user1", vpn:false, |>, +<|dt:1688910800, host:"fqdn2", ev_type:"login", ev_status:"failed", user:"user1", vpn:false, |>, +<|dt:1688910900, host:"fqdn2", ev_type:"login", ev_status:"failed", user:"user2", vpn:false, |>, +<|dt:1688911000, host:"fqdn2", ev_type:"login", ev_status:"success", user:"user1", vpn:false, |>, +]; + +pragma FeatureR010="prototype"; +pragma config.flags("MatchRecognizeStream", "disable"); + +SELECT * +FROM AS_TABLE($osquery_data) MATCH_RECOGNIZE( + MEASURES + LAST(SUSPICIOUS_ACTION_SOON.dt) as suspicious_action_dt, + LAST(LOGIN_SUCCESS_REMOTE.host) as remote_login_host, + LAST(LOGIN_SUCCESS_REMOTE.user) as remote_login_user, + LAST(LOGIN_SUCCESS_REMOTE.dt) as t, + FIRST(LOGIN_FAILED_SAME_USER.dt) as brutforce_begin, + FIRST(LOGIN_SUCCESS_SAME_USER.dt) as brutforce_end, + LAST(LOGIN_SUCCESS_SAME_USER.user) as brutforce_login + + ONE ROW PER MATCH + PATTERN ( + LOGIN_SUCCESS_REMOTE ANY_ROW* (SUSPICIOUS_ACTION_SOON | SUSPICIOUS_ACTION_TIMEOUT) | + (LOGIN_FAILED_SAME_USER ANY_ROW*){2,} LOGIN_SUCCESS_SAME_USER + ) + DEFINE + LOGIN_SUCCESS_REMOTE as + LOGIN_SUCCESS_REMOTE.ev_type = "login" and + LOGIN_SUCCESS_REMOTE.ev_status = "success" and + LOGIN_SUCCESS_REMOTE.vpn = true, + SUSPICIOUS_ACTION_SOON as + SUSPICIOUS_ACTION_SOON.host = LAST(LOGIN_SUCCESS_REMOTE.host) and + SUSPICIOUS_ACTION_SOON.ev_type = "delete_all" and + SUSPICIOUS_ACTION_SOON.dt - LAST(LOGIN_SUCCESS_REMOTE.dt) < 1000, + SUSPICIOUS_ACTION_TIMEOUT as + SUSPICIOUS_ACTION_TIMEOUT.dt - LAST(LOGIN_SUCCESS_REMOTE.dt) >= 1000, + + LOGIN_FAILED_SAME_USER as + LOGIN_FAILED_SAME_USER.ev_type = "login" and + LOGIN_FAILED_SAME_USER.ev_status <> "success" and + (LAST(LOGIN_FAILED_SAME_USER.user) IS NULL + or LAST(LOGIN_FAILED_SAME_USER.user) = LOGIN_FAILED_SAME_USER.user + ), + LOGIN_SUCCESS_SAME_USER as + LOGIN_SUCCESS_SAME_USER.ev_type = "login" and + LOGIN_SUCCESS_SAME_USER.ev_status = "success" and + LOGIN_SUCCESS_SAME_USER.user = LAST(LOGIN_FAILED_SAME_USER.user) +) AS MATCHED +; + diff --git a/ydb/library/yql/tests/sql/suites/match_recognize/test_type.sql b/ydb/library/yql/tests/sql/suites/match_recognize/test_type.sql index 1a5cdeaaf6bb..a92b5672e488 100644 --- a/ydb/library/yql/tests/sql/suites/match_recognize/test_type.sql +++ b/ydb/library/yql/tests/sql/suites/match_recognize/test_type.sql @@ -7,6 +7,7 @@ $data = [<|dt:4, host:"fqdn1", key:14|>]; -- NoPartitionNoMeasure select * from AS_TABLE($data) MATCH_RECOGNIZE( + ORDER BY CAST(dt as Timestamp) ONE ROW PER MATCH AFTER MATCH SKIP TO NEXT ROW PATTERN ( @@ -18,6 +19,7 @@ select * from AS_TABLE($data) MATCH_RECOGNIZE( --NoPartitionStringMeasure select * from AS_TABLE($data) MATCH_RECOGNIZE( + ORDER BY CAST(dt as Timestamp) MEASURES "SomeString" as Measure1 ONE ROW PER MATCH @@ -32,6 +34,7 @@ select * from AS_TABLE($data) MATCH_RECOGNIZE( --IntPartitionColNoMeasure select * from AS_TABLE($data) MATCH_RECOGNIZE( PARTITION BY dt + ORDER BY CAST(dt as Timestamp) ONE ROW PER MATCH AFTER MATCH SKIP TO NEXT ROW PATTERN ( @@ -44,6 +47,7 @@ select * from AS_TABLE($data) MATCH_RECOGNIZE( --StringPartitionColStringMeasure select * from AS_TABLE($data) MATCH_RECOGNIZE( PARTITION BY host + ORDER BY CAST(dt as Timestamp) MEASURES "SomeString" as Measure1 ONE ROW PER MATCH @@ -58,6 +62,7 @@ select * from AS_TABLE($data) MATCH_RECOGNIZE( --TwoPartitionColsTwoMeasures select * from AS_TABLE($data) MATCH_RECOGNIZE( PARTITION BY host, dt + ORDER BY CAST(dt as Timestamp) MEASURES "SomeString" as S, 345 as I diff --git a/ydb/library/yql/tools/dqrun/examples/gateways.conf b/ydb/library/yql/tools/dqrun/examples/gateways.conf index 4b1cfaee565f..fa0835e367b3 100644 --- a/ydb/library/yql/tools/dqrun/examples/gateways.conf +++ b/ydb/library/yql/tools/dqrun/examples/gateways.conf @@ -91,6 +91,12 @@ HttpGateway { DownloadBufferBytesLimit: 131072 } +YqlCore { + Flags { + Name: "_EnableMatchRecognize" + } +} + SqlCore { TranslationFlags: ["FlexibleTypes", "DisableAnsiOptionalAs", "EmitAggApply"] } diff --git a/ydb/tests/tools/kqprun/configuration/app_config.conf b/ydb/tests/tools/kqprun/configuration/app_config.conf index bd8b36376df6..374d0e35de4b 100644 --- a/ydb/tests/tools/kqprun/configuration/app_config.conf +++ b/ydb/tests/tools/kqprun/configuration/app_config.conf @@ -29,6 +29,7 @@ QueryServiceConfig { QueryArtifactsCompressionMethod: "zstd_6" ScriptResultRowsLimit: 0 ScriptResultSizeLimit: 10485760 + EnableMatchRecognize: true Generic { MdbGateway: "https://mdb.api.cloud.yandex.net:443"