diff --git a/README.md b/README.md index a116b50..ee02b0c 100644 --- a/README.md +++ b/README.md @@ -86,6 +86,7 @@ Start a benchmark: ./run_benchmark \ --dsn postgresql://postgres@localhost/ \ [--benchmark] \ + --use-server-side-cursors This runs the benchmark with the default runtime restriction per query. @@ -106,6 +107,10 @@ parameter `--scale-factor` is required. ## Optional Parameters +Parameter | Description +--------------------------|------------------------------------- +`use-server-side-cursors` | Use server-side cursors for executing the queries. + The optional parameters differ by benchmark. The ones for TPC-H, TPC-DS, and SSB are described in this section. The parameters supported by HTAP are described in a separate section below. diff --git a/run_benchmark b/run_benchmark index 8dbb9b7..977a4f6 100755 --- a/run_benchmark +++ b/run_benchmark @@ -103,6 +103,10 @@ def parse_arguments(argv, benchmarks): 'Supply with DB, e.g. postgresql://postgres@localhost/dbname' )) + common_parser.add_argument('--use-server-side-cursors', default=False, action='store_true', + required=False, help=('Use server-side cursors for executing the queries') + ) + subparsers = common_parser.add_subparsers(dest='benchmark') add_streams_parsers(subparsers, benchmarks) htap.add_parser(subparsers) diff --git a/s64da_benchmark_toolkit/db.py b/s64da_benchmark_toolkit/db.py index 6b97f81..201642e 100644 --- a/s64da_benchmark_toolkit/db.py +++ b/s64da_benchmark_toolkit/db.py @@ -39,7 +39,7 @@ def reset_config(self): conn.cursor.execute('ALTER SYSTEM RESET ALL') conn.cursor.execute('SELECT pg_reload_conf()') - def run_query(self, sql, timeout, auto_explain=False): + def run_query(self, sql, timeout, auto_explain=False, use_server_side_cursors=False): status = Status.ERROR query_result = None plan = None @@ -50,10 +50,23 @@ def run_query(self, sql, timeout, auto_explain=False): if auto_explain: DB.auto_explain_on(conn) - conn.cursor.execute(sql) - if conn.cursor.description is not None: - query_result_columns = [colname[0] for colname in conn.cursor.description] - query_result = query_result_columns, conn.cursor.fetchall() + cursor = conn.cursor; + if use_server_side_cursors: + # See https://github.com/psycopg/psycopg2/issues/941 for why + # starting a new connection is so weird. + conn.conn.rollback() + conn.conn.autocommit = False + cursor = conn.server_side_cursor + + cursor.execute(sql) + rows = cursor.fetchall() + + if use_server_side_cursors: + conn.autocommit = True + + if rows is not None: + query_result_columns = [colname[0] for colname in cursor.description] + query_result = query_result_columns, rows else: query_result = None status = Status.OK diff --git a/s64da_benchmark_toolkit/dbconn.py b/s64da_benchmark_toolkit/dbconn.py index 9f8f3cb..a109bff 100644 --- a/s64da_benchmark_toolkit/dbconn.py +++ b/s64da_benchmark_toolkit/dbconn.py @@ -13,6 +13,7 @@ def __init__(self, dsn, statement_timeout=0, num_retries=120, retry_wait=1): self.dsn = dsn self.conn = None self.cursor = None + self.server_side_cursor = None self.statement_timeout = statement_timeout self.num_retries = num_retries self.retry_wait = retry_wait @@ -25,6 +26,7 @@ def __enter__(self): self.conn = psycopg2.connect(self.dsn, options=options) self.conn.autocommit = True self.cursor = self.conn.cursor() + self.server_side_cursor = self.conn.cursor('server-side-cursor') break except psycopg2.Error as exc: @@ -34,6 +36,7 @@ def __enter__(self): assert self.conn, 'There is no connection.' assert self.cursor, 'There is no cursor.' + assert self.server_side_cursor, 'There is no server-side cursor.' return self diff --git a/s64da_benchmark_toolkit/streams.py b/s64da_benchmark_toolkit/streams.py index 06c142e..de5ab43 100644 --- a/s64da_benchmark_toolkit/streams.py +++ b/s64da_benchmark_toolkit/streams.py @@ -49,6 +49,7 @@ def __init__(self, args, benchmark): self.scale_factor = args.scale_factor self.query_dir = self._get_query_dir() self.explain_analyze = args.explain_analyze + self.use_server_side_cursors = args.use_server_side_cursors self.reporting = Reporting(benchmark, args, self.config) @staticmethod @@ -127,7 +128,7 @@ def _run_query(self, stream_id, query_id): query_sql = Streams.apply_sql_modifications(query_sql, ( ('revenue0', f'revenue{stream_id}'),)) timeout = self.config.get('timeout', 0) - return self.db.run_query(query_sql, timeout, self.explain_analyze) + return self.db.run_query(query_sql, timeout, self.explain_analyze, self.use_server_side_cursors) def _run_stream(self, reporting_queue, stream_id): sequence = self.get_stream_sequence(stream_id) diff --git a/tests/test_streams.py b/tests/test_streams.py index 0d0f123..9225bf9 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -16,6 +16,7 @@ class DefaultArgs: csv_file = 'results.csv' scale_factor = None explain_analyze = False + use_server_side_cursors = False check_correctness = False netdata_output_file = 'foobar.dat'