Skip to content

Commit

Permalink
YQ-2323: YQ Connector: use Connector docker image for YQL Generic pro…
Browse files Browse the repository at this point in the history
…vider integration tests (ydb-platform#604)

* YQ Connector: use Connector docker image instead of building it from scratch

* YQ Connector: tune performance of Generic provider integration tests
  • Loading branch information
vitalyisaev2 authored Dec 20, 2023
1 parent 3242350 commit d7d4cd2
Show file tree
Hide file tree
Showing 12 changed files with 155 additions and 86 deletions.
64 changes: 34 additions & 30 deletions ydb/library/yql/providers/generic/connector/tests/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from utils.log import make_logger
from utils.schema import Schema
from utils.settings import Settings
from utils.sql import format_values_for_bulk_sql_insert

import test_cases.select_missing_database
import test_cases.select_missing_table
Expand All @@ -25,7 +26,7 @@ def prepare_table(
schema: Schema,
data_in: Sequence,
):
dbTable = f'{database.name}.{table_name}'
dbTable = f"{database.name}.{table_name}"

# create database
create_database_stmt = database.create(data_source_pb2.CLICKHOUSE)
Expand All @@ -43,26 +44,17 @@ def prepare_table(
return

# create table
create_table_stmt = f'CREATE TABLE {dbTable} ({schema.yql_column_list(data_source_pb2.CLICKHOUSE)}) ENGINE = Memory'
create_table_stmt = f"CREATE TABLE {dbTable} ({schema.yql_column_list(data_source_pb2.CLICKHOUSE)}) ENGINE = Memory"
LOGGER.debug(create_table_stmt)
client.command(create_table_stmt)

# write data
for row in data_in:
# prepare string with serialized data
values_dump = []
for val in row:
if isinstance(val, str):
values_dump.append(f"'{val}'")
elif val is None:
values_dump.append('NULL')
else:
values_dump.append(str(val))
values = ", ".join(values_dump)

insert_stmt = f"INSERT INTO {dbTable} (*) VALUES ({values})"
LOGGER.debug(insert_stmt)
client.command(insert_stmt)
values = format_values_for_bulk_sql_insert(data_in)
insert_stmt = f"INSERT INTO {dbTable} (*) VALUES {values}"
# TODO: these logs may be too big when working with big tables,
# dump insert statement via yatest into file.
LOGGER.debug(insert_stmt)
client.command(insert_stmt)


def select_positive(
Expand All @@ -82,21 +74,25 @@ def select_positive(

# NOTE: to assert equivalence we have to add explicit ORDER BY,
# because Clickhouse's output will be randomly ordered otherwise.
where_statement = ''
where_statement = ""
if test_case.select_where is not None:
where_statement = f'WHERE {test_case.select_where.filter_expression}'
order_by_expression = ''
where_statement = f"WHERE {test_case.select_where.filter_expression}"
order_by_expression = ""
order_by_column_name = test_case.select_what.order_by_column_name
if order_by_column_name:
order_by_expression = f'ORDER BY {order_by_column_name}'
yql_script = f'''
order_by_expression = f"ORDER BY {order_by_column_name}"
yql_script = f"""
{test_case.pragmas_sql_string}
SELECT {test_case.select_what.yql_select_names}
FROM {settings.clickhouse.cluster_name}.{test_case.qualified_table_name}
{where_statement}
{order_by_expression}
'''
result = dqrun_runner.run(test_dir=tmp_path, script=yql_script, generic_settings=test_case.generic_settings)
"""
result = dqrun_runner.run(
test_dir=tmp_path,
script=yql_script,
generic_settings=test_case.generic_settings,
)

assert result.returncode == 0, result.stderr

Expand All @@ -115,11 +111,15 @@ def select_missing_database(
test_case: test_cases.select_missing_database.TestCase,
):
# select table from the database that does not exist
yql_script = f'''
yql_script = f"""
SELECT *
FROM {settings.clickhouse.cluster_name}.{test_case.qualified_table_name}
'''
result = dqrun_runner.run(test_dir=tmp_path, script=yql_script, generic_settings=test_case.generic_settings)
"""
result = dqrun_runner.run(
test_dir=tmp_path,
script=yql_script,
generic_settings=test_case.generic_settings,
)

assert test_case.database.missing_database_msg(data_source_pb2.CLICKHOUSE) in result.stderr, result.stderr

Expand All @@ -136,10 +136,14 @@ def select_missing_table(
LOGGER.debug(create_database_stmt)
client.command(create_database_stmt)

yql_script = f'''
yql_script = f"""
SELECT *
FROM {settings.clickhouse.cluster_name}.{test_case.qualified_table_name}
'''
result = dqrun_runner.run(test_dir=tmp_path, script=yql_script, generic_settings=test_case.generic_settings)
"""
result = dqrun_runner.run(
test_dir=tmp_path,
script=yql_script,
generic_settings=test_case.generic_settings,
)

assert test_case.database.missing_table_msg(data_source_pb2.CLICKHOUSE) in result.stderr, result.stderr
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
version: '3.4'
services:
postgres:
image: "postgres:15-bullseye@sha256:3411b9f2e5239cd7867f34fcf22fe964230f7d447a71d63c283e3593d3f84085"
postgresql:
image: postgres:15-bullseye@sha256:3411b9f2e5239cd7867f34fcf22fe964230f7d447a71d63c283e3593d3f84085
environment:
POSTGRES_DB: db
POSTGRES_USER: user
POSTGRES_PASSWORD: password
ports:
- "15432:5432"
- 15432:5432
clickhouse:
image: "clickhouse/clickhouse-server:23-alpine@sha256:b078c1cd294632afa2aeba3530e7ba2e568513da23304354f455a25fab575c06"
image: clickhouse/clickhouse-server:23-alpine@sha256:b078c1cd294632afa2aeba3530e7ba2e568513da23304354f455a25fab575c06
environment:
CLICKHOUSE_DB: db
CLICKHOUSE_USER: user
CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT: 1
CLICKHOUSE_PASSWORD: password
ports:
- "19000:9000"
- "18123:8123"
- 19000:9000
- 18123:8123
fq-connector-go:
image: ghcr.io/ydb-platform/fq-connector-go:v0.0.6-rc.8@sha256:74ebae0530d916c1842a7fddfbddc6c018763a0401f2f627a44e8829692fe41f
ports:
- 50051:50051
network_mode: host
61 changes: 41 additions & 20 deletions ydb/library/yql/providers/generic/connector/tests/postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from utils.postgresql import Client
from utils.schema import Schema
from utils.settings import Settings
from utils.sql import format_values_for_bulk_sql_insert

import test_cases.select_missing_database
import test_cases.select_missing_table
Expand All @@ -28,7 +29,7 @@ def prepare_table(
pg_schema: str = None,
):
# create database
with client.get_cursor('postgres') as (conn, cur):
with client.get_cursor("postgres") as (conn, cur):
database_exists_stmt = database.exists(data_source_pb2.POSTGRESQL)
LOGGER.debug(database_exists_stmt)
cur.execute(database_exists_stmt)
Expand Down Expand Up @@ -62,15 +63,19 @@ def prepare_table(
create_schema_stmt = f"CREATE SCHEMA IF NOT EXISTS {pg_schema}"
LOGGER.debug(create_schema_stmt)
cur.execute(create_schema_stmt)
table_name = f'{pg_schema}.{table_name}'
table_name = f"{pg_schema}.{table_name}"

create_table_stmt = f'CREATE TABLE {table_name} ({schema.yql_column_list(data_source_pb2.POSTGRESQL)})'
create_table_stmt = f"CREATE TABLE {table_name} ({schema.yql_column_list(data_source_pb2.POSTGRESQL)})"
LOGGER.debug(create_table_stmt)
cur.execute(create_table_stmt)

insert_stmt = f'INSERT INTO {table_name} ({schema.columns.names_with_commas}) VALUES ({", ".join(["%s"] * len(data_in[0]))})'
values = format_values_for_bulk_sql_insert(data_in)

insert_stmt = f"INSERT INTO {table_name} ({schema.columns.names_with_commas}) VALUES {values}"
# TODO: these logs may be too big when working with big tables,
# dump insert statement via yatest into file.
LOGGER.debug(insert_stmt)
cur.executemany(insert_stmt, data_in)
cur.execute(insert_stmt)

conn.commit()
cur.close()
Expand All @@ -92,17 +97,21 @@ def select_positive(
)

# read data
where_statement = ''
where_statement = ""
if test_case.select_where is not None:
where_statement = f'WHERE {test_case.select_where.filter_expression}'
yql_script = f'''
where_statement = f"WHERE {test_case.select_where.filter_expression}"
yql_script = f"""
{test_case.pragmas_sql_string}
SELECT {test_case.select_what.yql_select_names}
FROM {settings.postgresql.cluster_name}.{test_case.qualified_table_name}
{where_statement}
'''
"""
LOGGER.debug(yql_script)
result = dqrun_runner.run(test_dir=tmp_path, script=yql_script, generic_settings=test_case.generic_settings)
result = dqrun_runner.run(
test_dir=tmp_path,
script=yql_script,
generic_settings=test_case.generic_settings,
)

assert result.returncode == 0, result.stderr

Expand All @@ -122,12 +131,16 @@ def select_missing_database(
):
# select table from database that does not exist

yql_script = f'''
yql_script = f"""
SELECT *
FROM {settings.postgresql.cluster_name}.{test_case.qualified_table_name}
'''
"""
LOGGER.debug(yql_script)
result = dqrun_runner.run(test_dir=tmp_path, script=yql_script, generic_settings=test_case.generic_settings)
result = dqrun_runner.run(
test_dir=tmp_path,
script=yql_script,
generic_settings=test_case.generic_settings,
)

assert test_case.database.missing_database_msg(data_source_pb2.POSTGRESQL) in result.stderr, result.stderr

Expand All @@ -140,7 +153,7 @@ def select_missing_table(
test_case: test_cases.select_missing_table.TestCase,
):
# create database but don't create table
with client.get_cursor('postgres') as (conn, cur):
with client.get_cursor("postgres") as (conn, cur):
database_exists_stmt = test_case.database.exists(data_source_pb2.POSTGRESQL)
LOGGER.debug(database_exists_stmt)
cur.execute(database_exists_stmt)
Expand All @@ -155,12 +168,16 @@ def select_missing_table(
cur.close()

# read data
yql_script = f'''
yql_script = f"""
SELECT *
FROM {settings.postgresql.cluster_name}.{test_case.qualified_table_name}
'''
"""
LOGGER.debug(yql_script)
result = dqrun_runner.run(test_dir=tmp_path, script=yql_script, generic_settings=test_case.generic_settings)
result = dqrun_runner.run(
test_dir=tmp_path,
script=yql_script,
generic_settings=test_case.generic_settings,
)

assert test_case.database.missing_table_msg(data_source_pb2.POSTGRESQL) in result.stderr, result.stderr

Expand All @@ -182,12 +199,16 @@ def select_pg_schema(
)

# read data
yql_script = f'''
yql_script = f"""
SELECT {test_case.select_what.yql_select_names}
FROM {settings.postgresql.cluster_name}.{test_case.qualified_table_name}
'''
"""
LOGGER.debug(yql_script)
result = dqrun_runner.run(test_dir=tmp_path, script=yql_script, generic_settings=test_case.generic_settings)
result = dqrun_runner.run(
test_dir=tmp_path,
script=yql_script,
generic_settings=test_case.generic_settings,
)

assert result.returncode == 0, result.stderr

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ def _pushdown(self) -> TestCase:

return [
TestCase(
name=f'pushdown_{data_source_kind}',
name=f'pushdown_{EDataSourceKind.Name(data_source_kind)}',
data_in=data_in,
data_out_=data_out,
pragmas=dict({'generic.UsePredicatePushdown': 'true'}),
Expand Down Expand Up @@ -412,7 +412,7 @@ def make_test_cases(self) -> Sequence[TestCase]:
for base_tc in base_test_cases:
for protocol in protocols:
tc = replace(base_tc)
tc.name += f'_{protocol}'
tc.name += f'_{EProtocol.Name(protocol)}'
tc.protocol = protocol
test_cases.append(tc)
return test_cases
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,9 @@ def _large_table(self) -> Sequence[TestCase]:
# TODO: assert connector stats when it will be accessible
'''

table_size = 2.5 * self.ss.connector.paging_bytes_per_page * self.ss.connector.paging_prefetch_queue_capacity
# FIXME: uncomment to debug YQ-2729
# table_size = 2.5 * self.ss.connector.paging_bytes_per_page * self.ss.connector.paging_prefetch_queue_capacity
table_size = self.ss.connector.paging_bytes_per_page * self.ss.connector.paging_prefetch_queue_capacity / 1000

schema = Schema(
columns=ColumnList(
Expand All @@ -239,7 +241,7 @@ def _large_table(self) -> Sequence[TestCase]:
test_cases = []
for data_source_kind in data_source_kinds:
tc = TestCase(
name=f'large_table_{data_source_kind}',
name=f'large_table',
data_source_kind=data_source_kind,
data_in=data_in,
data_out_=data_in,
Expand Down Expand Up @@ -273,7 +275,7 @@ def make_test_cases(self, data_source_kind: EDataSourceKind) -> Sequence[TestCas
continue
for protocol in protocols[base_tc.data_source_kind]:
tc = replace(base_tc)
tc.name += f'_{protocol}'
tc.name += f'_{EProtocol.Name(protocol)}'
tc.protocol = protocol
test_cases.append(tc)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ def _pushdown(self) -> TestCase:

return [
TestCase(
name=f'pushdown_{data_source_kind}',
name=f'pushdown_{EDataSourceKind.Name(data_source_kind)}',
data_in=data_in,
data_out_=data_out_1,
pragmas=dict({'generic.UsePredicatePushdown': 'true'}),
Expand All @@ -440,7 +440,7 @@ def _pushdown(self) -> TestCase:
database=Database.make_for_data_source_kind(data_source_kind),
),
TestCase(
name=f'pushdown_{data_source_kind}',
name=f'pushdown_{EDataSourceKind.Name(data_source_kind)}',
data_in=data_in,
data_out_=data_out_2,
pragmas=dict({'generic.UsePredicatePushdown': 'true'}),
Expand Down Expand Up @@ -469,7 +469,7 @@ def make_test_cases(self) -> Sequence[TestCase]:
for base_tc in base_test_cases:
for protocol in protocols:
tc = replace(base_tc)
tc.name += f'_{protocol}'
tc.name += f'_{EProtocol.Name(protocol)}'
tc.protocol = protocol
test_cases.append(tc)
return test_cases
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from pathlib import Path
import subprocess
from typing import Final
import json

import jinja2

Expand Down Expand Up @@ -213,6 +214,7 @@ def run(self, test_dir: Path, script: str, generic_settings: GenericSettings) ->
data_out = None
data_out_with_types = None
schema = None
unique_suffix = test_dir.name

if out.returncode == 0:
# Parse output
Expand All @@ -236,7 +238,6 @@ def run(self, test_dir: Path, script: str, generic_settings: GenericSettings) ->
for line in out.stderr.decode('utf-8').splitlines():
LOGGER.error(line)

unique_suffix = test_dir.name
err_file = yatest.common.output_path(f'dqrun-{unique_suffix}.err')
with open(err_file, "w") as f:
f.write(out.stderr.decode('utf-8'))
Expand Down
Loading

0 comments on commit d7d4cd2

Please sign in to comment.