diff --git a/tests/README.md b/tests/README.md index db57def7e51..a9c0e045632 100644 --- a/tests/README.md +++ b/tests/README.md @@ -1,5 +1,4 @@ -Nebula Graph Test Manual -======================== +# Nebula Graph Test Manual ## Usage @@ -102,7 +101,7 @@ You can find all nebula test cases in [tck/features](tck/features) and some open The test cases are organized in feature files and described in gherkin language. The structure of feature file is like following example: -#### Basic Case: +### Basic Case ```gherkin Feature: Basic match @@ -134,7 +133,8 @@ Feature: Basic match | "serve" | "Cavaliers" | ``` -#### Case With an Execution Plan: +### Case With an Execution Plan + ```gherkin Scenario: push edge props filter down When profiling query: @@ -166,6 +166,61 @@ The table in `Then` step must have the first header line even if there's no data Note that for cases that contain execution plans, it is mandatory to fill the `id` column. +### Case With a New Nebula Cluster + +In some special cases, we need to test in a new nebula cluster. + +e.g. + +```gherkin +Feature: Nebula service termination test + Scenario: Basic termination test + Given a nebulacluster with 1 graphd and 1 metad and 1 storaged + When the cluster was terminated + Then no service should still running after 4s +``` + +```gherkin +Feature: Example + Scenario: test with disable authorize + Given a nebulacluster with 1 graphd and 1 metad and 1 storaged: + """ + graphd:enable_authorize=false + """ + When executing query: + """ + CREATE USER user1 WITH PASSWORD 'nebula'; + CREATE SPACE s1(vid_type=int) + """ + And wait 3 seconds + Then the execution should be successful + When executing query: + """ + GRANT ROLE god on s1 to user1 + """ + Then the execution should be successful + + Scenario: test with enable authorize + Given a nebulacluster with 1 graphd and 1 metad and 1 storaged: + """ + graphd:enable_authorize=true + """ + When executing query: + """ + CREATE USER user1 WITH PASSWORD 'nebula'; + CREATE SPACE s1(vid_type=int) + """ + And wait 3 seconds + Then the execution should be successful + When executing query: + """ + GRANT ROLE god on s1 to user1 + """ + Then an PermissionError should be raised at runtime: No permission to grant/revoke god user. +``` + +It would install a new nebula cluster, and create a session to connect this cluster. + ### Format In order to check your changed files for reviewers conveniently, please format your feature file before creating pull request. Try following command to do that: diff --git a/tests/common/constants.py b/tests/common/constants.py index 27fced8f060..4f2fd593027 100644 --- a/tests/common/constants.py +++ b/tests/common/constants.py @@ -9,6 +9,7 @@ _curr_path = Path(os.path.dirname(os.path.abspath(__file__))) NEBULA_HOME = _curr_path.parent.parent +BUILD_DIR = os.path.join(NEBULA_HOME, 'build') TMP_DIR = os.path.join(_curr_path.parent, '.pytest') NB_TMP_PATH = os.path.join(TMP_DIR, 'nebula') SPACE_TMP_PATH = os.path.join(TMP_DIR, 'spaces') diff --git a/tests/common/nebula_service.py b/tests/common/nebula_service.py index b7d95060eb3..c5d7ae8faff 100644 --- a/tests/common/nebula_service.py +++ b/tests/common/nebula_service.py @@ -12,92 +12,237 @@ import socket import glob import signal +import copy +import fcntl +from pathlib import Path from contextlib import closing +from tests.common.constants import TMP_DIR + NEBULA_START_COMMAND_FORMAT = "bin/nebula-{} --flagfile conf/nebula-{}.conf {}" +class NebulaProcess(object): + def __init__(self, name, ports, suffix_index=0, params=None): + if params is None: + params = {} + assert len(ports) == 4, 'should have 4 ports but have {}'.format(len(ports)) + self.name = name + self.tcp_port, self.tcp_internal_port, self.http_port, self.https_port = ports + self.suffix_index = suffix_index + self.params = params + self.host = '127.0.0.1' + self.pid = None + + def update_param(self, params): + self.params.update(params) + + def update_meta_server_addrs(self, address): + self.update_param({'meta_server_addrs': address}) + + def _format_nebula_command(self): + process_params = { + 'log_dir': 'logs{}'.format(self.suffix_index), + 'pid_file': 'pids{}/nebula-{}.pid'.format(self.suffix_index, self.name), + 'port': self.tcp_port, + 'ws_http_port': self.http_port, + 'ws_h2_port': self.https_port, + } + # data path + if self.name.upper() != 'GRAPHD': + process_params['data_path'] = 'data{}/{}'.format( + self.suffix_index, self.name + ) + + process_params.update(self.params) + cmd = [ + 'bin/nebula-{}'.format(self.name), + '--flagfile', + 'conf/nebula-{}.conf'.format(self.name), + ] + ['--{}={}'.format(key, value) for key, value in process_params.items()] + + return " ".join(cmd) + + def start(self): + cmd = self._format_nebula_command() + print("exec: " + cmd) + p = subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE) + p.wait() + if p.returncode != 0: + print("error: " + bytes.decode(p.communicate()[0])) + self.pid = p.pid + + def kill(self, sig): + if not self.is_alive(): + return + try: + os.kill(self.pid, sig) + except OSError as err: + print("stop nebula-{} {} failed: {}".format(self.name, self.pid, str(err))) + + def is_alive(self): + if self.pid is None: + return False + + process = subprocess.Popen( + ['ps', '-eo', 'pid,args'], stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) + stdout = process.communicate() + for line in bytes.decode(stdout[0]).splitlines(): + p = line.lstrip().split(' ', 1)[0] + if str(p) == str(self.pid): + return True + return False + + class NebulaService(object): - def __init__(self, build_dir, src_dir): + def __init__( + self, + build_dir, + src_dir, + metad_num=1, + storaged_num=1, + graphd_num=1, + ca_signed=False, + debug_log=True, + **kwargs, + ): + assert graphd_num > 0 and metad_num > 0 and storaged_num > 0 self.build_dir = str(build_dir) self.src_dir = str(src_dir) - self.work_dir = os.path.join(self.build_dir, 'server_' + time.strftime("%Y-%m-%dT%H-%M-%S", time.localtime())) + self.work_dir = os.path.join( + self.build_dir, + 'server_' + time.strftime('%Y-%m-%dT%H-%M-%S', time.localtime()), + ) self.pids = {} + self.metad_num, self.storaged_num, self.graphd_num = ( + metad_num, + storaged_num, + graphd_num, + ) + self.metad_processes, self.storaged_processes, self.graphd_processes = ( + [], + [], + [], + ) + self.all_processes = [] + self.all_ports = [] + self.metad_param, self.storaged_param, self.graphd_param = {}, {}, {} + self.ca_signed = ca_signed + self.debug_log = debug_log + self.ports_per_process = 4 + self.lock_file = os.path.join(TMP_DIR, "cluster_port.lock") + self.delimiter = "\n" + self._make_params(**kwargs) + self.init_process() + + def init_process(self): + process_count = self.metad_num + self.storaged_num + self.graphd_num + ports_count = process_count * self.ports_per_process + self.all_ports = self._find_free_port(ports_count) + index = 0 + + for suffix_index in range(self.metad_num): + metad = NebulaProcess( + "metad", + self.all_ports[index : index + self.ports_per_process], + suffix_index, + self.metad_param, + ) + self.metad_processes.append(metad) + index += self.ports_per_process + + for suffix_index in range(self.storaged_num): + storaged = NebulaProcess( + "storaged", + self.all_ports[index : index + self.ports_per_process], + suffix_index, + self.storaged_param, + ) + self.storaged_processes.append(storaged) + index += self.ports_per_process + + for suffix_index in range(self.graphd_num): + graphd = NebulaProcess( + "graphd", + self.all_ports[index : index + self.ports_per_process], + suffix_index, + self.graphd_param, + ) + self.graphd_processes.append(graphd) + index += self.ports_per_process + + self.all_processes = ( + self.metad_processes + self.storaged_processes + self.graphd_processes + ) + # update meta address + meta_server_addrs = ','.join( + [ + '{}:{}'.format(process.host, process.tcp_port) + for process in self.metad_processes + ] + ) + + for p in self.all_processes: + p.update_meta_server_addrs(meta_server_addrs) + + def _make_params(self, **kwargs): + _params = { + 'heartbeat_interval_secs': 1, + 'expired_time_factor': 60, + } + if self.ca_signed: + _params['ca_path'] = 'share/resources/test.ca.pem' + _params['cert_path'] = 'share/resources/test.derive.crt' + _params['key_path'] = 'share/resources/test.derive.key' + + else: + _params['ca_path'] = 'share/resources/test.ca.pem' + _params['cert_path'] = 'share/resources/test.ca.key' + _params['key_path'] = 'share/resources/test.ca.password' + + if self.debug_log: + _params['v'] = '4' + + self.graphd_param = copy.copy(_params) + self.graphd_param['local_config'] = 'false' + self.graphd_param['enable_authorize'] = 'true' + self.graphd_param['system_memory_high_watermark_ratio'] = '0.95' + self.graphd_param['num_rows_to_check_memory'] = '4' + self.graphd_param['session_reclaim_interval_secs'] = '2' + self.storaged_param = copy.copy(_params) + self.storaged_param['local_config'] = 'false' + self.storaged_param['raft_heartbeat_interval_secs'] = '30' + self.storaged_param['skip_wait_in_rate_limiter'] = 'true' + self.metad_param = copy.copy(_params) + for p in [self.metad_param, self.storaged_param, self.graphd_param]: + p.update(kwargs) def set_work_dir(self, work_dir): self.work_dir = work_dir def _copy_nebula_conf(self): - graph_path = self.build_dir + '/bin' - graph_conf_path = self.src_dir + '/conf' - storage_path = self.build_dir + '/bin' - storage_conf_path = self.src_dir + '/conf' - - # graph - shutil.copy(graph_path + '/nebula-graphd', self.work_dir + '/bin/') - shutil.copy(graph_conf_path + '/nebula-graphd.conf.default', - self.work_dir + '/conf/nebula-graphd.conf') - # storage - shutil.copy(storage_path + '/nebula-storaged', self.work_dir + '/bin/') - shutil.copy(storage_conf_path + '/nebula-storaged.conf.default', - self.work_dir + '/conf/nebula-storaged.conf') - # meta - shutil.copy(storage_path + '/nebula-metad', self.work_dir + '/bin/') - shutil.copy(storage_conf_path + '/nebula-metad.conf.default', - self.work_dir + '/conf/nebula-metad.conf') + bin_path = self.build_dir + '/bin/' + conf_path = self.src_dir + '/conf/' + + for item in ['nebula-graphd', 'nebula-storaged', 'nebula-metad']: + shutil.copy(bin_path + item, self.work_dir + '/bin/') + shutil.copy( + conf_path + '{}.conf.default'.format(item), + self.work_dir + '/conf/{}.conf'.format(item), + ) # gflags.json resources_dir = self.work_dir + '/share/resources/' os.makedirs(resources_dir) - shutil.copy(self.build_dir + '/../resources/gflags.json', resources_dir) + shutil.copy(self.build_dir + '/../resources/gflags.json', resources_dir) # cert files - shutil.copy(self.src_dir + '/tests/cert/test.ca.key', - resources_dir) - shutil.copy(self.src_dir + '/tests/cert/test.ca.pem', - resources_dir) - shutil.copy(self.src_dir + '/tests/cert/test.ca.password', - resources_dir) - shutil.copy(self.src_dir + '/tests/cert/test.derive.key', - resources_dir) - shutil.copy(self.src_dir + '/tests/cert/test.derive.crt', - resources_dir) - - def _format_nebula_command(self, name, meta_port, ports, debug_log=True, ca_signed=False): - params = [ - "--meta_server_addrs={}", - "--port={}", - "--ws_http_port={}", - "--ws_h2_port={}", - "--heartbeat_interval_secs=1", - "--expired_time_factor=60", - ] - if ca_signed: - params.append('--ca_path=share/resources/test.ca.pem') - params.append('--cert_path=share/resources/test.derive.crt') - params.append('--key_path=share/resources/test.derive.key') - else: - params.append('--cert_path=share/resources/test.ca.pem') - params.append('--key_path=share/resources/test.ca.key') - params.append('--password_path=share/resources/test.ca.password') - - if name == 'graphd': - params.append('--local_config=false') - params.append('--enable_authorize=true') - params.append('--system_memory_high_watermark_ratio=0.95') - params.append('--num_rows_to_check_memory=4') - params.append('--session_reclaim_interval_secs=2') - if name == 'storaged': - params.append('--local_config=false') - params.append('--raft_heartbeat_interval_secs=30') - params.append('--skip_wait_in_rate_limiter=true') - if debug_log: - params.append('--v=4') - param_format = " ".join(params) - param = param_format.format("127.0.0.1:" + str(meta_port), ports[0], - ports[1], ports[2]) - command = NEBULA_START_COMMAND_FORMAT.format(name, name, param) - return command + shutil.copy(self.src_dir + '/tests/cert/test.ca.key', resources_dir) + shutil.copy(self.src_dir + '/tests/cert/test.ca.pem', resources_dir) + shutil.copy(self.src_dir + '/tests/cert/test.ca.password', resources_dir) + shutil.copy(self.src_dir + '/tests/cert/test.derive.key', resources_dir) + shutil.copy(self.src_dir + '/tests/cert/test.derive.crt', resources_dir) @staticmethod def is_port_in_use(port): @@ -106,26 +251,54 @@ def is_port_in_use(port): @staticmethod def get_free_port(): - with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: - s.bind(('', random.randint(1024, 10000))) - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - return s.getsockname()[1] + for _ in range(30): + try: + with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: + s.bind(('', random.randint(10000, 20000))) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + return s.getsockname()[1] + except OSError as e: + pass # TODO(yee): Find free port range - @staticmethod - def _find_free_port(): - # tcp_port, http_port, https_port - ports = [] - for i in range(0, 2): - ports.append(NebulaService.get_free_port()) - while True: - port = NebulaService.get_free_port() - if port not in ports and all( - not NebulaService.is_port_in_use(port + i) - for i in range(-2, 3)): - ports.insert(0, port) - break - return ports + def _find_free_port(self, count): + assert count % self.ports_per_process == 0 + Path(self.lock_file).touch(exist_ok=True) + # thread safe + with open(self.lock_file, 'r+') as fl: + fcntl.flock(fl.fileno(), fcntl.LOCK_EX) + context = fl.read().strip() + lock_ports = [int(p) for p in context.split(self.delimiter) if p != ""] + + all_ports = [] + for i in range(count): + if i % self.ports_per_process == 0: + for _ in range(100): + tcp_port = NebulaService.get_free_port() + # force internal tcp port with port+1 + if all( + (tcp_port + i) not in all_ports + lock_ports + for i in range(0, 2) + ): + all_ports.append(tcp_port) + all_ports.append(tcp_port + 1) + break + + elif i % self.ports_per_process == 1: + continue + else: + for _ in range(100): + port = NebulaService.get_free_port() + if port not in all_ports + lock_ports: + all_ports.append(port) + break + fl.seek(0) + fl.truncate() + + fl.write(self.delimiter.join([str(p) for p in all_ports + lock_ports])) + fl.write(self.delimiter) + + return all_ports def _telnet_port(self, port): with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sk: @@ -133,16 +306,22 @@ def _telnet_port(self, port): result = sk.connect_ex(('127.0.0.1', port)) return result == 0 - def install(self): + def install(self, work_dir=None): + if work_dir is not None: + self.work_dir = work_dir if os.path.exists(self.work_dir): shutil.rmtree(self.work_dir) os.mkdir(self.work_dir) print("work directory: " + self.work_dir) os.chdir(self.work_dir) - installed_files = ['logs', 'bin', 'conf', 'data', 'pids', 'scripts'] + installed_files = ['bin', 'conf', 'scripts'] for f in installed_files: os.mkdir(self.work_dir + '/' + f) self._copy_nebula_conf() + max_suffix = max([self.graphd_num, self.storaged_num, self.metad_num]) + for i in range(max_suffix): + os.mkdir(self.work_dir + '/logs{}'.format(i)) + os.mkdir(self.work_dir + '/pids{}'.format(i)) def _check_servers_status(self, ports): ports_status = {} @@ -164,57 +343,14 @@ def _check_servers_status(self, ports): time.sleep(1) return False - def start(self, debug_log="true", multi_graphd=False, ca_signed="false", **kwargs): + def start(self): os.chdir(self.work_dir) - - metad_ports = self._find_free_port() - all_ports = [metad_ports[0]] - graph_ports = [] - server_ports = [] - servers = [] - if multi_graphd: - servers = ['metad', 'storaged', 'graphd', 'graphd1'] - os.mkdir(self.work_dir + '/logs1') - os.mkdir(self.work_dir + '/pids1') - else: - servers = ['metad', 'storaged', 'graphd'] - for server_name in servers: - ports = [] - if server_name != 'metad': - while True: - ports = self._find_free_port() - if all((ports[0] + i) not in all_ports for i in range(-2, 3)): - all_ports += [ports[0]] - break - else: - ports = metad_ports - server_ports.append(ports[0]) - new_name = server_name if server_name != 'graphd1' else 'graphd' - command = [ - self._format_nebula_command(new_name, - metad_ports[0], - ports, - debug_log, - ca_signed=ca_signed) - ] - if server_name == 'graphd1': - command.append('--log_dir=logs1') - command.append('--pid_file=pids1/nebula-graphd.pid') - - for k,v in kwargs.items(): - command.append("--{}={}".format(k, v)) - - cmd = " ".join(command) - print("exec: " + cmd) - p = subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE) - p.wait() - if p.returncode != 0: - print("error: " + bytes.decode(p.communicate()[0])) - elif server_name.find('graphd') != -1: - graph_ports.append(ports[0]) + start_time = time.time() + for p in self.all_processes: + p.start() # wait nebula start - start_time = time.time() + server_ports = [p.tcp_port for p in self.all_processes] if not self._check_servers_status(server_ports): self._collect_pids() self.kill_all(signal.SIGKILL) @@ -223,27 +359,40 @@ def start(self, debug_log="true", multi_graphd=False, ca_signed="false", **kwarg self._collect_pids() - return graph_ports + return [p.tcp_port for p in self.graphd_processes] def _collect_pids(self): - for pf in glob.glob(self.work_dir + '/pids/*.pid'): - with open(pf) as f: - self.pids[f.name] = int(f.readline()) - for pf in glob.glob(self.work_dir + '/pids1/*.pid'): + for pf in glob.glob(self.work_dir + '/pid*/*.pid'): with open(pf) as f: self.pids[f.name] = int(f.readline()) - def stop(self, cleanup): + def stop(self, cleanup=True): print("try to stop nebula services...") self._collect_pids() + if len(self.pids) == 0: + print("the cluster has been stopped and deleted.") + return self.kill_all(signal.SIGTERM) max_retries = 20 while self.is_procs_alive() and max_retries >= 0: time.sleep(1) - max_retries = max_retries-1 + max_retries = max_retries - 1 + + if self.is_procs_alive(): + self.kill_all(signal.SIGKILL) - self.kill_all(signal.SIGKILL) + # thread safe + with open(self.lock_file, 'r+') as fl: + fcntl.flock(fl.fileno(), fcntl.LOCK_EX) + context = fl.read().strip() + lock_ports = {int(p) for p in context.split(self.delimiter) if p != ""} + for p in self.all_ports: + lock_ports.remove(p) + fl.seek(0) + fl.truncate() + fl.write(self.delimiter.join([str(p) for p in lock_ports])) + fl.write(self.delimiter) if cleanup: shutil.rmtree(self.work_dir, ignore_errors=True) @@ -264,9 +413,9 @@ def is_procs_alive(self): return any(self.is_proc_alive(pid) for pid in self.pids) def is_proc_alive(self, pid): - process = subprocess.Popen(['ps', '-eo', 'pid,args'], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) + process = subprocess.Popen( + ['ps', '-eo', 'pid,args'], stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) stdout = process.communicate() for line in bytes.decode(stdout[0]).splitlines(): p = line.lstrip().split(' ', 1)[0] diff --git a/tests/common/utils.py b/tests/common/utils.py index 6867835015f..69e284e8f8a 100644 --- a/tests/common/utils.py +++ b/tests/common/utils.py @@ -249,13 +249,11 @@ def step_to_string(step): def path_to_string(path): - return vertex_to_string(path.src) \ - + ''.join(map(step_to_string, path.steps)) + return vertex_to_string(path.src) + ''.join(map(step_to_string, path.steps)) def dataset_to_string(dataset): - column_names = ','.join( - map(lambda x: x.decode('utf-8'), dataset.column_names)) + column_names = ','.join(map(lambda x: x.decode('utf-8'), dataset.column_names)) rows = '\n'.join(map(row_to_string, dataset.rows)) return '\n'.join([column_names, rows]) diff --git a/tests/conftest.py b/tests/conftest.py index a635ad1a201..29426008b56 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -11,7 +11,8 @@ from tests.common.configs import all_configs from tests.common.types import SpaceDesc from tests.common.utils import get_conn_pool -from tests.common.constants import NB_TMP_PATH, SPACE_TMP_PATH +from tests.common.constants import NB_TMP_PATH, SPACE_TMP_PATH, BUILD_DIR, NEBULA_HOME +from tests.common.nebula_service import NebulaService from nebula2.fbthrift.transport import TSocket from nebula2.fbthrift.transport import TTransport @@ -19,12 +20,11 @@ from nebula2.gclient.net import Connection from nebula2.graph import GraphService + tests_collected = set() tests_executed = set() data_dir = os.getenv('NEBULA_DATA_DIR') -CURR_PATH = os.path.dirname(os.path.abspath(__file__)) - # pytest hook to handle test collection when xdist is used (parallel tests) # https://github.com/pytest-dev/pytest-xdist/pull/35/commits (No official documentation available) @@ -56,8 +56,13 @@ def pytest_addoption(parser): parser.addoption("--build_dir", dest="build_dir", - default=f"{CURR_PATH}/../build", + default=BUILD_DIR, help="Nebula Graph CMake build directory") + parser.addoption("--src_dir", + dest="src_dir", + default=NEBULA_HOME, + help="Nebula Graph workspace") + def pytest_configure(config): @@ -70,6 +75,8 @@ def pytest_configure(config): pytest.cmdline.data_dir = config.getoption("data_dir") else: pytest.cmdline.data_dir = data_dir + pytest.cmdline.build_dir = config.getoption("build_dir") + pytest.cmdline.src_dir = config.getoption("src_dir") pytest.cmdline.stop_nebula = config.getoption("stop_nebula") pytest.cmdline.rm_dir = config.getoption("rm_dir") pytest.cmdline.debug_log = config.getoption("debug_log") @@ -83,6 +90,7 @@ def get_port(): raise Exception(f"Invalid port: {port}") return port[0] + def get_ports(): with open(NB_TMP_PATH, "r") as f: data = json.loads(f.readline()) @@ -91,6 +99,27 @@ def get_ports(): raise Exception(f"Invalid port: {port}") return port +@pytest.fixture(scope="class") +def class_fixture_variables(): + """save class scope fixture, used for session update. + """ + # cluster is the instance of NebulaService + res = dict( + pool=None, + session=None, + cluster=None, + ) + yield res + if res["session"] is not None: + res["session"].release() + if res["pool"] is not None: + res["pool"].close() + if res["cluster"] is not None: + _cluster = res["cluster"] + assert isinstance(_cluster, NebulaService) + _cluster.stop() + + @pytest.fixture(scope="session") def conn_pool_to_first_graph_service(pytestconfig): addr = pytestconfig.getoption("address") @@ -100,6 +129,7 @@ def conn_pool_to_first_graph_service(pytestconfig): yield pool pool.close() + @pytest.fixture(scope="session") def conn_pool_to_second_graph_service(pytestconfig): addr = pytestconfig.getoption("address") @@ -109,10 +139,12 @@ def conn_pool_to_second_graph_service(pytestconfig): yield pool pool.close() + @pytest.fixture(scope="session") def conn_pool(conn_pool_to_first_graph_service): return conn_pool_to_first_graph_service + @pytest.fixture(scope="class") def session_from_first_conn_pool(conn_pool_to_first_graph_service, pytestconfig): user = pytestconfig.getoption("user") @@ -121,6 +153,7 @@ def session_from_first_conn_pool(conn_pool_to_first_graph_service, pytestconfig) yield sess sess.release() + @pytest.fixture(scope="class") def session_from_second_conn_pool(conn_pool_to_second_graph_service, pytestconfig): user = pytestconfig.getoption("user") @@ -129,10 +162,14 @@ def session_from_second_conn_pool(conn_pool_to_second_graph_service, pytestconfi yield sess sess.release() + @pytest.fixture(scope="class") -def session(session_from_first_conn_pool): +def session(session_from_first_conn_pool, class_fixture_variables): + if class_fixture_variables.get('session', None) is not None: + return class_fixture_variables.get('session') return session_from_first_conn_pool + def load_csv_data_once(space: str): with open(SPACE_TMP_PATH, "r") as f: for sp in json.loads(f.readline()): @@ -158,8 +195,9 @@ def load_student_data(): # TODO(yee): Delete this when we migrate all test cases @pytest.fixture(scope="class") -def workarround_for_class(request, pytestconfig, conn_pool, - session, load_nba_data, load_student_data): +def workarround_for_class( + request, pytestconfig, conn_pool, session, load_nba_data, load_student_data +): if request.cls is None: return diff --git a/tests/nebula-test-run.py b/tests/nebula-test-run.py index c1d30c5f004..c4986336e92 100755 --- a/tests/nebula-test-run.py +++ b/tests/nebula-test-run.py @@ -10,7 +10,7 @@ import shutil from tests.common.nebula_service import NebulaService from tests.common.utils import get_conn_pool, load_csv_data -from tests.common.constants import NEBULA_HOME, TMP_DIR, NB_TMP_PATH, SPACE_TMP_PATH +from tests.common.constants import NEBULA_HOME, TMP_DIR, NB_TMP_PATH, SPACE_TMP_PATH, BUILD_DIR CURR_PATH = os.path.dirname(os.path.abspath(__file__)) @@ -18,59 +18,69 @@ def init_parser(): from optparse import OptionParser + opt_parser = OptionParser() - opt_parser.add_option('--build_dir', - dest='build_dir', - default=os.path.join(NEBULA_HOME, 'build'), - help='Build directory of nebula graph') - opt_parser.add_option('--rm_dir', - dest='rm_dir', - default='true', - help='Whether to remove the test folder') - opt_parser.add_option('--user', - dest='user', - default='root', - help='nebula graph user') - opt_parser.add_option('--password', - dest='password', - default='nebula', - help='nebula graph password') - opt_parser.add_option('--cmd', - dest='cmd', - default='', - help='start or stop command') - opt_parser.add_option('--multi_graphd', - dest='multi_graphd', - default='false', - help='Support multi graphds') - opt_parser.add_option('--address', - dest='address', - default='', - help='Address of the Nebula') - opt_parser.add_option('--debug', - dest='debug', - default=True, - help='Print verbose debug logs') - opt_parser.add_option('--enable_ssl', - dest='enable_ssl', - default='false', - help='Whether enable SSL for cluster.') - opt_parser.add_option('--enable_graph_ssl', - dest='enable_graph_ssl', - default='false', - help='Whether enable SSL for graph server.') - opt_parser.add_option('--enable_meta_ssl', - dest='enable_meta_ssl', - default='false', - help='Whether enable SSL for meta server.') - opt_parser.add_option('--ca_signed', - dest='ca_signed', - default='false', - help='Whether enable CA signed SSL/TLS mode.') - opt_parser.add_option('--containerized', - dest='containerized', - default='false', - help='run this process inside container') + opt_parser.add_option( + '--build_dir', + dest='build_dir', + default=BUILD_DIR, + help='Build directory of nebula graph', + ) + opt_parser.add_option( + '--rm_dir', + dest='rm_dir', + default='true', + help='Whether to remove the test folder', + ) + opt_parser.add_option( + '--user', dest='user', default='root', help='nebula graph user' + ) + opt_parser.add_option( + '--password', dest='password', default='nebula', help='nebula graph password' + ) + opt_parser.add_option('--cmd', dest='cmd', default='', help='start or stop command') + opt_parser.add_option( + '--multi_graphd', + dest='multi_graphd', + default='false', + help='Support multi graphds', + ) + opt_parser.add_option( + '--address', dest='address', default='', help='Address of the Nebula' + ) + opt_parser.add_option( + '--debug', dest='debug', default=True, help='Print verbose debug logs' + ) + opt_parser.add_option( + '--enable_ssl', + dest='enable_ssl', + default='false', + help='Whether enable SSL for cluster.', + ) + opt_parser.add_option( + '--enable_graph_ssl', + dest='enable_graph_ssl', + default='false', + help='Whether enable SSL for graph server.', + ) + opt_parser.add_option( + '--enable_meta_ssl', + dest='enable_meta_ssl', + default='false', + help='Whether enable SSL for meta server.', + ) + opt_parser.add_option( + '--ca_signed', + dest='ca_signed', + default='false', + help='Whether enable CA signed SSL/TLS mode.', + ) + opt_parser.add_option( + '--containerized', + dest='containerized', + default='false', + help='run this process inside container', + ) return opt_parser @@ -88,15 +98,7 @@ def start_nebula(nb, configs): else: nb.install() address = "localhost" - ports = nb.start( - debug_log=opt_is(configs.debug, "true"), - multi_graphd=opt_is(configs.multi_graphd, "true"), - ca_signed=opt_is(configs.ca_signed, "true"), - enable_ssl=configs.enable_ssl, - enable_graph_ssl=configs.enable_graph_ssl, - enable_meta_ssl=configs.enable_meta_ssl, - containerized=configs.containerized - ) + ports = nb.start() # Load csv data pool = get_conn_pool(address, ports[0]) @@ -117,11 +119,7 @@ def start_nebula(nb, configs): f.write(json.dumps(spaces)) with open(NB_TMP_PATH, "w") as f: - data = { - "ip": "localhost", - "port": ports, - "work_dir": nb.work_dir - } + data = {"ip": "localhost", "port": ports, "work_dir": nb.work_dir} f.write(json.dumps(data)) print('Start nebula successfully') @@ -148,7 +146,18 @@ def stop_nebula(nb, configs=None): (configs, opts) = parser.parse_args() # Setup nebula graph service - nebula_svc = NebulaService(configs.build_dir, NEBULA_HOME) + nebula_svc = NebulaService( + configs.build_dir, + NEBULA_HOME, + graphd_num=2, + storaged_num=1, + debug_log=opt_is(configs.debug, "true"), + ca_signed=opt_is(configs.ca_signed, "true"), + enable_ssl=configs.enable_ssl, + enable_graph_ssl=configs.enable_graph_ssl, + enable_meta_ssl=configs.enable_meta_ssl, + containerized=configs.containerized, + ) if opt_is(configs.cmd, "start"): start_nebula(nebula_svc, configs) @@ -159,4 +168,5 @@ def stop_nebula(nb, configs=None): except Exception as x: print('\033[31m' + str(x) + '\033[0m') import traceback + print(traceback.format_exc()) diff --git a/tests/tck/cluster/Example.feature b/tests/tck/cluster/Example.feature new file mode 100644 index 00000000000..a91a46fa4e8 --- /dev/null +++ b/tests/tck/cluster/Example.feature @@ -0,0 +1,41 @@ +# Copyright (c) 2021 vesoft inc. All rights reserved. +# +# This source code is licensed under Apache 2.0 License. +@skip +Feature: Example + + Scenario: test with disable authorize + Given a nebulacluster with 1 graphd and 1 metad and 1 storaged: + """ + graphd:enable_authorize=false + """ + When executing query: + """ + CREATE USER user1 WITH PASSWORD 'nebula'; + CREATE SPACE s1(vid_type=int) + """ + And wait 3 seconds + Then the execution should be successful + When executing query: + """ + GRANT ROLE god on s1 to user1 + """ + Then the execution should be successful + + Scenario: test with enable authorize + Given a nebulacluster with 1 graphd and 1 metad and 1 storaged: + """ + graphd:enable_authorize=true + """ + When executing query: + """ + CREATE USER user1 WITH PASSWORD 'nebula'; + CREATE SPACE s1(vid_type=int) + """ + And wait 3 seconds + Then the execution should be successful + When executing query: + """ + GRANT ROLE god on s1 to user1 + """ + Then an PermissionError should be raised at runtime: No permission to grant/revoke god user. diff --git a/tests/tck/cluster/terminate.feature b/tests/tck/cluster/terminate.feature new file mode 100644 index 00000000000..0166f184a60 --- /dev/null +++ b/tests/tck/cluster/terminate.feature @@ -0,0 +1,10 @@ +# Copyright (c) 2021 vesoft inc. All rights reserved. +# +# This source code is licensed under Apache 2.0 License. +Feature: Nebula service termination test + + # All nebula services shold exit as expected after termination + Scenario: Basic termination test + Given a nebulacluster with 1 graphd and 1 metad and 1 storaged + When the cluster was terminated + Then no service should still running after 4s diff --git a/tests/tck/conftest.py b/tests/tck/conftest.py index 637e1e1db32..7aa95a0a9b0 100644 --- a/tests/tck/conftest.py +++ b/tests/tck/conftest.py @@ -21,6 +21,7 @@ from tests.common.configs import DATA_DIR from tests.common.types import SpaceDesc from tests.common.utils import ( + get_conn_pool, create_space, load_csv_data, space_generator, @@ -28,6 +29,7 @@ response, resp_ok, ) +from tests.common.nebula_service import NebulaService from tests.tck.utils.table import dataset, table from tests.tck.utils.nbv import murmurhash2 @@ -208,6 +210,89 @@ def exec_query(request, ngql, session, graph_spaces, need_try: bool = False): graph_spaces['ngql'] = ngql +@given( + parse( + 'a nebulacluster with {graphd_num} graphd and {metad_num} metad and {storaged_num} storaged' + ) +) +def given_nebulacluster( + request, + graphd_num, + metad_num, + storaged_num, + class_fixture_variables, + pytestconfig, +): + given_nebulacluster_with_param( + request, + None, + graphd_num, + metad_num, + storaged_num, + class_fixture_variables, + pytestconfig, + ) + + +@given( + parse( + 'a nebulacluster with {graphd_num} graphd and {metad_num} metad and {storaged_num} storaged:\n{params}' + ) +) +def given_nebulacluster_with_param( + request, + params, + graphd_num, + metad_num, + storaged_num, + class_fixture_variables, + pytestconfig, +): + grpahd_param, metad_param, storaged_param = {}, {}, {} + if params is not None: + for param in params.splitlines(): + module, config = param.strip().split(":") + assert module.lower() in ["graphd", "storaged", "metad"] + key, value = config.strip().split("=") + if module.lower() == "graphd": + grpahd_param[key] = value + elif module.lower() == "storaged": + storaged_param[key] = value + else: + metad_param[key] = value + + user = pytestconfig.getoption("user") + password = pytestconfig.getoption("password") + build_dir = pytestconfig.getoption("build_dir") + src_dir = pytestconfig.getoption("src_dir") + nebula_svc = NebulaService( + build_dir, + src_dir, + int(metad_num), + int(storaged_num), + int(graphd_num), + ) + for process in nebula_svc.graphd_processes: + process.update_param(grpahd_param) + for process in nebula_svc.storaged_processes: + process.update_param(storaged_param) + for process in nebula_svc.metad_processes: + process.update_param(metad_param) + work_dir = os.path.join( + build_dir, + "C" + space_generator() + time.strftime('%Y-%m-%dT%H-%M-%S', time.localtime()), + ) + nebula_svc.install(work_dir) + nebula_svc.start() + graph_ip = nebula_svc.graphd_processes[0].host + graph_port = nebula_svc.graphd_processes[0].tcp_port + pool = get_conn_pool(graph_ip, graph_port) + sess = pool.get_session(user, password) + class_fixture_variables["session"] = sess + class_fixture_variables["cluster"] = nebula_svc + class_fixture_variables["pool"] = pool + + @when(parse("executing query:\n{query}")) def executing_query(query, graph_spaces, session, request): ngql = combine_query(query) @@ -232,6 +317,7 @@ def try_to_execute_query(query, graph_spaces, session, request): for stmt in ngql.split(';'): exec_query(request, stmt, session, graph_spaces, True) + @when(parse("clone a new space according to current space")) def clone_space(graph_spaces, session, request): space_desc = graph_spaces["space_desc"] @@ -239,12 +325,13 @@ def clone_space(graph_spaces, session, request): new_space = "EmptyGraph_" + space_generator() space_desc._name = new_space resp_ok(session, space_desc.drop_stmt(), True) - ngql = "create space " + new_space + " as " + current_space; + ngql = "create space " + new_space + " as " + current_space exec_query(request, ngql, session, graph_spaces) resp_ok(session, space_desc.use_stmt(), True) graph_spaces["space_desc"] = space_desc graph_spaces["drop_space"] = True + @given("wait all indexes ready") @when("wait all indexes ready") @then("wait all indexes ready") @@ -290,7 +377,6 @@ def parse_list(s: str): return [int(num) for num in s.split(',')] - def hash_columns(ds, hashed_columns): if len(hashed_columns) == 0: return ds @@ -304,14 +390,14 @@ def hash_columns(ds, hashed_columns): def cmp_dataset( - request, - graph_spaces, - result, - order: bool, - strict: bool, - contains=CmpType.EQUAL, - first_n_records=-1, - hashed_columns=[], + request, + graph_spaces, + result, + order: bool, + strict: bool, + contains=CmpType.EQUAL, + first_n_records=-1, + hashed_columns=[], ): rs = graph_spaces['result_set'] ngql = graph_spaces['ngql'] @@ -323,12 +409,14 @@ def cmp_dataset( graph_spaces.get("variables", {}), ) ds = hash_columns(ds, hashed_columns) - dscmp = DataSetComparator(strict=strict, - order=order, - contains=contains, - first_n_records=first_n_records, - decode_type=rs._decode_type, - vid_fn=vid_fn) + dscmp = DataSetComparator( + strict=strict, + order=order, + contains=contains, + first_n_records=first_n_records, + decode_type=rs._decode_type, + vid_fn=vid_fn, + ) def dsp(ds): printer = DataSetPrinter(rs._decode_type, vid_fn=vid_fn) @@ -344,7 +432,9 @@ def rowp(ds, i): return f'{i}: |' + ss + '|' if rs._data_set_wrapper is None: - assert not ds.column_names and not ds.rows, f"Expected result must be empty table: ||" + assert ( + not ds.column_names and not ds.rows + ), f"Expected result must be empty table: ||" rds = rs._data_set_wrapper._data_set res, i = dscmp(rds, ds) @@ -379,9 +469,20 @@ def result_should_be_in_order(request, result, graph_spaces): cmp_dataset(request, graph_spaces, result, order=True, strict=True) -@then(parse("the result should be, in order, and the columns {hashed_columns} should be hashed:\n{result}")) +@then( + parse( + "the result should be, in order, and the columns {hashed_columns} should be hashed:\n{result}" + ) +) def result_should_be_in_order_and_hash(request, result, graph_spaces, hashed_columns): - cmp_dataset(request, graph_spaces, result, order=True, strict=True, hashed_columns=parse_list(hashed_columns)) + cmp_dataset( + request, + graph_spaces, + result, + order=True, + strict=True, + hashed_columns=parse_list(hashed_columns), + ) @then(parse("the result should be, in order, with relax comparison:\n{result}")) @@ -389,9 +490,22 @@ def result_should_be_in_order_relax_cmp(request, result, graph_spaces): cmp_dataset(request, graph_spaces, result, order=True, strict=False) -@then(parse("the result should be, in order, with relax comparison, and the columns {hashed_columns} should be hashed:\n{result}")) -def result_should_be_in_order_relax_cmp_and_hash(request, result, graph_spaces, hashed_columns): - cmp_dataset(request, graph_spaces, result, order=True, strict=False, hashed_columns=parse_list(hashed_columns)) +@then( + parse( + "the result should be, in order, with relax comparison, and the columns {hashed_columns} should be hashed:\n{result}" + ) +) +def result_should_be_in_order_relax_cmp_and_hash( + request, result, graph_spaces, hashed_columns +): + cmp_dataset( + request, + graph_spaces, + result, + order=True, + strict=False, + hashed_columns=parse_list(hashed_columns), + ) @then(parse("the result should be, in any order:\n{result}")) @@ -399,9 +513,20 @@ def result_should_be(request, result, graph_spaces): cmp_dataset(request, graph_spaces, result, order=False, strict=True) -@then(parse("the result should be, in any order, and the columns {hashed_columns} should be hashed:\n{result}")) +@then( + parse( + "the result should be, in any order, and the columns {hashed_columns} should be hashed:\n{result}" + ) +) def result_should_be_and_hash(request, result, graph_spaces, hashed_columns): - cmp_dataset(request, graph_spaces, result, order=False, strict=True, hashed_columns=parse_list(hashed_columns)) + cmp_dataset( + request, + graph_spaces, + result, + order=False, + strict=True, + hashed_columns=parse_list(hashed_columns), + ) @then(parse("the result should be, in any order, with relax comparison:\n{result}")) @@ -409,40 +534,61 @@ def result_should_be_relax_cmp(request, result, graph_spaces): cmp_dataset(request, graph_spaces, result, order=False, strict=False) -@then(parse("the result should be, in any order, with relax comparison, and the columns {hashed_columns} should be hashed:\n{result}")) +@then( + parse( + "the result should be, in any order, with relax comparison, and the columns {hashed_columns} should be hashed:\n{result}" + ) +) def result_should_be_relax_cmp_and_hash(request, result, graph_spaces, hashed_columns): - cmp_dataset(request, graph_spaces, result, order=False, strict=False, hashed_columns=parse_list(hashed_columns)) + cmp_dataset( + request, + graph_spaces, + result, + order=False, + strict=False, + hashed_columns=parse_list(hashed_columns), + ) @then(parse("the result should contain:\n{result}")) def result_should_contain(request, result, graph_spaces): - cmp_dataset(request, - graph_spaces, - result, - order=False, - strict=True, - contains=CmpType.CONTAINS) + cmp_dataset( + request, + graph_spaces, + result, + order=False, + strict=True, + contains=CmpType.CONTAINS, + ) @then(parse("the result should not contain:\n{result}")) def result_should_not_contain(request, result, graph_spaces): - cmp_dataset(request, - graph_spaces, - result, - order=False, - strict=True, - contains=CmpType.NOT_CONTAINS) + cmp_dataset( + request, + graph_spaces, + result, + order=False, + strict=True, + contains=CmpType.NOT_CONTAINS, + ) -@then(parse("the result should contain, and the columns {hashed_columns} should be hashed:\n{result}")) +@then( + parse( + "the result should contain, and the columns {hashed_columns} should be hashed:\n{result}" + ) +) def result_should_contain_and_hash(request, result, graph_spaces, hashed_columns): - cmp_dataset(request, - graph_spaces, - result, - order=False, - strict=True, - contains=True, - hashed_columns=parse_list(hashed_columns)) + cmp_dataset( + request, + graph_spaces, + result, + order=False, + strict=True, + contains=True, + hashed_columns=parse_list(hashed_columns), + ) @then("no side effects") @@ -457,7 +603,11 @@ def execution_should_be_succ(graph_spaces): check_resp(rs, stmt) -@then(rparse(r"(?Pa|an) (?P\w+) should be raised at (?P