Skip to content

Commit

Permalink
Dock nhc dev (#88)
Browse files Browse the repository at this point in the history
* productionizing code for aml

updating docs and entry point

refactor

Removing IB write test from ncv3

fix bug on nc6 conf and add gpu count arg check

* refactor unit tests

doc and unit test update

bug fix

* Adding bug fix and kusto update

update unit tests

modify entry point

modify run script

modify run scripts

modify remove IB test from ncv3

---------

Co-authored-by: Ubuntu <rafsalas@nhc-A100vm.dav1onuyw5fejl0wtzkr0deqkf.bx.internal.cloudapp.net>
  • Loading branch information
rafsalas19 and Ubuntu authored Feb 26, 2024
1 parent 593b611 commit 48aba67
Show file tree
Hide file tree
Showing 14 changed files with 429 additions and 22 deletions.
2 changes: 0 additions & 2 deletions conf/nc24rs_v3.conf
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
* || check_hw_swap 0kB 0kB 3%
* || check_hw_eth eth0
* || check_hw_eth lo
* || check_hw_ib 40 mlx4_0:1
* || check_hw_eth ib0


########################################################
Expand Down
8 changes: 7 additions & 1 deletion customTests/azure_gpu_bandwidth.nhc
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,17 @@ function evaluate_nvBW_result(){
continue
fi
if (( $(echo "$gpubw >= $EXP_BW" | bc -l) )); then
dbg "${tests_map[$test]} test on GPU $gpu_device passed. Bandwidth $gpubw is greater than $EXP_BW"
if [ "$test" = "$P2P" ]; then
dbg "${tests_map[$test]}_GPU_${gpu_device}_${peer_device}: $gpubw GB/s"
else
dbg "${tests_map[$test]}_GPU_${gpu_device}: $gpubw GB/s"
fi
else
if [ "$test" = "$P2P" ]; then
dbg "${tests_map[$test]}_GPU_${gpu_device}_${peer_device}: $gpubw GB/s"
die 1 "check_gpu_bw: ${tests_map[$test]} test on GPU $gpu_device to GPU $peer_device failed. Bandwidth $gpubw is less than $EXP_BW"
else
dbg "${tests_map[$test]}_GPU_${gpu_device}: $gpubw GB/s"
die 1 "check_gpu_bw: ${tests_map[$test]} test on GPU $gpu_device failed. Bandwidth $gpubw is less than $EXP_BW"
fi
fi
Expand Down
3 changes: 2 additions & 1 deletion customTests/azure_ib_write_bw_gdr.nhc
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ function run_ib_bw_gdr(){
break
fi
done
if [[ $ib_bandwidth < $EXP_IB_BW ]]; then
dbg "ib_write_lb_${device}: $ib_bandwidth Gbps"
if (( $(echo "$ib_bandwidth < $EXP_IB_BW" | bc -l) )); then
log "$IB_WRITE_BW_OUT2"
die 1 "$FUNCNAME: $IB_WRITE_BW, IB=$device, $device_peer, IB BW (expected > $EXP_IB_BW Gbps, but measured $ib_bandwidth Gbps"
return 1
Expand Down
2 changes: 1 addition & 1 deletion customTests/azure_ib_write_bw_non_gdr.nhc
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ function ib_write(){
die 1 "check_ib_bw_non_gdr: IB=$device, IB BW (expected > $EXP_IB_BW Gbps, but measured $ib_bandwidth Gbps)"
return 1
fi
dbg "$FUNCNAME: IB device=$device: Measured IB BW $ib_bandwidth Gbps"
dbg "ib_write_lb_${device}: $ib_bandwidth Gbps"
return 0
}

Expand Down
5 changes: 2 additions & 3 deletions customTests/azure_nccl_allreduce.nhc
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,8 @@ function check_nccl_allreduce() {
break
fi
done

if [[ $avg_bus_bw < $EXP_NCCL_ALLREDUCE_BW ]]
then
dbg "nccl_all_red: $avg_bus_bw GB/s"
if (( $(echo "$avg_bus_bw < $EXP_NCCL_ALLREDUCE_BW" | bc -l) )); then
dbg "$nccl_allreduce_out"
dbg "Iteration ${iter} of ${REPEATS} failed: NCCL allreduce bandwidth $avg_bus_bw GB/s < $EXP_NCCL_ALLREDUCE_BW GB/s"
else
Expand Down
4 changes: 2 additions & 2 deletions customTests/azure_nccl_allreduce_ib_loopback.nhc
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ function check_nccl_allreduce_ib_loopback() {
break
fi
done

if [[ $avg_bus_bw < $EXP_NCCL_ALLREDUCE_IB_LOOPBACK_BW ]]
dbg "nccl_all_red_lb: $avg_bus_bw GB/s"
if (( $(echo "$avg_bus_bw < $EXP_NCCL_ALLREDUCE_IB_LOOPBACK_BW" | bc -l) ));
then
dbg "$nccl_allreduce_ib_loopback_out"
dbg "Iteration ${iter} of ${REPEATS} failed: NCCL allreduce IB loopback bandwidth $avg_bus_bw GB/s < $EXP_NCCL_ALLREDUCE_IB_LOOPBACK_BW GB/s"
Expand Down
171 changes: 167 additions & 4 deletions distributed_nhc/export_nhc_result_to_kusto.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#!/usr/bin/python3
import sys
import os
import json
import re
import subprocess
from datetime import datetime
from csv import DictReader
from argparse import ArgumentParser
Expand Down Expand Up @@ -45,7 +48,7 @@ def ingest_debug_log(debug_file, creds, ingest_url, database, debug_table_name):
if job_name == "pssh":
job_name = f"{job_name}-{ts_str}"

with open(health_file, 'r') as f:
with open(debug_file, 'r') as f:
lines = f.readlines()
reader = DictReader(lines, fieldnames = ["Hostname", "DebugLog"], delimiter='|', restkey="extra")

Expand All @@ -60,13 +63,171 @@ def ingest_debug_log(debug_file, creds, ingest_url, database, debug_table_name):
print(f"Ingesting health results from {os.path.basename(debug_file)} into {ingest_url} at {database}/{debug_table_name}")
ingest_client.ingest_from_dataframe(df, IngestionProperties(database, debug_table_name))

def run_command(cmd):
result = subprocess.run(cmd, capture_output=True, shell=True, text=True)
return result.stdout.strip()

def get_nhc_json_formatted_result(results_file):
def natural_sort_key(s):
return [int(text) if text.isdigit() else text.lower() for text in re.split(r'(\d+)', s)]

# check if GPU or CPU
processor_cmd = f"lspci | grep -iq NVIDIA" # if not empty, then GPU
processor_str = run_command(processor_cmd)

processor = "GPU" if processor_str else "CPU"

if processor == "GPU":
ib_write_lb_mlx5_ib_cmd = f"cat {results_file} | grep -o 'ib_write_lb_mlx5_ib[0-7]: .*'"
ib_write_lb_mlx5_ib_str = run_command(ib_write_lb_mlx5_ib_cmd)
ib_write_lb_mlx5_ib_str = sorted(ib_write_lb_mlx5_ib_str.strip().split("\n"), key=natural_sort_key)
ib_write_lb_mlx5_ib_str = '\n'.join(ib_write_lb_mlx5_ib_str) # convert to string

H2D_GPU_cmd = f"cat {results_file} | grep -o 'H2D_GPU_[0-7]: .*'"
H2D_GPU_str = run_command(H2D_GPU_cmd)

D2H_GPU_cmd = f"cat {results_file} | grep -o 'D2H_GPU_[0-7]: .*'"
D2H_GPU_str = run_command(D2H_GPU_cmd)

P2P_GPU_cmd = f"cat {results_file} | grep -o 'P2P_GPU_[0-7]_[0-7]: .*'"
P2P_GPU_str = run_command(P2P_GPU_cmd)

nccl_all_red_cmd = f"cat {results_file} | grep -o 'nccl_all_red: .*'"
nccl_all_red_str = run_command(nccl_all_red_cmd)

nccl_all_red_lb_cmd = f"cat {results_file} | grep -o 'nccl_all_red_lb: .*'"
nccl_all_red_lb_str = run_command(nccl_all_red_lb_cmd)

data_string = "\n".join([ib_write_lb_mlx5_ib_str, H2D_GPU_str, D2H_GPU_str, P2P_GPU_str, nccl_all_red_str, nccl_all_red_lb_str])
data_string = os.linesep.join([s for s in data_string.splitlines() if s]) # remove empty lines
result = {"IB_WRITE_GDR": {}, "GPU_BW_HTD": {}, "GPU_BW_DTH": {}, "GPU_BW_P2P": {}, "NCCL_ALL_REDUCE": {}, "NCCL_ALL_REDUCE_LOOP_BACK": {}}

# Split the string by lines and create key-value pairs
for line in data_string.strip().split("\n"):
if not line or line.isspace():
continue
key, value = line.split(":")
if key.startswith("ib_write_lb_mlx5_ib"):
result["IB_WRITE_GDR"][key] = str(value.strip())
elif key.startswith("H2D"):
result["GPU_BW_HTD"][key] = str(value.strip())
elif key.startswith("D2H"):
result["GPU_BW_DTH"][key] = str(value.strip())
elif key.startswith("P2P"):
result["GPU_BW_P2P"][key] = str(value.strip())
elif key.startswith("nccl_all_red_lb"):
result["NCCL_ALL_REDUCE_LOOP_BACK"] = str(value.strip())
elif key.startswith("nccl_all_red"):
result["NCCL_ALL_REDUCE"] = str(value.strip())

else: # processor == "CPU"
ib_write_lb_mlx5_ib_cmd = f"cat {results_file} | grep -o 'ib_write_lb_mlx5_ib[0-7]: .*'"
ib_write_lb_mlx5_ib_str = run_command(ib_write_lb_mlx5_ib_cmd)
ib_write_lb_mlx5_ib_str = sorted(ib_write_lb_mlx5_ib_str.strip().split("\n"), key=natural_sort_key)
ib_write_lb_mlx5_ib_str = '\n'.join(ib_write_lb_mlx5_ib_str) # convert to string

stream_Copy_cmd = f"cat {results_file} | grep -o 'stream_Copy: .*'"
stream_Copy_str = run_command(stream_Copy_cmd)

stream_Add_cmd = f"cat {results_file} | grep -o 'stream_Add: .*'"
stream_Add_str = run_command(stream_Add_cmd)

stream_Scale_cmd = f"cat {results_file} | grep -o 'stream_Scale: .*'"
stream_Scale_str = run_command(stream_Scale_cmd)

stream_Triad_cmd = f"cat {results_file} | grep -o 'stream_Triad: .*'"
stream_Triad_str = run_command(stream_Triad_cmd)

data_string = "\n".join([ib_write_lb_mlx5_ib_str, stream_Copy_str, stream_Add_str, stream_Scale_str, stream_Triad_str])
data_string = os.linesep.join([s for s in data_string.splitlines() if s]) # remove empty lines
result = {"IB_WRITE_NON_GDR": {}, "stream_Copy": {}, "stream_Add": {}, "stream_Scale": {}, "stream_Triad": {}}

# Split the string by lines and create key-value pairs
for line in data_string.strip().split("\n"):
if not line or line.isspace():
continue
key, value = line.split(":")
if key.startswith("ib_write_lb_mlx5_ib"):
result["IB_WRITE_NON_GDR"][key] = str(value.strip())
elif key.startswith("stream_Copy"):
result["stream_Copy"]= str(value.strip())
elif key.startswith("stream_Add"):
result["stream_Add"]= str(value.strip())
elif key.startswith("stream_Scale"):
result["stream_Scale"]= str(value.strip())
elif key.startswith("stream_Triad"):
result["stream_Triad"]= str(value.strip())

return result

def ingest_results(results_file, creds, ingest_url, database, results_table_name, nhc_run_uuid="None"):
ts = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

job_name = results_file.replace("\\", "/").split(".")[0].split("/")[-1] # account for \ or / in path
uuid = job_name if nhc_run_uuid == "None" else f"{nhc_run_uuid}-{job_name}"
if uuid == "health":
uuid = ""
else :
uuid = "-" + uuid # add the dash here instead of below; this way if 'uuid' is empty, we don't have a trailing dash
full_uuid = f"nhc-{ts}{uuid}"

vmSize_bash_cmd = "echo $( curl -H Metadata:true --max-time 10 -s \"http://169.254.169.254/metadata/instance/compute/vmSize?api-version=2021-01-01&format=text\") | tr '[:upper:]' '[:lower:]' "
vmSize = run_command(vmSize_bash_cmd)

vmId_bash_cmd = "curl -H Metadata:true --max-time 10 -s \"http://169.254.169.254/metadata/instance/compute/vmId?api-version=2021-02-01&format=text\""
vmId = run_command(vmId_bash_cmd)

vmName_bash_cmd = "hostname"
vmName = run_command(vmName_bash_cmd)

physhost = run_command("echo $(hostname) \"$(/opt/azurehpc/tools/kvp_client | grep Fully)\" | cut -d ':' -f 3 | cut -d ' ' -f 2 | sed 's/\"//g'")
if not physhost:
physhost = "not Mapped"

with open(results_file, 'r') as f:
full_results = f.read()
jsonResultDict = get_nhc_json_formatted_result(results_file)
jsonResult = json.dumps(jsonResultDict)

record = {
'vmSize': vmSize,
'vmId': vmId,
'vmHostname': vmName,
'physHostname': physhost,
'workflowType': "main",
'time': ts,
'pass': False, # keep as default false
'errors': '',
'logOutput': full_results, # the entire file
'jsonResult': jsonResult,
'uuid': full_uuid
}

if "ERROR" in full_results:
record['pass'] = False
record['errors'] = full_results
elif "Node Health Check completed successfully" in full_results:
record['pass'] = True
else:
record['pass'] = False
record['errors'] = "No Node Health Check completed successfully or ERROR"

df = pd.DataFrame(record, index=[0])

ingest_client = QueuedIngestClient(KustoConnectionStringBuilder.with_azure_token_credential(ingest_url, creds))
print(f"Ingesting results from {os.path.basename(results_file)} into {ingest_url} at {database}/{results_table_name}")
ingest_client.ingest_from_dataframe(df, IngestionProperties(database, results_table_name))


def parse_args():
parser = ArgumentParser(description="Ingest NHC results into Kusto")
parser.add_argument("health_files", nargs="+", help="List of .health.log or .debug.log files to ingest")
parser.add_argument("--ingest_url", help="Kusto ingest URL", required=True)
parser.add_argument("--database", help="Kusto database", required=True)
parser.add_argument("--health_table_name", default="NodeHealthCheck", help="Kusto table name for health results")
parser.add_argument("--debug_table_name", default="NodeHealthCheck_Debug", help="Kusto table name for debug results")
parser.add_argument("--results_table_name", default="AzNhcRunEvents", help="Kusto table name for results")
parser.add_argument("--uuid", default="None", help="UUID to help identify results in Kusto table")
parser.add_argument("--identity", nargs="?", const=True, default=False, help="Managed Identity to use for authentication, if a client ID is provided it will be used, otherwise the system-assigned identity will be used. If --identity is not provided DefaultAzureCredentials will be used.")
return parser.parse_args()

Expand All @@ -91,11 +252,13 @@ def get_creds(identity):
ingest_health_log(health_file, creds, args.ingest_url, args.database, args.health_table_name)
elif health_file.endswith(".debug.log"):
ingest_debug_log(health_file, creds, args.ingest_url, args.database, args.debug_table_name)
elif health_file.endswith(".log"):
ingest_results(health_file, creds, args.ingest_url, args.database, args.results_table_name, args.uuid)
else:
raise Exception("Unsuported file, must be .health.log or .debug.log produced by ./distributed_nhc.sb.sh")
raise Exception("Unsupported file, must be .health.log or .debug.log produced by ./distributed_nhc.sb.sh, or .log produced by run-health-checks.sh")

except FileNotFoundError:
if len(health_files) == 1:
if len(args.health_files) == 1:
print(f"Cannot find file '{health_file}'")
raise
print(f"Cannot find file '{health_file}', skipping...")
print(f"Cannot find file '{health_file}', skipping...")
6 changes: 6 additions & 0 deletions dockerfile/aznhc-entrypoint.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
#! /bin/bash

if [ -z "$AZ_NHC_ROOT" ]; then
AZ_NHC_ROOT="/azure-nhc"
fi

CONF_FILE=${AZ_NHC_ROOT}/conf/aznhc.conf
OUTPUT_PATH=${AZ_NHC_ROOT}/output/aznhc.log
DEFAULT_NHC_FILE_PATH=${AZ_NHC_ROOT}/default
Expand Down Expand Up @@ -49,4 +53,6 @@ if [ "$output_mounted" = false ]; then
cat $OUTPUT_PATH
fi

echo "Health checks completed." | tee -a $OUTPUT_PATH

exit 0
16 changes: 10 additions & 6 deletions dockerfile/azure-nvrt-nhc.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,6 @@ RUN cd /tmp && \
MLNX_OFED_LINUX-${OFED_VERSION}-ubuntu22.04-x86_64/mlnxofedinstall --user-space-only --without-fw-update --without-ucx-cuda --force --all && \
rm -rf /tmp/MLNX_OFED_LINUX*

# install clang dependency
RUN cd /tmp && \
wget https://download.amd.com/developer/eula/aocc-compiler/aocc-compiler-${AOCC_VERSION}_amd64.deb && \
apt install -y ./aocc-compiler-${AOCC_VERSION}_amd64.deb && \
rm aocc-compiler-${AOCC_VERSION}_amd64.deb

# Install HPCx
RUN cd /tmp && \
TARBALL="hpcx-${HPCX_VERSION}-gcc-mlnx_ofed-ubuntu22.04-cuda12-gdrcopy2-nccl2.18-x86_64.tbz" && \
Expand Down Expand Up @@ -104,6 +98,12 @@ COPY customTests/*.nhc /etc/nhc/scripts/
COPY customTests/topofiles ${AZ_NHC_ROOT}/topofiles
COPY conf ${AZ_NHC_ROOT}/default/conf

# install clang dependency needed for stream
RUN cd /tmp && \
wget https://download.amd.com/developer/eula/aocc-compiler/aocc-compiler-${AOCC_VERSION}_amd64.deb && \
apt install -y ./aocc-compiler-${AOCC_VERSION}_amd64.deb && \
rm aocc-compiler-${AOCC_VERSION}_amd64.deb

# Install stream
RUN mkdir -p /tmp/stream
COPY customTests/stream/Makefile /tmp/stream/
Expand All @@ -112,6 +112,10 @@ wget https://www.cs.virginia.edu/stream/FTP/Code/stream.c && \
make all CC=/opt/AMD/aocc-compiler-4.0.0/bin/clang EXEC_DIR=${AZ_NHC_ROOT}/bin && \
rm -rf /tmp/stream

# Remove AOCC after STREAM build
RUN version=$(echo "$AOCC_VERSION" | sed 's/_1$//') && \
apt remove aocc-compiler-"${version}" -y

# Copy necessary files
COPY customTests/*.nhc /etc/nhc/scripts/
COPY customTests/topofiles ${AZ_NHC_ROOT}/topofiles
Expand Down
2 changes: 1 addition & 1 deletion run-health-checks.sh
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ DOCKER_RUN_ARGS="--name=$DOCK_CONT_NAME --net=host -e TIMEOUT=$TIMEOUT \
-v /sys:/hostsys/ \
-v $CONF_FILE:"$DOCK_CONF_PATH/aznhc.conf" \
-v $OUTPUT_PATH:$WORKING_DIR/output/aznhc.log \
-v ${kernel_log}:$WORKING_DIR/syslog
-v ${kernel_log}:$WORKING_DIR/syslog \
-v ${AZ_NHC_ROOT}/customTests:$WORKING_DIR/customTests"

sudo docker run ${DOCKER_RUN_ARGS} -e NHC_ARGS="${NHC_ARGS}" "${DOCK_IMG_NAME}" bash -c "$WORKING_DIR/aznhc-entrypoint.sh"
Loading

0 comments on commit 48aba67

Please sign in to comment.