From a50ba253dcbef34f5774e408b2fae989f3385cd6 Mon Sep 17 00:00:00 2001 From: vikumarks Date: Tue, 27 Aug 2024 15:41:05 -0700 Subject: [PATCH] feat: add CyPerf ipsec demo with nvidia Signed-off-by: vikumarks --- .github/workflows/opi-lab.yml | 6 +- demos/security/nvidia/README.md | 24 +- demos/security/nvidia/RESTasV3.py | 2174 +++++++++++++++++ demos/security/nvidia/Statistics.py | 140 ++ demos/security/nvidia/cyperf-ipsec-config.zip | Bin 0 -> 7112 bytes demos/security/nvidia/deployment/.env | 11 + .../nvidia/deployment/conf/k.swanctl.conf | 8 + .../security/nvidia/deployment/conf/vici.conf | 10 + .../nvidia/deployment/cyperf_with_ipsec.yml | 49 + demos/security/nvidia/ipsec-config.py | 131 - demos/security/nvidia/ipsec_pb2.py | 552 ----- demos/security/nvidia/ipsec_pb2_grpc.py | 363 --- demos/security/nvidia/k.swanctl.conf | 1552 ------------ demos/security/nvidia/requirements.txt | 3 + demos/security/nvidia/requirments.txt | 3 - demos/security/nvidia/test_ipsec.py | 249 ++ .../nvidia/test_tgen_with_ipsec_demo.py | 235 -- demos/testbed.py | 4 + 18 files changed, 2658 insertions(+), 2856 deletions(-) create mode 100644 demos/security/nvidia/RESTasV3.py create mode 100644 demos/security/nvidia/Statistics.py create mode 100644 demos/security/nvidia/cyperf-ipsec-config.zip create mode 100644 demos/security/nvidia/deployment/.env create mode 100644 demos/security/nvidia/deployment/conf/k.swanctl.conf create mode 100644 demos/security/nvidia/deployment/conf/vici.conf create mode 100644 demos/security/nvidia/deployment/cyperf_with_ipsec.yml delete mode 100644 demos/security/nvidia/ipsec-config.py delete mode 100644 demos/security/nvidia/ipsec_pb2.py delete mode 100644 demos/security/nvidia/ipsec_pb2_grpc.py delete mode 100644 demos/security/nvidia/k.swanctl.conf create mode 100644 demos/security/nvidia/requirements.txt delete mode 100644 demos/security/nvidia/requirments.txt create mode 100644 demos/security/nvidia/test_ipsec.py delete mode 100644 demos/security/nvidia/test_tgen_with_ipsec_demo.py diff --git a/.github/workflows/opi-lab.yml b/.github/workflows/opi-lab.yml index 5ee3cd0d..c5c56933 100644 --- a/.github/workflows/opi-lab.yml +++ b/.github/workflows/opi-lab.yml @@ -21,9 +21,9 @@ jobs: steps: - uses: actions/checkout@v1 - name: Install python requirements - run: pip install -r ./demos/tgen/requirements.txt - - name: Run ipsec test automation - run: pytest -s ./demos/security/nvidia/ipsec-config.py + run: pip install -r ./demos/security/nvidia/requirements.txt + - name: Run CyPerf ipsec test automation + run: pytest -s ./demos/security/nvidia/test_ipsec.py opi-ansible: runs-on: self-hosted diff --git a/demos/security/nvidia/README.md b/demos/security/nvidia/README.md index faaa2abb..d109ec56 100644 --- a/demos/security/nvidia/README.md +++ b/demos/security/nvidia/README.md @@ -3,7 +3,7 @@ ## hardware - server with Ubuntu 22.04 -- Nvidia BlueField2 +- Nvidia BlueField2 or BlueField3 - Keysight CloudStorm 100G ## configuration @@ -11,8 +11,8 @@ ### host (the server holding the DPU) - install Ubuntu 22.04 server -- install `MLNX_OFED_LINUX-5.8-1.1.2.1-ubuntu22.04-x86_64.iso` -- install `DOCA_1.5.1_BSP_3.9.3_Ubuntu_20.04-4.2211-LTS.signed.bfb` on the BlueField2 +- install `MLNX_OFED_LINUX-24.04-0.6.6.0-ubuntu22.04-x86_64.iso` +- install `bf-bundle-2.7.0-33_24.04_ubuntu-22.04_prod.bfb` on the BlueField2 - set BlueField2 in SEPARATED_HOST mode to make things easier ```Shell @@ -78,18 +78,8 @@ see and 0: + raise Exception('Not all sessions could be deleted!') + else: + if DEBUG:print('No sessions opened!') + + def get_test_details(self, session_id): + apiPath = '/api/v2/sessions/{}/test'.format(session_id) + response = self.__sendGet(apiPath, 200).json() + return response + + def set_license_server(self, licenseServerIP): + apiPath = '/api/v2/license-servers' + self.__sendPost(apiPath, payload={"hostName": licenseServerIP}) + + def get_license_servers(self): + apiPath = '/api/v2/license-servers' + return self.__sendGet(apiPath, 200).json() + + def wait_event_success(self, apiPath, timeout): + counter = 1 + while timeout > 0: + response = self.__sendGet(apiPath, 200).json() + if response['state'] == "SUCCESS": + return response + else: + timeout -= counter + time.sleep(counter) + + def activate_license(self, activation_code, quantity=1, timeout=60): + apiPath = '/api/v2/licensing/operations/activate' + response = self.__sendPost(apiPath, payload=[{"activationCode": activation_code, "quantity": quantity}]).json() + apiPath = '/api/v2/licensing/operations/activate/{}'.format(response["id"]) + if not self.wait_event_success(apiPath, timeout): + raise TimeoutError("Failed to activate license. Timeout reached = {} seconds".format(timeout)) + + def deactivate_license(self, activation_code, quantity=1, timeout=60): + apiPath = '/api/v2/licensing/operations/deactivate' + response = self.__sendPost(apiPath, payload=[{"activationCode": activation_code, "quantity": quantity}]).json() + apiPath = '/api/v2/licensing/operations/deactivate/{}'.format(response["id"]) + if "The Activation Code : \'{}\' is not installed.".format(activation_code) == \ + self.__sendGet(apiPath, 200).json()['message']: + if DEBUG:print('License code {} is not installed'.format(activation_code)) + elif not self.wait_event_success(apiPath, timeout): + raise TimeoutError("Failed to deactivate license. Timeout reached = {} seconds".format(timeout)) + + def get_license_statistics(self, timeout=30): + apiPath = '/api/v2/licensing/operations/retrieve-counted-feature-stats' + response = self.__sendPost(apiPath, payload={}).json() + apiPath = '/api/v2/licensing/operations/retrieve-counted-feature-stats/{}'.format(response["id"]) + if not self.wait_event_success(apiPath, timeout): + raise TimeoutError("Failed to obtain license stats. Timeout reached = {} seconds".format(timeout)) + apiPath = '/api/v2/licensing/operations/retrieve-counted-feature-stats/{}/result'.format(response["id"]) + response = self.__sendGet(apiPath, 200).json() + return response + + def nats_update_route(self, nats_address): + apiPath = '/api/v2/brokers' + self.__sendPost(apiPath, payload={"host": nats_address}) + + def import_config(self, config): + apiPath = '/api/v2/configs' + if config.endswith('.json'): + config = json.loads(open(config, "r").read()) + response = self.__sendPost(apiPath, config) + elif config.endswith('.zip'): + zip_file_path = {"archive": (config, open(config, "rb"), "application/zip")} + response = self.__sendPost(apiPath, None, customHeaders=self.headers, files=zip_file_path) + else: + raise Exception("Config type not supported. Requires zip or json.") + if response: + if DEBUG:print('Config successfully imported, config ID: {}'.format(response.json()[0]['id'])) + return response.json()[0]['id'] + else: + raise Exception('Failed to import test config') + + def export_config(self, export_path=None): + config_id = self.configID + apiPath = '/api/v2/configs/{}?include=all&resolveDependencies=true'.format(config_id) + customHeaders = self.headers + customHeaders['Accept'] = 'application/zip' + response = self.__sendGet(apiPath, 200, customHeaders=customHeaders) + + file_name = response.headers.get('content-disposition').split("=")[1].strip('"') + + if export_path: + file_name = os.path.join(export_path, file_name) + if DEBUG:print("Export path/file: {}".format(file_name)) + with open(file_name, 'wb') as archive: + archive.write(response.content) + return file_name + + def export_config_by_name(self, export_path=None, config_name=None): + config_id = self.get_config_id(config_name) + apiPath = '/api/v2/configs/{}?include=all&resolveDependencies=true'.format(config_id) + customHeaders = self.headers + customHeaders['Accept'] = 'application/zip' + response = self.__sendGet(apiPath, 200, customHeaders=customHeaders) + + file_name = response.headers.get('content-disposition').split("=")[1].strip('"') + + if export_path: + file_name = os.path.join(export_path, file_name) + if DEBUG:print("Export path/file: {}".format(file_name)) + with open(file_name, 'wb') as archive: + archive.write(response.content) + return file_name + + def open_config(self): + apiPath = '/api/v2/sessions' + response = self.__sendPost(apiPath, payload={"configUrl": self.configID}) + if response: + if DEBUG:print('Config successfully opened, session ID: {}'.format(response.json()[0]['id'])) + return response.json()[0]['id'] + + def get_session_config(self, sessionId=None): + sessionId = sessionId if sessionId else self.sessionID + apiPath = '/api/v2/sessions/{}/config?include=all'.format(sessionId) + return self.__sendGet(apiPath, 200, debug=False).json() + + def delete_config(self, config_id): + """ + Delete a config after you've specified its id + :param config_id: The id of the config + :return: None + """ + apiPath = '/api/v2/configs/{}'.format(config_id) + self.__sendDelete(apiPath, self.headers) + + def delete_config_by_name(self, config_name=None): + config_id = self.get_config_id(config_name) + apiPath = '/api/v2/configs/{}'.format(config_id) + self.__sendDelete(apiPath, self.headers) + + def add_network_segment(self): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment'.format(self.sessionID) + self.__sendPost(apiPath, {}) + + def wait_agents_connect(self, agents_nr=3, timeout=300): + response = [] + init_timeout = timeout + print('Waiting for agents to connect to the CyPerf controller...') + while timeout > 0: + response = self.get_agents() + if len(response) >= agents_nr: + print('There are {} agents connected to the CyPerf controller'.format(len(response))) + return True + else: + time.sleep(10) + timeout -= 10 + else: + raise Exception( + "Expected {} agents connected after {}s. Got only {}.".format(agents_nr, init_timeout, len(response))) + + def get_agents(self): + apiPath = '/api/v2/agents' + return self.__sendGet(apiPath, 200).json() + + def check_agents_status(self, timeout=60): + waiting_time = 0 + if DEBUG:print('Waiting for agents to be available') + while True: + agents_status = [agent['Status'] for agent in self.get_agents()] + if all(status == 'STOPPED' for status in agents_status): + if waiting_time <= timeout: + print('Agents are available') + return True + else: + raise Exception( + "The agents were not available for the next test in {}s, but after {}s".format(timeout, + waiting_time)) + time.sleep(10) + waiting_time += 10 + + def get_agents_ids(self, agentIPs=None, wait=None): + if wait: + self.wait_agents_connect() + agentsIDs = list() + response = self.get_agents() + if DEBUG:print('Found {} agents'.format(len(response))) + if type(agentIPs) is str: + agentIPs = [agentIPs] + for agentIP in agentIPs: + for agent in response: + if agent['IP'] == agentIP: + if DEBUG:print("agent_IP: {}, agent_ID: {}".format(agent['IP'], agent['id'])) + agentsIDs.append(agent['id']) + break + return agentsIDs + + def get_agents_ips(self, wait=None): + if wait: + self.wait_agents_connect() + agentsIPs = list() + response = self.get_agents() + if DEBUG:print('Found {} agents'.format(len(response))) + # fixme B2B only - ClientAgent is excluded in AWS scenario + for agent in response: + agentsIPs.append(agent['IP']) + if DEBUG:print('Agents IP List: {}'.format(agentsIPs)) + return agentsIPs + + def assign_agents(self): + agents_ips = self.get_agents_ips() + self.assign_agents_by_ip(agents_ips=agents_ips[0], network_segment=1) + self.assign_agents_by_ip(agents_ips=agents_ips[1], network_segment=2) + + def assign_agents_by_ip(self, agents_ips, network_segment): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/agentAssignments'.format( + self.sessionID, network_segment) + payload = {"ByID": [], "ByTag": []} + agents_ids = self.get_agents_ids(agentIPs=agents_ips) + for agent_id in agents_ids: + payload["ByID"].append({"agentId": agent_id}) + self.__sendPatch(apiPath, payload) + + def test_warmup_value(self, value): + apiPath = '/api/v2/sessions/{}/config/config/TrafficProfiles/1/ObjectivesAndTimeline/AdvancedSettings'.format( + self.sessionID) + payload = {"WarmUpPeriod": int(value)} + self.__sendPatch(apiPath, payload) + + def assign_agents_by_tag(self, agents_tags, network_segment): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/agentAssignments'.format( + self.sessionID, network_segment) + self.__sendPatch(apiPath, payload={"ByID": [], "ByTag": [agents_tags]}) + + def set_traffic_capture(self, agents_ips, network_segment, is_enabled=True, capture_latest_packets=False, + max_capture_size=104857600): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/agentAssignments'.format( + self.sessionID, network_segment) + payload = {"ByID": []} + capture_settings = {"captureEnabled": is_enabled, "captureLatestPackets": capture_latest_packets, + "maxCaptureSize": max_capture_size} + agents_ids = self.get_agents_ids(agentIPs=agents_ips) + for agent_id in agents_ids: + payload["ByID"].append({"agentId": agent_id, "captureSettings": capture_settings}) + self.__sendPatch(apiPath, payload) + + def get_capture_files(self, captureLocation, exportTimeout=180): + self.get_result_ended() + test_id = self.get_test_id() + apiPath = '/api/v2/results/{}/operations/generate-results'.format(test_id) + response = self.__sendPost(apiPath, None).json() + apiPath = response['url'][len(self.host):] + response = self.wait_event_success(apiPath, timeout=exportTimeout) + if not response: + raise TimeoutError("Failed to download Captures. Timeout reached = {} seconds".format(exportTimeout)) + apiPath = response['resultUrl'] + response = self.__sendGet(apiPath, 200, debug=False) + zf = ZipFile(io.BytesIO(response.content), 'r') + zf.extractall(captureLocation) + for arh in glob.iglob(os.path.join(captureLocation, "*.zip")): + files = os.path.splitext(os.path.basename(arh))[0] + zf = ZipFile(arh) + zf.extractall(path=os.path.join(captureLocation, "pcaps", files)) + return response + + def add_dut(self): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/DUTNetworkSegment'.format(self.sessionID) + response = self.__sendPost(apiPath, payload={}).json() + return response + + def delete_dut(self, network_segment): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/DUTNetworkSegment/{}'.format(self.sessionID, + network_segment) + self.__sendDelete(apiPath, self.headers) + + def set_dut(self, active=True, network_segment=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/DUTNetworkSegment/{}'.format(self.sessionID, + network_segment) + self.__sendPatch(apiPath, payload={"active": active}) + + def check_if_dut_is_active(self, network_segment=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/DUTNetworkSegment/{}'.format(self.sessionID, + network_segment) + response = self.__sendGet(apiPath, 200).json() + return response["active"] + + def set_dut_host(self, host, network_segment=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/DUTNetworkSegment/{}'.format(self.sessionID, + network_segment) + self.__sendPatch(apiPath, payload={"host": host}) + + def set_active_dut_configure_host(self, hostname, active=True, network_segment=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/DUTNetworkSegment/{}'.format(self.sessionID, + network_segment) + self.__sendPatch(apiPath, payload={"active": active, "ServerDUTHost": hostname}) + + def set_client_http_proxy(self, host, client_port, connect_mode, network_segment=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/DUTNetworkSegment/{}'.format(self.sessionID, + network_segment) + self.__sendPatch(apiPath, payload={"active": True}) + self.__sendPatch(apiPath, payload={"ConfigSettings": "ADVANCED_MODE"}) + self.__sendPatch(apiPath, payload={"ClientDUTActive": True}) + self.__sendPatch(apiPath, payload={"ClientDUTHost": host}) + self.__sendPatch(apiPath, payload={"ClientDUTPort": int(client_port)}) + self.__sendPatch(apiPath, payload={"HttpForwardProxyMode": connect_mode}) + self.__sendPatch(apiPath, payload={"ServerDUTActive": False}) + + def set_http_health_check(self, enabled=True, network_segment=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/DUTNetworkSegment/{}/HTTPHealthCheck'.format( + self.sessionID, network_segment) + self.__sendPatch(apiPath, payload={"Enabled": enabled}) + + def set_http_health_check_port(self, port, network_segment=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/DUTNetworkSegment/{}/HTTPHealthCheck'.format( + self.sessionID, network_segment) + self.__sendPatch(apiPath, payload={"Port": port}) + + def set_http_health_check_url(self, target_url, network_segment=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/DUTNetworkSegment/{}/HTTPHealthCheck'.format( + self.sessionID, network_segment) + self.__sendPatch(apiPath + '/Params/1', payload={"Value": target_url}) + + def set_http_health_check_payload(self, payload_file, network_segment=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/DUTNetworkSegment/{}/HTTPHealthCheck'.format( + self.sessionID, network_segment) + if isinstance(payload_file, float): + self.__sendPatch(apiPath + '/Params/2', payload={"Value": payload_file}) + else: + self.set_custom_payload(apiPath + '/Params/2', payload_file) + + def set_http_health_check_version(self, http_version, network_segment=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/DUTNetworkSegment/{}/HTTPHealthCheck'.format( + self.sessionID, network_segment) + self.__sendPatch(apiPath + '/Params/3', payload={"Value": http_version}) + + def set_https_health_check(self, enabled=True, network_segment=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/DUTNetworkSegment/{}/HTTPSHealthCheck'.format( + self.sessionID, network_segment) + self.__sendPatch(apiPath, payload={"Enabled": enabled}) + + def set_https_health_check_port(self, port, network_segment=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/DUTNetworkSegment/{}/HTTPSHealthCheck'.format( + self.sessionID, network_segment) + self.__sendPatch(apiPath, payload={"Port": port}) + + def set_https_health_check_url(self, target_url, network_segment=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/DUTNetworkSegment/{}/HTTPSHealthCheck'.format( + self.sessionID, network_segment) + self.__sendPatch(apiPath + '/Params/1', payload={"Value": target_url}) + + def set_https_health_check_payload(self, payload_file, network_segment=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/DUTNetworkSegment/{}/HTTPSHealthCheck'.format( + self.sessionID, network_segment) + if isinstance(payload_file, float): + self.__sendPatch(apiPath + '/Params/2', payload={"Value": payload_file}) + else: + self.set_custom_payload(apiPath + '/Params/2', payload_file) + + def set_https_health_check_version(self, https_version, network_segment=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/DUTNetworkSegment/{}/HTTPHealthCheck'.format( + self.sessionID, network_segment) + self.__sendPatch(apiPath + '/Params/3', payload={"Value": https_version}) + + def set_tcp_health_check(self, enabled=True, network_segment=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/DUTNetworkSegment/{}/TCPHealthCheck'.format( + self.sessionID, network_segment) + self.__sendPatch(apiPath, payload={"Enabled": enabled}) + + def set_tcp_health_check_port(self, port, network_segment=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/DUTNetworkSegment/{}/TCPHealthCheck'.format( + self.sessionID, network_segment) + self.__sendPatch(apiPath, payload={"Port": port}) + + def set_client_recieve_buffer_size_attack_profile(self, value): + apiPath = '/api/v2/sessions/{}/config/config/AttackProfiles/1/TrafficSettings/DefaultTransportProfile/ClientTcpProfile'.format( + self.sessionID) + self.__sendPatch(apiPath, payload={"RxBuffer": value}) + + def set_client_transmit_buffer_size_attack_profile(self, value): + apiPath = '/api/v2/sessions/{}/config/config/AttackProfiles/1/TrafficSettings/DefaultTransportProfile/ClientTcpProfile'.format( + self.sessionID) + self.__sendPatch(apiPath, payload={"TxBuffer": value}) + + def set_client_recieve_buffer_size_traffic_profile(self, value): + apiPath = '/api/v2/sessions/{}/config/config/TrafficProfiles/1/TrafficSettings/DefaultTransportProfile/ClientTcpProfile'.format( + self.sessionID) + self.__sendPatch(apiPath, payload={"RxBuffer": value}) + + def set_client_transmit_buffer_size_traffic_profile(self, value): + apiPath = '/api/v2/sessions/{}/config/config/TrafficProfiles/1/TrafficSettings/DefaultTransportProfile/ClientTcpProfile'.format( + self.sessionID) + self.__sendPatch(apiPath, payload={"TxBuffer": value}) + + def set_server_recieve_buffer_size_attack_profile(self, value): + apiPath = '/api/v2/sessions/{}/config/config/AttackProfiles/1/TrafficSettings/DefaultTransportProfile/ServerTcpProfile'.format( + self.sessionID) + self.__sendPatch(apiPath, payload={"RxBuffer": value}) + + def set_server_transmit_buffer_size_attack_profile(self, value): + apiPath = '/api/v2/sessions/{}/config/config/AttackProfiles/1/TrafficSettings/DefaultTransportProfile/ServerTcpProfile'.format( + self.sessionID) + self.__sendPatch(apiPath, payload={"TxBuffer": value}) + + def set_server_recieve_buffer_size_traffic_profile(self, value): + apiPath = '/api/v2/sessions/{}/config/config/TrafficProfiles/1/TrafficSettings/DefaultTransportProfile/ServerTcpProfile'.format( + self.sessionID) + self.__sendPatch(apiPath, payload={"RxBuffer": value}) + + def set_server_transmit_buffer_size_traffic_profile(self, value): + apiPath = '/api/v2/sessions/{}/config/config/TrafficProfiles/1/TrafficSettings/DefaultTransportProfile/ServerTcpProfile'.format( + self.sessionID) + self.__sendPatch(apiPath, payload={"TxBuffer": value}) + + def set_ip_network_segment(self, active=True, network_segment=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}'.format(self.sessionID, + network_segment) + self.__sendPatch(apiPath, payload={"active": active}) + + def set_network_tags(self, tags="Client", network_segment=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}'.format(self.sessionID, + network_segment) + self.__sendPatch(apiPath, payload={"networkTags": [tags]}) + + def set_application_client_network_tags(self, tags, app_nr): + apiPath = '/api/v2/sessions/{}/config/config/TrafficProfiles/1/Applications/{}/operations/modify-tags-recursively'.format( + self.sessionID, app_nr) + self.__sendPost(apiPath, payload={"SelectTags": True, "ClientNetworkTags": [tags]}) + + def remove_application_client_network_tags(self, tags, app_nr): + apiPath = '/api/v2/sessions/{}/config/config/TrafficProfiles/1/Applications/{}/operations/modify-tags-recursively'.format( + self.sessionID, app_nr) + self.__sendPost(apiPath, payload={"SelectTags": False, "ClientNetworkTags": [tags]}) + + def set_network_min_agents(self, min_agents=1, network_segment=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}'.format(self.sessionID, + network_segment) + self.__sendPatch(apiPath, payload={"minAgents": min_agents}) + + def add_ip_range(self, network_segment=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/IPRanges'.format( + self.sessionID, network_segment) + response = self.__sendPost(apiPath, payload={}).json() + return response[-1]['id'] + + def delete_ip_range(self, network_segment=1, ip_range=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/IPRanges/{}'.format( + self.sessionID, network_segment, ip_range) + self.__sendDelete(apiPath, self.headers) + + def set_ip_range_automatic_ip(self, ip_auto=True, network_segment=1, ip_range=1, ): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/IPRanges/{}'.format( + self.sessionID, network_segment, ip_range) + self.__sendPatch(apiPath, payload={"IpAuto": ip_auto}) + + def set_ip_range_ip_start(self, ip_start, network_segment=1, ip_range=1, ): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/IPRanges/{}'.format( + self.sessionID, network_segment, ip_range) + self.__sendPatch(apiPath, payload={"IpStart": ip_start}) + self.__sendPatch(apiPath, payload={"IpAuto": False}) + + def set_ip_range_ip_start_if_enabled(self, ip_start, network_segment=1, ip_range=1, ): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/IPRanges/{}'.format( + self.sessionID, network_segment, ip_range) + self.__sendPatch(apiPath, payload={"IpStart": ip_start}) + + def set_ip_range_ip_increment(self, ip_increment="0.0.0.1", network_segment=1, ip_range=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/IPRanges/{}'.format( + self.sessionID, network_segment, ip_range) + self.__sendPatch(apiPath, payload={"IpIncr": ip_increment}) + + def set_ip_range_ip_count(self, count=1, network_segment=1, ip_range=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/IPRanges/{}'.format( + self.sessionID, network_segment, ip_range) + self.__sendPatch(apiPath, payload={"Count": count}) + + def set_ip_range_max_count_per_agent(self, max_count_per_agent=1, network_segment=1, ip_range=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/IPRanges/{}'.format( + self.sessionID, network_segment, ip_range) + self.__sendPatch(apiPath, payload={"maxCountPerAgent": max_count_per_agent}) + + def set_ip_range_automatic_netmask(self, netmask_auto=True, network_segment=1, ip_range=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/IPRanges/{}'.format( + self.sessionID, network_segment, ip_range) + self.__sendPatch(apiPath, payload={"NetMaskAuto": netmask_auto}) + + def set_ip_range_netmask(self, netmask=16, network_segment=1, ip_range=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/IPRanges/{}'.format( + self.sessionID, network_segment, ip_range) + self.__sendPatch(apiPath, payload={"NetMask": netmask}) + + def set_ip_range_automatic_gateway(self, gateway_auto=True, network_segment=1, ip_range=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/IPRanges/{}'.format( + self.sessionID, network_segment, ip_range) + self.__sendPatch(apiPath, payload={"GwAuto": gateway_auto}) + + def set_ip_range_gateway(self, gateway="10.0.0.1", network_segment=1, ip_range=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/IPRanges/{}'.format( + self.sessionID, network_segment, ip_range) + self.__sendPatch(apiPath, payload={"GwStart": gateway}) + self.__sendPatch(apiPath, payload={"GwAuto": False}) + + def set_ip_range_gateway_if_enabled(self, gateway="10.0.0.1", network_segment=1, ip_range=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/IPRanges/{}'.format( + self.sessionID, network_segment, ip_range) + self.__sendPatch(apiPath, payload={"GwStart": gateway}) + + def set_ip_range_network_tags(self, tags, network_segment=1, ip_range=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/IPRanges/{}'.format( + self.sessionID, network_segment, ip_range) + self.__sendPatch(apiPath, payload={"networkTags": [tags]}) + + def set_ip_range_mss(self, mss=1460, network_segment=1, ip_range=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/IPRanges/{}'.format( + self.sessionID, network_segment, ip_range) + self.__sendPatch(apiPath, payload={"Mss": mss}) + + def set_eth_range_mac_auto_false(self, network_segment=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/EthRange'.format( + self.sessionID, network_segment) + self.__sendPatch(apiPath, payload={"MacAuto": False}) + + def set_eth_range_mac_start(self, mac_start, network_segment=1): + self.set_eth_range_mac_auto_false(network_segment) + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/EthRange'.format( + self.sessionID, network_segment) + self.__sendPatch(apiPath, payload={"MacStart": mac_start}) + + def set_eth_range_mac_increment(self, mac_increment, network_segment=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/EthRange'.format( + self.sessionID, network_segment) + self.__sendPatch(apiPath, payload={"MacIncr": mac_increment}) + + def set_eth_range_one_mac_per_ip_false(self, network_segment=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/EthRange'.format( + self.sessionID, network_segment) + self.__sendPatch(apiPath, payload={"OneMacPerIP": False}) + + def set_eth_range_max_mac_count(self, count, network_segment=1): + self.set_eth_range_one_mac_per_ip_false(network_segment) + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/EthRange'.format( + self.sessionID, network_segment) + self.__sendPatch(apiPath, payload={"Count": count}) + + def set_eth_range_max_mac_count_per_agent(self, max_count_per_agent, network_segment=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/EthRange'.format( + self.sessionID, network_segment) + self.__sendPatch(apiPath, payload={"maxCountPerAgent": max_count_per_agent}) + + def set_dns_resolver(self, name_server, network_segment=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/DNSResolver'.format( + self.sessionID, network_segment) + self.__sendPatch(apiPath, payload={"nameServers": [{"name": name_server}]}) + + def set_dns_resolver_cache_timeout(self, timeout=0, network_segment=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/DNSResolver'.format( + self.sessionID, network_segment) + self.__sendPatch(apiPath, payload={"cacheTimeout": timeout}) + + def set_dut_connections(self, connections, network_segment=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}'.format(self.sessionID, + network_segment) + self.__sendPatch(apiPath, payload={"DUTConnections": connections}) + + def set_profile_duration(self, profile_type, value): + apiPath = '/api/v2/sessions/{}/config/config/{}/1/ObjectivesAndTimeline/TimelineSegments/1'.format( + self.sessionID, profile_type) + self.__sendPatch(apiPath, payload={"Duration": value}) + + def get_iteration_count_info(self, ap_id=1): + apiPath = '/api/v2/sessions/{}/config/config/AttackProfiles/{}/ObjectivesAndTimeline/TimelineSegments/1'.format( + self.sessionID, ap_id) + config_type = self.get_config_type() + if config_type["traffic"]: + if DEBUG:print('Parameter not available in traffic profile') + if config_type["attack"]: + response = self.__sendGet(apiPath, 200).json() + return response["IterationCount"] + + def get_profile_duration(self, profile_type): + apiPath = '/api/v2/sessions/{}/config/config/{}/1/ObjectivesAndTimeline/TimelineSegments/1'.format( + self.sessionID, profile_type) + response = self.__sendGet(apiPath, 200).json() + return response["Duration"] + + def set_test_duration(self, value): + config_type = self.get_config_type() + if config_type["traffic"]: + self.set_profile_duration(profile_type='TrafficProfiles', value=int(value)) + if config_type["attack"]: + self.set_profile_duration(profile_type='AttackProfiles', value=int(value)) + self.testDuration = int(value) + + def read_test_duration(self): + TrafficProfilesDuration = 0 + AttackProfilesDuration = 0 + config_type = self.get_config_type() + if config_type["traffic"]: + TrafficProfilesDuration = self.get_profile_duration(profile_type='TrafficProfiles') + if config_type["attack"]: + AttackProfilesDuration = self.get_profile_duration(profile_type='AttackProfiles') + self.testDuration = max(TrafficProfilesDuration, AttackProfilesDuration) + + def send_modified_config(self): + apiPath = '/api/v2/sessions/{}/config'.format(self.sessionID) + self.__sendPut(apiPath, self.config) + + def get_CPUCoresNR(self): + agent_info = self.get_agents() + l = [i["cpuInfo"] for i in agent_info] + count = list(map(len, l)) + return count[0] + + def get_CC_min_value(self, configured_cc_value, agent_cpu_cores): + favoured_sessions_no = 1 # static value + global_cc_min_value = 0 + app_info = {} + config = self.get_session_config() + traffic_profiles = config['Config']['TrafficProfiles'][0]['Applications'] + total_weight = sum(app['ObjectiveWeight'] for app in traffic_profiles) + for i in range(len(config['Config']['TrafficProfiles'][0]['Applications'])): + app_info[i] = {"weight": config['Config']['TrafficProfiles'][0]['Applications'][i]['ObjectiveWeight'], + "connections_no": len( + config['Config']['TrafficProfiles'][0]['Applications'][i]['Connections'])} + for app_no, app_details in app_info.items(): + individual_app_cc_value = configured_cc_value * (app_details['weight'] / total_weight) + individual_app_cc_min_value = min( + individual_app_cc_value - 2 * app_details['connections_no'] * agent_cpu_cores, + individual_app_cc_value - min(max(0.1 * individual_app_cc_value, agent_cpu_cores) + , 40 * agent_cpu_cores)) + min_alowed_cc_value = (app_details['connections_no'] * favoured_sessions_no + app_details[ + 'connections_no']) * agent_cpu_cores + global_cc_min_value += int(max(individual_app_cc_min_value, min_alowed_cc_value)) + return global_cc_min_value + + def get_config_type(self): + self.config = self.get_session_config() + config_type = {"traffic": False, + "traffic_profiles": [], + "tls_applications": [], + "tp_primary_obj": None, + "primary_obj_adv_time_params": [], + "tp_secondary_obj": None, + "tp_ssl": False, + "attack": False, + "attack_profiles": [], + "tls_attacks": [], + "att_obj": False, + "at_ssl": False, + "dut": False, + "tunnel_count_per_outer_ip": None, + "redirect_info": {}, + "ipsec": None, + "CPUCoresNr": self.get_CPUCoresNR(), + "test_duration": self.testDuration, + } + if len(self.config['Config']['TrafficProfiles']) > 0: + config_type['traffic'] = True if self.config['Config']['TrafficProfiles'][0]['Active'] else False + tp_profiles = self.config['Config']['TrafficProfiles'][0] + redirect_info = {} + for application in tp_profiles['Applications']: + if application['ProtocolID'] not in config_type['traffic_profiles']: + ###TODO: THIS IS A PROVISORY WORKAROUND BECAUSE CONFIG DOES NOT HOLD INFORMATION OF ENABLED APPS + try: + if application['Active'] == True: + config_type['traffic_profiles'].append(application['ProtocolID']) + if application['ClientTLSProfile']['tls12Enabled'] \ + or application['ClientTLSProfile']['tls13Enabled']: + config_type['tls_applications'].append(application['ProtocolID']) + except KeyError: + config_type['traffic_profiles'].append(application['ProtocolID']) + if application['ClientTLSProfile']['tls12Enabled'] \ + or application['ClientTLSProfile']['tls13Enabled']: + config_type['tls_applications'].append(application['ProtocolID']) + if application['ProtocolID'] == 'HTTP': + for param in application["Params"]: + if param["Name"] == 'Follow HTTP Redirects': + redirect_info[application['Name']] = param['Value'] + config_type['redirect_info'] = redirect_info + objectives = tp_profiles['ObjectivesAndTimeline'] + objective_dm = { + "type": objectives['PrimaryObjective']['Type'], + "unit": objectives['TimelineSegments'][0]['PrimaryObjectiveUnit'], + "value": objectives['TimelineSegments'][0]['PrimaryObjectiveValue'], + "steady_step_duration": objectives['TimelineSegments'][0]['Duration'] + } + if objectives['PrimaryObjective']['Type'] == "Concurrent connections": + objective_dm["concurrent_connections_min"] = self.get_CC_min_value( + objectives['TimelineSegments'][0]['PrimaryObjectiveValue'] + , config_type["CPUCoresNr"]) + config_type['tp_primary_obj'] = objective_dm + advance_timeline_params = [None, None] + if objectives['PrimaryObjective']['Timeline'][0]['Enabled']: + step_ramp_up = { + "Duration": objectives['PrimaryObjective']['Timeline'][0]["Duration"], + "NumberOfSteps": objectives['PrimaryObjective']['Timeline'][0]["NumberOfSteps"], + } + advance_timeline_params[0] = step_ramp_up + if objectives['PrimaryObjective']['Timeline'][2]['Enabled']: + step_ramp_down = { + "Duration": objectives['PrimaryObjective']['Timeline'][2]["Duration"], + "NumberOfSteps": objectives['PrimaryObjective']['Timeline'][2]["NumberOfSteps"], + } + advance_timeline_params[1] = step_ramp_down + config_type['primary_objective_adv_time_params'] = advance_timeline_params + if len(objectives['TimelineSegments'][0]['SecondaryObjectiveValues']) > 0 and len( + objectives['SecondaryObjectives']) > 0: + objective_dm = { + "type": objectives['SecondaryObjectives'][0]['Type'], + "unit": objectives['TimelineSegments'][0]['SecondaryObjectiveValues'][0]['Unit'], + "value": objectives['TimelineSegments'][0]['SecondaryObjectiveValues'][0]['Value'] + } + if objectives['SecondaryObjectives'][0]['Type'] == "Concurrent connections": + objective_dm["concurrent_connections_min"] = self.get_CC_min_value( + objectives['TimelineSegments'][0]['SecondaryObjectiveValues'][0]['Value'] + , config_type["CPUCoresNr"]) + config_type['tp_secondary_obj'] = objective_dm + if tp_profiles['TrafficSettings']['DefaultTransportProfile']['ClientTLSProfile']['version']: + config_type['tp_ssl'] = True + + if len(self.config['Config']['AttackProfiles']) > 0: + config_type['attack'] = True if self.config['Config']['AttackProfiles'][0]['Active'] else False + at_profiles = self.config['Config']['AttackProfiles'][0] + for attack in at_profiles['Attacks']: + if attack['ProtocolID'] not in config_type['attack_profiles']: + ###TODO: THIS IS A PROVISORY WORKAROUND BECAUSE CONFIG DOES NOT HOLD INFORMATION OF ENABLED ATTACKS + try: + if attack['Active'] == True: + config_type['attack_profiles'].append(attack['ProtocolID']) + if attack['ClientTLSProfile']['tls12Enabled'] \ + or attack['ClientTLSProfile']['tls13Enabled']: + config_type['tls_attacks'].append(attack['ProtocolID']) + except KeyError: + config_type['attack_profiles'].append(attack['ProtocolID']) + if attack['ClientTLSProfile']['tls12Enabled'] \ + or attack['ClientTLSProfile']['tls13Enabled']: + config_type['tls_attacks'].append(attack['ProtocolID']) + objective_dm = { + "attack_rate": at_profiles['ObjectivesAndTimeline']['TimelineSegments'][0]['AttackRate'], + "max_concurrent_attack": at_profiles['ObjectivesAndTimeline']['TimelineSegments'][0][ + 'MaxConcurrentAttack'] + } + config_type['att_obj'] = objective_dm + if at_profiles['TrafficSettings']['DefaultTransportProfile']['ClientTLSProfile']['version']: + config_type['tp_ssl'] = True + + if self.config['Config']['NetworkProfiles'][0]['DUTNetworkSegment'][0]['active']: + config_type['dut'] = True + + tunnel_stacks = self.config['Config']['NetworkProfiles'][0]['IPNetworkSegment'][0]['TunnelStacks'] + if len(tunnel_stacks) > 0: + config_type['tunnel_count_per_outer_ip'] = tunnel_stacks[0]['TunnelRange']['TunnelCountPerOuterIP'] + + ipsec_stacks = self.config['Config']['NetworkProfiles'][0]['IPNetworkSegment'][0]['IPSecStacks'] + if len(ipsec_stacks) > 0: + config_type['ipsec'] = {"host_count_per_tunnel": ipsec_stacks[0]['EmulatedSubConfig']['HostCountPerTunnel'], + "outer_ip_count": ipsec_stacks[0]['OuterIPRange']['Count'], + "rekey_margin": ipsec_stacks[0]["RekeyMargin"], + "lifetime_phase_1": ipsec_stacks[0]["IPSecRange"]["IKEPhase1Config"]["Lifetime"], + "lifetime_phase_2": ipsec_stacks[0]["IPSecRange"]["IKEPhase2Config"]["Lifetime"]} + return config_type + + def start_test(self, initializationTimeout=60): + apiPath = '/api/v2/sessions/{}/test-run/operations/start'.format(self.sessionID) + response = self.__sendPost(apiPath, payload={}).json() + self.startTime = self.__getEpochTime() + self.read_test_duration() + print('Waiting for the test to start...') + response = self.get_test_status() + actual_duration = 0 + counter = 1 + while actual_duration < initializationTimeout: + response = self.get_test_status() + if response['status'] == 'STARTING' and not self.startingStartTime: + self.startingStartTime = self.__getEpochTime() + if response['status'] == 'CONFIGURING' and not self.configuringStartTime: + self.configuringStartTime = self.__getEpochTime() + if response['status'] == 'STARTED': + if not self.startTrafficTime: + self.startTrafficTime = self.__getEpochTime() + return self.startTrafficTime + if response['status'] == 'ERROR': + raise Exception('Error when starting the test! {}'.format(self.get_test_details(self.sessionID))) + actual_duration += counter + time.sleep(counter) + else: + raise Exception( + 'ERROR! Test could not start in {} seconds, test state: {}'.format(initializationTimeout, response)) + + def stop_test(self, stopTimeout=60): + apiPath = '/api/v2/sessions/{}/test-run/operations/stop'.format(self.sessionID) + response = self.__sendPost(apiPath, payload={}).json() + stopID = response['id'] + if DEBUG:print('Stop ID : {}'.format(stopID)) + progressPath = '/api/v2/sessions/{}/test-run/operations/stop/{}'.format(self.sessionID, stopID) + self.__sendGet(progressPath, 200).json() + counter = 2 + iteration = 0 + while iteration < stopTimeout: + response = self.__sendGet(progressPath, 200).json() + if DEBUG:print('Stop Test Progress: {}'.format(response['progress'])) + if response['state'] == 'SUCCESS': + break + if response['state'] == 'ERROR': + raise Exception('Error when stopping the test! {}'.format(self.get_test_details(self.sessionID))) + iteration += counter + time.sleep(counter) + + def abort_test(self, abortTimeout=60): + apiPath = '/api/v2/sessions/{}/test-run/operations/abort'.format(self.sessionID) + response = self.__sendPost(apiPath, payload={}).json() + stopID = response['id'] + if DEBUG:print('Abort ID : {}'.format(stopID)) + progressPath = '/api/v2/sessions/{}/test-run/operations/abort/{}'.format(self.sessionID, stopID) + self.__sendGet(progressPath, 200).json() + counter = 2 + iteration = 0 + while iteration < abortTimeout: + response = self.__sendGet(progressPath, 200).json() + if DEBUG:print('Abort Test Progress: {}'.format(response['progress'])) + if response['state'] == 'SUCCESS': + break + if response['state'] == 'ERROR': + raise Exception('Error when aborting the test! {}'.format(self.get_test_details(self.sessionID))) + iteration += counter + time.sleep(counter) + + def get_error_notifications_for_session(self, notif_type="error"): + apiPath = '/api/v2/notifications?exclude=links&includeSeen=true&severity={}&sessionId={}'.format(notif_type, + self.sessionID) + return self.__sendGet(apiPath, 200).json() + + def get_error_notifications_for_test(self, notif_type="error"): + apiPath = '/api/v2/notifications?exclude=links&includeSeen=true&severity={}&sessionId={}'.format(notif_type, + self.sessionID) + error_notifs = self.__sendGet(apiPath, 200).json() + test_id = self.get_test_id() + return [notif for notif in error_notifs if "TestId" in notif["tags"] and notif["tags"]["TestId"] == test_id] + + def get_test_status(self): + apiPath = '/api/v2/sessions/{}/test'.format(self.sessionID) + return self.__sendGet(apiPath, 200).json() + + def wait_test_finished(self, timeout=300): + if DEBUG:print('Waiting for the test to finish...') + response = self.get_test_status() + actual_duration = 0 + counter = 1 + while actual_duration < self.testDuration + timeout: + response = self.get_test_status() + if response['status'] == 'STOPPING' and not self.stopTrafficTime: + self.stopTrafficTime = self.__getEpochTime() + if response['status'] == 'STOPPED': + if response['testElapsed'] >= response['testDuration']: + if DEBUG:print('Test gracefully finished') + self.stopTime = self.__getEpochTime() + return self.stopTime + else: + raise Exception("Error! Test stopped before reaching the configured duration = {}; Elapsed = {}" + .format(response['testDuration'], response['testElapsed'])) + else: + print('Test duration = {}; Elapsed = {}'.format(response['testDuration'], response['testElapsed'])) + actual_duration += counter + time.sleep(counter) + else: + if DEBUG:print("Test did not stop after timeout {}s. Test status= {}. Force stopping the test!".format(timeout, + response[ + 'status'])) + self.stop_test() + raise Exception("Error! Test failed to stop after timeout {}s.".format(timeout)) + + @staticmethod + def __getEpochTime(): + pattern = "%d.%m.%Y %H:%M:%S" + timeH = datetime.now().strftime(pattern) + epoch = int(time.mktime(time.strptime(timeH, pattern))) + return epoch + + def get_test_id(self): + apiPath = '/api/v2/sessions/{}/test'.format(self.sessionID) + response = self.__sendGet(apiPath, 200).json() + return response['testId'] + + def get_available_stats_name(self): + apiPath = '/api/v2/results/{}/stats'.format(self.get_test_id()) + response = self.__sendGet(apiPath, 200).json() + available_stats = [] + for stat in response: + available_stats.append(stat['name']) + return available_stats + + def get_stats_values(self, statName): + if DEBUG:print('Get the values for {}'.format(statName)) + apiPath = '/api/v2/results/{}/stats/{}'.format(self.get_test_id(), statName) + response = self.__sendGet(apiPath, 200).json() + return response + + def get_all_stats(self, csvLocation, exportTimeout=180): + test_id = self.get_test_id() + apiPath = '/api/v2/results/{}/operations/generate-csv'.format(test_id) + response = self.__sendPost(apiPath, None).json() + apiPath = response['url'][len(self.host):] + response = self.wait_event_success(apiPath, timeout=exportTimeout) + if not response: + raise TimeoutError("Failed to download CSVs. Timeout reached = {} seconds".format(exportTimeout)) + apiPath = response['resultUrl'] + response = self.__sendGet(apiPath, 200, debug=False) + zf = ZipFile(io.BytesIO(response.content), 'r') + zf.extractall(csvLocation) + return response + + def get_result_ended(self, timeout=5): + apiPath = '/api/v2/results/{}'.format(self.get_test_id()) + while timeout > 0: + if DEBUG:print('Pending result availability...') + response = self.__sendGet(apiPath, 200).json() + result_end_time = response['endTime'] + result_availability = result_end_time > 0 + if result_availability: + if DEBUG:print('Result may now be downloaded...') + return result_availability + else: + time.sleep(1) + timeout -= 1 + raise Exception('Result are not available for {}'.format(self.get_test_id())) + + def get_applications(self): + apiPath = '/api/v2/resources/apps?include=all' + response = self.__sendGet(apiPath, 200, debug=False).json() + return response + + def get_applications_by_pages(self, take=50, skip=0): + apiPath = '/api/v2/resources/apps?take={}&skip={}&exclude=links'.format(take, skip) + response = self.__sendGet(apiPath, 200, debug=False).json() + return response + + def get_configured_applications(self): + """ + return: returns a list of tuples containing application name(index 0) / configured app position in UI(index 1) + """ + configured = self.get_session_config(self.sessionID) + applications = configured["Config"]["TrafficProfiles"][0]["Applications"] + configured_applications = [app['Name'] for app in applications] + apps_in_test = [] + for item in configured_applications: + application_position = item.split()[-1] + application_name = " ".join([word for word in item.split() if not word.isdigit()]) + apps_in_test.append((application_name, application_position)) + return apps_in_test + + def switch_application_order(self, new_apps_order): + """ + Gets current order of the apps and reorders them the same order as the given list. + + :parameter new_apps_order: a list with new indexes representing the new order of the apps. + :return: list + """ + configured = self.get_session_config(self.sessionID) + applications = configured["Config"]["TrafficProfiles"][0]["Applications"] + reordered_applications = [applications[index] for index in new_apps_order] + apiPath = '/api/v2/sessions/{}/config/config/TrafficProfiles/1'.format(self.sessionID) + self.__sendPatch(apiPath, payload={"Applications": reordered_applications}) + + def get_applications_with_filter_applied(self, *args, filter_mode="and", take=50, skip=0): + filter_by = ",".join(args) + apiPath = f"/api/v2/resources/apps?take={take}&skip={skip}&exclude=links&searchCol=Name,Description&searchVal={filter_by}&filterMode={filter_mode}" + result = self.__sendGet(apiPath, 200).json() + filtered_applications = [(item["Name"], item["Description"]) for item in result["data"]] + return filtered_applications + + def get_attacks_with_filter_applied(self, *args, filter_mode="and", take=50, skip=0): + filter_by = ",".join(args) + apiPath = f"/api/v2/resources/attacks?take={take}&skip={skip}&exclude=links&include=Metadata&searchCol=Name,Description,Direction,Severity,Keywords,References&searchVal={filter_by}&filterMode={filter_mode}" + result = self.__sendGet(apiPath, 200).json() + filtered_applications = [ + (item["Name"], item["Description"], item["Metadata"]["Direction"], item["Metadata"]["Severity"]) for item in + result["data"]] + return filtered_applications + + def get_attacks(self, filter_by, take=50, skip=0): + apiPath = f'/api/v2/resources/attacks?take={take}&skip={skip}&exclude=links&include=Metadata&searchCol=Name&searchVal={urllib.parse.quote(filter_by)}&filterMode=and' + response = self.__sendGet(apiPath, 200, debug=False).json() + return response + + def get_strikes(self): + apiPath = '/api/v2/resources/strikes?include=all' + response = self.__sendGet(apiPath, 200).json() + return response + + def get_application_id(self, app_name): + if not self.app_list: + self.app_list = self.get_applications() + if DEBUG:print('Getting application {} ID...'.format(app_name)) + for app in self.app_list: + if app['Name'] == app_name: + if DEBUG:print('Application ID = {}'.format(app['id'])) + return app['id'] + + def get_strike_id(self, strike_name): + if not self.strike_list: + self.strike_list = self.get_strikes() + for strike in self.strike_list: + if strike['Name'] == strike_name: + if DEBUG:print('Strike ID = {}'.format(strike['id'])) + return strike['id'] + + def get_attack_id(self, attack_name): + self.attack_list = self.get_attacks(attack_name) + for attack in self.attack_list["data"]: + if attack_name == attack['Name']: + return attack['id'] + + def set_agent_optimization_mode(self, mode: str, tp_id=1): + """ + Configures the agent optimization mode. + Currently the 2 modes supported: + RATE_MODE, BALANCED_MODE + """ + if mode in ["RATE_MODE", "BALANCED_MODE"]: + apiPath = '/api/v2/sessions/{}/config/config/TrafficProfiles/{}/ObjectivesAndTimeline/AdvancedSettings'.format( + self.sessionID, tp_id) + self.__sendPatch(apiPath, payload={"AgentOptimizationMode": mode}) + else: + raise ValueError("{} is not supported".format(mode)) + + def set_attack_warmup_period(self, warmup_period, ap_id=1): + """ + Sets the warmup period for the Attack profile + """ + apiPath = '/api/v2/sessions/{}/config/config/AttackProfiles/{}/ObjectivesAndTimeline/TimelineSegments/1'.format( + self.sessionID, ap_id) + self.__sendPatch(apiPath, payload={"WarmUpPeriod": int(warmup_period)}) + + def set_traffic_warmup_period(self, warmup_period, tp_id=1): + """ + Sets the warmup period for the Traffic profile + """ + apiPath = '/api/v2/sessions/{}/config/config/TrafficProfiles/{}/ObjectivesAndTimeline/AdvancedSettings'.format( + self.sessionID, tp_id) + self.__sendPatch(apiPath, payload={"WarmUpPeriod": int(warmup_period)}) + + def add_attack(self, attack_name, ap_id=1): + app_id = self.get_attack_id(attack_name=attack_name) + apiPath = '/api/v2/sessions/{}/config/config/AttackProfiles/{}/Attacks'.format(self.sessionID, ap_id) + response = self.__sendPost(apiPath, payload={"ExternalResourceURL": app_id}).json() + return response[-1]['id'] + + def add_strike_as_attack(self, strike_name, ap_id=1): + app_id = self.get_strike_id(strike_name=strike_name) + apiPath = '/api/v2/sessions/{}/config/config/AttackProfiles/{}/Attacks'.format(self.sessionID, ap_id) + response = self.__sendPost(apiPath, payload={"ProtocolID": app_id}).json() + return response[-1]['id'] + + def create_customized_attack(self, application_name, strike_name, insert_at_position): + app_id = self.get_application_id(app_name=application_name) + api_path = '/api/v2/sessions/{}/config/config/AttackProfiles/1/Attacks/operations/create'.format(self.sessionID) + payload = {"Actions": [{"ProtocolID": strike_name, "InsertAtIndex": insert_at_position}], + "ResourceURL": f'api/v2/resources/apps/{app_id}'} + response = self.__sendPost(api_path, payload=payload).json() + status_url = response["url"].split("/")[-1] + api_path = '/api/v2/sessions/{}/config/config/AttackProfiles/1/Attacks/operations/create/{}'.format( + self.sessionID, status_url) + for index in range(10): + response = self.__sendGet(api_path, 200).json() + time.sleep(1) + if response["state"] == "SUCCESS": + return + else: + continue + raise Exception("Application action was not added after 10 seconds") + + def insert_attack_action_at_exact_position(self, attack_id, action_id, insert_at_position): + api_path = f'/api/v2/sessions/{self.sessionID}/config/config/AttackProfiles/1/Attacks/{attack_id}/Tracks/1/operations/add-actions' + response = self.__sendPost(api_path, payload={ + "Actions": [{"ActionID": action_id, "InsertAtIndex": insert_at_position}]}).json() + status_url = response["url"].split("/")[-1] + api_path = f'/api/v2/sessions/{self.sessionID}/config/config/AttackProfiles/1/Attacks/{attack_id}/Tracks/1/operations/add-actions/{status_url}' + for index in range(10): + response = self.__sendGet(api_path, 200).json() + time.sleep(1) + if response["state"] == "SUCCESS": + return response["result"][insert_at_position]["id"] + else: + continue + + def add_application(self, app_name, tp_id=1): + app_id = self.get_application_id(app_name=app_name) + apiPath = '/api/v2/sessions/{}/config/config/TrafficProfiles/{}/Applications'.format(self.sessionID, tp_id) + response = self.__sendPost(apiPath, payload={"ExternalResourceURL": app_id}).json() + return response[-1]['id'] + + def insert_application_at_action_exact_position(self, app_id, action_id, position): + api_path = f'/api/v2/sessions/{self.sessionID}/config/config/TrafficProfiles/1/Applications/{app_id}/Tracks/1/operations/add-actions' + response = self.__sendPost(api_path, + payload={"Actions": [{"ActionID": action_id, "InsertAtIndex": position}]}).json() + status_url = response["url"].split("/")[-1] + api_path = f'/api/v2/sessions/{self.sessionID}/config/config/TrafficProfiles/1/Applications/{app_id}/Tracks/1/operations/add-actions/{status_url}' + for index in range(10): + response = self.__sendGet(api_path, 200).json() + time.sleep(1) + if response["state"] == "SUCCESS": + return response["result"][position]["id"] + else: + continue + raise Exception("Application action was not added after 10 seconds") + + def add_application_action(self, app_id, action_name, tp_id=1): + apiPath = '/api/v2/sessions/{}/config/config/TrafficProfiles/{}/Applications/{}/Tracks/1/Actions'.format( + self.sessionID, tp_id, app_id) + self.__sendPost(apiPath, payload={"Name": action_name}).json() + + def set_application_action_value(self, app_id, action_id, param_id, value, file_value=None, source=None, tp_id=1): + apiPath = '/api/v2/sessions/{}/config/config/TrafficProfiles/{}/Applications/{}/Tracks/1/Actions/{}/Params/{}'.format( + self.sessionID, tp_id, app_id, action_id, param_id) + payload = {"Value": value, "FileValue": file_value, "Source": source} + self.__sendPatch(apiPath, payload) + + def get_application_actions(self, app_id): + apiPath = f'/api/v2/sessions/{self.sessionID}/config/config/TrafficProfiles/1/Applications/{app_id}/Tracks/1/Actions' + response = self.__sendGet(apiPath, 200, debug=False).json() + return response + + def delete_application_action(self, app_id, action_id, tp_id=1): + apiPath = '/api/v2/sessions/{}/config/config/TrafficProfiles/{}/Applications/{}/Tracks/1/Actions/{}'.format( + self.sessionID, tp_id, app_id, action_id) + self.__sendDelete(apiPath, self.headers) + + def add_attack_action(self, att_id, action_name, ap_id=1): + apiPath = '/api/v2/sessions/{}/config/config/AttackProfiles/{}/Attacks/{}/Tracks/1/Actions'.format( + self.sessionID, ap_id, att_id) + self.__sendPost(apiPath, payload={"Name": action_name}).json() + + def add_attack_profile(self): + apiPath = '/api/v2/sessions/{}/config/config/AttackProfiles'.format(self.sessionID) + response = self.__sendPost(apiPath, payload={}).json() + return response[-1]['id'] + + def add_traffic_profile(self): + apiPath = '/api/v2/sessions/{}/config/config/TrafficProfiles'.format(self.sessionID) + response = self.__sendPost(apiPath, payload={}).json() + return response[-1]['id'] + + def set_traffic_profile_timeline(self, duration, objective_value, objective_unit=None, pr_id=1): + apiPath = '/api/v2/sessions/{}/config/config/TrafficProfiles/{}/ObjectivesAndTimeline/TimelineSegments/1'.format( + self.sessionID, pr_id) + payload = {"Duration": duration, "PrimaryObjectiveValue": objective_value, + "PrimaryObjectiveUnit": objective_unit} + self.__sendPatch(apiPath, payload) + + def set_application_simulated_users_timeline(self, max_su_per_second, max_pending_su, objective_unit=None, pr_id=1): + apiPath = '/api/v2/sessions/{}/config/config/TrafficProfiles/1/ObjectivesAndTimeline/PrimaryObjective'.format( + self.sessionID) + payload = {"MaxSimulatedUsersPerInterval": max_su_per_second, "MaxPendingSimulatedUsers": max_pending_su} + self.__sendPatch(apiPath, payload) + + def set_primary_objective(self, objective, tp_id=1): + apiPath = '/api/v2/sessions/{}/config/config/TrafficProfiles/{}/ObjectivesAndTimeline/PrimaryObjective'.format( + self.sessionID, tp_id) + self.__sendPatch(apiPath, payload={"Type": objective, "Unit": ""}) + + def add_primary_objective(self, objective, tp_id=1): + apiPath = '/api/v2/sessions/{}/config/config/TrafficProfiles/{}/ObjectivesAndTimeline/PrimaryObjective'.format( + self.sessionID, tp_id) + self.__sendPatch(apiPath, payload={"Type": objective, "Unit": ""}) + + def add_secondary_objective(self, tp_id=1): + apiPath = '/api/v2/sessions/{}/config/config/TrafficProfiles/{}/ObjectivesAndTimeline/SecondaryObjectives'.format( + self.sessionID, tp_id) + self.__sendPost(apiPath, payload={}).json() + + def add_secondary_objective_value(self, objective, objective_value, objective_unit=None, tp_id=1): + apiPath = '/api/v2/sessions/{}/config/config/TrafficProfiles/{}/ObjectivesAndTimeline/SecondaryObjectives/1'.format( + self.sessionID, tp_id) + self.__sendPatch(apiPath, payload={"Type": objective}) + apiPath = '/api/v2/sessions/{}/config/config/TrafficProfiles/{}/ObjectivesAndTimeline/TimelineSegments/1/SecondaryObjectiveValues/1'.format( + self.sessionID, tp_id) + self.__sendPatch(apiPath, payload={"Value": objective_value}) + if objective_unit: + self.__sendPatch(apiPath, payload={"Unit": objective_unit}) + + def set_traffic_profile_client_tls(self, version, status, pr_id=1): + apiPath = '/api/v2/sessions/{}/config/config/TrafficProfiles/{}/TrafficSettings/DefaultTransportProfile/ClientTLSProfile'.format( + self.sessionID, pr_id) + self.__sendPatch(apiPath, payload={version: status}) + + def set_traffic_profile_server_tls(self, version, status, pr_id=1): + apiPath = '/api/v2/sessions/{}/config/config/TrafficProfiles/{}/TrafficSettings/DefaultTransportProfile/ServerTLSProfile'.format( + self.sessionID, pr_id) + self.__sendPatch(apiPath, payload={version: status}) + + def set_attack_profile_timeline(self, duration, objective_value, max_concurrent_attacks=None, iteration_count=0, + ap_id=1): + apiPath = '/api/v2/sessions/{}/config/config/AttackProfiles/{}/ObjectivesAndTimeline/TimelineSegments/1'.format( + self.sessionID, ap_id) + payload = {"Duration": duration, "AttackRate": objective_value, "MaxConcurrentAttack": max_concurrent_attacks, + "IterationCount": iteration_count} + self.__sendPatch(apiPath, payload) + + def set_attack_profile_client_tls(self, version, status, pr_id=1): + apiPath = '/api/v2/sessions/{}/config/config/AttackProfiles/{}/TrafficSettings/DefaultTransportProfile/ClientTLSProfile'.format( + self.sessionID, pr_id) + self.__sendPatch(apiPath, payload={version: status}) + + def set_attack_profile_server_tls(self, version, status, pr_id=1): + apiPath = '/api/v2/sessions/{}/config/config/AttackProfiles/{}/TrafficSettings/DefaultTransportProfile/ServerTLSProfile'.format( + self.sessionID, pr_id) + self.__sendPatch(apiPath, payload={version: status}) + + def set_custom_payload(self, apiPath, fileName): + resp = self.__sendPatch(apiPath, payload={"Source": "PayloadProfile"}) + if resp.status_code != 204: + if DEBUG:print("Error patching payload type: {}".format(resp.json())) + uploadUrl = "/api/v2/resources/payloads" + payload = self.get_resource(uploadUrl, name=os.path.basename(fileName)) + if not payload: + payloadFile = open(fileName, 'rb') + resp = self.__sendPost(uploadUrl, payload=None, customHeaders=self.headers, + files={'file': payloadFile}).json() + payload = {"FileValue": {"fileName": resp["fileName"], "resourceURL": resp["resourceURL"]}} + else: + payload = {"FileValue": {"fileName": payload["name"], "resourceURL": payload["links"][0]["href"]}} + self.__sendPatch(apiPath, payload=payload) + + def set_application_custom_payload(self, appName, actionName, paramName, fileName): + config = self.get_session_config()['Config'] + applicationsByName = {app['Name']: app for app in config['TrafficProfiles'][0]['Applications']} + httpApp = applicationsByName[appName] + actionsByName = {action['Name']: action for action in httpApp['Tracks'][0]['Actions']} + postAction = actionsByName[actionName] + actionParametersByName = {param['Name']: param for param in postAction['Params']} + bodyParam = actionParametersByName[paramName] + apiPath = "/api/v2/sessions/{}/config/config/TrafficProfiles/1/Applications/{}/Tracks/1/Actions/{}/Params/{}".format( + self.sessionID, httpApp['id'], postAction['id'], bodyParam['id'] + ) + self.set_custom_payload(apiPath, fileName) + + def set_custom_playlist(self, apiPath, fileName, value=None): + resp = self.__sendPatch(apiPath, payload={"Source": "Playlist"}) + if resp.status_code != 204: + if DEBUG:print("Error patching payload type: {}".format(resp.json())) + uploadUrl = "/api/v2/resources/playlists" + playlist = self.get_resource(uploadUrl, name=os.path.basename(fileName)) + if not playlist: + playlistFile = open(fileName, 'rb') + resp = self.__sendPost(uploadUrl, payload=None, customHeaders=self.headers, + files={'file': playlistFile}).json() + payload = {"FileValue": {"fileName": resp["fileName"], "resourceURL": resp["resourceURL"], "Value": value}} + else: + payload = {"FileValue": {"fileName": playlist["name"], "resourceURL": playlist["links"][0]["href"], + "Value": value}} + self.__sendPatch(apiPath, payload=payload) + + def set_attack_custom_playlist(self, attackName, actionName, paramName, fileName, value="Query"): + config = self.get_session_config()['Config'] + attacksByName = {app['Name']: app for app in config['AttackProfiles'][0]['Attacks']} + attack = attacksByName[attackName] + actionsByName = {action['Name']: action for action in attack['Tracks'][0]['Actions']} + postAction = actionsByName[actionName] + actionParametersByName = {param['Name']: param for param in postAction['Params']} + bodyParam = actionParametersByName[paramName] + apiPath = "/api/v2/sessions/{}/config/config/AttackProfiles/1/Attacks/{}/Tracks/1/Actions/{}/Params/{}".format( + self.sessionID, attack['id'], postAction['id'], bodyParam['id'] + ) + self.set_custom_playlist(apiPath, fileName, value) + + def get_all_configs(self): + apiPath = '/api/v2/configs' + response = self.__sendGet(apiPath, 200).json() + return response + + def get_config_id(self, test_name): + configs = self.get_all_configs() + for config in configs: + if config['displayName'] == test_name: + if DEBUG:print('Config ID = {}'.format(config['id'])) + return config['id'] + + def get_resource(self, apiPath, name): + resp = self.__sendGet(apiPath, 200).json() + for resource in resp: + if resource["name"] == name: + return resource + + def save_config(self, test_name, timeout=10): + apiPath = '/api/v2/sessions/{}/config/operations/save'.format(self.sessionID) + response = self.__sendPost(apiPath, payload={"Name": test_name}).json() + apiPath = '/api/v2/sessions/{}/config/operations/save/{}'.format(self.sessionID, response['id']) + if not self.wait_event_success(apiPath, timeout): + raise TimeoutError( + "Could not save copy for test= {}. Timeout reached = {} seconds".format(test_name, timeout)) + + def load_config(self, test_name): + configID = self.get_config_id(test_name=test_name) + apiPath = '/api/v2/sessions' + response = self.__sendPost(apiPath, payload={"configUrl": configID}).json() + if response: + if DEBUG:print('Test= {} was loaded with ID= {}'.format(test_name, response[-1]['id'])) + self.sessionID = response[-1]['id'] + return + else: + raise Exception('Failed to load test= {}'.format(test_name)) + + def collect_diagnostics(self, timeout=600): + apiPath = '/api/v2/diagnostics/operations/export' + response = self.__sendPost(apiPath, payload={"componentList": [], "sessionId": self.sessionID}).json() + apiPath = '/api/v2/diagnostics/operations/export/{}'.format(response["id"]) + response = self.wait_event_success(apiPath, timeout) + + return response['id'] + + def set_diagnostics_level(self, log_level): + apiPath = '/api/v2/log-config' + response = self.__sendPut(apiPath, payload={"level": log_level}) + + return response + + def add_tunnel_stack(self, network_segment_number=1): + """ + Add a tunnel stack + """ + apiPath = f'/api/v2/sessions/{self.sessionID}/config/config/NetworkProfiles/1/IPNetworkSegment/{network_segment_number}/TunnelStacks' + self.__sendPost(apiPath, payload={}) + + def get_tunnel_stack(self): + """ + Obtain information about tunnel stack configuration on client. + """ + apiPath = f'/api/v2/sessions/{self.sessionID}/config/config/NetworkProfiles/1/IPNetworkSegment/1/TunnelStacks' + return self.__sendGet(apiPath, 200).json() + + def get_tunnel_outer_ip(self): + """ + Obtain information about tunnel stack configuration on client. + """ + apiPath = f'/api/v2/sessions/{self.sessionID}/config/config/NetworkProfiles/1/IPNetworkSegment/1/TunnelStacks/1/OuterIPRange' + return self.__sendGet(apiPath, 200).json() + + def set_tunnel_stack_gateway_vpn_ip(self, tunnel_type, gw_vpn_ip, network_segment_number=1): + """ + Set the value for VPN gateway IP. + + :params tunnel_type: (str) CiscoAnyConnectSettings, FortinetSettings, PANGPSettings + """ + apiPath = f'/api/v2/sessions/{self.sessionID}/config/config/NetworkProfiles/1/IPNetworkSegment/{network_segment_number}/TunnelStacks/1/TunnelRange/{tunnel_type}' + response = self.get_tunnel_stack() + if response: + self.__sendPatch(apiPath, payload={"VPNGateway": gw_vpn_ip}) + else: + self.add_tunnel_stack() + self.__sendPatch(apiPath, payload={"VPNGateway": gw_vpn_ip}) + + def set_tunnel_stack_type(self, tunnel_type, network_segment_number=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/TunnelStacks/1/TunnelRange'.format( + self.sessionID, network_segment_number) + self.__sendPatch(apiPath, payload={"VendorType": tunnel_type}) + + def set_tunnel_count(self, number_of_tunnels, network_segment_number=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/TunnelStacks/1/TunnelRange'.format( + self.sessionID, network_segment_number) + resp = self.__sendPatch(apiPath, payload={"TunnelCountPerOuterIP": int(number_of_tunnels)}) + if resp.status_code != 204: + if DEBUG:print("Error setting VPN tunnel count per outer IP: {}".format(resp.json())) + + def set_tunnel_establisment_timeout(self, tunnel_timeout, network_segment_number=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/TunnelStacks/1/TunnelRange'.format( + self.sessionID, network_segment_number) + self.__sendPatch(apiPath, payload={"TunnelEstablishmentTimeout": tunnel_timeout}) + + def set_pan_tunnel_portal_hostname(self, portal_hostname, network_segment_number=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/TunnelStacks/1/TunnelRange/PANGPSettings'.format( + self.sessionID, network_segment_number) + self.__sendPatch(apiPath, payload={"PortalHostname": portal_hostname}) + + def set_tunnel_pan_vpn_gateways(self, tunnel_type, vpn_gateways, network_segment_number=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/TunnelStacks/1/TunnelRange/{}'.format( + self.sessionID, network_segment_number, tunnel_type) + self.__sendPatch(apiPath, payload={"VPNGateways": list(vpn_gateways.split(" "))}) + + def set_tunnel_cisco_vpn_gateway(self, tunnel_type, vpn_gateways, network_segment_number=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/TunnelStacks/1/TunnelRange/{}'.format( + self.sessionID, network_segment_number, tunnel_type) + self.__sendPatch(apiPath, payload={"VPNGateway": vpn_gateways}) + + def set_tunnel_cisco_connection_profiles(self, connection_profiles, network_segment_number=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/TunnelStacks/1/TunnelRange/CiscoAnyConnectSettings'.format( + self.sessionID, network_segment_number) + self.__sendPatch(apiPath, payload={"ConnectionProfiles": list(connection_profiles.split(" "))}) + + def set_tunnel_auth_settings(self, tunnel_type, field, value, source, network_segment_number=1): + # Field can be UsernamesParam or PasswordsParam + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/TunnelStacks/1/TunnelRange/{}/AuthSettings/{}'.format( + self.sessionID, network_segment_number, tunnel_type, field) + self.__sendPatch(apiPath, payload={"Value": value, "Source": source}) + + def set_tunnel_outer_ip_gateway(self, gateway_ip, network_segment_number=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/TunnelStacks/1/OuterIPRange'.format( + self.sessionID, network_segment_number) + self.__sendPatch(apiPath, payload={"GwAuto": False}) + resp = self.__sendPatch(apiPath, payload={"GwStart": gateway_ip}) + if resp.status_code != 204: + if DEBUG:print("Error network tags in Inncer IP range: {}".format(resp.json())) + + def set_tunnel_automatic_gateway(self, gateway_auto=True, network_segment=1, tunnel_stack=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/TunnelStacks/{}/OuterIPRange'.format( + self.sessionID, network_segment, tunnel_stack) + self.__sendPatch(apiPath, payload={"GwAuto": gateway_auto}) + + def set_tunnel_outer_ip_range(self, ip_start, count, max_count, network_segment_number=1, ip_incr="0.0.0.1"): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/TunnelStacks/1/OuterIPRange'.format( + self.sessionID, network_segment_number) + self.__sendPatch(apiPath, payload={"IpAuto": False}) + self.__sendPatch(apiPath, payload={"IpStart": ip_start}) + self.__sendPatch(apiPath, payload={"IpIncr": ip_incr}) + self.__sendPatch(apiPath, payload={"Count": count}) + self.__sendPatch(apiPath, payload={"maxCountPerAgent": max_count}) + + def set_tunnel_outer_ip_start(self, ip_start, network_segment_number=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/TunnelStacks/1/OuterIPRange'.format( + self.sessionID, network_segment_number) + self.__sendPatch(apiPath, payload={"IpStart": ip_start}) + + def set_tunnel_stack_dns_servers(self, servers, network_segment_number=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/TunnelStacks/1/TunnelRange/DNSResolver'.format( + self.sessionID, network_segment_number) + serverList = list(map(lambda x: {"name": x}, list(servers.split(",")))) + self.__sendPatch(apiPath, payload={"nameServers": serverList}) + + def delete_ip_stack(self, network_segment_number=1, ip_stack_number=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/IPRanges/{}'.format( + self.sessionID, network_segment_number, ip_stack_number) + self.__sendDelete(apiPath, self.headers) + + def set_tunnel_inner_ip_network_tags(self, network_tags, network_segment_number=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/TunnelStacks/1/InnerIPRange'.format( + self.sessionID, network_segment_number) + self.__sendPatch(apiPath, payload={"networkTags": network_tags}) + + def set_tunnel_udp_port(self, tunnel_type, encapsulation_type, udp_port, network_segment_number=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/TunnelStacks/1/TunnelRange/{}/{}'.format( + self.sessionID, network_segment_number, tunnel_type, encapsulation_type) + self.__sendPatch(apiPath, payload={"UdpPort": udp_port}) + + def __get_status_for_advance_timeline(self): + apiPath = f'/api/v2/sessions/{self.sessionID}/config/config/TrafficProfiles/1/ObjectivesAndTimeline/PrimaryObjective/Timeline' + result = self.__sendGet(apiPath, 200) + return result.json() + + def get_specific_value_from_given_ramp_segment(self, segment_type, given_key): + acceptable_keys = ["Duration", "SegmentType", "Enabled", "AutomaticObjectiveValue", "NumberOfSteps", + "PrimaryObjectiveValue", "SecondaryObjectiveValues", "ObjectiveValue", "ObjectiveUnit"] + if given_key not in acceptable_keys: + raise Exception(f"Cannot find key {given_key}, make sure the key exists in this list {acceptable_keys}") + response = self.__get_status_for_advance_timeline() + return response[segment_type - 1][given_key] + + def enable_step_ramp_down_or_up(self, segment_type=None): + """ + A method to activate the step ramp-up feature + + :params segment_type: default set to enable for both ramp up or down, could receive a string "up"/"down" if + ... only one segment is to be enabled. + """ + if segment_type: + self.__setStepRamp(True, segment_type) + else: + for segment in [1, 3]: + self.__setStepRamp(True, segment) + + def disable_step_ramp_down_or_up(self, segment_type=None): + """ + A method to activate the step ramp-up feature + + :params segment_type: default set to enable for both ramp up or down, could receive a string "up"/"down" if + ... only one segment is to be enabled. + """ + if segment_type: + self.__setStepRamp(False, segment_type) + else: + for segment in [1, 3]: + self.__setStepRamp(False, segment) + + def __setStepRamp(self, action, segment_type): + apiPath = f'/api/v2/sessions/{self.sessionID}/config/config/TrafficProfiles/1/ObjectivesAndTimeline/PrimaryObjective/Timeline/{segment_type}' + self.__sendPatch(apiPath, payload={"Enabled": action}) + + def set_specific_value_for_given_ramp_segment(self, segment_type, key, value): + """ + A method to set different parameters at the timeline segment + + :params key: one of the endpoint keys, must fit in the acceptable_keys list + :params value: any value + :params segment_type: specify which one of the RampUpDown segment to use must be "UP" / "DOWN" + """ + acceptable_keys = ["Duration", "NumberOfSteps", "ObjectiveValue", "ObjectiveUnit"] + if key not in acceptable_keys: + raise Exception(f"Cannot find key {key}, make sure the key exists in this list {acceptable_keys}") + apiPath = f'/api/v2/sessions/{self.sessionID}/config/config/TrafficProfiles/1/ObjectivesAndTimeline/PrimaryObjective/Timeline/{segment_type}' + self.__sendPatch(apiPath, payload={key: value}) + if self.get_specific_value_from_given_ramp_segment(segment_type, key) != value: + raise Exception(f'An error has occured, setting {key} to {value} did not work!') + return True + + def delete_added_application(self, app_id): + apiPath = f'/api/v2/sessions/{self.sessionID}/config/config/TrafficProfiles/1/Applications/{app_id}' + self.__sendDelete(apiPath, self.headers) + + def get_disk_usage_info(self): + apiPath = f'/api/v2/sessions/{self.sessionID}/config/config/ExpectedDiskSpace' + result = self.__sendGet(apiPath, 200) + return result.json() + + def __set_traffic_profile(self, option): + apiPath = f"/api/v2/sessions/{self.sessionID}/config/config/TrafficProfiles/1" + self.__sendPatch(apiPath, payload={"Active": option}) + + def __set_attack_profile(self, option): + apiPath = f"/api/v2/sessions/{self.sessionID}/config/config/AttackProfiles/1" + self.__sendPatch(apiPath, payload={"Active": option}) + + def enable_traffic_profile(self): + self.__set_traffic_profile(True) + + def disable_traffic_profile(self): + self.__set_traffic_profile(False) + + def enable_attack_profile(self): + self.__set_attack_profile(True) + + def disable_attack_profile(self): + self.__set_attack_profile(False) + + def enable_application_inherit_tls(self, app_id): + """ + enable inherit tls for any added application + """ + pass + self.__set_application_inherit_tls_status(True, app_id) + + def disable_application_inherit_tls(self, app_id): + """ + disable inherit tls for any added application + + :param app_id: application index in the added order + """ + self.__set_application_inherit_tls_status(False, app_id) + + def enable_attack_inherit_tls(self, attack_id): + """ + enable inherit tls for any added attack + + :param attack_id: attack index in the added order + """ + self.__set_attack_inherit_tls_status(True, attack_id) + + def disable_attack_inherit_tls(self, attack_id): + """ + enable inherit tls for any added attack + + :param attack_id: attack index in the added order + """ + self.__set_attack_inherit_tls_status(False, attack_id) + + def __set_application_inherit_tls_status(self, status, app_id): + apiPath = f'/api/v2/sessions/{self.sessionID}/config/config/TrafficProfiles/1/Applications/{app_id}' + self.__sendPatch(apiPath, payload={"InheritTLS": status}) + + def __set_attack_inherit_tls_status(self, status, attack_id): + apiPath = f'/api/v2/sessions/{self.sessionID}/config/config/AttackProfiles/1/Attacks/{attack_id}' + self.__sendPatch(apiPath, payload={"InheritTLS": status}) + + def clear_agent_ownership(self, agents_ips=None): + agents_ids = [] + result_id = self.get_current_session_result() + if agents_ips: + for agent_ip in agents_ips: + agents_ids.append(self.get_agents_ids(agent_ip)[0]) + apiPath = "/api/v2/agents/operations/release" + agents_to_release = [] + for agent_id in agents_ids: + agents_to_release.append({"agentId": agent_id}) + payload = { + "sessionId": result_id, + "agentsData": agents_to_release + } + response = self.__sendPost(apiPath, payload=payload).json() + status_url = response["id"] + api_path = f'/api/v2/agents/operations/release/{status_url}' + for index in range(10): + response = self.__sendGet(api_path, 200).json() + time.sleep(1) + if response["state"] == "SUCCESS": + return True + else: + continue + + def __get_results_list(self): + apiPath = "/api/v2/results?exclude=links" + result = self.__sendGet(apiPath, 200) + return result.json() + + def get_current_session_result(self): + sessions_results = self.__get_results_list() + for index in range(0, len(sessions_results)): + if sessions_results[index]['activeSession'] == self.sessionID: + return sessions_results[index]['testName'] + else: + continue + + def configure_attack_client_tls_settings(self, attack_id, config_endpoint, config_change): + apiPath = f'/api/v2/sessions/{self.sessionID}/config/config/AttackProfiles/1/Attacks/{attack_id}/ClientTLSProfile' + available_endpoints = ["tls12Enabled", "tls13Enabled", "ciphers12", "ciphers13", "immediateClose", + "middleBoxEnabled"] + if config_endpoint not in available_endpoints: + raise ("The endpoint you are trying to configure doesn't exist!") + self.__sendPatch(apiPath, payload={config_endpoint: config_change}) + + def configure_attack_server_tls_settings(self, attack_id, config_endpoint, config_change): + apiPath = f'/api/v2/sessions/{self.sessionID}/config/config/AttackProfiles/1/Attacks/{attack_id}/ServerTLSProfile' + available_endpoints = ["tls12Enabled", "tls13Enabled", "ciphers12", "ciphers13", "immediateClose", + "middleBoxEnabled"] + if config_endpoint not in available_endpoints: + raise ("The endpoint you are trying to configure doesn't exist!") + self.__sendPatch(apiPath, payload={config_endpoint: config_change}) + + def configure_application_client_tls_settings(self, app_id, config_endpoint, config_change): + apiPath = f'/api/v2/sessions/{self.sessionID}/config/config/TrafficProfiles/1/Applications/{app_id}/ClientTLSProfile' + available_endpoints = ["tls12Enabled", "tls13Enabled", "ciphers12", "ciphers13", "immediateClose", + "middleBoxEnabled"] + if config_endpoint not in available_endpoints: + raise ("The endpoint you are trying to configure doesn't exist!") + self.__sendPatch(apiPath, payload={config_endpoint: config_change}) + + def configure_application_server_tls_settings(self, app_id, config_endpoint, config_change): + apiPath = f'/api/v2/sessions/{self.sessionID}/config/config/TrafficProfiles/1/Applications/{app_id}/ServerTLSProfile' + available_endpoints = ["tls12Enabled", "tls13Enabled", "ciphers12", "ciphers13", "immediateClose", + "middleBoxEnabled"] + if config_endpoint not in available_endpoints: + raise ("The endpoint you are trying to configure doesn't exist!") + self.__sendPatch(apiPath, payload={config_endpoint: config_change}) + + def enable_configured_application(self, app_id): + self.__set_configured_applications(True, app_id) + + def disable_configured_application(self, app_id): + self.__set_configured_applications(False, app_id) + + def enable_configured_attack(self, app_id): + self.__set_configured_attacks(True, app_id) + + def disable_configured_attack(self, attack_id): + self.__set_configured_attacks(False, attack_id) + + def __set_configured_applications(self, option, application): + apiPath = f"/api/v2/sessions/{self.sessionID}/config/config/TrafficProfiles/1/Applications/{application}" + self.__sendPatch(apiPath, payload={"Active": option}) + + def __set_configured_attacks(self, option, attack): + apiPath = f"/api/v2/sessions/{self.sessionID}/config/config/AttackProfiles/1/Attacks/{attack}" + self.__sendPatch(apiPath, payload={"Active": option}) + + def set_sack(self, profile, tcp_profile, value=True): + apiPath = '/api/v2/sessions/{}/config/config/{}/1/TrafficSettings/DefaultTransportProfile/{}'.format( + self.sessionID, profile, tcp_profile) + self.__sendPatch(apiPath, payload={"SackEnabled": value}) + + def add_ipsec_stack(self, network_segment_number=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/IPSecStacks'.format( + self.sessionID, network_segment_number) + self.__sendPost(apiPath, {}) + + def set_ipsec_stack_role(self, network_segment_number, ipstack_role): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/IPSecStacks/1'.format( + self.sessionID, network_segment_number) + self.__sendPatch(apiPath, payload={"StackRole": ipstack_role}) + + def set_ipsec_tunnel_reattempt_count(self, tunnel_reattempt_count, network_segment_number): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/IPSecStacks/1'.format( + self.sessionID, network_segment_number) + self.__sendPatch(apiPath, payload={"RetryCount": tunnel_reattempt_count}) + + def set_ipsec_emulated_subnet_settings(self, startIP, increment, prefix=24, host_count_per_tunnel=1, + network_segment_number=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/IPSecStacks/1/EmulatedSubConfig'.format( + self.sessionID, network_segment_number) + payload = {"Start": startIP, + "Increment": increment, + "Prefix": prefix, + "HostCountPerTunnel": host_count_per_tunnel} + self.__sendPatch(apiPath, payload) + + def set_ph1_ipsec_algorithms(self, ph1_algorithms, network_segment_number=1, ipsec_stack_number=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/IPSecStacks/{}/IPSecRange/IKEPhase1Config'.format( + self.sessionID, network_segment_number, ipsec_stack_number) + algorithms = list(ph1_algorithms.split(" ")) + payload = {"EncAlgorithm": algorithms[0], + "HashAlgorithm": algorithms[1], + "DHGroup": algorithms[2], + "PrfAlgorithm": algorithms[3]} + self.__sendPatch(apiPath, payload) + + def set_ph2_ipsec_algorithms(self, ph2_algorithms, network_segment_number=1, ipsec_stack_number=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/IPSecStacks/{}/IPSecRange/IKEPhase2Config'.format( + self.sessionID, network_segment_number, ipsec_stack_number) + algorithms = list(ph2_algorithms.split(" ")) + payload = {"EncAlgorithm": algorithms[0], + "HashAlgorithm": algorithms[1], + "PfsGroup": algorithms[2]} + self.__sendPatch(apiPath, payload) + + def set_ipsec_public_peer(self, public_peer_ip, network_segment_number=1, ipsec_stack_number=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/IPSecStacks/{}/IPSecRange'.format( + self.sessionID, network_segment_number, ipsec_stack_number) + self.__sendPatch(apiPath, {"PublicPeer": public_peer_ip}) + + def set_ipsec_public_peer_increment(self, public_peer_increment, network_segment_number=1, ipsec_stack_number=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/IPSecStacks/{}/IPSecRange'.format( + self.sessionID, network_segment_number, ipsec_stack_number) + self.__sendPatch(apiPath, {"PublicPeerIncrement": public_peer_increment}) + + def set_ipsec_protected_subnet_start(self, protected_subnet_start, network_segment_number=1, ipsec_stack_number=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/IPSecStacks/{}/IPSecRange/ProtectedSubConfig'.format( + self.sessionID, network_segment_number, ipsec_stack_number) + self.__sendPatch(apiPath, {"Start": protected_subnet_start}) + + def set_ipsec_protected_subnet_increment(self, increment, network_segment_number=1, ipsec_stack_number=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/IPSecStacks/{}/IPSecRange/ProtectedSubConfig'.format( + self.sessionID, network_segment_number, ipsec_stack_number) + self.__sendPatch(apiPath, {"Increment": increment}) + + def set_ipsec_preshared_key(self, sharedkey, network_segment_number, ipsec_stack_number=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/IPSecStacks/{}/IPSecRange/AuthSettings'.format( + self.sessionID, network_segment_number, ipsec_stack_number) + self.__sendPatch(apiPath, {"SharedKey": sharedkey}) + + def set_ipsec_outer_ip_range_start(self, outer_ip_range_start, network_segment_number=1, ipsec_stack_number=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/IPSecStacks/{}/OuterIPRange'.format( + self.sessionID, network_segment_number, ipsec_stack_number) + self.__sendPatch(apiPath, {"IpStart": outer_ip_range_start}) + + def set_ipsec_outer_ip_range_gateway(self, outer_ip_range_gateway, network_segment_number=1, ipsec_stack_number=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/IPSecStacks/{}/OuterIPRange'.format( + self.sessionID, network_segment_number, ipsec_stack_number) + self.__sendPatch(apiPath, {"GwStart": outer_ip_range_gateway}) + + def set_ipsec_outer_ip_range_params(self, startIP, increment, count, maxcountperagent, netmask, gateway, + network_segment_number, ipsec_stack_number=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/IPSecStacks/{}/OuterIPRange'.format( + self.sessionID, network_segment_number, ipsec_stack_number) + payload = {"IpAuto": False, + "IpStart": startIP, + "IpIncr": increment, + "Count": count, + "maxCountPerAgent": maxcountperagent, + "NetMask": netmask, + "GwStart": gateway} + self.__sendPatch(apiPath, payload) + + def check_ER_status(self, network_segment_number=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/EmulatedRouter'.format( + self.sessionID, network_segment_number) + response = self.__sendGet(apiPath, 200).json() + return response['Enabled'] + + def get_IP_stack_IP_start(self, network_segment_number=1, IP_range=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/IPRanges/{}'.format( + self.sessionID, network_segment_number, IP_range) + response = self.__sendGet(apiPath, 200).json() + return response['IpStart'] + + def get_TLS_VPN_IP_start(self, network_segment_number=1, tunnel_stack=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/TunnelStacks/{}/OuterIPRange'.format( + self.sessionID, network_segment_number, tunnel_stack) + response = self.__sendGet(apiPath, 200).json() + return response['IpStart'] + + def get_IPsec_IP_start(self, network_segment_number=1, IPsec_stack=1): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/IPNetworkSegment/{}/IPSecStacks/{}/OuterIPRange'.format( + self.sessionID, network_segment_number, IPsec_stack) + response = self.__sendGet(apiPath, 200).json() + return response['IpStart'] + + def create_session_precanned_config(self, config_name): + config_lookup = {config["displayName"]: config["id"] for config in self.get_all_configs()} + config_id = config_lookup.get(config_name) + if config_id is None: + raise ValueError(f"Pre-canned config with name '{config_name}' was not found") + self.configID = config_id + self.sessionID = self.open_config() + + def set_base_path_url(self, server_test_ip): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/DUTNetworkSegment/1/PepDUT/AuthProfileParams/3'.format( + self.sessionID) + concated_ip = "/http/" + server_test_ip + self.__sendPatch(apiPath, payload={"Value": concated_ip}) + + def set_okta_credentials_ep(self, okta_username, okta_password): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/DUTNetworkSegment/1/PepDUT/AuthProfileParams/4'.format( + self.sessionID) + self.__sendPatch(apiPath, payload={"Value": okta_username}) + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/DUTNetworkSegment/1/PepDUT/AuthProfileParams/5'.format( + self.sessionID) + self.__sendPatch(apiPath, payload={"Value": okta_password}) + + def set_okta_credentials_gp(self, okta_username, okta_password): + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/DUTNetworkSegment/1/PepDUT/AuthProfileParams/1'.format( + self.sessionID) + self.__sendPatch(apiPath, payload={"Value": okta_username}) + apiPath = '/api/v2/sessions/{}/config/config/NetworkProfiles/1/DUTNetworkSegment/1/PepDUT/AuthProfileParams/2'.format( + self.sessionID) + self.__sendPatch(apiPath, payload={"Value": okta_password}) + + def export_controller(self, export_path=None, file_name="mycontroller.zip"): + apiPath = '/api/v2/controller-migration/operations/export' + payload = { + "keycloak": True, + "config": True, + "licenseServers": True, + "externalNatsBrokers": True, + "results": True + } + result = self.__sendPost(apiPath, payload=payload).json() + apiPath = apiPath + "/" + str(result["id"]) + for index in range(36): + time.sleep(10) + result = self.__sendGet(apiPath, 200).json() + if result["state"] == "SUCCESS": + download_url = result["resultUrl"] + break + elif result["state"] == "FAILURE": + raise "Export controller operation did not succeeded after 5 minutes" + customHeaders = self.headers + customHeaders['Accept'] = 'application/zip' + response = self.session.get('{}{}'.format(self.host, "/" + download_url), headers=customHeaders, stream=True) + if export_path: + file_name = os.path.join(export_path, file_name) + with open(file_name, 'wb') as fd: + for chunk in response.iter_content(chunk_size=256): + fd.write(chunk) + + def import_controller(self, import_path=None, file_name="fakenews.zip"): + apiPath = '/api/v2/controller-migration/operations/import' + customHeaders = self.headers + customHeaders['Accept'] = 'application/json' + if import_path: + file_to_open = import_path + file_name + else: + file_to_open = file_name + mp_encoder = MultipartEncoder( + fields={ + "request": json.dumps( + { + "keycloak": True, + "config": True, + "licenseServers": True, + "externalNatsBrokers": True, + "results": True + } + ), + "file": (file_name, open(file_to_open, "rb"), 'application/zip') + } + ) + + customHeaders['content-type'] = mp_encoder.content_type + result = self.__sendPost(apiPath, payload=mp_encoder, customHeaders=self.headers).json() + if DEBUG:print(f"This is post {result}") + + apiPath = apiPath + "/" + str(result["id"]) + for index in range(360): + time.sleep(3) + result = self.__sendGet(apiPath, 200).json() + if result["state"] == "SUCCESS": + self.session.close() + elif result["state"] == "ERROR": + self.session.close() + raise "Import was not succesful" diff --git a/demos/security/nvidia/Statistics.py b/demos/security/nvidia/Statistics.py new file mode 100644 index 00000000..b19b4951 --- /dev/null +++ b/demos/security/nvidia/Statistics.py @@ -0,0 +1,140 @@ +import json +import os + +import pandas as pd +from tabulate import tabulate + + +class JSONObject: + def __init__(self, dict): + vars(self).update(dict) + + +def json_to_class(path): + """ + Converts a json into a class + """ + json_file = open(path) + s = json_file.read() + return json.loads(s, object_hook=JSONObject) + + +class Statistics: + criteria_message = '' + + def __init__(self, csvs_path): + """ + Takes in the path to csv folder + """ + self.csvs_path = csvs_path + self.headers = ['Condition', 'Status'] + self.table = [] + self.stats_failures = [] + self.config_type = None + self.stats = {} + self.include_baseline_file = True + for csv in os.listdir(csvs_path): + if csv.endswith(".csv"): + self.stats[csv[:-4]] = self.make_dataframe(os.path.join(csvs_path, csv)) + + def make_dataframe(self, csv_file_path): + ''' + Creates a data frame from a csv found at that path + :csv_file_path + ''' + with open(csv_file_path, encoding='utf-8') as csvf: + try: + csv = pd.read_csv(csvf) + except pd.errors.EmptyDataError: + raise Exception("{} is empty".format(csv_file_path)) + except pd.errors.ParserError: + raise Exception("{} is corupt".format(csv_file_path)) + return csv + + @staticmethod + def last(df): + df = df[df['Timestamp epoch ms'] == max(df['Timestamp epoch ms'])] + return df + + def preform_validation(self, validation_entry): + stats = self.stats + last = Statistics.last + try: + validation_ok = eval(validation_entry.condition) + except : + raise Exception("This validation is not written correctly: {}".format(validation_entry.condition)) + if validation_ok: + self.table.append([validation_entry.description, 'Pass']) + else: + self.table.append([validation_entry.description, 'Fail']) + self.stats_failures.append(validation_entry.description) + + def validate_criteria_file(self, criteria_path): + """ + Preforms specific validation for a config and decides if the baseline validation needs to be added + criteria path: path to the json criteria + """ + validator = json_to_class(criteria_path) + self.include_baseline_file = validator.include_baseline + if self.config_type['dut']: + validator = validator.DUT + else: + validator = validator.B2B + for validation_entry in validator: + self.preform_validation(validation_entry) + + def validate_baseline_file(self, criteria_path): + """ + Checks what type of profiles are present in the test and preforms general validation + criteria path: path to the json criteria + config_type: A dictionary that flags the types of profiles present inside the test + """ + validator = json_to_class(criteria_path) + if self.config_type['dut']: + validator = validator.DUT + else: + validator = validator.B2B + if self.config_type['traffic'] or self.config_type['attack']: + for validation_entry in validator.general: + self.preform_validation(validation_entry) + else: + raise Exception('The config does not have an attack or traffic profile') + if self.config_type['traffic']: + for validation_entry in validator.traffic: + self.preform_validation(validation_entry) + if self.config_type['attack']: + for validation_entry in validator.attack: + self.preform_validation(validation_entry) + + def validate_mdw_stats(self, config_type, config_path=""): + """ + Using a the criteria json and the baseline json, validates the resources returned after the run. + config_type: A dictionary that flags the types of profiles present inside the test + config_name: same name as the test that ran. + """ + + self.config_type = config_type + if os.path.exists(config_path): + config_name = os.path.basename(config_path) + print("Config: {}\n\n".format(config_name)) + criteria_path = os.path.join(config_path, 'validation.json') + print(criteria_path) + print('Running Validations for {}'.format(config_name)) + + if os.path.exists(criteria_path): + + try: + self.validate_criteria_file(criteria_path) + except AttributeError as e: + print('Criteria {} could not be applied due to: {}'.format(config_name, e)) + else: + self.include_baseline_file = True + if self.include_baseline_file: + print('Baseline validation applied') + criteria_path = os.path.join("./resources", "baseline_validation.json") + self.validate_baseline_file(criteria_path) + else: + print('Baseline validation skipped') + print(tabulate(self.table, self.headers, tablefmt="grid")) + return "; ".join(self.stats_failures) + diff --git a/demos/security/nvidia/cyperf-ipsec-config.zip b/demos/security/nvidia/cyperf-ipsec-config.zip new file mode 100644 index 0000000000000000000000000000000000000000..d0e8da8922140a8514ec5c05d8f04a8e3619a489 GIT binary patch literal 7112 zcmZ`;bxa&iw?&IfVWH4Map@L!r?^wx-CdT&-AZw%6n9wMDN>5NyVC_0C~ix;=$kchdPp5*1g?$kE zb*5ZQ4$%yHoBw56P>j!%@KPNI%DnSa+u@$AQQhZbDBS9jtY78yCz7tipP3pS7l-@w zVp&R{+-~um1SHg22XEPE?j2Io97jHT+p}lSAqT zEBjKtn7IX{m4Yp@gZ5e*{mK3d4iwD}a?i4SR}DS3g#^7!W;qdnmLx;ZN|%~hJ1sJ_ z5J8`7%$09vBcqZ+jKm9fVN((tQ75g_eVkY!#Y@2xAQ;d;P*XHw6t{wTDNE+&F2mk> zRHO3wEi{?}ml8wTtE*96R~aKy^~01O#Sn;RxnY+4cC7(ZU8$aI;XuudQ4L4XWgOCy z<2P>Kob#A5FJ0LoCmJn3Y)|s^#luuhpx=Z_udZ&`Kh06CNztslEcrtcX2n*bc;%H(sg|0Fp+n3m*0sP> zZn9L2g-Tv0FcGLXHOy}kz;f$RNkph$%CUw~_Uf}iZMDUF(0j z+F5nV`4d~ZTNgz%v6$H^%zGa&{n>UYY1TkDyiOOwGubCp=ad$@VCyE=msu{KT#1I3 z;h-t?M-21aUrY@v!pfd|mXnkNYt_hGq1$gE3S$AZ2iUNrCo+I3BJMD+Pwl;NuKDG; zQQ_I;+EgAvR9xF>w_;xRniIW~&6DrQ!>Lo;TKc7ilkKk7dg-mCm2E2`*ES0cg_{$$ zDV5U05Im;7pe3=^zmIiij=EamaTOZr9$|2JQZ(PT@#v@duJZDTO;i!Bx7S@B{T77H zgxsw2!tOi5NWl15K9pzQIL0-M!sBMR3$shpkvW5OFe3)pwlJji09kiT?KRWzu0p#7 zZ+gFvQ!~6cUe@Y`vghfIaBK+pE;oR4v&1!sSGwh}uRk(YbWUEQ(Cb1JP|nlrB-9w&h<7&+|kNyv*3k%^vr3A9P^;;{8<3QIu-iuyB?mwPY=wvG&rQ3 ztm7p|pvioPc$c?lFY`uNIdA$t@KrBU3L59X9M$t+@U^v8GGs}{og~h7AXJ>QJ}IfL zl^|mlKI!5=YLR<|YE2+!_bW+5vxarYt6n)5wPlH=>1nhhsjhlODzwCwE21(j8D z8)C^8d%xGJVu~bR9JmPG(;D?Qla0~()21=M$o|&Q=JL5(Ek;?P#J{aTB5!p&Re8=%R$|Jr%aseEw5 zwR&J<-%#6PjhL6dCOx?su}JSsDlc?PQN9Q=kjZK_I6j0p@-taqBTAgQS0Ck*ua0JA zbH``h<=^>eY^Be+m*(|0dSrWW6>1~{?_rw(5lliW7cN`JfKo$GwXuhvNLd&w;*|@I zaQ9O6b@56H#F@GE319vEH}_%qBb^l;Eq@%RPL}HMD-nw5Oo=Txk4)@HDm`!f=sC4_ z;SlsGlnbDes0jhfQ2Dn}H^#GLGL^ewgsuf_o62wA+7*)*V(S_e68ML0NMJRwM@2h~ zn-|g43!90{X4=J{En3*On_6;pOmmZOx>h-FcZ`0MsCI1=|G_+ba(l4(^kA8E<#}t{ zc1Cg5Ir)e^bU&%MOSn5W35_=tte`SGAMtj$=RL1EnbyUYctddpW|NDY&rO}J2u2Yg zoZo%t(}o@o`C37x=pqV^%kL;);oj!uAA=QqnH1=ALg`Dvw>X;LO2S&kFcY-BO+h`L zWhNCA_d564!r(KB0Hu3pv-1i|Mn@U$Ox1c@JQwAh5&hsHXmq!2Dyn*QB_iB|UI?V7 zZLo`%vaz?)!+t{=zHRX{tXS({QCFQ&P(>L+$LVNYx;{6oy*`UW+&@4%bIC)Wx0>tm z^XylD3vCLUpq_$t3sQj)fVP`}hm+xC5W3;n-@aH1Cyna#6JJi%N8D#4wQMHs`wT;gVF53^d;a`A*}* ziTdov;dI|~yxe%zvCo5F^>QL9eve+3d6}P8Tz}mlVJzdRSwunSyRoXlDxF%=Cfr+bUZ>O_ZZrQeapNaKJzcSnpf93ru*h+dgsdD-yDb8g9~iTT>1D%GW_3n_!SjqSSgDp}N*m0kMUbz(=n2nP3TVcwG- zGQIU{YNB7r1cj+V@=T+@-3dW|ln1~_BH!`Ni)-7!d;+xGKT)QOi1S%NxFuU@U3syFDKLk@bNvdjdF~gO8%(0)H{z{rJxg#;eVM`;96F8`}+3R^8us|8$ zkWcIBnLfD)t%f}R9vsxSG0l&|<|exp$ZJ$s;NB|AqZi*{MhNKX!l?yO2Jo`9dBCW1wg+bcM&6MXfqYB zSjt{vr}`*sTi!~NWLPg2;{E2y!9M4Va;ZwS1u6tTnhoV0u$=h(t5kyumwE^>8A5nL zef!oXG`^TQDJCHTjPM8A^ahV>EE)wpKDGr5EYdwvqGzgUyV<3Z=k=IKGu>|AK9!B# zemSViu@H)G7<*DhLN--p4tKAm`11 zMZ&Hzf5NTiB6?a!nM=yAX2`JkC~3g!D6ou_!l6-rFe&4YI7ZxiwO{JBT5i@!^yPqp zKyI3zs|(W<+hW~jpwP-~AgwPB#<4ZOqb+N+yZz>|;>|~iY@?-T@*uTYog5<+w^JC- zIE34%yRez=M8(uzN@4hEln>ys%0B<*^-KR2hvK8R20x6p2DAo#WH4)bJOht?xxWlK z*Fqa8ARr2j-ahJOw@*|+ACzZ1p?E3V@^UY~BmepR6e-~*jW}$@iW28zE@RI%4k2KB zLDP1Rf)aPT=s%cMM?{qkEVJl^c$&E zu8b8ZcYR>+f=~;|Z06)DF;d$wC@{M20L5s)u-g4L5|%fm2{j#Q<%vOD(`>mM$nQPU zUjZ}h9P4Sk7d9Hfko_5AnxX*JTz5ME1Ex(0SMX*#MLJY!E)Yu6wpS=9>cloI-g%tS zYEa2LY!$<;x@clNSCw@4<+#--=#Zti+T^u<@UGNZlI zh7m*}&g{L^*RBzUos@o*|p5B|BtKBJ8ayUR7^FW$$^eGZZ^Y0#Ln*$KMbO08J zQH_X2!$IT(6PR`#Vjz#1bowS9ZnG5bf>@8=hXAsd8JS_8sG0$Ck~W*foGoM*m?AXM`rjrlV+}p4lk{B{A6pD! zX0<=i&$i$#<ayojjK@b20&wX@7;q2O?o3N zg%OTr6NHpw2-PzOCK9f9L`M!T_VgO5<7*XffabngaLvb@f?nd? z{lmbsD=m-tmBe`eLI?8uk|gSPu+5D0`g~{dxAhDd3@GX<$2Izg;hrDEdT33DGe@P{ zUqTU18DSkk-zH1=79*!*JJi_!UfUIG&*+tLb<`BFO*`CU@fbqsoXguH3neG?A9X3} zZN|-J!-8U!J@v(~_JT+NkDvXkg!j`O;b! zpf?}@~_MEMG`YlYO4je=dnNn@o|79VwS|JUqMxJcZ&QrlUTbXI6&*{xuN%~ zEfMZqF2<1~J6)wUttp9AG1`gcg&R!g<ke1b!|Ge#_N5MxFMy3zXRcGHkRt@go_aZ~qtUdfqAL0<9>DaMs(nYK8x z)LJjsw>h1}#HZrh!^xO4-wfO0j&=I_JdVN8Q*i!#a^Hwk_wJ}usCUb$f~jZvla_b= zm!O5Z@0oZS%I>2~^#O+oYQ9l)x!_ctWvyyeD&+(*Cw}$8&hzvSux)atDaT`YTr2#5{g-lLD||Wl z3BG*raz6@_v|9jPKXj`p-8`bFDSc2mvPBW$EYZ`85`CfF&P0U3)(5U>P zaz!z=jR8J5d`=m9FDq@~E6t!qq{(vg5`lNvRp?$p{FZ0)MoIaw;bHZoO2&4EpV6DNZ87XdzE%&x2Pi7z^nT4Nj({J__Puu46f6_qwFGD#TzDu5Yx=4p&Yrg| zn5y;^TV&K@G^pRx;eOKQQP{6T_=OVriSJRP{*)i%_rp8Jn*)Xi+ZbZ*z-x@uHwn7} zbsf1d6L}N3$7l3y&}JZP@OP@Fj=|I7z^fvsR&qsvp$(^@Cj6&;r3p7#T|xDr8Lm^f z)(S)K*lM|Hmm6RILS4qx(>G*-uFHu72HG;c&8!?a0|*o_ zx*ZEc?5aiD`uNqZi2&6il3bT<$eho~Mx_n&_@%r!-*+^$}Cn0?@<<^_zp0)7H33 z<_=a@9~W{@FZXm~Bba~Zcg9+a6)&24jrRv@+FLa?o4FqM&bjMw`f72NJ{es;y4`WV z#?sX1jl2`THOjQv1i37`uVyugY|dRZrm|VN7SpIx!|sH7!FTid-E)i&#Ys-bct6_1 zygCbRjvIQYT)l(JT2}RJx^<&{YE(`B)COEg0(@--9MM&Btg{AKMmFtyw_FPy08b$X zcf5y5+l-5e!!BgxTrX2ysa|V{Z_`XlDZC7Sj?I2Xg?G=|Ql~p zKIpGN2vfveo)bvw9~`4Oj504UzYr)ILX=&Ou^#OIIK1Am49G~nvI8rS88e>axN#t$sHbMDAZ%-dE#y z-T$M10rkg4tD?rp`vhYL%m%?8xqxkP;cNZ`!$Hq@8s zNXPkyRqkrdF9KYDj)d?+%;KBHmu>sIHN|gtX(zOt0`zjjp6k)XmnEUjSOx?CPD0DvZ3(9AAFJ* zA6&TKDaw9mdE6Inx6R~kd%y%XGKOReFwUE^;xuuTb(apc^)7XpFLvtQ)4#BdMD3(|TOw-?dn>0MbfEp6dLKoP7gLl%Yx#Ah$!#}>L!mweCXqSg z?GR12_Cjy$!c#WuN2cQ%z0D%q>#_0mjqx>ZW}iCBO>l!Y zH-C_+=+p8u1`^WqvzihHCON~m@hUPTq`^8QB(?w7=K%c6=Ww1v6Jp1+#V2@3LA5<1}b>q@5_5||wqkO>WN4V}W& zR|0fS`Do#ecYBNr`URihUQ0+&*SAMGptdamNA;6rrO=AYk zK?#s}zo{0u9@1Zt!(Kw)9i|Si??jST}Pcn25t%q`*@0w)`-Z}R8duIFVsQRzn7hS)g>7Hf2kC#I?{rL zf`o*4@eh@dQ2= response['testDuration']: + print('Test gracefully finished') + rest.stopTime = rest._RESTasV3__getEpochTime() + return rest.stopTime + else: + raise Exception("Error! Test stopped before reaching the configured duration = {}; Elapsed = {}" + .format(response['testDuration'], response['testElapsed'])) + else: + print('Test duration = {}; Elapsed = {}'.format(response['testDuration'], response['testElapsed'])) + actual_duration += counter + collect_stats([('client-action-statistics',False),('ipsec-tunnels-total',True)]) + time.sleep(counter) + else: + print("Test did not stop after timeout {}s. Test status= {}. Force stopping the test!".format(timeout,response['status'])) + rest.stop_test() + raise Exception("Error! Test failed to stop after timeout {}s.".format(timeout)) + + def verify_stats(): + istat = rest._RESTasV3__sendGet('/api/v2/results/{}/stats/{}'.format(result_id, 'ipsec-tunnels-total'), 200, debug=False).json() + indx_f, indx_s, indx_ss = istat['columns'].index('Sessions Failed'), istat['columns'].index('Sessions Initiated'), istat['columns'].index('Sessions Succeeded') + rows = istat['snapshots'][-1]['values'] + if not (rows[0][indx_f]=='0' and rows[0][indx_s] == rows[0][indx_ss]): + rest.delete_current_session() + raise Exception("Error! Please check Ipsec Tunnels Total Statistics") + + cstat = rest._RESTasV3__sendGet('/api/v2/results/{}/stats/{}'.format(result_id, 'client-action-statistics'), 200, debug=False).json() + indx_f, indx_s, indx_ss = cstat['columns'].index('Action Failed'), cstat['columns'].index('Action Started'), cstat['columns'].index('Action Succeeded') + rows = cstat['snapshots'][-1]['values'] + + passed = True + for row in rows: + if (row[indx_f]=='0' and row[indx_s] == row[indx_ss]): + continue + else: + passed = False + if not passed: + rest.delete_current_session() + raise Exception("Error! Please check Client Action Statistics") + + + # CyPerf API test with 1 imported test config. This is where we keep all the test configs. + rest.setup(os.path.join(os.getcwd(), "demos/security/nvidia","cyperf-ipsec-config.zip")) + rest.assign_agents_by_ip(agents_ips=agents_IPs[0], network_segment=1) + rest.assign_agents_by_ip(agents_ips=agents_IPs[1], network_segment=2) + rest.set_test_duration(30) + #print("SLEEP FOR 30 MINS") + #time.sleep(180) + rest.start_test() + result_id = rest.get_test_id() + wait_test_finished() + verify_stats() + rest.delete_current_session() diff --git a/demos/security/nvidia/test_tgen_with_ipsec_demo.py b/demos/security/nvidia/test_tgen_with_ipsec_demo.py deleted file mode 100644 index 4b1d3917..00000000 --- a/demos/security/nvidia/test_tgen_with_ipsec_demo.py +++ /dev/null @@ -1,235 +0,0 @@ -import grpc -import netmiko -import pydpu -import pytest -import snappi -from passwords import * -from testbed import * - -# replace with pydpu code -from pydpu.proto.v1 import ipsec_pb2 -from pydpu.proto.v1 import ipsec_pb2_grpc - - -@pytest.fixture(scope='session') -def dpu(): - dpu_info = { - 'device_type': 'linux', - 'host': DPU_IP, - 'username': 'root', - 'password': DPU_PASSWORD - } - dpu_connect = netmiko.ConnectHandler(**dpu_info) - #prompt = dpu_connect.find_prompt() - dpu_connect.send_command('ip addr add 200.0.0.1/24 dev enp3s0f0s0') - dpu_connect.send_command('ip addr add 201.0.0.1/24 dev enp3s0f1s0') - dpu_connect.send_command('sysctl net.ipv4.ip_forward=1') - dpu_connect.send_command('iptables -P FORWARD ACCEPT') - #dpu_connect.send_command('arp -s 200.0.0.2 80:09:02:02:00:01') - dpu_connect.send_command('arp -s 201.0.0.2 80:09:02:02:00:02') - #dpu_connect.send_command('arp -s 40.0.0.1 80:09:02:02:00:40') - dpu_connect.send_command('systemctl start docker') - - command = 'docker run --rm --network host --mount src=/var/run,target=/var/run,type=bind --name opi-strongswan-bridge -d ghcr.io/opiproject/opi-strongswan-bridge:main /opi-vici-bridge -port=50151' - print(command) - output = dpu_connect.send_command(command) - print(output) - - # idealy replace as many of those ssh commands with OPI API as they become available - - yield dpu_connect - - dpu_connect.send_command('ip addr del 200.0.0.1/24 dev enp3s0f0s0') - dpu_connect.send_command('ip addr del 201.0.0.1/24 dev enp3s0f1s0') - dpu_connect.send_command('docker stop opi-strongswan-bridge') - dpu_connect.send_command('ip neigh flush 200.0.0.0/8') - dpu_connect.send_command('ip neigh flush 201.0.0.0/8') - - -@pytest.fixture(scope='session') -def server(): - server_info = { - 'device_type': 'linux', - 'host': SERVER_IP, - 'username': 'root', - 'password': SERVER_PASSWORD - } - server_connect = netmiko.ConnectHandler(**server_info) - - server_connect.send_command('ip addr add 200.0.0.2/24 dev enp5s0f0np0') - - - command = 'docker compose -f /root/opi/tgen/deployment/ipsec2.yml up -d' - print(command) - output = server_connect.send_command(command, read_timeout=30) - print(output) - - command = 'docker exec -i vpn-client-root bash -c "swanctl --load-all"' - print(command) - output = server_connect.send_command(command) - print(output) - - yield server_connect - - command = 'docker compose -f /root/opi/tgen/deployment/ipsec2.yml down' - print(command) - output = server_connect.send_command(command) - print(output) - - server_connect.send_command('ip addr del 200.0.0.2/24 dev enp5s0f0np0') - - -def test_server_to_server_via_ipsec_and_dpu(dpu, server): - - print('connecting to opi') - channel = grpc.insecure_channel('10.36.78.168:50151') - stub = ipsec_pb2_grpc.IPsecStub(channel) - - stub.IPsecVersion(ipsec_pb2.IPsecVersionReq()) - - stub.IPsecStats(ipsec_pb2.IPsecStatsReq()) - - print('configuring the tunnel') - tun1_0_0 = ipsec_pb2.IPsecLoadConnReq( - connection=ipsec_pb2.Connection( - name='tun1_0_0', - version='2', - local_addrs=[ipsec_pb2.Addrs(addr='200.0.0.1')], - remote_addrs=[ipsec_pb2.Addrs(addr='200.0.0.2')], - local_auth=ipsec_pb2.LocalAuth( - auth=ipsec_pb2.PSK, - id='200.0.0.1' - ), - remote_auth=ipsec_pb2.RemoteAuth( - auth=ipsec_pb2.PSK, - id='200.0.0.2' - ), - children=[ipsec_pb2.Child( - name='tun1_0_0', - esp_proposals=ipsec_pb2.Proposals( - crypto_alg=[ipsec_pb2.AES128], - integ_alg=[ipsec_pb2.SHA1] - ), - remote_ts=ipsec_pb2.TrafficSelectors( - ts=[ipsec_pb2.TrafficSelectors.TrafficSelector( - cidr='40.0.0.0/24' - )] - ), - local_ts=ipsec_pb2.TrafficSelectors( - ts=[ipsec_pb2.TrafficSelectors.TrafficSelector( - cidr='201.0.0.0/24' - )] - ), - )], - proposals=ipsec_pb2.Proposals( - crypto_alg=[ipsec_pb2.AES128], - integ_alg=[ipsec_pb2.SHA384], - dhgroups=[ipsec_pb2.MODP1536] - ) - ) - ) - - connection_1 = stub.IPsecLoadConn(tun1_0_0) - - command = 'docker exec -i vpn-client-root bash -c "swanctl --initiate --child tun1_0_0"' - print(command) - output = server.send_command(command) - print(output) - - tgen = snappi.api(location=f'https://{SERVER_IP}', verify=False) - cfg = tgen.config() - p1 = cfg.ports.port(name='server:p1', location=f'{SERVER_IP}:5555')[-1] - p2 = cfg.ports.port(name='server:p2', location=f'{SERVER_IP}:5556')[-1] - - # add layer 1 property to configure same speed on both ports - ly = cfg.layer1.layer1(name='ly')[-1] - ly.port_names = [p1.name, p2.name] - ly.speed = ly.SPEED_1_GBPS - - # enable packet capture on both ports - cp = cfg.captures.capture(name='cp')[-1] - cp.port_names = [p1.name, p2.name] - - # flow1 = cfg.flows.flow(name='flow server:p1 -> server:p2')[-1] - # flow1.tx_rx.port.tx_name = p1.name - # flow1.tx_rx.port.rx_name = p2.name - # flow1.size.fixed = 256 - # flow1.duration.fixed_packets.packets = 1000 - # flow1.rate.pps = 1000 - # eth2, ip2, udp2 = flow1.packet.ethernet().ipv4().udp() - # eth2.src.value = '00:1B:6E:00:00:01' - # eth2.dst.value = '02:6a:82:5d:da:2c' - # ip2.src.value = '40.0.0.1' - # ip2.dst.value = '201.0.0.2' - # inc2 = udp2.src_port.increment - # inc2.start, inc2.step, inc2.count = 6000, 4, 10 - # udp2.dst_port.values = [8000, 8044, 8060, 8074, 8082, 8084] - - flow2 = cfg.flows.flow(name='flow server:p2 -> server:p1')[-1] - flow2.tx_rx.port.tx_name = p2.name - flow2.tx_rx.port.rx_name = p1.name - flow2.size.fixed = 256 - flow2.duration.fixed_packets.packets = 1000 - flow2.rate.pps = 1000 - eth2, ip2, udp2 = flow2.packet.ethernet().ipv4().udp() - eth2.src.value = '00:1B:6E:00:00:02' - eth2.dst.value = '02:6a:82:5d:da:2c' - ip2.src.value = '201.0.0.2' - ip2.dst.value = '40.0.0.1' - inc2 = udp2.src_port.increment - inc2.start, inc2.step, inc2.count = 6000, 4, 10 - udp2.dst_port.values = [8000, 8044, 8060, 8074, 8082, 8084] - - print('Pushing traffic configuration ...') - tgen.set_config(cfg) - - print('Starting transmit on all configured flows ...') - ts = tgen.transmit_state() - ts.state = ts.START - tgen.set_transmit_state(ts) - - # import pdb - # pdb.set_trace() - - print('Checking metrics on all configured ports ...') - print('Expected\tTotal Tx\tTotal Rx') - assert wait_for(lambda: metrics_ok(tgen)), 'Metrics validation failed!' - - -def metrics_ok(api): - # create a port metrics request and filter based on port names - cfg = api.get_config() - - req = api.metrics_request() - req.port.port_names = [p.name for p in cfg.ports] - #import pdb;pdb.set_trace() - # include only sent and received packet counts - req.port.column_names = [req.port.FRAMES_TX, req.port.FRAMES_RX] - # fetch port metrics - res = api.get_metrics(req) - # calculate total frames sent and received across all configured ports - total_tx = sum([m.frames_tx for m in res.port_metrics]) - total_rx = sum([m.frames_rx for m in res.port_metrics]) - expected = sum([f.duration.fixed_packets.packets for f in cfg.flows]) - - print('%d\t\t%d\t\t%d' % (expected, total_tx, total_rx)) - - return expected == total_tx and total_rx >= expected - - -def wait_for(func, timeout=10, interval=0.2): - ''' - Keeps calling the `func` until it returns true or `timeout` occurs - every `interval` seconds. - ''' - import time - - start = time.time() - - while time.time() - start <= timeout: - if func(): - return True - time.sleep(interval) - - print('Timeout occurred !') - return False diff --git a/demos/testbed.py b/demos/testbed.py index f35d0864..c547c4f6 100644 --- a/demos/testbed.py +++ b/demos/testbed.py @@ -22,5 +22,9 @@ # TGEN SERVER 1 TGEN1_IP = '172.22.1.100' + TGEN1_INTERFACES = ['ens1f0np0', 'ens1f1np1'] #TGEN1_INTERFACES = ['enp138s0f0np0', 'enp138s0f1np1'] + +# CYPERF SERVER +CYPERF_IP = '172.22.222.9'