Skip to content
This repository was archived by the owner on Sep 1, 2024. It is now read-only.

Commit

Permalink
from ADO
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusz-bzowka-chain-insight committed Jul 22, 2024
1 parent 0869a3c commit bd382f5
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 35 deletions.
11 changes: 9 additions & 2 deletions neurons/nodes/bitcoin/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
parser = argparse.ArgumentParser()
bt.logging.add_args(parser)
indexlogger = setup_logger("BitcoinNode")

from neurons import logger

class BitcoinNode(Node):
def __init__(self, node_rpc_url: str = None):
Expand Down Expand Up @@ -168,6 +168,9 @@ def validate_funds_flow_challenge_response_output(self, challenge: Challenge, re
return challenge.in_total_amount == in_total_amount and challenge.out_total_amount == out_total_amount

def create_balance_tracking_challenge(self, block_height):

logger.info(f"Creating balance tracking challenge", block_height=block_height)

block = self.get_block_by_height(block_height)
block_data = parse_block_data(block)
transactions = block_data.transactions
Expand All @@ -191,13 +194,17 @@ def create_balance_tracking_challenge(self, block_height):
balance_changes_by_address[address] += out_amount_by_address[address]

challenge = Challenge(model_type=MODEL_TYPE_BALANCE_TRACKING, block_height=block_height)
return challenge, balance_changes_by_address
total_balance_change = sum(balance_changes_by_address.values())
logger.info(f"Created balance tracking challenge", block_height=block_height)

return challenge, total_balance_change

def get_txn_data_by_id(self, txn_id: str):
try:
rpc_connection = AuthServiceProxy(self.node_rpc_url)
return rpc_connection.getrawtransaction(txn_id, 1)
except Exception as e:
logger.error(f"Failed to get transaction data by id", error={'exception_type': e.__class__.__name__, 'exception_message': str(e), 'exception_args': e.args})
return None

def create_in_memory_txn(self, tx_data):
Expand Down
6 changes: 2 additions & 4 deletions neurons/remote_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
import time
import json
import requests
import bittensor as bt
import numpy as np
import threading

import insights
Expand Down Expand Up @@ -97,7 +95,7 @@ def __init__(self):
self.max_requests = 128
self.min_request_period = 60
self.stake_threshold = 5000
self.config_url = os.getenv("MINER_REMOTE_CONFIG_URL", 'https://chaininsightsaipreprod.blob.core.windows.net/minercfg/miner.json')
self.config_url = os.getenv("MINER_REMOTE_CONFIG_URL", 'https://chaininsightsaiprod.blob.core.windows.net/minercfg/miner.json')
self.blockchain_sync_delta = 100
self.is_grace_period = False
self.set_weights = True
Expand Down Expand Up @@ -169,7 +167,7 @@ def __init__(self):
self.version_update = True
self.balance_model_diff = 849008

self.config_url = os.getenv("VALIDATOR_REMOTE_CONFIG_URL", 'https://chaininsightsaipreprod.blob.core.windows.net/validatorcfg/validator.json')
self.config_url = os.getenv("VALIDATOR_REMOTE_CONFIG_URL", 'https://chaininsightsaiprod.blob.core.windows.net/validatorcfg/validator.json')

def load_and_get_config_values(self):
self.load_remote_config()
Expand Down
2 changes: 1 addition & 1 deletion neurons/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def get_commitment(netuid: int, uid: int, block: Optional[int] = None) -> str:
metadata_str = subtensor.get_commitment(config.netuid, 0)
if metadata_str is None:
continue
logger.info("Got miner metadata, trying to parse..", miner_hotkey=hotkey, metadata_str=metadata_str)

metadata = MinerMetadata.from_compact(metadata_str)
miners_metadata[hotkey] = metadata
except Exception as e:
Expand Down
6 changes: 5 additions & 1 deletion neurons/validators/benchmark.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import traceback
from collections import Counter
from random import randint
Expand Down Expand Up @@ -45,7 +46,6 @@ def run_benchmarks(self, filtered_responses):
responses = group_info['responses']
self.run_benchmark_type(MODEL_TYPE_BALANCE_TRACKING, self.validator_config.get_benchmark_balance_tracking_script(network).strip(), benchmark_query_script_vars, responses, results)


return results
except Exception as e:
logger.error("Run benchmark failed", error=traceback.format_exc())
Expand All @@ -64,6 +64,10 @@ def run_benchmark_type(self, benchmark_type, benchmark_query_script, benchmark_q
if uid_value not in results:
results[uid_value] = {}
results[uid_value][benchmark_type] = (response_time, result == most_common_result)

if benchmark_type == MODEL_TYPE_BALANCE_TRACKING:
logger.info("DEBUG - run_benchmark_type ", responses=[(r.axon.hotkey, r.output.balance_model_last_block) for r, _ in responses if r.output is not None] , results=results)

except Exception as e:
logger.error(f"Run benchmark failed", benchmark_type = benchmark_type, error=traceback.format_exc())

Expand Down
40 changes: 29 additions & 11 deletions neurons/validators/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ def __init__(self, config=None):

def cross_validate(self, axon, node, start_block_height, last_block_height, balance_model_last_block):
try:
logger.info("Funds flow challenge started", miner_ip = axon.ip, miner_hotkey=axon.hotkey)
challenge, expected_response = node.create_funds_flow_challenge(start_block_height, last_block_height)

response = self.dendrite.query(
Expand All @@ -143,6 +144,8 @@ def cross_validate(self, axon, node, start_block_height, last_block_height, bala
logger.info("Cross validation failed", miner_hotkey=hotkey, reason="expected_response", response_output=response.output, expected_output=expected_response, miner_ip = response.axon.ip)
return False, response_time

""" TEMPORARILY DISABLED
logger.info("Balance tracking challenge started", miner_hotkey=hotkey, miner_ip = response.axon.ip)
random_balance_tracking_block = randint(1, balance_model_last_block)
challenge, expected_response = node.create_balance_tracking_challenge(random_balance_tracking_block)
response = self.dendrite.query(
Expand All @@ -163,8 +166,9 @@ def cross_validate(self, axon, node, start_block_height, last_block_height, bala
response_time += response.dendrite.process_time
if response.output != expected_response:
logger.info("Cross validation failed", miner_hotkey=hotkey, miner_ip = response.axon.ip, reason="expected_response", response_output=response.output, expected_output=expected_response)
logger.info("Cross validation failed", miner_hotkey=hotkey, miner_ip = response.axon.ip, reason="expected_response")
return False, response_time
"""

logger.info("Cross validation passed", miner_hotkey=hotkey, miner_ip = response.axon.ip)

Expand Down Expand Up @@ -254,13 +258,22 @@ def get_reward(self, response: Discovery, uid: int, benchmarks_result):
else:
logger.info("Ping Test passed", miner_uid = uid_value, miner_hotkey=hotkey, miner_ip = response.axon.ip, average_ping_time=average_ping_time)

logger.info("Cross validation started", miner_uid = uid_value, miner_hotkey=hotkey, miner_ip = response.axon.ip)
cross_validation_result, _ = self.cross_validate(response.axon, self.nodes[network], start_block_height, last_block_height, balance_model_last_block)
if cross_validation_result is None or not cross_validation_result:
self.miner_uptime_manager.down(uid_value, hotkey)
logger.info("Reward failed", miner_uid = uid_value, miner_hotkey=hotkey, miner_ip = response.axon.ip, reason="cross_validation_failed", score=0)
return 0

funds_flow_benchmark_result = benchmarks_result.get(uid_value)[MODEL_TYPE_FUNDS_FLOW]
def _try_get(_items, _uid_value, _key):
if isinstance(_items, dict):
uid_items = _items.get(_uid_value, {})
if isinstance(uid_items, dict):
if _key in uid_items:
return uid_items.get(_key)
return None

funds_flow_benchmark_result = _try_get(benchmarks_result, uid_value, MODEL_TYPE_FUNDS_FLOW)
if funds_flow_benchmark_result is None:
score = self.metagraph.T[uid]/4
self.miner_uptime_manager.down(uid_value, hotkey)
Expand All @@ -270,10 +283,10 @@ def get_reward(self, response: Discovery, uid: int, benchmarks_result):
funds_flow_response_time, funds_flow_benchmark_is_valid = funds_flow_benchmark_result
if not funds_flow_benchmark_is_valid:
self.miner_uptime_manager.down(uid_value, hotkey)
logger.info("Reward failed", miner_uid = uid_value, miner_hotkey=hotkey, miner_ip = response.axon.ip, reason="funds_flow_benchmark_failed", score=0)
logger.info("Reward failed", miner_uid=uid_value, miner_hotkey=hotkey, miner_ip = response.axon.ip, reason="funds_flow_benchmark_failed", score=0)
return 0

balance_tracking_benchmark_result = benchmarks_result.get(uid_value)[MODEL_TYPE_BALANCE_TRACKING]
balance_tracking_benchmark_result = _try_get(benchmarks_result, uid_value, MODEL_TYPE_BALANCE_TRACKING)
if balance_tracking_benchmark_result is None:
score = self.metagraph.T[uid]/4
self.miner_uptime_manager.down(uid_value, hotkey)
Expand Down Expand Up @@ -316,14 +329,19 @@ def calculate_min_max_time(self, benchmarks_result, responses):
min_time_response = self.validator_config.benchmark_timeout
for item, response in zip(benchmarks_result.values(), responses):
average_ping_time = ping(response.axon.ip, response.axon.port, attempts=10)[1]
f_max_time_response = max(max_time_response, item[MODEL_TYPE_FUNDS_FLOW][0] - average_ping_time)
f_min_time_response = min(min_time_response, item[MODEL_TYPE_FUNDS_FLOW][0] - average_ping_time)

b_max_time_response = max(max_time_response, item[MODEL_TYPE_BALANCE_TRACKING][0] - average_ping_time)
b_min_time_response = min(min_time_response, item[MODEL_TYPE_BALANCE_TRACKING][0] - average_ping_time)
f_max_time_response , b_max_time_response = 0, 0
f_min_time_response, b_min_time_response = 0, 0

max_time_response = f_max_time_response + b_max_time_response
min_time_response = f_min_time_response + b_min_time_response
if MODEL_TYPE_FUNDS_FLOW in item:
f_max_time_response = max(max_time_response, item[MODEL_TYPE_FUNDS_FLOW][0] - average_ping_time)
f_min_time_response = min(min_time_response, item[MODEL_TYPE_FUNDS_FLOW][0] - average_ping_time)
if MODEL_TYPE_BALANCE_TRACKING in item:
b_max_time_response = max(max_time_response, item[MODEL_TYPE_BALANCE_TRACKING][0] - average_ping_time)
b_min_time_response = min(min_time_response, item[MODEL_TYPE_BALANCE_TRACKING][0] - average_ping_time)
if MODEL_TYPE_BALANCE_TRACKING in item and MODEL_TYPE_FUNDS_FLOW in item:
max_time_response = f_max_time_response + b_max_time_response
min_time_response = f_min_time_response + b_min_time_response

if max_time_response == min_time_response:
max_time_response += 0.1
Expand All @@ -349,8 +367,8 @@ async def forward(self):

responses_to_benchmark = [(response, uid) for response, uid in zip(responses, uids) if self.is_response_valid(response)]
benchmarks_result = self.benchmark_validator.run_benchmarks(responses_to_benchmark)

self.update_scorer_config(benchmarks_result, responses)

self.block_height_cache = {network: self.nodes[network].get_current_block_height() for network in self.networks}

rewards = [
Expand Down
42 changes: 26 additions & 16 deletions template/base/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,29 +143,39 @@ def run(self):
# Check that validator is registered on the network.
self.sync()

logger.info(f"Validator starting", block = self.block)
previous_block = self.block
logger.info(f"Validator starting")

# This loop maintains the validator's operations until intentionally stopped.
try:
while True:
if self.block - previous_block >= 10:
logger.info('step', step=self.step, block=self.block)

try:
start_block = self.subtensor.get_current_block()
logger.info('running forward loop', start_block=start_block)
self.loop.run_until_complete(self.concurrent_forward())
self.sync()

if self.should_exit:
break

end_block = self.subtensor.get_current_block()

block_elapsed = end_block - start_block
logger.info('running forward loop', block_elapsed = block_elapsed)

blocks_to_wait = 50
if block_elapsed < blocks_to_wait:
sleep_time = bt.__blocktime__ * (blocks_to_wait - block_elapsed)
logger.warning(f"Block elapsed is less than {blocks_to_wait} blocks, going to sleep", block_elapsed=block_elapsed,
sleep_time=sleep_time)
time.sleep(sleep_time)

try:
self.sync()
except Exception as e:
logger.warning(f"Error during sync", error = {'exception_type': e.__class__.__name__,'exception_message': str(e),'exception_args': e.args})
if self.should_exit:
break

previous_block = self.block
if self.should_exit:
break
self.step += 1
time.sleep(bt.__blocktime__)
self.step += 1

# If someone intentionally stops the validator, it'll safely terminate operations.
except Exception as e:
logger.warning(f"Error in validator loop", error={'exception_type': e.__class__.__name__,'exception_message': str(e),'exception_args': e.args})
time.sleep(bt.__blocktime__ * 10)
except KeyboardInterrupt:
self.axon.stop()
logger.success("Validator killed by keyboard interrupt.")
Expand Down

0 comments on commit bd382f5

Please sign in to comment.