diff --git a/maro/cli/grass/data.py b/maro/cli/grass/data.py index ca934c5a7..6a8469b4f 100644 --- a/maro/cli/grass/data.py +++ b/maro/cli/grass/data.py @@ -2,11 +2,12 @@ # Licensed under the MIT license. -from maro.cli.grass.utils.copy import copy_files_from_node, copy_files_to_node, sync_mkdir +from maro.cli.grass.utils.copy import copy_files_from_node, copy_files_to_node from maro.cli.utils.checkers import check_details_validity from maro.cli.utils.details import load_cluster_details from maro.cli.utils.lock import lock from maro.cli.utils.params import GlobalPaths +from maro.utils.exception.cli_exception import CliException @check_details_validity(mode='grass') @@ -17,14 +18,11 @@ def push_data(cluster_name: str, local_path: str, remote_path: str, **kwargs): admin_username = cluster_details['user']['admin_username'] master_public_ip_address = cluster_details['master']['public_ip_address'] - sync_mkdir( - remote_path=f"{GlobalPaths.MARO_CLUSTERS}/{cluster_name}/data/{remote_path}", - admin_username=admin_username, node_ip_address=master_public_ip_address - ) - + if not remote_path.startswith("/"): + raise CliException("Invalid remote path") copy_files_to_node( local_path=local_path, - remote_dir=f"{GlobalPaths.MARO_CLUSTERS}/{cluster_name}/data/{remote_path}", + remote_dir=f"{GlobalPaths.MARO_CLUSTERS}/{cluster_name}/data{remote_path}", admin_username=admin_username, node_ip_address=master_public_ip_address ) @@ -37,8 +35,10 @@ def pull_data(cluster_name: str, local_path: str, remote_path: str, **kwargs): admin_username = cluster_details['user']['admin_username'] master_public_ip_address = cluster_details['master']['public_ip_address'] + if not remote_path.startswith("/"): + raise CliException("Invalid remote path") copy_files_from_node( local_dir=local_path, - remote_path=f"{GlobalPaths.MARO_CLUSTERS}/{cluster_name}/data/{remote_path}", + remote_path=f"{GlobalPaths.MARO_CLUSTERS}/{cluster_name}/data{remote_path}", admin_username=admin_username, node_ip_address=master_public_ip_address ) diff --git a/maro/cli/grass/executors/grass_azure_executor.py b/maro/cli/grass/executors/grass_azure_executor.py index c239c6cd4..8b043ec04 100644 --- a/maro/cli/grass/executors/grass_azure_executor.py +++ b/maro/cli/grass/executors/grass_azure_executor.py @@ -16,11 +16,11 @@ from maro.cli.grass.executors.grass_executor import GrassExecutor from maro.cli.grass.utils.copy import copy_files_to_node, copy_files_from_node, sync_mkdir, copy_and_rename from maro.cli.grass.utils.hash import get_checksum -from maro.cli.utils.details import load_cluster_details, save_cluster_details, load_job_details, save_job_details, \ - load_schedule_details, save_schedule_details +from maro.cli.utils.details import (load_cluster_details, save_cluster_details, load_job_details, save_job_details, + load_schedule_details, save_schedule_details) from maro.cli.utils.executors.azure_executor import AzureExecutor -from maro.cli.utils.naming import generate_cluster_id, generate_job_id, generate_component_id, generate_node_name, \ - get_valid_file_name +from maro.cli.utils.naming import (generate_cluster_id, generate_job_id, generate_component_id, generate_node_name, + get_valid_file_name) from maro.cli.utils.params import GlobalParams, GlobalPaths from maro.cli.utils.subprocess import SubProcess from maro.cli.utils.validation import validate_and_fill_dict @@ -209,13 +209,13 @@ def _init_master(self): # Copy required files copy_files_to_node( - local_path=f"{GlobalPaths.MARO_GRASS_LIB}/*", - remote_dir=GlobalPaths.MARO_GRASS_LIB, + local_path=GlobalPaths.MARO_GRASS_LIB, + remote_dir=GlobalPaths.MARO_LIB, admin_username=admin_username, node_ip_address=master_public_ip_address ) copy_files_to_node( - local_path=f"{GlobalPaths.MARO_CLUSTERS}/{self.cluster_name}/*", - remote_dir=f"{GlobalPaths.MARO_CLUSTERS}/{self.cluster_name}", + local_path=f"{GlobalPaths.MARO_CLUSTERS}/{self.cluster_name}", + remote_dir=GlobalPaths.MARO_CLUSTERS, admin_username=admin_username, node_ip_address=master_public_ip_address ) @@ -505,7 +505,7 @@ def _init_node(self, node_name: str): admin_username=admin_username, node_ip_address=node_public_ip_address) copy_files_to_node( local_path=f"{GlobalPaths.MARO_CLUSTERS}/{self.cluster_name}/details.yml", - remote_dir="~/details.yml", + remote_dir="~/", admin_username=admin_username, node_ip_address=node_public_ip_address) # Remote init node diff --git a/maro/cli/grass/utils/copy.py b/maro/cli/grass/utils/copy.py index d72441401..7e0a26289 100644 --- a/maro/cli/grass/utils/copy.py +++ b/maro/cli/grass/utils/copy.py @@ -5,6 +5,7 @@ import os import shutil +from maro.cli.utils.copy import get_reformatted_source_path, get_reformatted_target_dir from maro.cli.utils.subprocess import SubProcess from maro.utils.exception.cli_exception import CliException from maro.utils.logger import CliLogger @@ -12,8 +13,8 @@ logger = CliLogger(name=__name__) -def copy_files_to_node(local_path: str, remote_dir: str, admin_username: str, node_ip_address: str): - """Copy local files to node, automatically create folder if not exist +def copy_files_to_node(local_path: str, remote_dir: str, admin_username: str, node_ip_address: str) -> None: + """Copy local files to node, automatically create folder if not exist. Args: local_path (str): path of the local file @@ -21,13 +22,20 @@ def copy_files_to_node(local_path: str, remote_dir: str, admin_username: str, no admin_username (str) node_ip_address (str) """ - copy_scripts = f"rsync -e 'ssh -o StrictHostKeyChecking=no' " \ - f"-az {local_path} {admin_username}@{node_ip_address}:{remote_dir}" - _ = SubProcess.run(copy_scripts) + source_path = get_reformatted_source_path(local_path) + basename = os.path.basename(source_path) + folder_name = os.path.dirname(source_path) + target_dir = get_reformatted_target_dir(remote_dir) + mkdir_script = f"ssh -o StrictHostKeyChecking=no {admin_username}@{node_ip_address} 'mkdir -p {target_dir}'" + _ = SubProcess.run(mkdir_script) + copy_script = (f"tar czf - -C {folder_name} {basename} | " + f"ssh {admin_username}@{node_ip_address} 'tar xzf - -C {target_dir}'") + _ = SubProcess.run(copy_script) -def copy_files_from_node(local_dir: str, remote_path: str, admin_username: str, node_ip_address: str): - """Copy node files to local, automatically create folder if not exist + +def copy_files_from_node(local_dir: str, remote_path: str, admin_username: str, node_ip_address: str) -> None: + """Copy node files to local, automatically create folder if not exist. Args: local_dir (str): dir for local files @@ -35,13 +43,20 @@ def copy_files_from_node(local_dir: str, remote_path: str, admin_username: str, admin_username (str) node_ip_address (str) """ - copy_scripts = f"rsync -e 'ssh -o StrictHostKeyChecking=no' " \ - f"-az {admin_username}@{node_ip_address}:{remote_path} {local_dir}" - _ = SubProcess.run(copy_scripts) + source_path = get_reformatted_source_path(remote_path) + basename = os.path.basename(source_path) + folder_name = os.path.dirname(source_path) + target_dir = get_reformatted_target_dir(local_dir) + + mkdir_script = f"mkdir -p {target_dir}" + _ = SubProcess.run(mkdir_script) + copy_script = (f"ssh {admin_username}@{node_ip_address} 'tar czf - -C {folder_name} {basename}' | " + f"tar xzf - -C {target_dir}") + _ = SubProcess.run(copy_script) def sync_mkdir(remote_path: str, admin_username: str, node_ip_address: str): - """mkdir synchronously at local and remote + """Mkdir synchronously at local and remote. Args: remote_path (str): path of the remote file @@ -56,15 +71,12 @@ def sync_mkdir(remote_path: str, admin_username: str, node_ip_address: str): def copy_and_rename(source_path: str, target_dir: str, new_name: str = None): - """Copy and rename a file + """Copy and rename a file. Args: source_path (str): path of the source target_dir (str): dir of the target new_name (str): name of the new file, if None, will not do rename - - Raises: - CliException: """ source_path = os.path.expanduser(source_path) target_dir = os.path.expanduser(target_dir) diff --git a/maro/cli/k8s/executors/k8s_azure_executor.py b/maro/cli/k8s/executors/k8s_azure_executor.py index 3093f5f15..96c82cc28 100644 --- a/maro/cli/k8s/executors/k8s_azure_executor.py +++ b/maro/cli/k8s/executors/k8s_azure_executor.py @@ -9,8 +9,9 @@ import yaml -from maro.cli.utils.details import load_cluster_details, save_cluster_details, load_job_details, save_job_details, \ - load_schedule_details, save_schedule_details +from maro.cli.utils.copy import get_reformatted_source_path, get_reformatted_target_dir +from maro.cli.utils.details import (load_cluster_details, save_cluster_details, load_job_details, save_job_details, + load_schedule_details, save_schedule_details) from maro.cli.utils.executors.azure_executor import AzureExecutor from maro.cli.utils.naming import generate_cluster_id, generate_job_id, generate_component_id, generate_name_with_md5 from maro.cli.utils.params import GlobalPaths @@ -435,9 +436,13 @@ def push_data(self, local_path: str, remote_dir: str): sas = self._check_and_get_account_sas() # Push data + source_path = get_reformatted_source_path(local_path) + target_dir = get_reformatted_target_dir(remote_dir) + if not target_dir.startswith("/"): + raise CliException("Invalid remote path") copy_command = f'azcopy copy ' \ - f'"{local_path}" ' \ - f'"https://{cluster_id}st.file.core.windows.net/{cluster_id}-fs{remote_dir}?{sas}" ' \ + f'"{source_path}" ' \ + f'"https://{cluster_id}st.file.core.windows.net/{cluster_id}-fs{target_dir}?{sas}" ' \ f'--recursive=True' _ = SubProcess.run(copy_command) @@ -450,9 +455,15 @@ def pull_data(self, local_dir: str, remote_path: str): sas = self._check_and_get_account_sas() # Push data + source_path = get_reformatted_source_path(remote_path) + target_dir = get_reformatted_target_dir(local_dir) + mkdir_script = f"mkdir -p {target_dir}" + _ = SubProcess.run(mkdir_script) + if not source_path.startswith("/"): + raise CliException("Invalid remote path") copy_command = f'azcopy copy ' \ - f'"https://{cluster_id}st.file.core.windows.net/{cluster_id}-fs{remote_path}?{sas}" ' \ - f'"{local_dir}" ' \ + f'"https://{cluster_id}st.file.core.windows.net/{cluster_id}-fs{source_path}?{sas}" ' \ + f'"{os.path.expanduser(target_dir)}" ' \ f'--recursive=True' _ = SubProcess.run(copy_command) @@ -707,9 +718,7 @@ def get_job_logs(self, job_name: str, export_dir: str = './'): job_id = job_details['id'] # Get pods details - command = "kubectl get pods -o json" - return_str = SubProcess.run(command) - pods_details = json.loads(return_str)['items'] + pods_details = self.get_pods_details() # Export logs for pod_details in pods_details: @@ -815,9 +824,7 @@ def status(self): return_status = {} # Get pods details - command = "kubectl get pods -o json" - return_str = SubProcess.run(command) - pods_details = json.loads(return_str)['items'] + pods_details = self.get_pods_details() for pod_details in pods_details: if pod_details['metadata']['labels']['app'] == 'maro-redis': @@ -866,3 +873,10 @@ def _check_and_load_k8s_context(self): config_dict = yaml.safe_load(config_str) if config_dict['current-context'] != f"{cluster_id}-aks": self._load_k8s_context() + + @staticmethod + def get_pods_details(): + # Get pods details + command = "kubectl get pods -o json" + return_str = SubProcess.run(command) + return json.loads(return_str)['items'] diff --git a/maro/cli/maro.py b/maro/cli/maro.py index e37315dc4..098e847f1 100644 --- a/maro/cli/maro.py +++ b/maro/cli/maro.py @@ -248,7 +248,7 @@ def load_parser_grass(prev_parser: ArgumentParser, global_parser: ArgumentParser from maro.cli.grass.data import push_data parser_data_push = parser_data_subparsers.add_parser( 'push', - help='Push local data to the cluster data storage', + help='Push the local data to the remote directory', examples=CliExamples.MARO_GRASS_DATA_PUSH, parents=[global_parser] ) @@ -264,7 +264,7 @@ def load_parser_grass(prev_parser: ArgumentParser, global_parser: ArgumentParser from maro.cli.grass.data import pull_data parser_data_pull = parser_data_subparsers.add_parser( 'pull', - help='Pull data in the cluster data storage to local', + help='Pull the remote data to the local directory', examples=CliExamples.MARO_GRASS_DATA_PULL, parents=[global_parser] ) @@ -598,7 +598,7 @@ def load_parser_k8s(prev_parser: ArgumentParser, global_parser: ArgumentParser) from maro.cli.k8s.data import push_data parser_data_push = parser_data_subparsers.add_parser( 'push', - help='Push local data to the cluster data storage', + help='Push the local data to the remote directory', examples=CliExamples.MARO_K8S_DATA_PUSH, parents=[global_parser] ) @@ -614,7 +614,7 @@ def load_parser_k8s(prev_parser: ArgumentParser, global_parser: ArgumentParser) from maro.cli.k8s.data import pull_data parser_data_pull = parser_data_subparsers.add_parser( 'pull', - help='Pull data in the cluster data storage to local', + help='Pull the remote data to the local directory', examples=CliExamples.MARO_K8S_DATA_PULL, parents=[global_parser] ) diff --git a/maro/cli/utils/copy.py b/maro/cli/utils/copy.py new file mode 100644 index 000000000..59d84dc55 --- /dev/null +++ b/maro/cli/utils/copy.py @@ -0,0 +1,30 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + + +def get_reformatted_source_path(path: str): + """Build source path without trailing '/'. + + Args: + path (str): Original path. + + Returns: + str: Reformatted path. + """ + if path.endswith("/"): + path = path[:-1] + return path + + +def get_reformatted_target_dir(path: str): + """Get reformatted target dir with trailing '/'. + + Args: + path: (str): Original path. + + Returns: + str: Reformatted path. + """ + if not path.endswith("/"): + path = path + "/" + return path diff --git a/maro/cli/utils/params.py b/maro/cli/utils/params.py index 3448c1556..fe57a71e8 100644 --- a/maro/cli/utils/params.py +++ b/maro/cli/utils/params.py @@ -11,7 +11,9 @@ class GlobalParams: class GlobalPaths: + MARO_LIB = '~/.maro/lib' MARO_GRASS_LIB = '~/.maro/lib/grass' MARO_K8S_LIB = '~/.maro/lib/k8s' MARO_CLUSTERS = '~/.maro/clusters' MARO_DATA = '~/.maro/data' + MARO_TEST = '~/.maro/test' diff --git a/tests/cli/config.yml b/tests/cli/config.yml new file mode 100644 index 000000000..f7184b6d0 --- /dev/null +++ b/tests/cli/config.yml @@ -0,0 +1,2 @@ +cloud/subscription: "" +user/admin_public_key: "" diff --git a/tests/cli/grass/test_data/README.md b/tests/cli/grass/test_data/README.md new file mode 100644 index 000000000..13125c62d --- /dev/null +++ b/tests/cli/grass/test_data/README.md @@ -0,0 +1 @@ +Just a file for testing data push/pull. diff --git a/tests/cli/grass/test_grass.py b/tests/cli/grass/test_grass.py new file mode 100644 index 000000000..e04c85a26 --- /dev/null +++ b/tests/cli/grass/test_grass.py @@ -0,0 +1,268 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + + +import json +import logging +import os +import time +import unittest +import uuid + +import yaml + +from maro.cli.utils.params import GlobalParams, GlobalPaths +from maro.cli.utils.subprocess import SubProcess + + +@unittest.skipUnless(os.environ.get("test_with_cli", False), "Require cli prerequisites.") +class TestGrass(unittest.TestCase): + """Tests for Grass Mode. + + Tests should be executed in specific order, + and the order in which the various tests will be run is determined by sorting the test method names with + respect to the built-in ordering for strings. + We use testXX (X is a digit) as prefix to specify the order of the tests. + Ref: https://docs.python.org/3.7/library/unittest.html#organizing-test-code + """ + @classmethod + def setUpClass(cls) -> None: + # Get and set params + GlobalParams.LOG_LEVEL = logging.DEBUG + cls.test_id = uuid.uuid4().hex[:8] + os.makedirs(os.path.expanduser(f"{GlobalPaths.MARO_TEST}/{cls.test_id}"), exist_ok=True) + cls.file_path = os.path.abspath(__file__) + cls.dir_path = os.path.dirname(cls.file_path) + cls.deployment_template_path = os.path.normpath( + os.path.join(cls.dir_path, "../templates/test_grass_azure_create.yml")) + cls.deployment_path = os.path.expanduser(f"{GlobalPaths.MARO_TEST}/{cls.test_id}/test_grass_azure_create.yml") + cls.config_path = os.path.normpath(os.path.join(cls.dir_path, "../config.yml")) + + # Load config and save deployment + with open(cls.deployment_template_path) as fr: + deployment_details = yaml.safe_load(fr) + with open(cls.config_path) as fr: + config_details = yaml.safe_load(fr) + if config_details["cloud/subscription"] and config_details["user/admin_public_key"]: + deployment_details["cloud"]["subscription"] = config_details["cloud/subscription"] + deployment_details["user"]["admin_public_key"] = config_details["user/admin_public_key"] + else: + raise Exception("Invalid config") + with open(cls.deployment_path, "w") as fw: + yaml.safe_dump(deployment_details, fw) + + # Get params from deployments + cls.cluster_name = deployment_details["name"] + + # Pull "ubuntu" as testing image + command = "docker pull alpine:latest" + SubProcess.run(command) + command = "docker pull ubuntu:latest" + SubProcess.run(command) + + # Utils + + def _get_node_details(self) -> dict: + command = f"maro grass node list {self.cluster_name}" + return_str = SubProcess.run(command) + return json.loads(return_str) + + @staticmethod + def _gracefully_wait(secs: int = 10) -> None: + time.sleep(secs) + + # Tests + + def test10_create(self) -> None: + # Run cli command + command = f"maro grass create --debug {self.deployment_path}" + SubProcess.interactive_run(command) + + def test11_node1(self) -> None: + """Scale node spec Standard_D4s_v3 to 1. + + A Standard_D4s_v3 should be in running state. + + Returns: + None. + """ + # Run cli command + command = f"maro grass node scale {self.cluster_name} --debug Standard_D4s_v3 1" + SubProcess.interactive_run(command) + + # Check validity + nodes_details = self._get_node_details() + self.assertEqual(len(nodes_details), 1) + for _, node_details in nodes_details.items(): + self.assertEqual("Running", node_details["state"]) + + def test12_image1(self) -> None: + """Push image alpine:latest to the cluster. + + The only Standard_D4s_v3 should have loaded the image alpine:latest. + + Returns: + None. + """ + # Run cli command + command = f"maro grass image push {self.cluster_name} --debug --image-name alpine:latest" + SubProcess.interactive_run(command) + self._gracefully_wait() + + # Check validity + nodes_details = self._get_node_details() + self.assertEqual(len(nodes_details), 1) + for _, node_details in nodes_details.items(): + self.assertEqual("Running", node_details["state"]) + self.assertIn("alpine_latest", node_details["image_files"]) + + def test13_node2(self) -> None: + """Scale node spec Standard_D4s_v3 to 2. + + Two Standard_D4s_v3 should be in running state, and they should have loaded the image alpine:latest. + + Returns: + None. + """ + # Run cli command + command = f"maro grass node scale {self.cluster_name} --debug Standard_D4s_v3 2" + SubProcess.interactive_run(command) + self._gracefully_wait() + + # Check validity + nodes_details = self._get_node_details() + self.assertEqual(len(nodes_details), 2) + for _, node_details in nodes_details.items(): + self.assertEqual("Running", node_details["state"]) + self.assertIn("alpine_latest", node_details["image_files"]) + + def test14_stop(self) -> None: + """Stop one Standard_D4s_v3. + + One Standard_D4s_v3 should be in running state, and the other should be in Stopped state. + + Returns: + None. + """ + # Run cli command + command = f"maro grass node stop {self.cluster_name} --debug Standard_D4s_v3 1" + SubProcess.interactive_run(command) + self._gracefully_wait() + + # Check validity + nodes_details = self._get_node_details() + self.assertEqual(len(nodes_details), 2) + running_count = 0 + stopped_count = 0 + for _, node_details in nodes_details.items(): + if node_details["state"] == "Running": + running_count += 1 + if node_details["state"] == "Stopped": + stopped_count += 1 + self.assertEqual(running_count, 1) + self.assertEqual(stopped_count, 1) + + def test15_image2(self) -> None: + """Push image ubuntu:latest to the cluster. + + The only Running Standard_D4s_v3 should have loaded the image ubuntu:latest, + and the other should have not. + + Returns: + None. + """ + # Run cli command + command = f"maro grass image push {self.cluster_name} --debug --image-name ubuntu:latest" + SubProcess.interactive_run(command) + self._gracefully_wait() + + # Check validity + nodes_details = self._get_node_details() + self.assertEqual(len(nodes_details), 2) + running_count = 0 + stopped_count = 0 + for _, node_details in nodes_details.items(): + if node_details["state"] == "Running": + running_count += 1 + self.assertIn("alpine_latest", node_details["image_files"]) + self.assertIn("ubuntu_latest", node_details["image_files"]) + if node_details["state"] == "Stopped": + stopped_count += 1 + self.assertIn("alpine_latest", node_details["image_files"]) + self.assertNotIn("ubuntu_latest", node_details["image_files"]) + self.assertEqual(running_count, 1) + self.assertEqual(stopped_count, 1) + + def test16_start(self) -> None: + """Start one Standard_D4s_v3. + + Two Standard_D4s_v3 should be in running state, + and they should have loaded the image alpine:latest and ubuntu:latest. + + Returns: + None. + """ + command = f"maro grass node start {self.cluster_name} --debug Standard_D4s_v3 1" + SubProcess.interactive_run(command) + self._gracefully_wait() + + # Check validity + nodes_details = self._get_node_details() + self.assertEqual(len(nodes_details), 2) + running_count = 0 + for _, node_details in nodes_details.items(): + if node_details["state"] == "Running": + running_count += 1 + self.assertIn("alpine_latest", node_details["image_files"]) + self.assertIn("ubuntu_latest", node_details["image_files"]) + self.assertEqual(running_count, 2) + + def test17_data(self) -> None: + # Push file to an existed folder + command = f"maro grass data push {self.cluster_name} --debug '{self.dir_path}/test_data/README.md' '/'" + SubProcess.interactive_run(command) + + # Push file to a new folder + command = f"maro grass data push {self.cluster_name} --debug '{self.dir_path}/test_data/README.md' '/F1'" + SubProcess.interactive_run(command) + + # Push folder to an existed folder + command = f"maro grass data push {self.cluster_name} --debug '{self.dir_path}/test_data/' '/'" + SubProcess.interactive_run(command) + + # Push folder to a new folder + command = f"maro grass data push {self.cluster_name} --debug '{self.dir_path}/test_data/' '/F2'" + SubProcess.interactive_run(command) + + # Pull file to an existed folder + command = f"maro grass data pull {self.cluster_name} --debug " \ + f"'/README.md' '{GlobalPaths.MARO_TEST}/{self.test_id}'" + SubProcess.interactive_run(command) + + # Pull file to a new folder + command = f"maro grass data pull {self.cluster_name} --debug " \ + f"'/F1/README.md' '{GlobalPaths.MARO_TEST}/{self.test_id}/F1'" + SubProcess.interactive_run(command) + + # Pull folder to an existed folder + command = f"maro grass data pull {self.cluster_name} --debug " \ + f"'/test_data' '{GlobalPaths.MARO_TEST}/{self.test_id}'" + SubProcess.interactive_run(command) + + # Pull folder to a new folder + command = f"maro grass data pull {self.cluster_name} --debug " \ + f"'/F2/test_data/' '{GlobalPaths.MARO_TEST}/{self.test_id}/F2/'" + SubProcess.interactive_run(command) + + self.assertTrue(os.path.exists(os.path.expanduser(f"{GlobalPaths.MARO_TEST}/{self.test_id}/README.md"))) + self.assertTrue(os.path.exists(os.path.expanduser(f"{GlobalPaths.MARO_TEST}/{self.test_id}/F1/README.md"))) + self.assertTrue(os.path.exists(os.path.expanduser(f"{GlobalPaths.MARO_TEST}/{self.test_id}/test_data"))) + self.assertTrue(os.path.exists(os.path.expanduser(f"{GlobalPaths.MARO_TEST}/{self.test_id}/F2/test_data"))) + + def test30_delete(self) -> None: + command = f"maro grass delete --debug {self.cluster_name}" + SubProcess.interactive_run(command) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/cli/k8s/test_data/README.md b/tests/cli/k8s/test_data/README.md new file mode 100644 index 000000000..13125c62d --- /dev/null +++ b/tests/cli/k8s/test_data/README.md @@ -0,0 +1 @@ +Just a file for testing data push/pull. diff --git a/tests/cli/k8s/test_k8s.py b/tests/cli/k8s/test_k8s.py new file mode 100644 index 000000000..9605f4e83 --- /dev/null +++ b/tests/cli/k8s/test_k8s.py @@ -0,0 +1,163 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + + +import ast +import json +import logging +import os +import time +import unittest +import uuid + +import yaml + +from maro.cli.utils.params import GlobalParams, GlobalPaths +from maro.cli.utils.subprocess import SubProcess + + +@unittest.skipUnless(os.environ.get("test_with_cli", False), "Require cli prerequisites.") +class TestK8s(unittest.TestCase): + """Tests for K8s Mode. + + Tests should be executed in specific order, + and the order in which the various tests will be run is determined by sorting the test method names with + respect to the built-in ordering for strings. + We use testXX (X is a digit) as prefix to specify the order of the tests. + Ref: https://docs.python.org/3.7/library/unittest.html#organizing-test-code + """ + @classmethod + def setUpClass(cls) -> None: + # Get and set params + GlobalParams.LOG_LEVEL = logging.DEBUG + cls.test_id = uuid.uuid4().hex[:8] + os.makedirs(os.path.expanduser(f"{GlobalPaths.MARO_TEST}/{cls.test_id}"), exist_ok=True) + cls.file_path = os.path.abspath(__file__) + cls.dir_path = os.path.dirname(cls.file_path) + cls.deployment_template_path = os.path.normpath( + os.path.join(cls.dir_path, "../templates/test_k8s_azure_create.yml")) + cls.deployment_path = os.path.expanduser(f"{GlobalPaths.MARO_TEST}/{cls.test_id}/test_k8s_azure_create.yml") + cls.config_path = os.path.normpath(os.path.join(cls.dir_path, "../config.yml")) + + # Load config and save deployment + with open(cls.deployment_template_path) as fr: + deployment_details = yaml.safe_load(fr) + with open(cls.config_path) as fr: + config_details = yaml.safe_load(fr) + if config_details["cloud/subscription"] and config_details["user/admin_public_key"]: + deployment_details["cloud"]["subscription"] = config_details["cloud/subscription"] + deployment_details["user"]["admin_public_key"] = config_details["user/admin_public_key"] + else: + raise Exception("Invalid config") + with open(cls.deployment_path, "w") as fw: + yaml.safe_dump(deployment_details, fw) + + # Get params from deployments + cls.cluster_name = deployment_details["name"] + + # Pull "ubuntu" as testing image + command = "docker pull alpine:latest" + SubProcess.run(command) + command = "docker pull ubuntu:latest" + SubProcess.run(command) + + # Utils + + def _get_node_details(self) -> dict: + command = f"maro k8s node list {self.cluster_name}" + return_str = SubProcess.run(command) + return json.loads(return_str) + + def _get_image_details(self) -> dict: + command = f"maro k8s image list {self.cluster_name}" + return_str = SubProcess.run(command) + return json.loads(return_str) + + @staticmethod + def _gracefully_wait(secs: int = 10) -> None: + time.sleep(secs) + + # Tests + + def test10_create(self) -> None: + # Run cli command + command = f"maro k8s create --debug {self.deployment_path}" + SubProcess.interactive_run(command) + + # Check validity + nodes_details = self._get_node_details() + self.assertIn("Standard_D2s_v3", nodes_details) + self.assertEqual(nodes_details["Standard_D2s_v3"], 1) + + def test11_node(self) -> None: + # Run cli command + command = f"maro k8s node scale {self.cluster_name} --debug Standard_D4s_v3 1" + SubProcess.interactive_run(command) + + # Check validity + nodes_details = self._get_node_details() + self.assertIn("Standard_D2s_v3", nodes_details) + self.assertIn("Standard_D4s_v3", nodes_details) + self.assertEqual(nodes_details["Standard_D2s_v3"], 1) + self.assertEqual(nodes_details["Standard_D4s_v3"], 1) + + def test12_image(self) -> None: + # Run cli command + command = f"maro k8s image push {self.cluster_name} --debug --image-name alpine:latest" + SubProcess.interactive_run(command) + + # Check validity + command = f"maro k8s image list {self.cluster_name}" + return_str = SubProcess.run(command) + images = ast.literal_eval(return_str) + self.assertIn("alpine", images) + + def test13_data(self) -> None: + # Push file to an existed folder + command = f"maro k8s data push {self.cluster_name} --debug '{self.dir_path}/test_data/README.md' '/'" + SubProcess.interactive_run(command) + + # Push file to a new folder + command = f"maro k8s data push {self.cluster_name} --debug '{self.dir_path}/test_data/README.md' '/F1'" + SubProcess.interactive_run(command) + + # Push folder to an existed folder + command = f"maro k8s data push {self.cluster_name} --debug '{self.dir_path}/test_data/' '/'" + SubProcess.interactive_run(command) + + # Push folder to a new folder + command = f"maro k8s data push {self.cluster_name} --debug '{self.dir_path}/test_data/' '/F2'" + SubProcess.interactive_run(command) + + # Pull file to an existed folder + command = f"maro k8s data pull {self.cluster_name} --debug " \ + f"'/README.md' '{GlobalPaths.MARO_TEST}/{self.test_id}'" + SubProcess.interactive_run(command) + + # Pull file to a new folder + command = f"maro k8s data pull {self.cluster_name} --debug " \ + f"'/F1/README.md' '{GlobalPaths.MARO_TEST}/{self.test_id}/F1'" + SubProcess.interactive_run(command) + + # Pull folder to an existed folder + command = f"maro k8s data pull {self.cluster_name} --debug " \ + f"'/test_data' '{GlobalPaths.MARO_TEST}/{self.test_id}'" + SubProcess.interactive_run(command) + + # Pull folder to a new folder + command = f"maro k8s data pull {self.cluster_name} --debug " \ + f"'/F2/test_data/' '{GlobalPaths.MARO_TEST}/{self.test_id}/F2/'" + SubProcess.interactive_run(command) + + self.assertTrue(os.path.exists(os.path.expanduser(f"{GlobalPaths.MARO_TEST}/{self.test_id}/README.md"))) + self.assertTrue(os.path.exists(os.path.expanduser(f"{GlobalPaths.MARO_TEST}/{self.test_id}/F1/README.md"))) + self.assertTrue(os.path.exists(os.path.expanduser(f"{GlobalPaths.MARO_TEST}/{self.test_id}/test_data"))) + self.assertTrue(os.path.exists(os.path.expanduser(f"{GlobalPaths.MARO_TEST}/{self.test_id}/F2/test_data"))) + + def test30_delete(self) -> None: + command = f"maro k8s delete --debug {self.cluster_name}" + SubProcess.interactive_run(command) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/cli/speed/test_grass_copy.py b/tests/cli/speed/test_grass_copy.py new file mode 100644 index 000000000..8aa00540a --- /dev/null +++ b/tests/cli/speed/test_grass_copy.py @@ -0,0 +1,225 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + + +import json +import logging +import os +import shutil +import time +import unittest +import uuid + +import yaml + +from maro.cli.utils.details import load_cluster_details +from maro.cli.utils.params import GlobalParams, GlobalPaths +from maro.cli.utils.subprocess import SubProcess + +TEST_TO_TIME = {} + + +def record_speed(func): + def with_record_speed(*args, **kwargs): + start_time = time.time_ns() / (10 ** 9) + func(*args, **kwargs) + end_time = time.time_ns() / (10 ** 9) + print(f"{func.__name__}: {end_time - start_time} s") + TEST_TO_TIME[func.__name__] = end_time - start_time + + return with_record_speed + + +@unittest.skipUnless(os.environ.get("test_with_cli", False), "Require cli prerequisites.") +class TestGrassCopy(unittest.TestCase): + """Tests for Grass Copy. + + Tests should be executed in specific order, + and the order in which the various tests will be run is determined by sorting the test method names with + respect to the built-in ordering for strings. + Therefore, we use test_X (X is a digit) as prefix to specify the order of the tests. + Ref: https://docs.python.org/3.7/library/unittest.html#organizing-test-code + """ + test_id = None + cluster_name = None + + @classmethod + def setUpClass(cls) -> None: + # Set params + GlobalParams.LOG_LEVEL = logging.DEBUG + cls.test_id = uuid.uuid4().hex[:8] + os.makedirs(os.path.expanduser(f"{GlobalPaths.MARO_TEST}/{cls.test_id}"), exist_ok=True) + cls.file_path = os.path.abspath(__file__) + cls.dir_path = os.path.dirname(cls.file_path) + cls.deployment_template_path = os.path.normpath( + os.path.join(cls.dir_path, "../templates/test_grass_azure_create.yml")) + cls.deployment_path = os.path.expanduser(f"{GlobalPaths.MARO_TEST}/{cls.test_id}/test_grass_azure_create.yml") + cls.config_path = os.path.normpath(os.path.join(cls.dir_path, "../config.yml")) + + # Load config and save deployment + with open(cls.deployment_template_path) as fr: + deployment_details = yaml.safe_load(fr) + with open(cls.config_path) as fr: + config_details = yaml.safe_load(fr) + if config_details["cloud/subscription"] and config_details["user/admin_public_key"]: + deployment_details["cloud"]["subscription"] = config_details["cloud/subscription"] + deployment_details["user"]["admin_public_key"] = config_details["user/admin_public_key"] + else: + raise Exception("Invalid config") + with open(cls.deployment_path, "w") as fw: + yaml.safe_dump(deployment_details, fw) + + # Get params from deployments + cls.cluster_name = deployment_details["name"] + + # Init test files + cls.local_big_file_path = os.path.expanduser(f"{GlobalPaths.MARO_TEST}/{cls.test_id}/big_file") + cls.local_small_files_path = os.path.expanduser(f"{GlobalPaths.MARO_TEST}/{cls.test_id}/small_files") + command = f"dd if=/dev/zero of={cls.local_big_file_path} bs=1 count=0 seek=1G" + SubProcess.run(command) + command = f"git clone git@github.com:microsoft/maro.git {cls.local_small_files_path}" + SubProcess.run(command) + + # Create cluster + command = f"maro grass create --debug {cls.deployment_path}" + SubProcess.interactive_run(command) + cluster_details = load_cluster_details(cluster_name=cls.cluster_name) + master_details = cls._get_master_details() + cls.admin_username = cluster_details["user"]["admin_username"] + cls.master_public_ip_address = master_details["public_ip_address"] + + @classmethod + def tearDownClass(cls) -> None: + # Delete cluster + command = f"maro grass delete --debug {cls.cluster_name}" + SubProcess.interactive_run(command) + + # Print result + print( + json.dumps( + TEST_TO_TIME, + indent=4, sort_keys=True + ) + ) + + # Delete tmp test folder + shutil.rmtree(os.path.expanduser(f"{GlobalPaths.MARO_TEST}/{cls.test_id}")) + + # Utils + + @classmethod + def _get_master_details(cls) -> dict: + command = f"maro grass status {cls.cluster_name} master" + return_str = SubProcess.run(command) + return json.loads(return_str) + + # Tests + + @record_speed + def test_1_rsync_big_file_to_remote(self) -> None: + command = (f"ssh -o StrictHostKeyChecking=no " + f"{self.admin_username}@{self.master_public_ip_address} " + f"'mkdir -p ~/test/{self.test_id}/test_1_rsync_big_file_to_remote'") + _ = SubProcess.run(command) + command = (f"rsync -e 'ssh -o StrictHostKeyChecking=no' -az -r " + f"{self.local_big_file_path} " + f"{self.admin_username}@{self.master_public_ip_address}:" + f"~/test/{self.test_id}/test_1_rsync_big_file_to_remote") + SubProcess.interactive_run(command) + + @record_speed + def test_1_rsync_small_files_to_remote(self) -> None: + command = (f"ssh -o StrictHostKeyChecking=no " + f"{self.admin_username}@{self.master_public_ip_address} " + f"'mkdir -p ~/test/{self.test_id}/test_1_rsync_small_files_to_remote'") + _ = SubProcess.run(command) + command = (f"rsync -e 'ssh -o StrictHostKeyChecking=no' -az -r " + f"{self.local_small_files_path} " + f"{self.admin_username}@{self.master_public_ip_address}:" + f"~/test/{self.test_id}/test_1_rsync_small_files_to_remote") + SubProcess.interactive_run(command) + + @record_speed + def test_2_rsync_big_file_to_local(self) -> None: + command = f"mkdir -p {GlobalPaths.MARO_TEST}/{self.test_id}/test_2_rsync_big_file_to_local" + _ = SubProcess.run(command) + command = (f"rsync -e 'ssh -o StrictHostKeyChecking=no' -az -r " + f"{self.admin_username}@{self.master_public_ip_address}:" + f"~/test/{self.test_id}/test_1_rsync_big_file_to_remote " + f"{GlobalPaths.MARO_TEST}/{self.test_id}/test_2_rsync_big_file_to_local") + SubProcess.interactive_run(command) + self.assertTrue(os.path.exists(os.path.expanduser( + f"{GlobalPaths.MARO_TEST}/{self.test_id}/" + f"test_2_rsync_big_file_to_local/test_1_rsync_big_file_to_remote/big_file"))) + + @record_speed + def test_2_rsync_small_files_to_local(self) -> None: + command = f"mkdir -p {GlobalPaths.MARO_TEST}/{self.test_id}/test_2_rsync_small_files_to_local" + _ = SubProcess.run(command) + command = (f"rsync -e 'ssh -o StrictHostKeyChecking=no' -az -r " + f"{self.admin_username}@{self.master_public_ip_address}:" + f"~/test/{self.test_id}/test_1_rsync_small_files_to_remote " + f"{GlobalPaths.MARO_TEST}/{self.test_id}/test_2_rsync_small_files_to_local") + SubProcess.interactive_run(command) + self.assertTrue(os.path.exists(os.path.expanduser( + f"{GlobalPaths.MARO_TEST}/{self.test_id}/" + f"test_2_rsync_small_files_to_local/test_1_rsync_small_files_to_remote/small_files/README.md"))) + + @record_speed + def test_1_tar_ssh_big_file_to_remote(self) -> None: + command = (f"ssh -o StrictHostKeyChecking=no " + f"{self.admin_username}@{self.master_public_ip_address} " + f"'mkdir -p ~/test/{self.test_id}/test_1_tar_ssh_big_file_to_remote'") + _ = SubProcess.run(command) + + basename = os.path.basename(self.local_big_file_path) + dirname = os.path.dirname(self.local_big_file_path) + command = (f"tar cf - -C {dirname} {basename} | " + f"ssh {self.admin_username}@{self.master_public_ip_address} " + f"'tar xf - -C ~/test/{self.test_id}/test_1_tar_ssh_big_file_to_remote'") + SubProcess.interactive_run(command) + + @record_speed + def test_1_tar_ssh_small_files_to_remote(self) -> None: + command = (f"ssh -o StrictHostKeyChecking=no " + f"{self.admin_username}@{self.master_public_ip_address} " + f"'mkdir -p ~/test/{self.test_id}/test_1_tar_ssh_small_files_to_remote'") + _ = SubProcess.run(command) + + basename = os.path.basename(self.local_small_files_path) + dirname = os.path.dirname(self.local_small_files_path) + command = (f"tar cf - -C {dirname} {basename} | " + f"ssh {self.admin_username}@{self.master_public_ip_address} " + f"'tar xf - -C ~/test/{self.test_id}/test_1_tar_ssh_small_files_to_remote'") + SubProcess.interactive_run(command) + + @record_speed + def test_2_tar_ssh_big_file_to_local(self) -> None: + command = f"mkdir -p {GlobalPaths.MARO_TEST}/{self.test_id}/test_2_tar_ssh_big_file_to_local" + _ = SubProcess.run(command) + + basename = os.path.basename(f"~/test/{self.test_id}/test_1_tar_ssh_big_file_to_remote") + dirname = os.path.dirname(f"~/test/{self.test_id}/test_1_tar_ssh_big_file_to_remote") + command = (f"ssh {self.admin_username}@{self.master_public_ip_address} 'tar cf - -C {dirname} {basename}' | " + f"tar xf - -C {GlobalPaths.MARO_TEST}/{self.test_id}/test_2_tar_ssh_big_file_to_local") + SubProcess.interactive_run(command) + self.assertTrue(os.path.exists(os.path.expanduser( + f"{GlobalPaths.MARO_TEST}/{self.test_id}/" + f"test_2_tar_ssh_big_file_to_local/test_1_tar_ssh_big_file_to_remote/big_file"))) + + @record_speed + def test_2_tar_ssh_small_files_to_local(self) -> None: + command = f"mkdir -p {GlobalPaths.MARO_TEST}/{self.test_id}/test_2_tar_ssh_small_files_to_local" + _ = SubProcess.run(command) + basename = os.path.basename(f"~/test/{self.test_id}/test_1_tar_ssh_small_files_to_remote") + dirname = os.path.dirname(f"~/test/{self.test_id}/test_1_tar_ssh_small_files_to_remote") + command = (f"ssh {self.admin_username}@{self.master_public_ip_address} 'tar cf - -C {dirname} {basename}' | " + f"tar xf - -C {GlobalPaths.MARO_TEST}/{self.test_id}/test_2_tar_ssh_small_files_to_local") + SubProcess.interactive_run(command) + self.assertTrue(os.path.exists(os.path.expanduser( + f"{GlobalPaths.MARO_TEST}/{self.test_id}/" + f"test_2_tar_ssh_small_files_to_local/test_1_tar_ssh_small_files_to_remote/small_files/README.md"))) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/cli/speed/test_k8s_copy.py b/tests/cli/speed/test_k8s_copy.py new file mode 100644 index 000000000..c9c5f041e --- /dev/null +++ b/tests/cli/speed/test_k8s_copy.py @@ -0,0 +1,360 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + + +import json +import logging +import os +import shutil +import time +import unittest +import uuid + +import yaml + +from maro.cli.k8s.executors.k8s_azure_executor import K8sAzureExecutor +from maro.cli.utils.details import load_cluster_details +from maro.cli.utils.params import GlobalParams, GlobalPaths +from maro.cli.utils.subprocess import SubProcess + +TEST_TO_TIME = {} + + +def record_speed(func): + def with_record_speed(*args, **kwargs): + start_time = time.time_ns() / (10 ** 9) + func(*args, **kwargs) + end_time = time.time_ns() / (10 ** 9) + print(f"{func.__name__}: {end_time - start_time} s") + TEST_TO_TIME[func.__name__] = end_time - start_time + + return with_record_speed + + +@unittest.skipUnless(os.environ.get("test_with_cli", False), "Require cli prerequisites.") +class TestK8sCopy(unittest.TestCase): + """Tests for K8s Copy. + + Tests should be executed in specific order, + and the order in which the various tests will be run is determined by sorting the test method names with + respect to the built-in ordering for strings. + Therefore, we use test_X (X is a digit) as prefix to specify the order of the tests. + Ref: https://docs.python.org/3.7/library/unittest.html#organizing-test-code + """ + cluster_name = None + test_id = None + + @classmethod + def setUpClass(cls, file_path: str = os.path.abspath(__file__)) -> None: + # Get and set params + GlobalParams.LOG_LEVEL = logging.DEBUG + cls.test_id = uuid.uuid4().hex[:8] + os.makedirs(os.path.expanduser(f"{GlobalPaths.MARO_TEST}/{cls.test_id}"), exist_ok=True) + os.makedirs(os.path.expanduser(f"{GlobalPaths.MARO_TEST}/{cls.test_id}/tar"), exist_ok=True) + cls.file_path = os.path.abspath(__file__) + cls.dir_path = os.path.dirname(cls.file_path) + cls.deployment_template_path = os.path.normpath( + os.path.join(cls.dir_path, "../templates/test_k8s_azure_create.yml")) + cls.deployment_path = os.path.expanduser(f"{GlobalPaths.MARO_TEST}/{cls.test_id}/test_k8s_azure_create.yml") + cls.config_path = os.path.normpath(os.path.join(cls.dir_path, "../config.yml")) + + # Load config and save deployment + with open(cls.deployment_template_path) as fr: + deployment_details = yaml.safe_load(fr) + with open(cls.config_path) as fr: + config_details = yaml.safe_load(fr) + if config_details["cloud/subscription"] and config_details["user/admin_public_key"]: + deployment_details["cloud"]["subscription"] = config_details["cloud/subscription"] + deployment_details["user"]["admin_public_key"] = config_details["user/admin_public_key"] + else: + raise Exception("Invalid config") + with open(cls.deployment_path, "w") as fw: + yaml.safe_dump(deployment_details, fw) + + # Get params from deployments + cls.cluster_name = deployment_details["name"] + + # Init test files + cls.local_big_file_path = os.path.expanduser(f"{GlobalPaths.MARO_TEST}/{cls.test_id}/big_file") + cls.local_small_files_path = os.path.expanduser(f"{GlobalPaths.MARO_TEST}/{cls.test_id}/small_files") + command = f"dd if=/dev/zero of={cls.local_big_file_path} bs=1 count=0 seek=1G" + SubProcess.run(command) + command = f"git clone git@github.com:microsoft/maro.git {cls.local_small_files_path}" + SubProcess.run(command) + + # Create cluster + command = f"maro k8s create --debug {cls.deployment_path}" + SubProcess.interactive_run(command) + cls.cluster_details = load_cluster_details(cluster_name=cls.cluster_name) + cls.cluster_id = cls.cluster_details["id"] + cls.executor = K8sAzureExecutor(cluster_name=cls.cluster_name) + time.sleep(15) + cls.pod_name = cls._get_redis_pod_name() + + @classmethod + def tearDownClass(cls) -> None: + # Delete cluster + command = f"maro k8s delete --debug {cls.cluster_name}" + SubProcess.interactive_run(command) + + # Print result + print( + json.dumps( + TEST_TO_TIME, + indent=4, sort_keys=True + ) + ) + + # Delete tmp test folder + shutil.rmtree(os.path.expanduser(f"{GlobalPaths.MARO_TEST}/{cls.test_id}")) + + # Utils + + @classmethod + def _get_redis_pod_name(cls) -> str: + # Get pods details + command = "kubectl get pods -o json" + return_str = SubProcess.run(command) + pods_details = json.loads(return_str)["items"] + + # Export logs + for pod_details in pods_details: + if pod_details["metadata"]["labels"]["app"] == "maro-redis": + return pod_details["metadata"]["name"] + + # Tests + + @record_speed + def test_1_azcopy_big_file_to_remote(self) -> None: + sas = self.executor._check_and_get_account_sas() + command = (f"azcopy copy " + f"'{self.local_big_file_path}' " + f"'https://{self.cluster_id}st.file.core.windows.net/{self.cluster_id}-fs" + f"/{self.test_id}/test_1_azcopy_big_file_to_remote/?{sas}' " + f"--recursive=True") + SubProcess.interactive_run(command) + + @record_speed + def test_1_azcopy_small_files_to_remote(self) -> None: + sas = self.executor._check_and_get_account_sas() + command = (f"azcopy copy " + f"'{self.local_small_files_path}' " + f"'https://{self.cluster_id}st.file.core.windows.net/{self.cluster_id}-fs" + f"/{self.test_id}/test_1_azcopy_small_files_to_remote/?{sas}' " + f"--recursive=True") + SubProcess.interactive_run(command) + + @record_speed + def test_2_azcopy_big_file_to_local(self) -> None: + sas = self.executor._check_and_get_account_sas() + command = f"mkdir -p {GlobalPaths.MARO_TEST}/{self.test_id}/test_2_azcopy_big_file_to_local" + _ = SubProcess.run(command) + + local_path = os.path.expanduser(f"{GlobalPaths.MARO_TEST}/{self.test_id}/test_2_azcopy_big_file_to_local") + command = (f"azcopy copy " + f"'https://{self.cluster_id}st.file.core.windows.net/{self.cluster_id}-fs" + f"/{self.test_id}/test_1_azcopy_big_file_to_remote?{sas}' " + f"'{local_path}' " + f"--recursive=True") + SubProcess.interactive_run(command) + self.assertTrue(os.path.exists(os.path.expanduser( + f"{GlobalPaths.MARO_TEST}/{self.test_id}/" + f"test_2_azcopy_big_file_to_local/test_1_azcopy_big_file_to_remote/big_file"))) + + @record_speed + def test_2_azcopy_small_files_to_local(self) -> None: + sas = self.executor._check_and_get_account_sas() + command = f"mkdir -p {GlobalPaths.MARO_TEST}/{self.test_id}/test_2_azcopy_small_files_to_local" + _ = SubProcess.run(command) + + local_path = os.path.expanduser(f"{GlobalPaths.MARO_TEST}/{self.test_id}/test_2_azcopy_small_files_to_local") + command = (f"azcopy copy " + f"'https://{self.cluster_id}st.file.core.windows.net/{self.cluster_id}-fs" + f"/{self.test_id}/test_1_azcopy_small_files_to_remote?{sas}' " + f"'{local_path}' " + f"--recursive=True") + SubProcess.interactive_run(command) + self.assertTrue(os.path.exists(os.path.expanduser( + f"{GlobalPaths.MARO_TEST}/{self.test_id}/" + f"test_2_azcopy_small_files_to_local/test_1_azcopy_small_files_to_remote/small_files"))) + + @record_speed + def test_1_kubectl_exec_big_file_to_remote(self) -> None: + command = (f"kubectl exec -i {self.pod_name} -- " + f"mkdir -p /mnt/maro/{self.test_id}/test_1_kubectl_exec_big_file_to_remote") + SubProcess.interactive_run(command) + + basename = os.path.basename(self.local_big_file_path) + dirname = os.path.dirname(self.local_big_file_path) + command = (f"tar cf - -C {dirname} {basename} | " + f"kubectl exec -i {self.pod_name} -- " + f"tar xf - -C /mnt/maro/{self.test_id}/test_1_kubectl_exec_big_file_to_remote") + SubProcess.interactive_run(command) + + @record_speed + def test_1_kubectl_exec_small_files_to_remote(self) -> None: + command = (f"kubectl exec -i {self.pod_name} -- " + f"mkdir -p /mnt/maro/{self.test_id}/test_1_kubectl_exec_small_files_to_remote") + SubProcess.interactive_run(command) + + basename = os.path.basename(self.local_small_files_path) + dirname = os.path.dirname(self.local_small_files_path) + command = (f"tar cf - -C {dirname} {basename} | " + f"kubectl exec -i {self.pod_name} -- " + f"tar xf - -C /mnt/maro/{self.test_id}/test_1_kubectl_exec_small_files_to_remote") + SubProcess.interactive_run(command) + + @record_speed + def test_2_kubectl_exec_big_file_to_local(self) -> None: + command = f"mkdir -p {GlobalPaths.MARO_TEST}/{self.test_id}/test_2_kubectl_exec_big_file_to_local" + _ = SubProcess.run(command) + + basename = os.path.basename(f"/mnt/maro/{self.test_id}/test_1_kubectl_exec_big_file_to_remote") + dirname = os.path.dirname(f"/mnt/maro/{self.test_id}/test_1_kubectl_exec_big_file_to_remote") + command = (f"kubectl exec -i {self.pod_name} -- tar cf - -C {dirname} {basename} | " + f"tar xf - -C {GlobalPaths.MARO_TEST}/{self.test_id}/test_2_kubectl_exec_big_file_to_local") + SubProcess.interactive_run(command) + self.assertTrue(os.path.exists(os.path.expanduser( + f"{GlobalPaths.MARO_TEST}/{self.test_id}/" + f"test_2_kubectl_exec_big_file_to_local/test_1_kubectl_exec_big_file_to_remote/big_file"))) + + @record_speed + def test_2_kubectl_exec_small_files_to_local(self) -> None: + command = f"mkdir -p {GlobalPaths.MARO_TEST}/{self.test_id}/test_2_kubectl_exec_small_files_to_local" + _ = SubProcess.run(command) + + basename = os.path.basename(f"/mnt/maro/{self.test_id}/test_1_kubectl_exec_small_files_to_remote") + dirname = os.path.dirname(f"/mnt/maro/{self.test_id}/test_1_kubectl_exec_small_files_to_remote") + command = (f"kubectl exec -i {self.pod_name} -- tar cf - -C {dirname} {basename} | " + f"tar xf - -C {GlobalPaths.MARO_TEST}/{self.test_id}/test_2_kubectl_exec_small_files_to_local") + SubProcess.interactive_run(command) + self.assertTrue(os.path.exists(os.path.expanduser( + f"{GlobalPaths.MARO_TEST}/{self.test_id}/" + f"test_2_kubectl_exec_small_files_to_local/test_1_kubectl_exec_small_files_to_remote/small_files"))) + + @record_speed + def test_1_azcopy_tar_big_file_to_remote(self) -> None: + # create remote folder + command = (f"kubectl exec -i {self.pod_name} -- " + f"mkdir -p /mnt/maro/{self.test_id}/test_1_azcopy_tar_big_file_to_remote") + SubProcess.interactive_run(command) + + # local tar zip + basename = os.path.basename(self.local_big_file_path) + dirname = os.path.dirname(self.local_big_file_path) + tar_file_name = uuid.uuid4().hex[:8] + command = f"tar cf {GlobalPaths.MARO_TEST}/{self.test_id}/tar/{tar_file_name} -C {dirname} {basename}" + SubProcess.interactive_run(command) + + # azcopy + sas = self.executor._check_and_get_account_sas() + local_path = os.path.expanduser(f"{GlobalPaths.MARO_TEST}/{self.test_id}/tar/{tar_file_name}") + command = (f"azcopy copy " + f"'{local_path}' " + f"'https://{self.cluster_id}st.file.core.windows.net/{self.cluster_id}-fs" + f"/tar/{tar_file_name}?{sas}' " + f"--recursive=True") + SubProcess.interactive_run(command) + + # remote tar unzip + command = (f"kubectl exec -i {self.pod_name} -- " + f"tar xf /mnt/maro/tar/{tar_file_name} " + f"-C /mnt/maro/{self.test_id}/test_1_azcopy_tar_big_file_to_remote") + SubProcess.interactive_run(command) + + @record_speed + def test_1_azcopy_tar_small_files_to_remote(self) -> None: + # create remote folder + command = (f"kubectl exec -i {self.pod_name} -- " + f"mkdir -p /mnt/maro/{self.test_id}/test_1_azcopy_tar_small_files_to_remote") + SubProcess.interactive_run(command) + + # local tar zip + basename = os.path.basename(self.local_small_files_path) + dirname = os.path.dirname(self.local_small_files_path) + tar_file_name = uuid.uuid4().hex[:8] + command = f"tar cf {GlobalPaths.MARO_TEST}/{self.test_id}/tar/{tar_file_name} -C {dirname} {basename}" + SubProcess.interactive_run(command) + + # azcopy + sas = self.executor._check_and_get_account_sas() + local_path = os.path.expanduser(f"{GlobalPaths.MARO_TEST}/{self.test_id}/tar/{tar_file_name}") + command = (f"azcopy copy " + f"'{local_path}' " + f"'https://{self.cluster_id}st.file.core.windows.net/{self.cluster_id}-fs" + f"/tar/{tar_file_name}?{sas}' " + f"--recursive=True") + SubProcess.interactive_run(command) + + # remote tar unzip + command = (f"kubectl exec -i {self.pod_name} -- " + f"tar xf /mnt/maro/tar/{tar_file_name} " + f"-C /mnt/maro/{self.test_id}/test_1_azcopy_tar_small_files_to_remote") + SubProcess.interactive_run(command) + + @record_speed + def test_2_azcopy_tar_big_file_to_local(self) -> None: + # create folder + command = f"mkdir -p {GlobalPaths.MARO_TEST}/{self.test_id}/test_2_azcopy_tar_big_file_to_local" + SubProcess.interactive_run(command) + + # remote tar zip + basename = os.path.basename(f"/mnt/maro/{self.test_id}/test_1_azcopy_tar_big_file_to_remote") + dirname = os.path.dirname(f"/mnt/maro/{self.test_id}/test_1_azcopy_tar_big_file_to_remote") + tar_file_name = uuid.uuid4().hex[:8] + command = (f"kubectl exec -i {self.pod_name} -- " + f"tar cf /mnt/maro/tar/{tar_file_name} -C {dirname} {basename}") + SubProcess.interactive_run(command) + + # azcopy + sas = self.executor._check_and_get_account_sas() + local_path = os.path.expanduser(f"{GlobalPaths.MARO_TEST}/{self.test_id}/tar/{tar_file_name}") + command = (f"azcopy copy " + f"'https://{self.cluster_id}st.file.core.windows.net/{self.cluster_id}-fs" + f"/tar/{tar_file_name}?{sas}' " + f"'{local_path}' " + f"--recursive=True") + SubProcess.interactive_run(command) + + # local tar unzip + command = (f"tar xf {GlobalPaths.MARO_TEST}/{self.test_id}/tar/{tar_file_name} " + f"-C {GlobalPaths.MARO_TEST}/{self.test_id}/test_2_azcopy_tar_big_file_to_local") + SubProcess.interactive_run(command) + self.assertTrue(os.path.exists(os.path.expanduser( + f"{GlobalPaths.MARO_TEST}/{self.test_id}/" + f"test_2_azcopy_tar_big_file_to_local/test_1_azcopy_tar_big_file_to_remote/big_file"))) + + @record_speed + def test_2_azcopy_tar_small_files_to_local(self) -> None: + # create folder + command = f"mkdir -p {GlobalPaths.MARO_TEST}/{self.test_id}/test_2_azcopy_tar_small_files_to_local" + SubProcess.interactive_run(command) + + # remote tar zip + basename = os.path.basename(f"/mnt/maro/{self.test_id}/test_1_azcopy_tar_small_files_to_remote") + dirname = os.path.dirname(f"/mnt/maro/{self.test_id}/test_1_azcopy_tar_small_files_to_remote") + tar_file_name = uuid.uuid4().hex[:8] + command = (f"kubectl exec -i {self.pod_name} -- " + f"tar cf /mnt/maro/tar/{tar_file_name} -C {dirname} {basename}") + SubProcess.interactive_run(command) + + # azcopy + sas = self.executor._check_and_get_account_sas() + local_path = os.path.expanduser(f"{GlobalPaths.MARO_TEST}/{self.test_id}/tar/{tar_file_name}") + command = (f"azcopy copy " + f"'https://{self.cluster_id}st.file.core.windows.net/{self.cluster_id}-fs/tar/{tar_file_name}?{sas}' " + f"'{local_path}' " + f"--recursive=True") + SubProcess.interactive_run(command) + + # local tar unzip + command = (f"tar xf {GlobalPaths.MARO_TEST}/{self.test_id}/tar/{tar_file_name} " + f"-C {GlobalPaths.MARO_TEST}/{self.test_id}/test_2_azcopy_tar_small_files_to_local") + SubProcess.interactive_run(command) + self.assertTrue(os.path.exists(os.path.expanduser( + f"{GlobalPaths.MARO_TEST}/{self.test_id}/" + f"test_2_azcopy_tar_small_files_to_local/test_1_azcopy_tar_small_files_to_remote/small_files"))) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/cli/templates/test_grass_azure_create.yml b/tests/cli/templates/test_grass_azure_create.yml new file mode 100644 index 000000000..993ae027b --- /dev/null +++ b/tests/cli/templates/test_grass_azure_create.yml @@ -0,0 +1,15 @@ +name: test_maro_grass_cluster +mode: grass + +cloud: + infra: azure + location: japaneast + resource_group: test_maro_grass + subscription: "" + +user: + admin_public_key: "" + admin_username: marotest + +master: + node_size: Standard_D2s_v3 diff --git a/tests/cli/templates/test_k8s_azure_create.yml b/tests/cli/templates/test_k8s_azure_create.yml new file mode 100644 index 000000000..9e143e785 --- /dev/null +++ b/tests/cli/templates/test_k8s_azure_create.yml @@ -0,0 +1,15 @@ +name: test_maro_k8s_cluster +mode: k8s + +cloud: + infra: azure + location: japaneast + resource_group: test_maro_k8s + subscription: "" + +user: + admin_public_key: "" + admin_username: marotest + +master: + node_size: Standard_D2s_v3