-
Notifications
You must be signed in to change notification settings - Fork 9
/
testTypeIperf.py
182 lines (138 loc) · 5.91 KB
/
testTypeIperf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
import logging
import json
import task
from collections.abc import Mapping
from dataclasses import dataclass
from typing import Any
import tftbase
from task import ClientTask
from task import ServerTask
from task import TaskOperation
from testSettings import TestSettings
from testType import TestTypeHandler
from tftbase import BaseOutput
from tftbase import Bitrate
from tftbase import FlowTestOutput
from tftbase import TestType
logger = logging.getLogger("tft." + __name__)
IPERF_EXE = "iperf3"
IPERF_UDP_OPT = "-u -b 25G"
IPERF_REV_OPT = "-R"
def _calculate_gbps_tcp(result: Mapping[str, Any]) -> Bitrate:
sum_sent = result["end"]["sum_sent"]
sum_received = result["end"]["sum_received"]
bitrate_sent = sum_sent["bits_per_second"] / 1e9
bitrate_received = sum_received["bits_per_second"] / 1e9
return Bitrate(tx=float(f"{bitrate_sent:.5g}"), rx=float(f"{bitrate_received:.5g}"))
def _calculate_gbps_udp(result: Mapping[str, Any]) -> Bitrate:
sum_data = result["end"]["sum"]
# UDP tests only have sender traffic
bitrate_sent = sum_data["bits_per_second"] / 1e9
return Bitrate(tx=float(f"{bitrate_sent:.5g}"), rx=float(f"{bitrate_sent:.5g}"))
def _calculate_gbps(test_type: TestType, result: Mapping[str, Any]) -> Bitrate:
if test_type == TestType.IPERF_TCP:
return _calculate_gbps_tcp(result)
return _calculate_gbps_udp(result)
@dataclass(frozen=True)
class TestTypeHandlerIperf(TestTypeHandler):
def _create_server_client(self, ts: TestSettings) -> tuple[ServerTask, ClientTask]:
s = IperfServer(ts=ts)
c = IperfClient(ts=ts, server=s)
return (s, c)
def can_run_reverse(self) -> bool:
if self.test_type == TestType.IPERF_TCP:
return True
return False
TestTypeHandler.register_test_type(TestTypeHandlerIperf(TestType.IPERF_TCP))
TestTypeHandler.register_test_type(TestTypeHandlerIperf(TestType.IPERF_UDP))
class IperfServer(task.ServerTask):
def get_template_args(self) -> dict[str, str | list[str]]:
extra_args: dict[str, str | list[str]] = {}
if self.exec_persistent:
extra_args["args"] = [IPERF_EXE, "-s", "-p", f"{self.port}"]
return {
**super().get_template_args(),
**extra_args,
}
def _create_setup_operation_get_thread_action_cmd(self) -> str:
return f"{IPERF_EXE} -s -p {self.port} --one-off --json"
def _create_setup_operation_get_cancel_action_cmd(self) -> str:
return f"killall {IPERF_EXE}"
class IperfClient(task.ClientTask):
def _create_task_operation(self) -> TaskOperation:
server_ip = self.get_target_ip()
cmd = (
f"{IPERF_EXE} -c {server_ip} -p {self.port} --json -t {self.get_duration()}"
)
if self.test_type == TestType.IPERF_UDP:
cmd += f" {IPERF_UDP_OPT}"
if self.reverse:
cmd += f" {IPERF_REV_OPT}"
def _thread_action() -> BaseOutput:
self.ts.clmo_barrier.wait()
r = self.run_oc_exec(cmd)
self.ts.event_client_finished.set()
parsed_data: dict[str, Any] = {}
success_result = False
if r.success:
data = r.out
try:
parsed_data = json.loads(data)
except Exception:
pass
if parsed_data and "error" not in parsed_data:
success_result = True
try:
bitrate_gbps = _calculate_gbps(self.test_type, parsed_data)
except Exception:
success_result = False
bitrate_gbps = Bitrate.NA
return FlowTestOutput(
success=success_result,
tft_metadata=self.ts.get_test_metadata(),
command=cmd,
result=parsed_data,
bitrate_gbps=bitrate_gbps,
)
return TaskOperation(
log_name=self.log_name,
thread_action=_thread_action,
)
def _aggregate_output(
self,
result: tftbase.AggregatableOutput,
tft_result_builder: tftbase.TftResultBuilder,
) -> None:
assert isinstance(result, FlowTestOutput)
if self.test_type == TestType.IPERF_TCP:
self.print_tcp_results(result.result)
if self.test_type == TestType.IPERF_UDP:
self.print_udp_results(result.result)
def print_tcp_results(self, data: Mapping[str, Any]) -> None:
sum_sent = data["end"]["sum_sent"]
sum_received = data["end"]["sum_received"]
transfer_sent = sum_sent["bytes"] / (1024**3)
bitrate_sent = sum_sent["bits_per_second"] / 1e9
transfer_received = sum_received["bytes"] / (1024**3)
bitrate_received = sum_received["bits_per_second"] / 1e9
mss = data["start"]["tcp_mss_default"]
logger.info(
f"\n [ ID] Interval Transfer Bitrate\n"
f" [SENT] 0.00-{sum_sent['seconds']:.2f} sec {transfer_sent:.2f} GBytes {bitrate_sent:.2f} Gbits/sec sender\n"
f" [REC] 0.00-{sum_received['seconds']:.2f} sec {transfer_received:.2f} GBytes {bitrate_received:.2f} Gbits/sec receiver\n"
f" MSS = {mss}"
)
def print_udp_results(self, data: Mapping[str, Any]) -> None:
sum_data = data["end"]["sum"]
total_gigabytes = sum_data["bytes"] / (1024**3)
average_gigabitrate = sum_data["bits_per_second"] / 1e9
average_jitter = sum_data["jitter_ms"]
total_lost_packets = sum_data["lost_packets"]
total_lost_percent = sum_data["lost_percent"]
logger.info(
f"\n Total GBytes: {total_gigabytes:.4f} GBytes\n"
f" Average Bitrate: {average_gigabitrate:.2f} Gbits/s\n"
f" Average Jitter: {average_jitter:.9f} ms\n"
f" Total Lost Packets: {total_lost_packets}\n"
f" Total Lost Percent: {total_lost_percent:.2f}%"
)