Skip to content

Commit

Permalink
Merge pull request #100 from swarm64/feature/SDB-9484-support-using-n…
Browse files Browse the repository at this point in the history
…amed-cursor

Support server-side cursors for running queries
  • Loading branch information
David Geier authored Jan 25, 2021
2 parents a1788e9 + a6033eb commit d0d5596
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 6 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ parameter `--scale-factor` is required.

## Optional Parameters

Parameter | Description
--------------------------|-------------------------------------
`use-server-side-cursors` | Use server-side cursors for executing the queries.

The optional parameters differ by benchmark.
The ones for TPC-H, TPC-DS, and SSB are described in this section.
The parameters supported by HTAP are described in a separate section below.
Expand Down
4 changes: 4 additions & 0 deletions run_benchmark
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ def parse_arguments(argv, benchmarks):
'Supply with DB, e.g. postgresql://postgres@localhost/dbname'
))

common_parser.add_argument('--use-server-side-cursors', default=False, action='store_true',
required=False, help=('Use server-side cursors for executing the queries')
)

subparsers = common_parser.add_subparsers(dest='benchmark')
add_streams_parsers(subparsers, benchmarks)
htap.add_parser(subparsers)
Expand Down
23 changes: 18 additions & 5 deletions s64da_benchmark_toolkit/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def reset_config(self):
conn.cursor.execute('ALTER SYSTEM RESET ALL')
conn.cursor.execute('SELECT pg_reload_conf()')

def run_query(self, sql, timeout, auto_explain=False):
def run_query(self, sql, timeout, auto_explain=False, use_server_side_cursors=False):
status = Status.ERROR
query_result = None
plan = None
Expand All @@ -50,10 +50,23 @@ def run_query(self, sql, timeout, auto_explain=False):
if auto_explain:
DB.auto_explain_on(conn)

conn.cursor.execute(sql)
if conn.cursor.description is not None:
query_result_columns = [colname[0] for colname in conn.cursor.description]
query_result = query_result_columns, conn.cursor.fetchall()
cursor = conn.cursor;
if use_server_side_cursors:
# See https://github.com/psycopg/psycopg2/issues/941 for why
# starting a new connection is so weird.
conn.conn.rollback()
conn.conn.autocommit = False
cursor = conn.server_side_cursor

cursor.execute(sql)
rows = cursor.fetchall()

if use_server_side_cursors:
conn.autocommit = True

if rows is not None:
query_result_columns = [colname[0] for colname in cursor.description]
query_result = query_result_columns, rows
else:
query_result = None
status = Status.OK
Expand Down
3 changes: 3 additions & 0 deletions s64da_benchmark_toolkit/dbconn.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def __init__(self, dsn, statement_timeout=0, num_retries=120, retry_wait=1):
self.dsn = dsn
self.conn = None
self.cursor = None
self.server_side_cursor = None
self.statement_timeout = statement_timeout
self.num_retries = num_retries
self.retry_wait = retry_wait
Expand All @@ -25,6 +26,7 @@ def __enter__(self):
self.conn = psycopg2.connect(self.dsn, options=options)
self.conn.autocommit = True
self.cursor = self.conn.cursor()
self.server_side_cursor = self.conn.cursor('server-side-cursor')
break

except psycopg2.Error as exc:
Expand All @@ -34,6 +36,7 @@ def __enter__(self):

assert self.conn, 'There is no connection.'
assert self.cursor, 'There is no cursor.'
assert self.server_side_cursor, 'There is no server-side cursor.'

return self

Expand Down
3 changes: 2 additions & 1 deletion s64da_benchmark_toolkit/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def __init__(self, args, benchmark):
self.scale_factor = args.scale_factor
self.query_dir = self._get_query_dir()
self.explain_analyze = args.explain_analyze
self.use_server_side_cursors = args.use_server_side_cursors
self.reporting = Reporting(benchmark, args, self.config)

@staticmethod
Expand Down Expand Up @@ -127,7 +128,7 @@ def _run_query(self, stream_id, query_id):
query_sql = Streams.apply_sql_modifications(query_sql, (
('revenue0', f'revenue{stream_id}'),))
timeout = self.config.get('timeout', 0)
return self.db.run_query(query_sql, timeout, self.explain_analyze)
return self.db.run_query(query_sql, timeout, self.explain_analyze, self.use_server_side_cursors)

def _run_stream(self, reporting_queue, stream_id):
sequence = self.get_stream_sequence(stream_id)
Expand Down
1 change: 1 addition & 0 deletions tests/test_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class DefaultArgs:
csv_file = 'results.csv'
scale_factor = None
explain_analyze = False
use_server_side_cursors = False
check_correctness = False
netdata_output_file = 'foobar.dat'

Expand Down

0 comments on commit d0d5596

Please sign in to comment.