Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support server-side cursors for running queries #100

Merged
2 commits merged into from
Jan 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ parameter `--scale-factor` is required.

## Optional Parameters

Parameter | Description
--------------------------|-------------------------------------
`use-server-side-cursors` | Use server-side cursors for executing the queries.

The optional parameters differ by benchmark.
The ones for TPC-H, TPC-DS, and SSB are described in this section.
The parameters supported by HTAP are described in a separate section below.
Expand Down
4 changes: 4 additions & 0 deletions run_benchmark
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ def parse_arguments(argv, benchmarks):
'Supply with DB, e.g. postgresql://postgres@localhost/dbname'
))

common_parser.add_argument('--use-server-side-cursors', default=False, action='store_true',
required=False, help=('Use server-side cursors for executing the queries')
)

subparsers = common_parser.add_subparsers(dest='benchmark')
add_streams_parsers(subparsers, benchmarks)
htap.add_parser(subparsers)
Expand Down
23 changes: 18 additions & 5 deletions s64da_benchmark_toolkit/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def reset_config(self):
conn.cursor.execute('ALTER SYSTEM RESET ALL')
conn.cursor.execute('SELECT pg_reload_conf()')

def run_query(self, sql, timeout, auto_explain=False):
def run_query(self, sql, timeout, auto_explain=False, use_server_side_cursors=False):
status = Status.ERROR
query_result = None
plan = None
Expand All @@ -50,10 +50,23 @@ def run_query(self, sql, timeout, auto_explain=False):
if auto_explain:
DB.auto_explain_on(conn)

conn.cursor.execute(sql)
if conn.cursor.description is not None:
query_result_columns = [colname[0] for colname in conn.cursor.description]
query_result = query_result_columns, conn.cursor.fetchall()
cursor = conn.cursor;
lucvlaming marked this conversation as resolved.
Show resolved Hide resolved
if use_server_side_cursors:
# See https://github.com/psycopg/psycopg2/issues/941 for why
# starting a new connection is so weird.
conn.conn.rollback()
This conversation was marked as resolved.
Show resolved Hide resolved
conn.conn.autocommit = False
cursor = conn.server_side_cursor

cursor.execute(sql)
rows = cursor.fetchall()

if use_server_side_cursors:
conn.autocommit = True
This conversation was marked as resolved.
Show resolved Hide resolved

if rows is not None:
query_result_columns = [colname[0] for colname in cursor.description]
query_result = query_result_columns, rows
else:
query_result = None
status = Status.OK
Expand Down
3 changes: 3 additions & 0 deletions s64da_benchmark_toolkit/dbconn.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def __init__(self, dsn, statement_timeout=0, num_retries=120, retry_wait=1):
self.dsn = dsn
self.conn = None
self.cursor = None
self.server_side_cursor = None
self.statement_timeout = statement_timeout
self.num_retries = num_retries
self.retry_wait = retry_wait
Expand All @@ -25,6 +26,7 @@ def __enter__(self):
self.conn = psycopg2.connect(self.dsn, options=options)
self.conn.autocommit = True
self.cursor = self.conn.cursor()
self.server_side_cursor = self.conn.cursor('server-side-cursor')
break

except psycopg2.Error as exc:
Expand All @@ -34,6 +36,7 @@ def __enter__(self):

assert self.conn, 'There is no connection.'
assert self.cursor, 'There is no cursor.'
assert self.server_side_cursor, 'There is no server-side cursor.'

return self

Expand Down
3 changes: 2 additions & 1 deletion s64da_benchmark_toolkit/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def __init__(self, args, benchmark):
self.scale_factor = args.scale_factor
self.query_dir = self._get_query_dir()
self.explain_analyze = args.explain_analyze
self.use_server_side_cursors = args.use_server_side_cursors
self.reporting = Reporting(benchmark, args, self.config)

@staticmethod
Expand Down Expand Up @@ -127,7 +128,7 @@ def _run_query(self, stream_id, query_id):
query_sql = Streams.apply_sql_modifications(query_sql, (
('revenue0', f'revenue{stream_id}'),))
timeout = self.config.get('timeout', 0)
return self.db.run_query(query_sql, timeout, self.explain_analyze)
return self.db.run_query(query_sql, timeout, self.explain_analyze, self.use_server_side_cursors)

def _run_stream(self, reporting_queue, stream_id):
sequence = self.get_stream_sequence(stream_id)
Expand Down
1 change: 1 addition & 0 deletions tests/test_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class DefaultArgs:
csv_file = 'results.csv'
scale_factor = None
explain_analyze = False
use_server_side_cursors = False
check_correctness = False
netdata_output_file = 'foobar.dat'

Expand Down