Skip to content

Commit

Permalink
V0.1: CLI pull/push data refactoring with integration test (#142)
Browse files Browse the repository at this point in the history
* refine readme

* feat: refine data push/pull (#138)

* feat: refine data push/pull

* test: add cli provision testing

* fix: style fix

* fix: add necessary comments

* fix: from code review

Co-authored-by: Arthur Jiang <sjian@microsoft.com>
Co-authored-by: Romic Huang <romic.kid@gmail.com>
  • Loading branch information
3 people authored Oct 8, 2020
1 parent c250791 commit 4ad668b
Show file tree
Hide file tree
Showing 16 changed files with 1,156 additions and 48 deletions.
16 changes: 8 additions & 8 deletions maro/cli/grass/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
# Licensed under the MIT license.


from maro.cli.grass.utils.copy import copy_files_from_node, copy_files_to_node, sync_mkdir
from maro.cli.grass.utils.copy import copy_files_from_node, copy_files_to_node
from maro.cli.utils.checkers import check_details_validity
from maro.cli.utils.details import load_cluster_details
from maro.cli.utils.lock import lock
from maro.cli.utils.params import GlobalPaths
from maro.utils.exception.cli_exception import CliException


@check_details_validity(mode='grass')
Expand All @@ -17,14 +18,11 @@ def push_data(cluster_name: str, local_path: str, remote_path: str, **kwargs):
admin_username = cluster_details['user']['admin_username']
master_public_ip_address = cluster_details['master']['public_ip_address']

sync_mkdir(
remote_path=f"{GlobalPaths.MARO_CLUSTERS}/{cluster_name}/data/{remote_path}",
admin_username=admin_username, node_ip_address=master_public_ip_address
)

if not remote_path.startswith("/"):
raise CliException("Invalid remote path")
copy_files_to_node(
local_path=local_path,
remote_dir=f"{GlobalPaths.MARO_CLUSTERS}/{cluster_name}/data/{remote_path}",
remote_dir=f"{GlobalPaths.MARO_CLUSTERS}/{cluster_name}/data{remote_path}",
admin_username=admin_username, node_ip_address=master_public_ip_address
)

Expand All @@ -37,8 +35,10 @@ def pull_data(cluster_name: str, local_path: str, remote_path: str, **kwargs):
admin_username = cluster_details['user']['admin_username']
master_public_ip_address = cluster_details['master']['public_ip_address']

if not remote_path.startswith("/"):
raise CliException("Invalid remote path")
copy_files_from_node(
local_dir=local_path,
remote_path=f"{GlobalPaths.MARO_CLUSTERS}/{cluster_name}/data/{remote_path}",
remote_path=f"{GlobalPaths.MARO_CLUSTERS}/{cluster_name}/data{remote_path}",
admin_username=admin_username, node_ip_address=master_public_ip_address
)
18 changes: 9 additions & 9 deletions maro/cli/grass/executors/grass_azure_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
from maro.cli.grass.executors.grass_executor import GrassExecutor
from maro.cli.grass.utils.copy import copy_files_to_node, copy_files_from_node, sync_mkdir, copy_and_rename
from maro.cli.grass.utils.hash import get_checksum
from maro.cli.utils.details import load_cluster_details, save_cluster_details, load_job_details, save_job_details, \
load_schedule_details, save_schedule_details
from maro.cli.utils.details import (load_cluster_details, save_cluster_details, load_job_details, save_job_details,
load_schedule_details, save_schedule_details)
from maro.cli.utils.executors.azure_executor import AzureExecutor
from maro.cli.utils.naming import generate_cluster_id, generate_job_id, generate_component_id, generate_node_name, \
get_valid_file_name
from maro.cli.utils.naming import (generate_cluster_id, generate_job_id, generate_component_id, generate_node_name,
get_valid_file_name)
from maro.cli.utils.params import GlobalParams, GlobalPaths
from maro.cli.utils.subprocess import SubProcess
from maro.cli.utils.validation import validate_and_fill_dict
Expand Down Expand Up @@ -209,13 +209,13 @@ def _init_master(self):

# Copy required files
copy_files_to_node(
local_path=f"{GlobalPaths.MARO_GRASS_LIB}/*",
remote_dir=GlobalPaths.MARO_GRASS_LIB,
local_path=GlobalPaths.MARO_GRASS_LIB,
remote_dir=GlobalPaths.MARO_LIB,
admin_username=admin_username, node_ip_address=master_public_ip_address
)
copy_files_to_node(
local_path=f"{GlobalPaths.MARO_CLUSTERS}/{self.cluster_name}/*",
remote_dir=f"{GlobalPaths.MARO_CLUSTERS}/{self.cluster_name}",
local_path=f"{GlobalPaths.MARO_CLUSTERS}/{self.cluster_name}",
remote_dir=GlobalPaths.MARO_CLUSTERS,
admin_username=admin_username, node_ip_address=master_public_ip_address
)

Expand Down Expand Up @@ -505,7 +505,7 @@ def _init_node(self, node_name: str):
admin_username=admin_username, node_ip_address=node_public_ip_address)
copy_files_to_node(
local_path=f"{GlobalPaths.MARO_CLUSTERS}/{self.cluster_name}/details.yml",
remote_dir="~/details.yml",
remote_dir="~/",
admin_username=admin_username, node_ip_address=node_public_ip_address)

# Remote init node
Expand Down
42 changes: 27 additions & 15 deletions maro/cli/grass/utils/copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,43 +5,58 @@
import os
import shutil

from maro.cli.utils.copy import get_reformatted_source_path, get_reformatted_target_dir
from maro.cli.utils.subprocess import SubProcess
from maro.utils.exception.cli_exception import CliException
from maro.utils.logger import CliLogger

logger = CliLogger(name=__name__)


def copy_files_to_node(local_path: str, remote_dir: str, admin_username: str, node_ip_address: str):
"""Copy local files to node, automatically create folder if not exist
def copy_files_to_node(local_path: str, remote_dir: str, admin_username: str, node_ip_address: str) -> None:
"""Copy local files to node, automatically create folder if not exist.
Args:
local_path (str): path of the local file
remote_dir (str): dir for remote files
admin_username (str)
node_ip_address (str)
"""
copy_scripts = f"rsync -e 'ssh -o StrictHostKeyChecking=no' " \
f"-az {local_path} {admin_username}@{node_ip_address}:{remote_dir}"
_ = SubProcess.run(copy_scripts)
source_path = get_reformatted_source_path(local_path)
basename = os.path.basename(source_path)
folder_name = os.path.dirname(source_path)
target_dir = get_reformatted_target_dir(remote_dir)

mkdir_script = f"ssh -o StrictHostKeyChecking=no {admin_username}@{node_ip_address} 'mkdir -p {target_dir}'"
_ = SubProcess.run(mkdir_script)
copy_script = (f"tar czf - -C {folder_name} {basename} | "
f"ssh {admin_username}@{node_ip_address} 'tar xzf - -C {target_dir}'")
_ = SubProcess.run(copy_script)

def copy_files_from_node(local_dir: str, remote_path: str, admin_username: str, node_ip_address: str):
"""Copy node files to local, automatically create folder if not exist

def copy_files_from_node(local_dir: str, remote_path: str, admin_username: str, node_ip_address: str) -> None:
"""Copy node files to local, automatically create folder if not exist.
Args:
local_dir (str): dir for local files
remote_path (str): path of the remote file
admin_username (str)
node_ip_address (str)
"""
copy_scripts = f"rsync -e 'ssh -o StrictHostKeyChecking=no' " \
f"-az {admin_username}@{node_ip_address}:{remote_path} {local_dir}"
_ = SubProcess.run(copy_scripts)
source_path = get_reformatted_source_path(remote_path)
basename = os.path.basename(source_path)
folder_name = os.path.dirname(source_path)
target_dir = get_reformatted_target_dir(local_dir)

mkdir_script = f"mkdir -p {target_dir}"
_ = SubProcess.run(mkdir_script)
copy_script = (f"ssh {admin_username}@{node_ip_address} 'tar czf - -C {folder_name} {basename}' | "
f"tar xzf - -C {target_dir}")
_ = SubProcess.run(copy_script)


def sync_mkdir(remote_path: str, admin_username: str, node_ip_address: str):
"""mkdir synchronously at local and remote
"""Mkdir synchronously at local and remote.
Args:
remote_path (str): path of the remote file
Expand All @@ -56,15 +71,12 @@ def sync_mkdir(remote_path: str, admin_username: str, node_ip_address: str):


def copy_and_rename(source_path: str, target_dir: str, new_name: str = None):
"""Copy and rename a file
"""Copy and rename a file.
Args:
source_path (str): path of the source
target_dir (str): dir of the target
new_name (str): name of the new file, if None, will not do rename
Raises:
CliException:
"""
source_path = os.path.expanduser(source_path)
target_dir = os.path.expanduser(target_dir)
Expand Down
38 changes: 26 additions & 12 deletions maro/cli/k8s/executors/k8s_azure_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@

import yaml

from maro.cli.utils.details import load_cluster_details, save_cluster_details, load_job_details, save_job_details, \
load_schedule_details, save_schedule_details
from maro.cli.utils.copy import get_reformatted_source_path, get_reformatted_target_dir
from maro.cli.utils.details import (load_cluster_details, save_cluster_details, load_job_details, save_job_details,
load_schedule_details, save_schedule_details)
from maro.cli.utils.executors.azure_executor import AzureExecutor
from maro.cli.utils.naming import generate_cluster_id, generate_job_id, generate_component_id, generate_name_with_md5
from maro.cli.utils.params import GlobalPaths
Expand Down Expand Up @@ -435,9 +436,13 @@ def push_data(self, local_path: str, remote_dir: str):
sas = self._check_and_get_account_sas()

# Push data
source_path = get_reformatted_source_path(local_path)
target_dir = get_reformatted_target_dir(remote_dir)
if not target_dir.startswith("/"):
raise CliException("Invalid remote path")
copy_command = f'azcopy copy ' \
f'"{local_path}" ' \
f'"https://{cluster_id}st.file.core.windows.net/{cluster_id}-fs{remote_dir}?{sas}" ' \
f'"{source_path}" ' \
f'"https://{cluster_id}st.file.core.windows.net/{cluster_id}-fs{target_dir}?{sas}" ' \
f'--recursive=True'
_ = SubProcess.run(copy_command)

Expand All @@ -450,9 +455,15 @@ def pull_data(self, local_dir: str, remote_path: str):
sas = self._check_and_get_account_sas()

# Push data
source_path = get_reformatted_source_path(remote_path)
target_dir = get_reformatted_target_dir(local_dir)
mkdir_script = f"mkdir -p {target_dir}"
_ = SubProcess.run(mkdir_script)
if not source_path.startswith("/"):
raise CliException("Invalid remote path")
copy_command = f'azcopy copy ' \
f'"https://{cluster_id}st.file.core.windows.net/{cluster_id}-fs{remote_path}?{sas}" ' \
f'"{local_dir}" ' \
f'"https://{cluster_id}st.file.core.windows.net/{cluster_id}-fs{source_path}?{sas}" ' \
f'"{os.path.expanduser(target_dir)}" ' \
f'--recursive=True'
_ = SubProcess.run(copy_command)

Expand Down Expand Up @@ -707,9 +718,7 @@ def get_job_logs(self, job_name: str, export_dir: str = './'):
job_id = job_details['id']

# Get pods details
command = "kubectl get pods -o json"
return_str = SubProcess.run(command)
pods_details = json.loads(return_str)['items']
pods_details = self.get_pods_details()

# Export logs
for pod_details in pods_details:
Expand Down Expand Up @@ -815,9 +824,7 @@ def status(self):
return_status = {}

# Get pods details
command = "kubectl get pods -o json"
return_str = SubProcess.run(command)
pods_details = json.loads(return_str)['items']
pods_details = self.get_pods_details()

for pod_details in pods_details:
if pod_details['metadata']['labels']['app'] == 'maro-redis':
Expand Down Expand Up @@ -866,3 +873,10 @@ def _check_and_load_k8s_context(self):
config_dict = yaml.safe_load(config_str)
if config_dict['current-context'] != f"{cluster_id}-aks":
self._load_k8s_context()

@staticmethod
def get_pods_details():
# Get pods details
command = "kubectl get pods -o json"
return_str = SubProcess.run(command)
return json.loads(return_str)['items']
8 changes: 4 additions & 4 deletions maro/cli/maro.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ def load_parser_grass(prev_parser: ArgumentParser, global_parser: ArgumentParser
from maro.cli.grass.data import push_data
parser_data_push = parser_data_subparsers.add_parser(
'push',
help='Push local data to the cluster data storage',
help='Push the local data to the remote directory',
examples=CliExamples.MARO_GRASS_DATA_PUSH,
parents=[global_parser]
)
Expand All @@ -264,7 +264,7 @@ def load_parser_grass(prev_parser: ArgumentParser, global_parser: ArgumentParser
from maro.cli.grass.data import pull_data
parser_data_pull = parser_data_subparsers.add_parser(
'pull',
help='Pull data in the cluster data storage to local',
help='Pull the remote data to the local directory',
examples=CliExamples.MARO_GRASS_DATA_PULL,
parents=[global_parser]
)
Expand Down Expand Up @@ -598,7 +598,7 @@ def load_parser_k8s(prev_parser: ArgumentParser, global_parser: ArgumentParser)
from maro.cli.k8s.data import push_data
parser_data_push = parser_data_subparsers.add_parser(
'push',
help='Push local data to the cluster data storage',
help='Push the local data to the remote directory',
examples=CliExamples.MARO_K8S_DATA_PUSH,
parents=[global_parser]
)
Expand All @@ -614,7 +614,7 @@ def load_parser_k8s(prev_parser: ArgumentParser, global_parser: ArgumentParser)
from maro.cli.k8s.data import pull_data
parser_data_pull = parser_data_subparsers.add_parser(
'pull',
help='Pull data in the cluster data storage to local',
help='Pull the remote data to the local directory',
examples=CliExamples.MARO_K8S_DATA_PULL,
parents=[global_parser]
)
Expand Down
30 changes: 30 additions & 0 deletions maro/cli/utils/copy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.


def get_reformatted_source_path(path: str):
"""Build source path without trailing '/'.
Args:
path (str): Original path.
Returns:
str: Reformatted path.
"""
if path.endswith("/"):
path = path[:-1]
return path


def get_reformatted_target_dir(path: str):
"""Get reformatted target dir with trailing '/'.
Args:
path: (str): Original path.
Returns:
str: Reformatted path.
"""
if not path.endswith("/"):
path = path + "/"
return path
2 changes: 2 additions & 0 deletions maro/cli/utils/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ class GlobalParams:


class GlobalPaths:
MARO_LIB = '~/.maro/lib'
MARO_GRASS_LIB = '~/.maro/lib/grass'
MARO_K8S_LIB = '~/.maro/lib/k8s'
MARO_CLUSTERS = '~/.maro/clusters'
MARO_DATA = '~/.maro/data'
MARO_TEST = '~/.maro/test'
2 changes: 2 additions & 0 deletions tests/cli/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
cloud/subscription: ""
user/admin_public_key: ""
1 change: 1 addition & 0 deletions tests/cli/grass/test_data/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Just a file for testing data push/pull.
Loading

0 comments on commit 4ad668b

Please sign in to comment.