Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1.0.1] Test: Rewrite P2P sync throttle test #708

Merged
merged 3 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion tests/TestHarness/Node.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,10 +506,33 @@ def unstartedFile(nodeId):
Utils.errorExit("Cannot find unstarted node since %s file does not exist" % startFile)
return startFile

def launchUnstarted(self):
def launchUnstarted(self, waitForAlive=True):
Utils.Print("launchUnstarted cmd: %s" % (self.cmd))
self.popenProc = self.launchCmd(self.cmd, self.data_dir, self.launch_time)

if not waitForAlive:
return

def isNodeAlive():
"""wait for node to be responsive."""
try:
return True if self.checkPulse() else False
except (TypeError) as _:
pass
return False

isAlive=Utils.waitForBool(isNodeAlive)

if isAlive:
if Utils.Debug: Utils.Print("Node launch was successful.")
else:
Utils.Print("ERROR: Node launch Failed.")
# Ensure the node process is really killed
if self.popenProc:
self.popenProc.send_signal(signal.SIGTERM)
self.popenProc.wait()
self.pid=None

def launchCmd(self, cmd: List[str], data_dir: Path, launch_time: str):
dd = data_dir
out = dd / 'stdout.txt'
Expand Down
4 changes: 3 additions & 1 deletion tests/gelf_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ def gelfServer(stop):

data_dir = Path(Utils.getNodeDataDir(node_id))
config_dir = Path(Utils.getNodeConfigDir(node_id))
# It is good to have at least one integration test that does not use eosio::chain_api_plugin or eosio::http_plugin
start_nodeos_cmd = shlex.split(f"{Utils.EosServerPath} -e -p eosio --data-dir={data_dir} --config-dir={config_dir}")
if os.path.exists(data_dir):
shutil.rmtree(data_dir)
Expand All @@ -110,7 +111,8 @@ def cleanup():

t1.start()

nodeos.launchUnstarted()
# waitForAlive=False since isNodeAlive depends on get_info of chain_api_plugin
nodeos.launchUnstarted(waitForAlive=False)
time.sleep(nodeos_run_time_in_sec)
finally:
cleanup()
Expand Down
156 changes: 33 additions & 123 deletions tests/p2p_sync_throttle_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
delay=args.d
debug=args.v
prod_count = 2
total_nodes=4
total_nodes=5
activateIF=args.activate_if
dumpErrorDetails=args.dump_error_details

Expand All @@ -42,13 +42,6 @@
cluster=Cluster(unshared=args.unshared, keepRunning=args.leave_running, keepLogs=args.keep_logs)
walletMgr=WalletMgr(True)

def extractPrometheusMetric(connID: str, metric: str, text: str):
searchStr = f'nodeos_p2p_{metric}{{connid="{connID}"}} '
begin = text.find(searchStr) + len(searchStr)
return int(text[begin:text.find('\n', begin)])

prometheusHostPortPattern = re.compile(r'^nodeos_p2p_port.connid="([a-f0-9]*)". ([0-9]*)', re.MULTILINE)

try:
TestHelper.printSystemInfo("BEGIN")

Expand All @@ -57,12 +50,14 @@ def extractPrometheusMetric(connID: str, metric: str, text: str):
Print(f'producing nodes: {pnodes}, delay between nodes launch: {delay} second{"s" if delay != 1 else ""}')

Print("Stand up cluster")
extraNodeosArgs = '--plugin eosio::prometheus_plugin --connection-cleanup-period 3'
# Custom topology is a line of singly connected nodes from the highest node number in sequence to lowest,
# the reverse of the usual TestHarness line topology.
if cluster.launch(pnodes=pnodes, unstartedNodes=2, totalNodes=total_nodes, prodCount=prod_count,
topo='./tests/p2p_sync_throttle_test_shape.json', delay=delay, activateIF=activateIF,
extraNodeosArgs=extraNodeosArgs) is False:
# Custom topology:
# prodNode <-> nonProdNode
# <-> throttlingNode <-> throttledNode
# <-> unThrottledNode
#
# Compare the sync time of throttledNode and unThrottledNode
if cluster.launch(pnodes=pnodes, unstartedNodes=3, totalNodes=total_nodes, prodCount=prod_count,
topo='./tests/p2p_sync_throttle_test_shape.json', delay=delay, activateIF=activateIF) is False:
errorExit("Failed to stand up eos cluster.")

prodNode = cluster.getNode(0)
Expand Down Expand Up @@ -109,128 +104,43 @@ def extractPrometheusMetric(connID: str, metric: str, text: str):
throttleListenAddr = throttlingNode.cmd[i+1]
# Using 40 Kilobytes per second to allow syncing of ~250 transaction blocks at ~175 bytes per transaction
# (250*175=43750 per block or 87500 per second)
# resulting from the trx generators in a reasonable amount of time, while still being able to capture
# throttling state within the Prometheus update window (3 seconds in this test).
# resulting from the trx generators in a reasonable amount of time
throttlingNode.cmd[i+1] = throttlingNode.cmd[i+1] + ':40KB/s'
throttleListenIP, throttleListenPort = throttleListenAddr.split(':')
throttlingNode.cmd.append('--p2p-listen-endpoint')
throttlingNode.cmd.append(f'{throttleListenIP}:{int(throttleListenPort)+100}:1TB/s')

cluster.biosNode.kill(signal.SIGTERM)

Print("Launch throttling node")
cluster.launchUnstarted(1)

assert throttlingNode.verifyAlive(), "throttling node did not launch"

# Throttling node was offline during block generation and once online receives blocks as fast as possible
assert throttlingNode.waitForBlock(endLargeBlocksHeadBlock), f'wait for block {endLargeBlocksHeadBlock} on throttled node timed out'

Print("Launch throttled and un-throttled nodes")
clusterStart = time.time()
cluster.launchUnstarted(2)

errorLimit = 40 # Approximately 20 retries required
throttledNode = cluster.getNode(3)
throttledNodeConnId = None
throttlingNodeConnId = None
while errorLimit > 0:
try:
response = throttlingNode.processUrllibRequest('prometheus', 'metrics', returnType=ReturnType.raw, printReturnLimit=16).decode()
except urllib.error.URLError:
# catch ConnectionRefusedEror waiting for node to finish startup and respond
errorLimit -= 1
time.sleep(0.5)
continue
else:
if len(response) < 100:
# tolerate HTTPError as well (method returns only the exception code)
errorLimit -= 1
time.sleep(0.5)
continue
connPorts = prometheusHostPortPattern.findall(response)
Print(connPorts)
if len(connPorts) < 3:
# wait for node to be connected
errorLimit -= 1
time.sleep(0.5)
continue
Print('Throttling Node Start State')
throttlingNodePortMap = {port: id for id, port in connPorts if port != '0' and port != '9877'}
throttlingNodeConnId = next(iter(throttlingNodePortMap.values())) # 9879
startSyncThrottlingBytesSent = extractPrometheusMetric(throttlingNodeConnId,
'block_sync_bytes_sent',
response)
startSyncThrottlingState = extractPrometheusMetric(throttlingNodeConnId,
'block_sync_throttling',
response)
Print(f'Start sync throttling bytes sent: {startSyncThrottlingBytesSent}')
Print(f'Start sync throttling node throttling: {"True" if startSyncThrottlingState else "False"}')
if time.time() > clusterStart + 30: errorExit('Timed out')
break
else:
errorExit('Exceeded error retry limit waiting for throttling node')

errorLimit = 40 # Few if any retries required but for consistency...
while errorLimit > 0:
try:
response = throttledNode.processUrllibRequest('prometheus', 'metrics', returnType=ReturnType.raw, printReturnLimit=16).decode()
except urllib.error.URLError:
# catch ConnectionRefusedError waiting for node to finish startup and respond
errorLimit -= 1
time.sleep(0.5)
continue
else:
if len(response) < 100:
# tolerate HTTPError as well (method returns only the exception code)
errorLimit -= 1
time.sleep(0.5)
continue
connPorts = prometheusHostPortPattern.findall(response)
Print(connPorts)
if len(connPorts) < 2:
# wait for sending node to be connected
errorLimit -= 1
time.sleep(0.5)
continue
Print('Throttled Node Start State')
throttledNodePortMap = {port: id for id, port in connPorts if port != '0'}
throttledNodeConnId = next(iter(throttledNodePortMap.values())) # 9878
Print(throttledNodeConnId)
startSyncThrottledBytesReceived = extractPrometheusMetric(throttledNodeConnId,
'block_sync_bytes_received',
response)
Print(f'Start sync throttled bytes received: {startSyncThrottledBytesReceived}')
break
else:
errorExit('Exceeded error retry limit waiting for throttled node')

# Throttling node was offline during block generation and once online receives blocks as fast as possible while
# transmitting blocks to the next node in line at the above throttle setting.
assert throttlingNode.waitForBlock(endLargeBlocksHeadBlock), f'wait for block {endLargeBlocksHeadBlock} on throttled node timed out'
endThrottlingSync = time.time()
response = throttlingNode.processUrllibRequest('prometheus', 'metrics', exitOnError=True, returnType=ReturnType.raw, printReturnLimit=16).decode()
Print('Throttling Node End State')
endSyncThrottlingBytesSent = extractPrometheusMetric(throttlingNodeConnId,
'block_sync_bytes_sent',
response)
Print(f'End sync throttling bytes sent: {endSyncThrottlingBytesSent}')
assert throttledNode.waitForBlock(beginLargeBlocksHeadBlock, timeout=120), f'Wait for begin block {beginLargeBlocksHeadBlock} on throttled sync node timed out'
# Throttled node is connecting to a listen port with a block sync throttle applied so it will receive
# blocks more slowly during syncing than an unthrottled node.
wasThrottled = False
while time.time() < endThrottlingSync + 30:
response = throttlingNode.processUrllibRequest('prometheus', 'metrics', exitOnError=True,
returnType=ReturnType.raw, printReturnLimit=16).decode()
throttledState = extractPrometheusMetric(throttlingNodeConnId,
'block_sync_throttling',
response)
if throttledState:
wasThrottled = True
break
assert throttledNode.waitForBlock(endLargeBlocksHeadBlock, timeout=120), f'Wait for block {endLargeBlocksHeadBlock} on sync node timed out'
unThrottledNode = cluster.getNode(4)
assert throttledNode.verifyAlive(), "throttled node did not launch"
assert unThrottledNode.verifyAlive(), "un-throttled node did not launch"

assert unThrottledNode.waitForBlock(endLargeBlocksHeadBlock), f'wait for block {endLargeBlocksHeadBlock} on un-throttled node timed out'
endUnThrottledSync = time.time()

assert throttledNode.waitForBlock(endLargeBlocksHeadBlock, timeout=120), f'Wait for block {endLargeBlocksHeadBlock} on throttled node timed out'
endThrottledSync = time.time()
response = throttledNode.processUrllibRequest('prometheus', 'metrics', exitOnError=True, returnType=ReturnType.raw, printReturnLimit=16).decode()
Print('Throttled Node End State')
endSyncThrottledBytesReceived = extractPrometheusMetric(throttledNodeConnId,
'block_sync_bytes_received',
response)
Print(f'End sync throttled bytes received: {endSyncThrottledBytesReceived}')
throttlingElapsed = endThrottlingSync - clusterStart

throttledElapsed = endThrottledSync - clusterStart
Print(f'Unthrottled sync time: {throttlingElapsed} seconds')
unThrottledElapsed = endUnThrottledSync - clusterStart
Print(f'Un-throttled sync time: {unThrottledElapsed} seconds')
Print(f'Throttled sync time: {throttledElapsed} seconds')
assert wasThrottled, 'Throttling node never reported throttling its transmission rate'

assert throttledElapsed > 2 * unThrottledElapsed, f'Throttled node did not sync slower {throttledElapsed} <= {2 * unThrottledElapsed}'

testSuccessful=True
finally:
Expand Down
Loading