From 656c7f54812d635c4b0b82088101cc752d5bc356 Mon Sep 17 00:00:00 2001 From: nikurt <86772482+nikurt@users.noreply.github.com> Date: Thu, 19 Oct 2023 11:12:03 +0200 Subject: [PATCH] no stress (#9968) As we concluded that nobody understands how to interpret stress tests, I suggest to delete them. Recreated #9701 with better history. --- nightly/pytest-stress.txt | 28 - nightly/pytest.txt | 1 - .../100_node_block_production.py | 39 - pytest/tests/stress/hundred_nodes/README.md | 64 -- .../stress/hundred_nodes/block_chunks.py | 42 - .../stress/hundred_nodes/collect_logs.py | 41 - .../hundred_nodes/create_gcloud_image.py | 70 -- .../stress/hundred_nodes/node_rotation.py | 168 ---- .../stress/hundred_nodes/start_100_nodes.py | 271 ------- .../tests/stress/hundred_nodes/watch_fork.py | 33 - pytest/tests/stress/network_stress.py | 69 -- pytest/tests/stress/saturate_routing_table.py | 122 --- pytest/tests/stress/stress.py | 754 ------------------ 13 files changed, 1702 deletions(-) delete mode 100644 nightly/pytest-stress.txt delete mode 100755 pytest/tests/stress/hundred_nodes/100_node_block_production.py delete mode 100644 pytest/tests/stress/hundred_nodes/README.md delete mode 100755 pytest/tests/stress/hundred_nodes/block_chunks.py delete mode 100755 pytest/tests/stress/hundred_nodes/collect_logs.py delete mode 100755 pytest/tests/stress/hundred_nodes/create_gcloud_image.py delete mode 100755 pytest/tests/stress/hundred_nodes/node_rotation.py delete mode 100755 pytest/tests/stress/hundred_nodes/start_100_nodes.py delete mode 100755 pytest/tests/stress/hundred_nodes/watch_fork.py delete mode 100755 pytest/tests/stress/network_stress.py delete mode 100755 pytest/tests/stress/saturate_routing_table.py delete mode 100755 pytest/tests/stress/stress.py diff --git a/nightly/pytest-stress.txt b/nightly/pytest-stress.txt deleted file mode 100644 index 4c5ca243307..00000000000 --- a/nightly/pytest-stress.txt +++ /dev/null @@ -1,28 +0,0 @@ -# python stress tests -pytest --timeout=2000 stress/stress.py 3 3 3 0 staking transactions -pytest --timeout=2000 stress/stress.py 3 3 3 0 staking transactions --features nightly -pytest --timeout=2000 stress/stress.py 3 3 3 0 staking transactions local_network -pytest --timeout=2000 stress/stress.py 3 3 3 0 staking transactions local_network --features nightly -# pytest --timeout=2000 stress/stress.py 3 3 3 0 staking transactions local_network packets_drop -# pytest --timeout=2000 stress/stress.py 3 3 3 0 staking transactions local_network packets_drop --features nightly - -# TODO(#9090): Node restarts often fail due to the DB having the LOCK file. -# pytest --timeout=2000 stress/stress.py 3 3 3 0 staking transactions node_restart -# pytest --timeout=2000 stress/stress.py 3 3 3 0 staking transactions node_restart --features nightly -# pytest --timeout=2000 stress/stress.py 3 3 3 0 staking transactions node_restart packets_drop -# pytest --timeout=2000 stress/stress.py 3 3 3 0 staking transactions node_restart packets_drop --features nightly - -# pytest --timeout=2000 stress/stress.py 3 3 3 0 staking transactions node_restart wipe_data -# pytest --timeout=2000 stress/stress.py 3 3 3 0 staking transactions node_restart wipe_data --features nightly -# pytest --timeout=2000 stress/stress.py 3 3 3 0 staking transactions node_restart packets_drop wipe_data -# pytest --timeout=2000 stress/stress.py 3 3 3 0 staking transactions node_restart packets_drop wipe_data --features nightly -# pytest --timeout=2000 stress/stress.py 3 2 4 0 staking transactions node_set -# pytest --timeout=2000 stress/stress.py 3 2 4 0 staking transactions node_set --features nightly - -pytest --timeout=300 stress/saturate_routing_table.py -pytest --timeout=300 stress/saturate_routing_table.py --features nightly - -# TODO(#4618): Those tests are currently broken. Comment out while we’re -# working on a fix / deciding whether to remove them. -# pytest stress/network_stress.py -# pytest stress/network_stress.py --features nightly diff --git a/nightly/pytest.txt b/nightly/pytest.txt index 052071b643e..254455786fe 100644 --- a/nightly/pytest.txt +++ b/nightly/pytest.txt @@ -2,4 +2,3 @@ ./pytest-contracts.txt ./pytest-sanity.txt ./pytest-spec.txt -./pytest-stress.txt diff --git a/pytest/tests/stress/hundred_nodes/100_node_block_production.py b/pytest/tests/stress/hundred_nodes/100_node_block_production.py deleted file mode 100755 index 49eafa80ea7..00000000000 --- a/pytest/tests/stress/hundred_nodes/100_node_block_production.py +++ /dev/null @@ -1,39 +0,0 @@ -#!/usr/bin/env python3 -import sys, time -import subprocess -from rc import pmap -import pathlib - -sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'lib')) - -from cluster import GCloudNode, RpcNode -from configured_logger import new_logger -from utils import chain_query - - -def print_chain_data(block, logger): - all_height = list(map(lambda b: b['height_included'], block['chunks'])) - all_catch_up = False - if all(map(lambda h: h == block['header']['height'], all_height)): - all_catch_up = True - logger.info( - f"{block['header']['hash']} {block['header']['height']} {block['header']['approvals']} {all_catch_up} {all_height}" - ) - - -subprocess.run('mkdir -p /tmp/100_node/', shell=True) - -f = [] -for node in range(100): - f.append(new_logger(outfile=f'/tmp/100_node/pytest-node-{node}.txt')) - - -def query_node(i): - node = GCloudNode(f'pytest-node-{i}') - chain_query(node, lambda b: print_chain_data(b, f[i]), max_blocks=20) - - -pmap(query_node, range(100)) - -# node = RpcNode('localhost', 3030) -# chain_query(node, print_chain_data, max_blocks=20) diff --git a/pytest/tests/stress/hundred_nodes/README.md b/pytest/tests/stress/hundred_nodes/README.md deleted file mode 100644 index e5b66717769..00000000000 --- a/pytest/tests/stress/hundred_nodes/README.md +++ /dev/null @@ -1,64 +0,0 @@ -# Steps to run hundred node tests - -## Prerequisites - -- Have a [gcloud cli installed](https://cloud.google.com/sdk/install), [being logged in](https://cloud.google.com/sdk/gcloud/reference/auth/login), default project set. -(If you can `gcloud compute instances list` and your account can create gcloud instances then it's good) -- python3, virtualenv, pip3 -- Locally compile these packages (used for create config, keys and genesis) - -```bash -cargo build -p neard --release -cargo build -p genesis-csv-to-json --release -cargo build -p keypair-generator --release -``` - -## Steps - -1. Install python dependencies: - - ```bash - sudo apt install python3-dev - cd pytest - virtualenv venv -p `which python3` # First time only - . venv/bin/activate - pip install -r requirements.txt - ``` - - Note: You need python3.6 or greater. - -2. Create a gcloud (vm disk) image that has compiled near binary - - ```bash - # This will build from current branch, image name as near--YYYYMMDD-, cargo build -p near --release - python tests/stress/hundred_nodes/create_gcloud_image.py - # If you want different branch, image name or additional flags passed to cargo - python tests/stress/hundred_nodes/create_gcloud_image image_name branch 'additional flags' - ``` - -3. Start hundred nodes - - ```bash - # will use the near--YYYYMMDD- image, instance name will be pytest-node--0 to 99 - python tests/stress/hundred_nodes/start_100_nodes.py - # If you have a different image name, or want different instance name - ... start_100_nodes.py image_name instance_name_prefix - ``` - - Nodes are running after this step - -4. Access every node - - ```bash - gcloud compute ssh pytest-node- - tmux a - ``` - -## Clean up - -- Logs are stored in each instance in `/tmp/python-rc.log`. You can collect them all by `tests/stress/hundred_nodes/collect_logs.py`. -- Delete all instances quickly with `tests/delete_remote_nodes.py [prefix]` - -## Some notes - -If you have volatile or slow ssh access to gcloud instances, these scripts can fail at any step. I recommend create an instance on digitalocean, mosh to digitalocean instance (mosh is reliable), running all pytest script there (access gcloud from digital ocean is fast). In the reliable office network or mosh-digitalocean over an unreliable network, scripts never failed. diff --git a/pytest/tests/stress/hundred_nodes/block_chunks.py b/pytest/tests/stress/hundred_nodes/block_chunks.py deleted file mode 100755 index b23ef8e5298..00000000000 --- a/pytest/tests/stress/hundred_nodes/block_chunks.py +++ /dev/null @@ -1,42 +0,0 @@ -#!/usr/bin/env python3 -import sys, time -import subprocess -from rc import pmap -import pathlib - -sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'lib')) - -from cluster import GCloudNode, RpcNode -from configured_logger import new_logger -from utils import chain_query - - -def print_chain_data(block, logger): - chunks = [] - for c in block['chunks']: - chunks.append( - f'{c["chunk_hash"]} {c["shard_id"]} {c["height_created"]} {c["height_included"]}' - ) - logger.info( - f"{block['header']['height']} {block['header']['hash']} {','.join(chunks)}" - ) - - -subprocess.run('mkdir -p /tmp/100_node/', shell=True) - -f = [] -for node in range(100): - f.append(new_logger(outfile=f'/tmp/100_node/pytest-node-{node}.txt')) - - -def query_node(i): - node = GCloudNode(f'pytest-node-{i}') - chain_query(node, lambda b: print_chain_data(b, f[i]), max_blocks=20) - - -# pmap(query_node, range(100)) - -node = GCloudNode('pytest-node-0') -chain_query(node, - print_chain_data, - block_hash='9rnC5G6qDpXgT4gTG4znowmdSUavC1etuV99F18ByxxK') diff --git a/pytest/tests/stress/hundred_nodes/collect_logs.py b/pytest/tests/stress/hundred_nodes/collect_logs.py deleted file mode 100755 index 74755cd7432..00000000000 --- a/pytest/tests/stress/hundred_nodes/collect_logs.py +++ /dev/null @@ -1,41 +0,0 @@ -#!/usr/bin/env python3 -from rc import gcloud, pmap -from distutils.util import strtobool -import sys -import datetime -import pathlib -import tempfile -import pathlib - -sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'lib')) -from configured_logger import logger -from utils import user_name - -if len(sys.argv) >= 2: - node_prefix = sys.argv[1] -else: - node_prefix = f'pytest-node-{user_name()}' -nodes = [ - machine for machine in gcloud.list() if machine.name.startswith(node_prefix) -] - -if len(sys.argv) >= 3: - log_file = sys.argv[2] -else: - log_file = pathlib.Path(tempfile.gettempdir()) / 'python-rc.log' - -collected_place = ( - pathlib.Path(tempfile.gettempdir()) / 'near' / - f'collected_logs_{datetime.datetime.strftime(datetime.datetime.now(),"%Y%m%d")}' -) -collected_place.mkdir(parents=True, exist_ok=True) - - -def collect_file(node): - logger.info(f'Download file from {node.name}') - node.download(str(log_file), str(collected_place / f'{node.name}.txt')) - logger.info(f'Download file from {node.name} finished') - - -pmap(collect_file, nodes) -logger.info(f'All download finish, log collected at {collected_place}') diff --git a/pytest/tests/stress/hundred_nodes/create_gcloud_image.py b/pytest/tests/stress/hundred_nodes/create_gcloud_image.py deleted file mode 100755 index 9ab64c4ef21..00000000000 --- a/pytest/tests/stress/hundred_nodes/create_gcloud_image.py +++ /dev/null @@ -1,70 +0,0 @@ -#!/usr/bin/env python3 -from utils import user_name -import sys -import os -import datetime -from rc import gcloud, run -import pathlib - -sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'lib')) -from configured_logger import logger - -additional_flags = '' - -try: - image_name = sys.argv[1] - branch = sys.argv[2] -except: - branch = run( - 'git rev-parse --symbolic-full-name --abbrev-ref HEAD').stdout.strip() - username = user_name() - image_name = f'near-{branch}-{datetime.datetime.strftime(datetime.datetime.now(),"%Y%m%d")}-{username}' - -machine_name = f'{image_name}-image-builder' - -logger.info(f"Creating machine: {machine_name}") - -m = gcloud.create(name=machine_name, - machine_type='n1-standard-64', - disk_size='50G', - image_project='ubuntu-os-cloud', - image_family='ubuntu-1804-lts', - zone='us-west2-c', - firewall_allows=['tcp:3030', 'tcp:24567'], - min_cpu_platform='Intel Skylake') - -logger.info(f'machine created: {image_name}') - -p = m.run('bash', - input=f''' -for i in `seq 1 3`; do - sudo apt update -done - -sudo apt install -y python pkg-config libssl-dev build-essential cmake clang llvm docker.io -sudo groupadd docker -sudo usermod -aG docker $USER - -curl -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain none -source ~/.cargo/env - -git clone --single-branch --branch {branch} https://github.com/nearprotocol/nearcore.git nearcore -cd nearcore -cargo build -p neard --release {additional_flags} -''') - -assert p.returncode == 0 - -logger.info('near built') - -m.shutdown() - -logger.info('machine stopped') - -m.save_image(image=image_name) - -logger.info('image saved') - -m.delete() - -logger.info('machine deleted') diff --git a/pytest/tests/stress/hundred_nodes/node_rotation.py b/pytest/tests/stress/hundred_nodes/node_rotation.py deleted file mode 100755 index a9060466dac..00000000000 --- a/pytest/tests/stress/hundred_nodes/node_rotation.py +++ /dev/null @@ -1,168 +0,0 @@ -#!/usr/bin/env python3 -import base58 -from enum import Enum -import os -import pathlib -import random -import sys -import tempfile -import time - -sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'lib')) -from cluster import GCloudNode, Key -from configured_logger import logger -from transaction import sign_staking_tx -from utils import user_name, collect_gcloud_config - - -def stop_node(machine): - machine.run('tmux send-keys -t python-rc C-c') - machine.kill_detach_tmux() - logger.info(f'{machine} killed') - - -def start_node(machine): - machine.run_detach_tmux( - 'cd nearcore && export RUST_LOG=diagnostic=trace && export RUST_BACKTRACE=1 && target/release/near run' - ) - logger.info(f'{machine} started') - - -class NodeState(Enum): - NOTRUNNING = 1 - SYNCING = 2 - # running but have not staked - NONVALIDATING = 3 - # staked but not validating - STAKED = 4 - # validating - VALIDATING = 5 - # unstaked but still validating - UNSTAKED = 6 - - def __repr__(self): - return str(self.name).split('.')[-1].lower() - - -class RemoteNode(GCloudNode): - - def __init__(self, instance_name, node_dir): - super().__init__(instance_name) - self.validator_key = Key.from_json_file(node_dir / 'validator_key.json') - self.node_key = Key.from_json_file(node_dir / 'node_key.json') - self.signer_key = Key.from_json_file(node_dir / 'signer0_key.json') - self.last_synced_height = 0 - - try: - status = super().get_status() - validators = set( - map(lambda x: x['account_id'], status['validators'])) - if self.signer_key.account_id in validators: - self.state = NodeState.VALIDATING - elif status['sync_info']['syncing']: - self.state = NodeState.SYNCING - else: - self.state = NodeState.NONVALIDATING - self.account_key_nonce = super().get_nonce_for_pk( - self.signer_key.account_id, self.signer_key.pk) - except Exception: - start_node(self.machine) - time.sleep(20) - self.state = NodeState.SYNCING - self.account_key_nonce = None - - def send_staking_tx(self, stake): - hash_ = self.get_latest_block().hash_bytes - if self.account_key_nonce is None: - self.account_key_nonce = self.get_nonce_for_pk( - self.signer_key.account_id, self.signer_key.pk) - self.account_key_nonce += 1 - tx = sign_staking_tx(nodes[index].signer_key, - nodes[index].validator_key, stake, - self.account_key_nonce, hash_) - logger.info(f'{self.signer_key.account_id} stakes {stake}') - res = self.send_tx_and_wait(tx, timeout=15) - if 'error' in res or 'Failure' in res['result']['status']: - logger.info(res) - - def send_unstaking_tx(self): - self.send_staking_tx(0) - - def change_state(self, cur_validators): - if self.state is NodeState.NOTRUNNING: - if bool(random.getrandbits(1)): - start_node(self.machine) - self.state = NodeState.SYNCING - elif self.state is NodeState.SYNCING: - node_status = self.get_status() - if not node_status['sync_info']['syncing']: - self.state = NodeState.NONVALIDATING - else: - cur_height = node_status['sync_info']['latest_block_height'] - assert cur_height > self.last_synced_height + 10, f'current height {cur_height} did not change much from last synced height: {self.last_synced_height}' - self.last_synced_height = cur_height - elif self.state is NodeState.NONVALIDATING: - if bool(random.getrandbits(1)): - stop_node(self.machine) - self.state = NodeState.NOTRUNNING - else: - stake = int(list(cur_validators.values())[0]) - self.send_staking_tx(stake) - self.state = NodeState.STAKED - elif self.state is NodeState.STAKED: - if self.signer_key.account_id in cur_validators: - self.state = NodeState.VALIDATING - elif self.state is NodeState.VALIDATING: - assert self.signer_key.account_id in cur_validators, f'invariant failed: {self.signer_key.account_id} not in {cur_validators}' - if bool(random.getrandbits(1)): - self.send_unstaking_tx() - self.state = NodeState.UNSTAKED - elif self.state is NodeState.UNSTAKED: - if self.signer_key.account_id not in cur_validators: - self.state = NodeState.NONVALIDATING - else: - assert False, "unexpected state" - - -num_nodes = 100 -collect_gcloud_config(num_nodes) -nodes = [ - RemoteNode(f'pytest-node-{user_name()}-{i}', - pathlib.Path(tempfile.gettempdir()) / 'near' / f'node{i}') - for i in range(num_nodes) -] - -while True: - # find a node that is not syncing and get validator information - validator_info = None - for i in range(num_nodes): - status = nodes[i].get_status() - if not status['sync_info']['syncing']: - validator_info = nodes[i].get_validators() - break - if validator_info is None: - assert False, "all nodes are syncing" - assert 'error' not in validator_info, validator_info - cur_validators = dict( - map(lambda x: (x['account_id'], x['stake']), - validator_info['result']['current_validators'])) - prev_epoch_kickout = validator_info['result']['prev_epoch_kickout'] - logger.info( - f'validators kicked out in the previous epoch: {prev_epoch_kickout}') - for validator_kickout in prev_epoch_kickout: - assert validator_kickout['reason'] != 'Unstaked' or validator_kickout[ - 'reason'] != 'DidNotGetASeat' or not validator_kickout.startswith( - 'NotEnoughStake'), validator_kickout - logger.info(f'current validators: {cur_validators}') - - # choose 5 nodes and change their state - node_indices = random.sample(range(100), 5) - for index in node_indices: - cur_state = nodes[index].state - nodes[index].change_state(cur_validators) - logger.info( - f'node {index} changed its state from {cur_state} to {nodes[index].state}' - ) - - logger.info(dict(enumerate(map(lambda x: x.state, nodes)))) - time.sleep(600) diff --git a/pytest/tests/stress/hundred_nodes/start_100_nodes.py b/pytest/tests/stress/hundred_nodes/start_100_nodes.py deleted file mode 100755 index 1c9a3d9008c..00000000000 --- a/pytest/tests/stress/hundred_nodes/start_100_nodes.py +++ /dev/null @@ -1,271 +0,0 @@ -#!/usr/bin/env python3 -from rc import run, gcloud, pmap -import json -import datetime -import csv -from concurrent.futures import ThreadPoolExecutor, as_completed -import pathlib -import time -import tempfile -from tqdm import tqdm -import shutil -import sys - -sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'lib')) -from cluster import apply_config_changes, apply_genesis_changes -from utils import user_name - -try: - image_name = sys.argv[1] -except: - branch = run( - 'git rev-parse --symbolic-full-name --abbrev-ref HEAD').stdout.strip() - username = user_name() - image_name = f'near-{branch}-{datetime.datetime.strftime(datetime.datetime.now(),"%Y%m%d")}-{username}' - -try: - machine_name_prefix = sys.argv[2] -except: - machine_name_prefix = f'pytest-node-{username}-' - -genesis_time = (datetime.datetime.utcnow() - - datetime.timedelta(hours=2)).isoformat() + 'Z' - -# binary search this to observe if network forks, default is 1 -block_production_time = 1 - -client_config_changes = { - "consensus": { - "min_block_production_delay": { - "secs": block_production_time, - "nanos": 0, - }, - "max_block_production_delay": { - "secs": 2 * block_production_time, - "nanos": 0, - }, - "max_block_wait_delay": { - "secs": 6 * block_production_time, - "nanos": 0, - }, - }, - "telemetry": { - "endpoints": [], - } -} - -# default is 50; 7,7,6,6,6,6,6,6 -genesis_config_changes = [ - ["num_block_producer_seats", 100], - ["num_block_producer_seats_per_shard", [13, 13, 13, 13, 12, 12, 12, 12]], -] - -num_machines = 100 - -# machine 0-(k-1) run docker, machine k-100 run binary -num_docker_machines = 50 - -# docker image to use -docker_image = 'nearprotocol/nearcore:master' - -# 25 zones, each zone 4 instances -# 5 asia, 1 australia, 5 europe, 1 canada, 13 us -zones = [ - 'asia-east1-a', - # 'asia-east1-b', - # 'asia-east1-c', - # 'asia-east2-a', - # 'asia-east2-b', - 'asia-east2-c', - # 'asia-northeast1-a', - # 'asia-northeast1-b', - # 'asia-northeast1-c', - # 'asia-northeast2-a', - # 'asia-northeast2-b', - 'asia-northeast2-c', - 'asia-south1-a', - # 'asia-south1-b', - # 'asia-south1-c', - # 'asia-southeast1-a', - # 'asia-southeast1-b', - 'asia-southeast1-c', - 'australia-southeast1-a', - # 'australia-southeast1-b', - # 'australia-southeast1-c', - # 'europe-north1-a', - # 'europe-north1-b', - 'europe-north1-c', - # 'europe-west1-b', - # 'europe-west1-c', - # 'europe-west1-d', - # 'europe-west2-a', - # 'europe-west2-b', - 'europe-west2-c', - 'europe-west3-a', - # 'europe-west3-c', - # 'europe-west4-a', - # 'europe-west4-b', - 'europe-west4-c', - 'europe-west6-a', - # 'europe-west6-b', - # 'europe-west6-c', - # 'northamerica-northeast1-a', - # 'northamerica-northeast1-b', - 'northamerica-northeast1-c', - # 'southamerica-east1-a', - # 'southamerica-east1-b', - # 'southamerica-east1-c', - 'us-central1-a', - 'us-central1-b', - # 'us-central1-c', - 'us-central1-f', - 'us-east1-b', - 'us-east1-c', - # 'us-east1-d', - 'us-east4-a', - 'us-east4-b', - # 'us-east4-c', - 'us-west1-a', - 'us-west1-b', - 'us-west1-c', - 'us-west2-a', - 'us-west2-b', - 'us-west2-c', -] -# Unless you want to shutdown gcloud instance and restart, keep this `False' -reserve_ip = False - -pbar = tqdm(total=num_machines, desc=' create machines') - - -def create_machine(i): - m = gcloud.create(name=machine_name_prefix + str(i), - machine_type='n1-standard-2', - disk_size='200G', - image_project='near-core', - image=image_name, - zone=zones[i % len(zones)], - min_cpu_platform='Intel Skylake', - reserve_ip=reserve_ip) - pbar.update(1) - return m - - -machines = pmap(create_machine, range(num_machines)) -pbar.close() -# machines = pmap(lambda name: gcloud.get(name), [ -# f'{machine_name_prefix}{i}' for i in range(num_machines)]) - -tempdir = pathlib.Path(tempfile.gettempdir()) / 'near' - - -def get_node_dir(i): - node_dir = tempdir / f'node{i}' - node_dir.mkdir(parents=True, exist_ok=True) - return node_dir - - -for i in range(num_machines): - node_dir = get_node_dir(i) - p = run('bash', - input=f''' -# deactivate virtualenv doesn't work in non interactive shell, explicitly run with python2 -cd .. -python2 scripts/start_stakewars.py --local --home {node_dir} --init --signer-keys --account-id=node{i} -''') - assert p.returncode == 0 - - -# Generate csv from jsons and ips -def pk_from_file(path): - with open(path) as rd: - return json.load(rd)['public_key'] - - -def get_validator_key(i): - return pk_from_file(get_node_dir(i) / 'validator_key.json') - - -def get_full_pks(i): - pks = [] - for j in range(3): - pks.append(pk_from_file(get_node_dir(i) / f'signer{j}_key.json')) - return ','.join(pks) - - -def get_pubkey(i): - return pk_from_file(get_node_dir(i) / 'node_key.json') - - -with open(tempdir / 'accounts.csv', 'w', newline='') as f: - fieldnames = 'genesis_time,account_id,regular_pks,privileged_pks,foundation_pks,full_pks,amount,is_treasury,validator_stake,validator_key,peer_info,smart_contract,lockup,vesting_start,vesting_end,vesting_cliff'.split( - ',') - - writer = csv.DictWriter(f, fieldnames=fieldnames) - writer.writeheader() - amount = 1000 * 10**24 - staked_amount = 10 * 10**24 - - for i in range(num_machines): - writer.writerow({ - 'genesis_time': genesis_time, - 'account_id': f'node{i}', - 'full_pks': get_full_pks(i), - 'amount': amount, - 'is_treasury': 'true' if i == 0 else 'false', - 'validator_stake': staked_amount, - 'validator_key': get_validator_key(i), - 'peer_info': f'{get_pubkey(i)}@{machines[i].ip}:24567' - }) - -# Generate config and genesis locally, apply changes to config/genesis locally -for i in range(num_machines): - node_dir = get_node_dir(i) - shutil.copy(tempdir / 'accounts.csv', node_dir / 'accounts.csv') - p = run('bash', - input=f''' -cd .. -target/release/genesis-csv-to-json --home {node_dir} --chain-id pytest -''') - apply_config_changes(node_dir, client_config_changes) - apply_genesis_changes(node_dir, genesis_config_changes) - -pbar = tqdm(total=num_machines, desc=' upload nodedir') - - -# Upload json and accounts.csv -def upload_genesis_files(i): - # stop if already start - machines[i].run('tmux send-keys -t python-rc C-c') - time.sleep(2) - machines[i].kill_detach_tmux() - machines[i].run('rm -rf ~/.near') - # upload keys, config, genesis - machines[i].upload(str(get_node_dir(i)), - f'/home/{machines[i].username}/.near') - pbar.update(1) - - -pmap(upload_genesis_files, range(num_machines)) -pbar.close() - -pbar = tqdm(total=num_machines, desc=' start near') - - -def start_nearcore(i): - m = machines[i] - if i < num_docker_machines: - m.run('bash', - input=f''' -docker run -d -u $UID:$UID -v /home/{m.username}/.near:/srv/near \ - -p 3030:3030 -p 24567:24567 --name nearcore {docker_image} near --home=/srv/near run -''') - else: - m.run_detach_tmux( - 'cd nearcore && export RUST_LOG=diagnostic=trace && target/release/near run --archive' - ) - pbar.update(1) - - -pmap(start_nearcore, range(len(machines))) -pbar.close() diff --git a/pytest/tests/stress/hundred_nodes/watch_fork.py b/pytest/tests/stress/hundred_nodes/watch_fork.py deleted file mode 100755 index bcedc96aff3..00000000000 --- a/pytest/tests/stress/hundred_nodes/watch_fork.py +++ /dev/null @@ -1,33 +0,0 @@ -#!/usr/bin/env python3 - -import sys -import pathlib - -sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'lib')) - -from cluster import GCloudNode, RpcNode -from configured_logger import logger -from utils import user_name -from concurrent.futures import ThreadPoolExecutor, as_completed -import datetime - -validators = [None] * 100 - -while True: - futures = {} - with ThreadPoolExecutor(max_workers=20) as pool: - for i in range(100): - node = GCloudNode(f'pytest-node-{user_name()}-{i}') - futures[pool.submit(lambda: node.validators())] = i - - for f in as_completed(futures): - i = futures[f] - validators[i] = f.result() - - for v in validators[1:]: - assert v == validators[0], f'{v} not equal to {validators[0]}' - - v0 = sorted(list(validators[0])) - logger.info( - f'{datetime.datetime.now(datetime.timezone.utc).isoformat()}, {len(v0)}, {v0}' - ) diff --git a/pytest/tests/stress/network_stress.py b/pytest/tests/stress/network_stress.py deleted file mode 100755 index d35d9535dd3..00000000000 --- a/pytest/tests/stress/network_stress.py +++ /dev/null @@ -1,69 +0,0 @@ -#!/usr/bin/env python3 -import sys, random, time, base58, requests -import pathlib - -sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'lib')) - -from configured_logger import logger -from stress import stress_process, doit, monkey_staking, get_validator_ids, get_the_guy_to_mess_up_with, get_recent_hash, sign_payment_tx, expect_network_issues -from network import stop_network, resume_network, init_network_pillager - -TIMEOUT = 300 - - -@stress_process -def monkey_transactions_noval(stopped, error, nodes, nonces): - while stopped.value == 0: - validator_ids = get_validator_ids(nodes) - - from_ = random.randint(0, len(nodes) - 1) - to = random.randint(0, len(nodes) - 1) - while from_ == to: - to = random.randint(0, len(nodes) - 1) - amt = random.randint(0, 100) - nonce_val, nonce_lock = nonces[from_] - - hash_, _ = get_recent_hash(nodes[-1]) - - with nonce_lock: - tx = sign_payment_tx(nodes[from_].signer_key, 'test%s' % to, amt, - nonce_val.value, - base58.b58decode(hash_.encode('utf8'))) - for validator_id in validator_ids: - try: - tx_hash = nodes[validator_id].send_tx(tx)['result'] - except requests.exceptions.ReadTimeout: - pass - - nonce_val.value = nonce_val.value + 1 - - time.sleep(0.1) - - -@stress_process -def monkey_network_hammering(stopped, error, nodes, nonces): - s = [False for x in nodes] - while stopped.value == 0: - node_idx = random.randint(0, len(nodes) - 2) - pid = nodes[node_idx].pid.value - if s[node_idx]: - logger.info(f"Resuming network for process {pid}") - resume_network(pid) - else: - logger.info(f"Stopping network for process {pid}") - stop_network(pid) - s[node_idx] = not s[node_idx] - - time.sleep(0.5) - for i, x in enumerate(s): - if x: - pid = nodes[i].pid.value - logger.info(f"Resuming network for process {pid}") - resume_network(pid) - - -expect_network_issues() -init_network_pillager() -doit(3, 3, 3, 0, - [monkey_network_hammering, monkey_transactions_noval, monkey_staking], - TIMEOUT) diff --git a/pytest/tests/stress/saturate_routing_table.py b/pytest/tests/stress/saturate_routing_table.py deleted file mode 100755 index cfe751b16fa..00000000000 --- a/pytest/tests/stress/saturate_routing_table.py +++ /dev/null @@ -1,122 +0,0 @@ -#!/usr/bin/env python3 -""" -Saturate routing table with edges. - -Spin a node and connect to it with the python client. Fake several identities in the network and send -edges to the node. Measure the impact of these messages in the node with respect to the time the PeerManagerActor -is blocked. -""" -import asyncio -import socket -import sys -import time -import struct -import hashlib -import pathlib - -sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'lib')) - -import nacl.signing -from cluster import start_cluster -from configured_logger import logger -from peer import connect, run_handshake, Connection -from utils import LogTracker -from messages.network import Edge, SyncData, PeerMessage -from messages.crypto import PublicKey, Signature -from random import randint, seed -import base58 - -seed(0) - - -def key_seed(): - return bytes([randint(0, 255) for _ in range(32)]) - - -async def consume(conn: Connection): - while True: - message = await conn.recv() - - -def create_sync_data(accounts=[], edges=[]): - sync_data = SyncData() - sync_data.accounts = accounts - sync_data.edges = edges - - peer_message = PeerMessage() - peer_message.enum = 'Sync' - peer_message.Sync = sync_data - return peer_message - - -def create_edge(key0, key1, nonce): - if bytes(key1.verify_key) < bytes(key0.verify_key): - key0, key1 = key1, key0 - - edge = Edge() - edge.peer0 = PublicKey() - edge.peer0.keyType = 0 - edge.peer0.data = bytes(key0.verify_key) - - edge.peer1 = PublicKey() - edge.peer1.keyType = 0 - edge.peer1.data = bytes(key1.verify_key) - - edge.nonce = nonce - - val = bytes([0]) + bytes(edge.peer0.data) + bytes([0]) + bytes( - edge.peer1.data) + struct.pack('Q', nonce) - hsh = hashlib.sha256(val).digest() - enc58 = base58.b58encode(hsh) - - edge.signature0 = Signature() - edge.signature0.keyType = 0 - edge.signature0.data = key0.sign(hashlib.sha256(val).digest()).signature - - edge.signature1 = Signature() - edge.signature1.keyType = 0 - edge.signature1.data = key1.sign(hashlib.sha256(val).digest()).signature - - edge.removal_info = None - - return edge - - -async def main(): - key_pair_0 = nacl.signing.SigningKey(key_seed()) - tracker = LogTracker(nodes[0]) - conn = await connect(nodes[0].addr()) - await run_handshake(conn, - nodes[0].node_key.pk, - key_pair_0, - listen_port=12345) - - num_nodes = 300 - - def create_update(): - key_pairs = [key_pair_0] + [ - nacl.signing.SigningKey(key_seed()) for _ in range(num_nodes - 1) - ] - nonces = [[1] * num_nodes for _ in range(num_nodes)] - - edges = [] - for i in range(num_nodes): - for j in range(i): - edge = create_edge(key_pairs[i], key_pairs[j], nonces[i][j]) - edges.append(edge) - return create_sync_data(edges=edges) - - asyncio.get_event_loop().create_task(consume(conn)) - - for i in range(3): - update = create_update() - logger.info("Sending update...") - await conn.send(update) - logger.info("Sent...") - await asyncio.sleep(1) - - assert tracker.check("delay_detector: LONG DELAY!") is False - - -nodes = start_cluster(1, 0, 4, None, [], {}) -asyncio.run(main()) diff --git a/pytest/tests/stress/stress.py b/pytest/tests/stress/stress.py deleted file mode 100755 index 5b3545cd62d..00000000000 --- a/pytest/tests/stress/stress.py +++ /dev/null @@ -1,754 +0,0 @@ -#!/usr/bin/env python3 -# Chaos Monkey test. Simulates random events and failures and makes sure the blockchain continues operating as expected -# -# _.-._ ..-.. _.-._ -# (_-.-_) /|'.'|\ (_'.'_) -# mrf.\-/. \)\-/(/ ,-.-. -# __/ /-. \__ __/ ' ' \__ __/'-'-'\__ -# ( (___/___) ) ( (_/-._\_) ) ( (_/ \_) ) -# '.Oo___oO.' '.Oo___oO.' '.Oo___oO.' -# -# Parameterized as: -# `s`: number of shards -# `n`: initial (and minimum) number of validators -# `N`: max number of validators -# `k`: number of observers (technically `k+1`, one more observer is used by the test) -# `monkeys`: enabled monkeys (see below) -# Supports the following monkeys: -# [v] `node_set`: ocasionally spins up new nodes or kills existing ones, as long as the number of nodes doesn't exceed `N` and doesn't go below `n`. Also makes sure that for each shard there's at least one node that has been live sufficiently long -# [v] `node_restart`: ocasionally restarts nodes -# [v] `local_network`: ocasionally briefly shuts down the network connection for a specific node -# [v] `packets_drop`: drop 10% of all the network packets -# [ ] `global_network`: ocasionally shots down the network globally for several seconds -# [v] `transactions`: sends random transactions keeping track of expected balances -# [v] `staking`: runs staking transactions for validators. Presently the test doesn't track staking invariants, relying on asserts in the nearcore. -# `staking2.py` tests some basic stake invariants -# [v] `wipe_data`: only used in conjunction with `node_set` and `node_restart`. If present, nodes data folders will be periodically cleaned on restart -# This test also completely disables rewards, which simplifies ensuring total supply invariance and balance invariances - -import sys, time, base58, random, inspect, traceback, requests, logging -import pathlib -from multiprocessing import Process, Value, Lock - -sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'lib')) - -from cluster import init_cluster, spin_up_node, load_config -from configured_logger import logger -from transaction import sign_payment_tx, sign_staking_tx -from proxy_instances import RejectListProxy - -logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO) - -TIMEOUT = 1500 # after how much time to shut down the test -TIMEOUT_SHUTDOWN = 120 # time to wait after the shutdown was initiated before failing the test due to process stalling -MAX_STAKE = int(1e32) - -# How many times to try to send transactions to each validator. -# Is only applicable in the scenarios where we expect failures in tx sends. -SEND_TX_ATTEMPTS = 10 - -# Block_header_fetch_horizon need to be shorter than the epoch length. -# otherwise say epoch boundaries are H and H'. If the current height is H' + eps, and a node finished header sync at -# H' + eps - block_header_fetch_horizon, and then rolled state_fetch_horizon back, it will end up before H, and will -# try to state sync at the beginning of the epoch *two* epochs ago. No node will respond to such state requests. -BLOCK_HEADER_FETCH_HORIZON = 15 - -epoch_length = 50 -block_timeout = 25 # if two blocks are not produced within that many seconds, the test will fail. The timeout is increased if nodes are restarted or network is being messed up with -balances_timeout = 20 # how long to tolerate for balances to update after txs are sent -restart_sync_timeout = 45 # for how long to wait for nodes to sync in `node_restart` -tx_tolerance = 0.1 -wait_if_restart = False # whether to wait between `kill` and `start`, is needed when nodes are proxied -wipe_data = False - -assert balances_timeout * 2 <= TIMEOUT_SHUTDOWN -assert block_timeout * 2 <= TIMEOUT_SHUTDOWN - -network_issues_expected = False - - -def expect_network_issues(): - global network_issues_expected - network_issues_expected = True - - -def stress_process(func): - - def wrapper(stopped, error, *args): - try: - func(stopped, error, *args) - except Exception as ex: - logger.info(f'Process {func.__name__} failed', exc_info=ex) - error.value = 1 - - wrapper.__name__ = func.__name__ - return wrapper - - -def get_recent_hash(node, sync_timeout): - # return the parent of the last block known to the observer - # don't return the last block itself, since some validators might have not seen it yet - # also returns the height of the actual last block (so the height doesn't match the hash!) - - for attempt in range(sync_timeout): - # use timeout=10 throughout, because during header sync processing headers takes up to 3-10s - sync_info = node.get_status(timeout=10)['sync_info'] - block_hash = sync_info['latest_block_hash'] - info = node.json_rpc('block', [block_hash], timeout=10) - sync_error = ('error' in info and - 'unavailable on the node' in info['error']['data']) - if not sync_info['syncing'] and not sync_error: - break - time.sleep(1) - else: - assert False, "Node hasn't synced in %s seconds" % sync_timeout - - assert 'result' in info, info - hash_ = info['result']['header']['last_final_block'] - return hash_, sync_info['latest_block_height'] - - -def get_validator_ids(nodes): - # the [4:] part is a hack to convert test7 => 7 - return set([ - int(x['account_id'][4:]) for x in nodes[-1].get_status()['validators'] - ]) - - -@stress_process -def monkey_node_set(stopped, error, nodes, nonces): - - def get_future_time(): - if random.choice([True, False]): - return time.time() + random.randint(1, 5) - else: - return time.time() + random.randint(10, 30) - - nodes_stopped = [x.mess_with for x in nodes] - change_status_at = [get_future_time() for x in nodes] - while stopped.value == 0: - for i, node in enumerate(nodes): - if not node.mess_with: - continue - if time.time() < change_status_at[i]: - continue - if nodes_stopped[i]: - logging.info("Node set: starting node %s" % i) - # figuring out a live node with `node_restart` monkey is not trivial - # for simplicity just boot from the observer node - # `node_restart` doesn't boot from the observer, increasing coverage - boot_node = nodes[-1] - node.start(boot_node=boot_node) - else: - node.kill() - wipe = False - if wipe_data and random.choice([True, False]): - wipe = True - node.reset_data() - logging.info("Node set: stopping%s node %s" % - (" and wiping" if wipe else "", i)) - nodes_stopped[i] = not nodes_stopped[i] - change_status_at[i] = get_future_time() - - -@stress_process -def monkey_node_restart(stopped, error, nodes, nonces): - heights_after_restart = [0 for _ in nodes] - while stopped.value == 0: - node_idx = get_the_guy_to_mess_up_with(nodes) - boot_node_idx = random.randint(0, len(nodes) - 2) - while boot_node_idx == node_idx: - boot_node_idx = random.randint(0, len(nodes) - 2) - boot_node = nodes[boot_node_idx] - - node = nodes[node_idx] - # don't kill the same node too frequently, give it time to reboot and produce something - while True: - _, h = get_recent_hash(node, restart_sync_timeout) - assert h >= heights_after_restart[node_idx], "%s > %s" % ( - h, heights_after_restart[node_idx]) - if h > heights_after_restart[node_idx] + (5 - if not wipe_data else 10): - break - time.sleep(1) - - reset_data = wipe_data and random.choice([True, False, False]) - logging.info( - "NUKING NODE %s%s" % - (node_idx, " AND WIPING ITS STORAGE" if reset_data else "")) - - node.kill() - if reset_data: - node.reset_data() - - if wait_if_restart: - time.sleep(7) - node.start(boot_node=boot_node) - logging.info("NODE %s IS BACK UP" % node_idx) - - _, new_height = get_recent_hash(node, restart_sync_timeout) - assert new_height >= heights_after_restart[node_idx] - heights_after_restart[node_idx] = new_height - - time.sleep(5) - - -@stress_process -def monkey_packets_drop(stopped, error, nodes, nonces): - # no implementation needed, packet drop is configured on start - pass - - -@stress_process -def monkey_local_network(stopped, error, nodes, nonces): - last_height = 0 - last_time_height_updated = time.time() - - while stopped.value == 0: - _, cur_height = get_recent_hash(nodes[-1], 15) - if cur_height == last_height and time.time( - ) - last_time_height_updated > 10: - time.sleep(25) - else: - last_height = cur_height - last_time_height_updated = time.time() - time.sleep(5) - - # "- 2" below is because we don't want to kill the node we use to check stats - node_idx = random.randint(0, len(nodes) - 2) - node = nodes[node_idx] - logging.info(f'ISOLATING NODE {node_idx}') - node.proxy.reject_list[0] = node_idx - if node_idx == get_the_guy_to_mess_up_with(nodes): - time.sleep(5) - else: - time.sleep(1) - logging.info(f'RESTORING NODE {node_idx} NETWORK') - node.proxy.reject_list[0] = -1 - - -@stress_process -def monkey_wipe_data(stopped, error, nodes, nonces): - # no implementation needed, wipe_data is implemented inside node_set and node_restart - pass - - -@stress_process -def monkey_transactions(stopped, error, nodes, nonces): - - def get_balances(): - acts = [ - nodes[-1].get_account("test%s" % i, timeout=10)['result'] - for i in range(len(nodes)) - ] - return [int(x['amount']) + int(x['locked']) for x in acts] - - expected_balances = get_balances() - min_balances = [x - MAX_STAKE for x in expected_balances] - total_supply = (sum(expected_balances)) - logging.info("TOTAL SUPPLY: %s" % total_supply) - - last_iter_switch = time.time() - mode = 0 # 0 = send more tx, 1 = wait for balances - tx_count = 0 - last_tx_set = [] - - rolling_tolerance = tx_tolerance - - # do not stop when waiting for balances - while stopped.value == 0 or mode == 1: - validator_ids = get_validator_ids(nodes) - if time.time() - last_iter_switch > balances_timeout: - if mode == 0: - logging.info("%s TRANSACTIONS SENT. WAITING FOR BALANCES" % - tx_count) - mode = 1 - else: - logging.info( - "BALANCES NEVER CAUGHT UP, CHECKING UNFINISHED TRANSACTIONS" - ) - - def revert_txs(): - nonlocal expected_balances - good = 0 - bad = 0 - for tx in last_tx_set: - tx_happened = True - - response = nodes[-1].json_rpc( - 'tx', [tx[3], "test%s" % tx[1]], timeout=10) - - if 'error' in response and 'data' in response[ - 'error'] and "doesn't exist" in response[ - 'error']['data']: - tx_happened = False - elif 'result' in response and 'receipts_outcome' in response[ - 'result']: - tx_happened = len( - response['result']['receipts_outcome']) > 0 - else: - assert False, response - - if not tx_happened: - bad += 1 - expected_balances[tx[1]] += tx[4] - expected_balances[tx[2]] -= tx[4] - else: - good += 1 - return (good, bad) - - good, bad = revert_txs() - if expected_balances == get_balances(): - # reverting helped - logging.info( - "REVERTING HELPED, TX EXECUTED: %s, TX LOST: %s" % - (good, bad)) - bad_ratio = bad / (good + bad) - if bad_ratio > rolling_tolerance: - rolling_tolerance -= bad_ratio - rolling_tolerance - if rolling_tolerance < 0: - assert False - else: - rolling_tolerance = tx_tolerance - - min_balances = [x - MAX_STAKE for x in expected_balances] - tx_count = 0 - mode = 0 - last_tx_set = [] - else: - # still no match, fail - logging.info( - "REVERTING DIDN'T HELP, TX EXECUTED: %s, TX LOST: %s" % - (good, bad)) - - assert False, "Balances didn't update in time. Expected: %s, received: %s" % ( - expected_balances, get_balances()) - last_iter_switch = time.time() - - if mode == 0: - from_ = random.randint(0, len(nodes) - 1) - while min_balances[from_] < 0: - from_ = random.randint(0, len(nodes) - 1) - to = random.randint(0, len(nodes) - 1) - while from_ == to: - to = random.randint(0, len(nodes) - 1) - amt = random.randint(0, min_balances[from_]) - nonce_val, nonce_lock = nonces[from_] - - hash_, _ = get_recent_hash(nodes[-1], 15) - - with nonce_lock: - tx = sign_payment_tx(nodes[from_].signer_key, 'test%s' % to, - amt, nonce_val.value, - base58.b58decode(hash_.encode('utf8'))) - - # Loop trying to send the tx to all the validators, until at least one receives it - tx_hash = None - for send_attempt in range(SEND_TX_ATTEMPTS): - shuffled_validator_ids = [x for x in validator_ids] - random.shuffle(shuffled_validator_ids) - for validator_id in shuffled_validator_ids: - try: - info = nodes[validator_id].send_tx(tx) - if 'error' in info: - pass - - elif 'result' in info: - tx_hash = info['result'] - break - - else: - assert False, info - - except (requests.exceptions.ReadTimeout, - requests.exceptions.ConnectionError): - if not network_issues_expected and not nodes[ - validator_id].mess_with: - raise - - if tx_hash is not None: - break - - time.sleep(1) - - else: - assert False, "Failed to send the transation after %s attempts" % SEND_TX_ATTEMPTS - - last_tx_set.append((tx, from_, to, tx_hash, amt)) - nonce_val.value = nonce_val.value + 1 - - expected_balances[from_] -= amt - expected_balances[to] += amt - min_balances[from_] -= amt - - tx_count += 1 - - else: - if get_balances() == expected_balances: - logging.info("BALANCES CAUGHT UP, BACK TO SPAMMING TXS") - min_balances = [x - MAX_STAKE for x in expected_balances] - tx_count = 0 - mode = 0 - rolling_tolerance = tx_tolerance - last_tx_set = [] - - if mode == 1: - time.sleep(1) - elif mode == 0: - time.sleep(0.1) - - -def get_the_guy_to_mess_up_with(nodes): - global epoch_length - _, height = get_recent_hash(nodes[-1], 15) - return (height // epoch_length) % (len(nodes) - 1) - - -@stress_process -def monkey_staking(stopped, error, nodes, nonces): - while stopped.value == 0: - validator_ids = get_validator_ids(nodes) - whom = random.randint(0, len(nonces) - 2) - - status = nodes[-1].get_status() - hash_, _ = get_recent_hash(nodes[-1], 15) - - who_can_unstake = get_the_guy_to_mess_up_with(nodes) - - nonce_val, nonce_lock = nonces[whom] - with nonce_lock: - stake = random.randint(0.7 * MAX_STAKE // 1000000, - MAX_STAKE // 1000000) * 1000000 - - if whom == who_can_unstake: - stake = 0 - - tx = sign_staking_tx(nodes[whom].signer_key, - nodes[whom].validator_key, stake, - nonce_val.value, - base58.b58decode(hash_.encode('utf8'))) - for validator_id in validator_ids: - try: - nodes[validator_id].send_tx(tx) - except (requests.exceptions.ReadTimeout, - requests.exceptions.ConnectionError): - if not network_issues_expected and not nodes[ - validator_id].mess_with: - raise - nonce_val.value = nonce_val.value + 1 - - time.sleep(1) - - -@stress_process -def blocks_tracker(stopped, error, nodes, nonces): - # note that we do not do `white stopped.value == 0`. When the test finishes, we want - # to wait for at least one more block to be produced - mapping = {} - height_to_hash = {} - largest_height = 0 - largest_per_node = [0 for _ in nodes] - largest_divergence = 0 - last_updated = time.time() - done = False - every_ten = False - last_validators = None - while not done: - # always query the last validator, and a random one - for val_id in [-1, random.randint(0, len(nodes) - 2)]: - try: - status = nodes[val_id].get_status() - if status['validators'] != last_validators and val_id == -1: - last_validators = status['validators'] - logging.info( - "VALIDATORS TRACKER: validators set changed, new set: %s" - % [x['account_id'] for x in last_validators]) - hash_ = status['sync_info']['latest_block_hash'] - height = status['sync_info']['latest_block_height'] - largest_per_node[val_id] = height - if height > largest_height: - if stopped.value != 0: - done = True - if not every_ten or largest_height % 10 == 0: - logging.info("BLOCK TRACKER: new height %s" % - largest_height) - if largest_height >= 20: - if not every_ten: - every_ten = True - logging.info( - "BLOCK TRACKER: switching to tracing every ten blocks to reduce spam" - ) - largest_height = height - last_updated = time.time() - - elif time.time() - last_updated > block_timeout: - assert False, "Block production took more than %s seconds" % block_timeout - - if hash_ not in mapping: - block_info = nodes[val_id].json_rpc('block', [hash_], - timeout=10) - confirm_height = block_info['result']['header']['height'] - assert height == confirm_height - prev_hash = block_info['result']['header']['prev_hash'] - if height in height_to_hash: - assert False, "Two blocks for the same height: %s and %s" % ( - height_to_hash[height], hash_) - - height_to_hash[height] = hash_ - mapping[hash_] = (prev_hash, height) - - except: - # other monkeys can tamper with all the nodes but the last one, so exceptions are possible - # the last node must always respond, so we rethrow - if val_id == -1: - raise - time.sleep(0.2) - - for B1, (P1, H1) in [x for x in mapping.items()]: - for b2, (p2, h2) in [x for x in mapping.items()]: - b1, p1, h1 = B1, P1, H1 - if abs(h1 - h2) < 8: - initial_smaller_height = min(h1, h2) - try: - while b1 != b2: - while h1 > h2: - (b1, (p1, h1)) = (p1, mapping[p1]) - while h2 > h1: - (b2, (p2, h2)) = (p2, mapping[p2]) - while h1 == h2 and b1 != b2: - (b1, (p1, h1)) = (p1, mapping[p1]) - (b2, (p2, h2)) = (p2, mapping[p2]) - assert h1 == h2 - assert b1 == b2 - assert p1 == p2 - divergence = initial_smaller_height - h1 - except KeyError as e: - # some blocks were missing in the mapping, so do our best estimate - divergence = initial_smaller_height - min(h1, h2) - - if divergence > largest_divergence: - largest_divergence = divergence - - logging.info("=== BLOCK TRACKER SUMMARY ===") - logging.info("Largest height: %s" % largest_height) - logging.info("Largest divergence: %s" % largest_divergence) - logging.info("Per node: %s" % largest_per_node) - - if not network_issues_expected: - assert largest_divergence < len(nodes) - else: - assert largest_divergence < 2 * len(nodes) - - -def doit(s, n, N, k, monkeys, timeout): - global block_timeout, balances_timeout, tx_tolerance, epoch_length, wait_if_restart, wipe_data, restart_sync_timeout - - assert 2 <= n <= N - - config = load_config() - local_config_changes = {} - - monkey_names = [x.__name__ for x in monkeys] - proxy = None - logging.info(monkey_names) - - for i in range(N + k + 1): - local_config_changes[i] = { - "consensus": { - "block_header_fetch_horizon": BLOCK_HEADER_FETCH_HORIZON, - "state_sync_timeout": { - "secs": 0, - "nanos": 500000000 - } - }, - "state_sync_enabled": True, - "view_client_throttle_period": { - "secs": 0, - "nanos": 0 - }, - "store.state_snapshot_enabled": True, - } - for i in range(N, N + k + 1): - # make all the observers track all the shards - local_config_changes[i]["tracked_shards"] = list(range(s)) - if 'monkey_wipe_data' in monkey_names: - # When data can be deleted, with the short epoch length while the node with deleted data folder is syncing, - # other nodes can run sufficiently far ahead to GC the old data. Have one archival node to address it. - # It is also needed, because the balances timeout is longer, and the txs can get GCed on the observer node - # by the time it gets to checking their status. - local_config_changes[N + k]['archive'] = True - - if 'monkey_local_network' in monkey_names or 'monkey_packets_drop' in monkey_names or 'monkey_node_restart' in monkey_names: - expect_network_issues() - block_timeout += 40 - - if 'monkey_local_network' in monkey_names or 'monkey_packets_drop' in monkey_names: - assert config[ - 'local'], 'Network stress operations only work on local nodes' - drop_probability = 0.05 if 'monkey_packets_drop' in monkey_names else 0 - - reject_list = RejectListProxy.create_reject_list(1) - proxy = RejectListProxy(reject_list, drop_probability) - tx_tolerance += 0.3 - - if 'monkey_local_network' in monkey_names or 'monkey_packets_drop' in monkey_names: - # add 15 seconds + 10 seconds for each unique network-related monkey - balances_timeout += 15 - - if 'monkey_local_network' in monkey_names: - balances_timeout += 10 - - if 'monkey_packets_drop' in monkey_names: - wait_if_restart = True - balances_timeout += 10 - - if 'monkey_node_restart' in monkey_names or 'monkey_node_set' in monkey_names: - balances_timeout += 10 - tx_tolerance += 0.5 - - if 'monkey_wipe_data' in monkey_names: - assert 'monkey_node_restart' in monkey_names or 'monkey_node_set' in monkey_names - wipe_data = True - balances_timeout += 25 - - # if nodes can restart, we should give them way more time to sync. - # if packets can also be dropped, each state-sync-related request or response lost adds 10 seconds - # to the sync process. - restart_sync_timeout = 45 if 'monkey_packets_drop' not in monkey_names else 90 - block_timeout += (10 - if 'monkey_packets_drop' not in monkey_names else 40) - - # We need to make sure that the blocks that include txs are not garbage collected. From the first tx sent until - # we check balances time equal to `balances_timeout * 2` passes, and the block production is capped at 1.7/s. - # The GC keeps five epochs of blocks. - min_epoch_length = (int((balances_timeout * 2) * 1.7) + 4) // 5 - epoch_length = max(epoch_length, min_epoch_length) - - logger.info( - f"block_timeout: {block_timeout}, balances_timeout: {balances_timeout}, tx_tolerance: {tx_tolerance}, epoch_length: {epoch_length}, wait_if_restart: {wait_if_restart}, wipe_data: {wipe_data}, restart_sync_timeout: {restart_sync_timeout}" - ) - - near_root, node_dirs = init_cluster( - N, k + 1, s, config, - [["min_gas_price", 0], ["max_inflation_rate", [0, 1]], - ["epoch_length", epoch_length], [ - "block_producer_kickout_threshold", 10 - ], ["chunk_producer_kickout_threshold", 10]], local_config_changes) - - started = time.time() - - boot_node = spin_up_node(config, near_root, node_dirs[0], 0, proxy=proxy) - boot_node.stop_checking_store() - boot_node.mess_with = False - nodes = [boot_node] - - for i in range(1, N + k + 1): - node = spin_up_node(config, - near_root, - node_dirs[i], - i, - boot_node=boot_node, - proxy=proxy) - node.stop_checking_store() - nodes.append(node) - if i >= n and i < N: - node.kill() - node.mess_with = True - else: - node.mess_with = False - - stopped = Value('i', 0) - error = Value('i', 0) - ps = [] - nonces = [(Value('i', 1), Lock()) for _ in range(N + k + 1)] - - def launch_process(func): - nonlocal stopped, error, ps - - p = Process(target=func, args=(stopped, error, nodes, nonces)) - p.start() - ps.append((p, func.__name__)) - - def check_errors(): - nonlocal error, ps - if error.value != 0: - for (p, _) in ps: - p.terminate() - assert False, "At least one process failed, check error messages above" - - for monkey in monkeys: - launch_process(monkey) - - launch_process(blocks_tracker) - - started = time.time() - while time.time() - started < timeout: - check_errors() - time.sleep(1) - - logging.info("") - logging.info("==========================================") - logging.info("# TIMEOUT IS HIT, SHUTTING DOWN THE TEST #") - logging.info("==========================================") - stopped.value = 1 - started_shutdown = time.time() - proxies_stopped = False - - while True: - check_errors() - still_running = [name for (p, name) in ps if p.is_alive()] - - if len(still_running) == 0: - break - - # If the test is running with proxies, `node_restart` and `node_set` can get - # stuck because the proxies now are their child processes. We can't kill the - # proxies rigth away, because that would interfere with block production, and - # might prevent other workers (e.g. block_tracker) from completing in a timely - # manner. Thus, kill the proxies some time into the shut down process. - if time.time( - ) - started_shutdown > TIMEOUT_SHUTDOWN / 2 and not proxies_stopped: - logging.info( - "Shutdown is %s seconds in, shutting down proxies if any" % - (TIMEOUT_SHUTDOWN / 2)) - if boot_node.proxy is not None: - boot_node.proxy.global_stopped.value = 1 - for p in boot_node.proxy.ps: - p.terminate() - proxies_stopped = True - - if time.time() - started_shutdown > TIMEOUT_SHUTDOWN: - for (p, _) in ps: - p.terminate() - assert False, "The test didn't gracefully shut down in time\nStill running: %s" % ( - still_running) - - check_errors() - - logging.info("Shut down complete, executing store validity checks") - for node in nodes: - node.is_check_store = True - node.check_store() - - -MONKEYS = dict([(name[7:], obj) - for name, obj in inspect.getmembers(sys.modules[__name__]) - if inspect.isfunction(obj) and name.startswith("monkey_")]) - -if __name__ == "__main__": - if len(sys.argv) < 5: - logger.info( - "Usage:\npython tests/stress/stress.py s n N k monkey1 monkey2 ...") - sys.exit(1) - - s = int(sys.argv[1]) - n = int(sys.argv[2]) - N = int(sys.argv[3]) - k = int(sys.argv[4]) - monkeys = sys.argv[5:] - - assert len(monkeys) == len(set(monkeys)), "Monkeys should be unique" - for monkey in monkeys: - assert monkey in MONKEYS, "Unknown monkey \"%s\"" % monkey - - doit(s, n, N, k, [globals()["monkey_%s" % x] for x in monkeys], TIMEOUT)