Skip to content
This repository was archived by the owner on Sep 1, 2024. It is now read-only.

Commit

Permalink
resolve conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
renesweet24 committed Jun 3, 2024
2 parents 53395a4 + a1f0a94 commit 3a3da50
Show file tree
Hide file tree
Showing 3 changed files with 284 additions and 24 deletions.
163 changes: 155 additions & 8 deletions insights/api/insight_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,138 @@ class APIServer:

failed_prompt_msg = "Please try again. Can't receive any responses from the miners or due to the poor network connection."

def __init__(self, config, wallet, subtensor, metagraph):
self.app = FastAPI(title="Validator API",
description="API for the Validator service",
version=insights.__version__)
def set_weights(self):
"""
Sets the validator weights to the metagraph hotkeys based on the scores it has received from the miners. The weights determine the trust and incentive level the validator assigns to miner nodes on the network.
"""
try:
# Check if self.scores contains any NaN values and log a warning if it does.
if np.isnan(self.scores).any():
logger.warning(
f"Scores contain NaN values. This may be due to a lack of responses from miners, or a bug in your reward functions."
)

# Calculate the average reward for each uid across non-zero values.
# Replace any NaN values with 0.
raw_weights = np.linalg.norm(self.scores, p=1, dim=0)

# Process the raw weights to final_weights via subtensor limitations.
(
processed_weight_uids,
processed_weights,
) = bt.utils.weight_utils.process_weights_for_netuid(
uids=self.metagraph.uids,
weights=raw_weights,
netuid=self.config.netuid,
subtensor=self.subtensor,
metagraph=self.metagraph,
)

# Convert to uint16 weights and uids.
(
uint_uids,
uint_weights,
) = bt.utils.weight_utils.convert_weights_and_uids_for_emit(
uids=processed_weight_uids, weights=processed_weights
)
table = Table(title="All Weights")
table.add_column("uid", justify="right", style="cyan", no_wrap=True)
table.add_column("weight", style="magenta")
table.add_column("score", style="magenta")
uids_and_weights = list(
zip(uint_uids, uint_weights)
)
# Sort by weights descending.
sorted_uids_and_weights = sorted(
uids_and_weights, key=lambda x: x[1], reverse=True
)
for uid, weight in sorted_uids_and_weights:
table.add_row(
str(uid),
str(round(weight, 4)),
str(int(self.scores[uid].item())),
)
console = Console()
console.print(table)

# Set the weights on chain via our subtensor connection.
self.subtensor.set_weights(
wallet=self.wallet,
netuid=self.config.netuid,
uids=processed_weight_uids,
weights=processed_weights,
wait_for_finalization=False,
wait_for_inclusion=False,
version_key=self.spec_version
)

with self.lock:
self.last_weights_set_block = self.block

logger.success("Finished setting weights.")
except Exception as e:
logger.error(
f"Failed to set weights on chain with exception: { e }"
)
def is_response_status_code_valid(self, response):
status_code = response.axon.status_code
status_message = response.axon.status_message
if response.is_failure:
logger.info(f"Discovery response: Failure, miner {response.axon.hotkey} returned {status_code=}: {status_message=}")
elif response.is_blacklist:
logger.info(f"Discovery response: Blacklist, miner {response.axon.hotkey} returned {status_code=}: {status_message=}")
elif response.is_timeout:
logger.info(f"Discovery response: Timeout, miner {response.axon.hotkey}")
return status_code == 200

def get_reward(self, response: Union["bt.Synapse", Any], uid: int):
return 0.5

def update_scores(self, rewards: np.float32, uids: List[int]):
"""Performs exponential moving average on the scores based on the rewards received from the miners."""

# Check if rewards contains NaN values.
if np.isnan(rewards).any():
logger.warning(f"NaN values detected in rewards: {rewards}")
# Replace any NaN values in rewards with 0.
rewards = np.nan_to_num(rewards, 0)

# Check if `uids` is already a tensor and clone it to avoid the warning.
if isinstance(uids, np.array):
uids_tensor = uids.clone().detach()
else:
uids_tensor = np.array(uids)

# Compute forward pass rewards, assumes uids are mutually exclusive.
# shape: [ metagraph.n ]
scattered_rewards: np.float32 = self.scores.scatter(
0, uids_tensor, rewards
)
logger.debug(f"Scattered rewards: {rewards}")

# Update scores with rewards produced by this step.
# shape: [ metagraph.n ]
alpha: float = self.config.user_query_moving_average_alpha
self.scores: np.float32 = alpha * scattered_rewards + (
1 - alpha
) * self.scores
logger.debug(f"Updated moving avg scores: {self.scores}")

def __init__(
self,
config: None,
wallet: None,
subtensor: None,
metagraph: None,
scores: None,
):
"""
API can be invoked while running a validator.
Receive config, wallet, subtensor, metagraph from the validator and share the score of miners with the validator.
subtensor and metagraph of APIs will change as the ones of validators change.
"""
self.app = FastAPI(title="validator-api",
description="The goal of validator-api is to set up how to message between Chat API and validators.")

self.app.add_middleware(
CORSMiddleware,
Expand Down Expand Up @@ -115,10 +243,29 @@ async def get_response(request: Request, query: ChatMessageRequest = Body(..., e
responded_uids = np.setdiff1d(np.array(top_miner_uids), blacklist_uids)

# Add score to miners respond to user query
# uids = responded_uids.tolist()
# TODO: we store the responded UIDs to progres here and that data will be take later in scoring function
# !! Score should go to miners hotkey not its uid !! uid can change but hotkey is unique

uids = responded_uids.tolist()
rewards = [
self.get_reward(response, uid) for response, uid in zip(responses, uids)
]
# Remove None reward as they represent timeout cross validation
filtered_data = [(reward, uid) for reward, uid in zip(rewards, uids) if reward is not None]

if filtered_data:
rewards, uids = zip(*filtered_data)

rewards = np.float32(rewards)
self.update_scores(rewards, uids)
else:
logger.info('Skipping update_scores() as no responses were valid')

# If the number of excluded_uids is bigger than top x percentage of the whole axons, format it.
if len(self.excluded_uids) > int(self.metagraph.n * self.config.top_rate):
logger.info(f"Excluded UID list is too long")
self.excluded_uids = []
logger.info(f"Excluded_uids are {self.excluded_uids}")

logger.info(f"Responses are {responses}")

selected_index = responses.index(random.choice(responses))
response_object = ChatMessageResponse(
miner_hotkey=self.metagraph.axons[responded_uids[selected_index]].hotkey,
Expand Down
31 changes: 15 additions & 16 deletions neurons/validators/utils/uids.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,9 @@ def check_uid_availability(
# Available otherwise.
return True


async def get_top_miner_uids(metagraph: "bt.metagraph.Metagraph",
wallet: "bt.wallet.Wallet",
top_rate: float = 1,
vpermit_tao_limit: int = 4096) -> np.int64:

def get_top_miner_uids(
metagraph: "bt.metagraph.Metagraph", top_rate: float = 1, exclude: List[int] = None, vpermit_tao_limit: int = 4096
) -> np.int64:
"""Returns the available top miner UID from the metagraph.
Args:
metagraph (bt.metagraph.Metagraph): Metagraph object
Expand Down Expand Up @@ -80,16 +77,18 @@ async def get_top_miner_uids(metagraph: "bt.metagraph.Metagraph",
# Consider only incentive
# values = [(uid, metagraph.I[uid]) for uid in candidate_uids]

sorted_values = sorted(values, key=lambda x: x[1], reverse=True)
top_rate_num_items = max(1, int(top_rate * len(miner_ip_filtered_uids)))
top_miner_uids = np.array([uid for uid, _ in sorted_values[:top_rate_num_items]])

return top_miner_uids
except Exception as e:
logger.error(message=f"Failed to get top miner uids: {e}")
return None
finally:
dendrite.close_session()
ips = []
filtered_uids = []
for uid, _ in sorted_values:
if metagraph.axons[uid].ip not in ips:
ips.append(metagraph.axons[uid].ip)
filtered_uids.append(uid)

values = [(uid, metagraph.I[uid] * metagraph.trust[uid]) for uid in filtered_uids]
sorted_values = sorted(values, key=lambda x: x[1], reverse=True)
top_rate_num_items = max(1, int(top_rate * len(filtered_uids)))
top_miner_uids = np.array([uid for uid, _ in sorted_values[:top_rate_num_items]])
return top_miner_uids


def get_random_uids(
Expand Down
114 changes: 114 additions & 0 deletions tests/test_template_validator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# The MIT License (MIT)
# Copyright © 2023 Yuma Rao
# Copyright © 2023 Opentensor Foundation

# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the “Software”), to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

# The above copyright notice and this permission notice shall be included in all copies or substantial portions of
# the Software.

# THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
# THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.

import sys
import numpy as np
import unittest
import bittensor as bt

from neurons.validator import Neuron as Validator
from neurons.miner import Neuron as Miner

from template.protocol import Dummy
from template.validator.forward import forward
from template.utils.uids import get_random_uids
from template.validator.reward import get_rewards
from template.base.validator import BaseValidatorNeuron


class TemplateValidatorNeuronTestCase(unittest.TestCase):
"""
This class contains unit tests for the RewardEvent classes.
The tests cover different scenarios where completions may or may not be successful and the reward events are checked that they don't contain missing values.
The `reward` attribute of all RewardEvents is expected to be a float, and the `is_filter_model` attribute is expected to be a boolean.
"""

def setUp(self):
sys.argv = sys.argv[0] + ["--config", "tests/configs/validator.json"]

config = BaseValidatorNeuron.config()
config.wallet._mock = True
config.metagraph._mock = True
config.subtensor._mock = True
self.neuron = Validator(config)
self.miner_uids = get_random_uids(self, k=10)

def test_run_single_step(self):
# TODO: Test a single step
pass

def test_sync_error_if_not_registered(self):
# TODO: Test that the validator throws an error if it is not registered on metagraph
pass

def test_forward(self):
# TODO: Test that the forward function returns the correct value
pass

def test_dummy_responses(self):
# TODO: Test that the dummy responses are correctly constructed

responses = self.neuron.dendrite.query(
# Send the query to miners in the network.
axons=[
self.neuron.metagraph.axons[uid] for uid in self.miner_uids
],
# Construct a dummy query.
synapse=Dummy(dummy_input=self.neuron.step),
# All responses have the deserialize function called on them before returning.
deserialize=True,
)

for i, response in enumerate(responses):
self.assertEqual(response, self.neuron.step * 2)

def test_reward(self):
# TODO: Test that the reward function returns the correct value
responses = self.dendrite.query(
# Send the query to miners in the network.
axons=[self.metagraph.axons[uid] for uid in self.miner_uids],
# Construct a dummy query.
synapse=Dummy(dummy_input=self.neuron.step),
# All responses have the deserialize function called on them before returning.
deserialize=True,
)

rewards = get_rewards(self.neuron, responses)
expected_rewards = np.float32([1.0] * len(responses))
self.assertEqual(rewards, expected_rewards)

def test_reward_with_nan(self):
# TODO: Test that NaN rewards are correctly sanitized
# TODO: Test that a bt.logging.warning is thrown when a NaN reward is sanitized
responses = self.dendrite.query(
# Send the query to miners in the network.
axons=[self.metagraph.axons[uid] for uid in self.miner_uids],
# Construct a dummy query.
synapse=Dummy(dummy_input=self.neuron.step),
# All responses have the deserialize function called on them before returning.
deserialize=True,
)

rewards = get_rewards(self.neuron, responses)
expected_rewards = rewards.clone()
# Add NaN values to rewards
rewards[0] = float("nan")

with self.assertLogs(bt.logging, level="WARNING") as cm:
self.neuron.update_scores(rewards, self.miner_uids)

0 comments on commit 3a3da50

Please sign in to comment.