Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Promo suffix #977

Merged
merged 3 commits into from
Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 32 additions & 36 deletions bittensor/_axon/axon_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,33 @@
import grpc
import wandb
import pandas
import uuid
from loguru import logger
import torch.nn.functional as F
import concurrent

from prometheus_client import Counter, Histogram, Enum, CollectorRegistry

import bittensor
import bittensor.utils.stats as stat_utils
from datetime import datetime

logger = logger.opt(colors=True)

from prometheus_client import Counter, Histogram, Enum, CollectorRegistry
PROM_axon_is_started = Enum('axon_is_started', 'is_started', states=['stopped', 'started'])
PROM_total_forward = Counter('axon_total_forward', 'total_forward', ['wallet', 'identifier'])
PROM_total_backward = Counter('axon_total_backward', 'total_backward', ['wallet', 'identifier'])
PROM_forward_latency = Histogram('axon_forward_latency', 'forward_latency', ['wallet', 'identifier'], buckets=list(range(0,bittensor.__blocktime__,1)))
PROM_backward_latency = Histogram('axon_backward_latency', 'backward_latency', ['wallet', 'identifier'], buckets=list(range(0,bittensor.__blocktime__,1)))
PROM_forward_synapses = Counter('axon_forward_synapses', 'forward_synapses', ['wallet', 'identifier', "synapse"])
PROM_backward_synapses = Counter('axon_backward_synapses', 'backward_synapses', ['wallet', 'identifier', "synapse"])
PROM_forward_codes = Counter('axon_forward_codes', 'forward_codes', ['wallet', 'identifier', "code"])
PROM_backward_codes = Counter('axon_backward_codes', 'backward_codes', ['wallet', 'identifier', "code"])
PROM_forward_hotkeys = Counter('axon_forward_hotkeys', 'forward_hotkeys', ['wallet', 'identifier', "hotkey"])
PROM_backward_hotkeys = Counter('axon_backward_hotkeys', 'backward_hotkeys', ['wallet', 'identifier', "hotkey"])
PROM_forward_bytes = Counter('axon_forward_bytes', 'forward_bytes', ['wallet', 'identifier', "hotkey"])
PROM_backward_bytes = Counter('axon_backward_bytes', 'backward_bytes', ['wallet', 'identifier', "hotkey"])

class Axon( bittensor.grpc.BittensorServicer ):
r""" Services Forward and Backward requests from other neurons.
"""
Expand Down Expand Up @@ -103,27 +118,8 @@ def __init__(

# -- Priority
self.priority = priority
self.priority_threadpool= priority_threadpool

# == Prometheus
# We are running over various suffix values in the event that there are multiple axons in the same process.
# The first axon is created with a null suffix and subsequent values are ordered like so: axon_is_started, axon_is_started_1, axon_is_started_2 etc...

if self.prometheus_level != bittensor.prometheus.level.OFF.name:
registry = CollectorRegistry()
self.is_started = Enum('axon_is_started', 'is_started', states=['stopped', 'started'], registry=registry)
self.total_forward = Counter('axon_total_forward', 'total_forward', registry=registry)
self.total_backward = Counter('axon_total_backward', 'total_backward', registry=registry)
self.forward_latency = Histogram('axon_forward_latency', 'forward_latency', buckets=list(range(0,bittensor.__blocktime__,1)), registry=registry)
self.backward_latency = Histogram('axon_backward_latency', 'backward_latency', buckets=list(range(0,bittensor.__blocktime__,1)), registry=registry)
self.forward_synapses = Counter('axon_forward_synapses', 'forward_synapses', ["synapse"], registry=registry)
self.backward_synapses = Counter('axon_backward_synapses', 'backward_synapses', ["synapse"], registry=registry)
self.forward_codes = Counter('axon_forward_codes', 'forward_codes', ["code"], registry=registry)
self.backward_codes = Counter('axon_backward_codes', 'backward_codes', ["code"], registry=registry)
self.forward_hotkeys = Counter('axon_forward_hotkeys', 'forward_hotkeys', ["hotkey"], registry=registry)
self.backward_hotkeys = Counter('axon_backward_hotkeys', 'backward_hotkeys', ["hotkey"], registry=registry)
self.forward_bytes = Counter('axon_forward_bytes', 'forward_bytes', ["hotkey"], registry=registry)
self.backward_bytes = Counter('axon_backward_bytes', 'backward_bytes', ["hotkey"], registry=registry)
self.priority_threadpool = priority_threadpool
self._prometheus_uuid = uuid.uuid1()

def __str__(self) -> str:
return "Axon({}, {}, {}, {})".format( self.ip, self.port, self.wallet.hotkey.ss58_address, "started" if self.started else "stopped")
Expand Down Expand Up @@ -239,17 +235,17 @@ def check_if_should_return() -> bool:
def finalize_codes_stats_and_logs( message = None):
# === Prometheus
if self.prometheus_level != bittensor.prometheus.level.OFF.name:
self.total_forward.inc()
self.forward_latency.observe( clock.time() - start_time )
PROM_total_forward.labels( wallet = self.wallet.hotkey.ss58_address, identifier = self._prometheus_uuid ).inc()
PROM_forward_latency.labels( wallet = self.wallet.hotkey.ss58_address, identifier = self._prometheus_uuid ).observe( clock.time() - start_time )
if self.prometheus_level == bittensor.prometheus.level.DEBUG.name:
self.forward_hotkeys.labels( request.hotkey ).inc()
self.forward_bytes.labels( request.hotkey ).inc( sys.getsizeof( request ) )
PROM_forward_hotkeys.labels( wallet = self.wallet.hotkey.ss58_address, identifier = self._prometheus_uuid, hotkey = request.hotkey ).inc()
PROM_forward_bytes.labels( wallet = self.wallet.hotkey.ss58_address, identifier = self._prometheus_uuid, hotkey = request.hotkey ).inc( sys.getsizeof( request ) )

for index, synapse in enumerate( synapses ):
# === Prometheus
if self.prometheus_level != bittensor.prometheus.level.OFF.name:
self.forward_synapses.labels( str(synapse) ).inc()
self.forward_codes.labels( str(synapse_codes[ index ]) ).inc()
PROM_forward_synapses.labels( wallet = self.wallet.hotkey.ss58_address, identifier = self._prometheus_uuid, synapse = str(synapse) ).inc()
PROM_forward_codes.labels( wallet = self.wallet.hotkey.ss58_address, identifier = self._prometheus_uuid, code = str(synapse_codes[ index ]) ).inc()

# === Logging
request.synapses [ index ].return_code = synapse_codes[ index ] # Set synapse wire proto codes.
Expand Down Expand Up @@ -471,17 +467,17 @@ def check_if_should_return() -> bool:
def finalize_codes_stats_and_logs():
# === Prometheus
if self.prometheus_level != bittensor.prometheus.level.OFF.name:
self.total_backward.inc()
self.backward_latency.observe( clock.time() - start_time )
PROM_total_backward.labels( wallet = self.wallet.hotkey.ss58_address, identifier = self._prometheus_uuid ).inc()
PROM_backward_latency.labels( wallet = self.wallet.hotkey.ss58_address, identifier = self._prometheus_uuid ).observe( clock.time() - start_time )
if self.prometheus_level == bittensor.prometheus.level.DEBUG.name:
self.backward_hotkeys.labels( request.hotkey ).inc()
self.backward_bytes.labels( request.hotkey ).inc( sys.getsizeof( request ) )
PROM_backward_hotkeys.labels( wallet = self.wallet.hotkey.ss58_address, identifier = self._prometheus_uuid, hotkey = request.hotkey ).inc()
PROM_backward_bytes.labels( wallet = self.wallet.hotkey.ss58_address, identifier = self._prometheus_uuid, hotkey = request.hotkey ).inc( sys.getsizeof( request ) )

for index, synapse in enumerate( synapses ):
# === Prometheus
if self.prometheus_level != bittensor.prometheus.level.OFF.name:
self.backward_synapses.labels( str(synapse) ).inc()
self.backward_codes.labels( str(synapse_codes[ index ]) ).inc()
PROM_backward_synapses.labels( wallet = self.wallet.hotkey.ss58_address, identifier = self._prometheus_uuid, synapse = str(synapse) ).inc()
PROM_backward_codes.labels( wallet = self.wallet.hotkey.ss58_address, identifier = self._prometheus_uuid, code = str(synapse_codes[ index ]) ).inc()

# === Logging
request.synapses [ index ].return_code = synapse_codes[ index ] # Set synapse wire proto codes.
Expand Down Expand Up @@ -818,7 +814,7 @@ def start(self) -> 'Axon':

# Switch prometheus ENUM.
if self.prometheus_level != bittensor.prometheus.level.OFF.name:
self.is_started.state('started')
PROM_axon_is_started.state('started')

return self

Expand All @@ -832,7 +828,7 @@ def stop(self) -> 'Axon':

# Switch prometheus ENUM.
if self.prometheus_level != bittensor.prometheus.level.OFF.name:
self.is_started.state('stopped')
PROM_axon_is_started.state('stopped')

return self

Expand Down
51 changes: 24 additions & 27 deletions bittensor/_dendrite/dendrite_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import pandas
import random
import time
import uuid

from torch.autograd.function import once_differentiable
from loguru import logger
Expand All @@ -40,13 +41,19 @@

import wandb

from prometheus_client import Summary, Counter, Histogram, CollectorRegistry

logger = logger.opt(colors=True)

# dummy tensor that triggers autograd
DUMMY = torch.empty(0, requires_grad=True)

# Global prometheus
from prometheus_client import Summary, Counter, Histogram, CollectorRegistry
PROM_prometheus_counters = Counter('dendrite_counters', 'dendrite_counters', ['wallet', 'identifier', 'name'])
PROM_prometheus_latency = Histogram('dendrite_latency', 'dendrite_latency', ['wallet', 'identifier'], buckets=list(range(0,bittensor.__blocktime__,1)))
PROM_prometheus_latency_per_uid = Summary('dendrite_latency_per_uid', 'dendrite_latency_per_uid', ['wallet', 'identifier', 'uid'])
PROM_prometheus_successes_per_uid = Counter('dendrite_successes_per_uid', 'dendrite_successes_per_uid', ['wallet', 'identifier', 'uid'])
PROM_prometheus_failures_per_uid = Counter('dendrite_failures_per_uid', 'dendrite_failures_per_uid', ['wallet', 'identifier', 'uid'])

class Dendrite(torch.autograd.Function):
r""" This is the implementation class for a bittensor.dendrite(). The dendrite class operates as a normal torch autograd friendly operation
Expand All @@ -57,7 +64,7 @@ class Dendrite(torch.autograd.Function):
Args:
config (:obj:`bittensor.Config`, `optional`, defaults to bittensor.dendrite.config()):
config namespace object created by calling bittensor.dendrite.config()
wallet (:obj:`bittensor.Wallet`, `optional`, defaults to bittensor.wallet( name = 'default', hotkey = 'default')):
wallet (:obj:`bittensor.Wallet`, `optional`, defaults to bittensor.wallet( name = 'default', wallet ='default')):
A bittensor wallet object containing a pair of cryptographic keys, the hot and coldkey, used for signing messages
on the wire.
receptor_pool (:obj:`bittensor.ReceptorPool`, `optional`, defaults to bittensor.receptor_pool()):
Expand All @@ -84,17 +91,7 @@ def __init__(
# ---- Dendrite stats
# num of time we have sent request to a peer, received successful respond, and the respond time
self.stats = self._init_stats()

# == Prometheus
# We are running over various suffix values in the event that there are multiple dendrites in the same process.
# The first dendrite is created with a null suffix. Values are ordered like so: dendrite_counters, dendrite_counters_1, dendrite_counters_2 etc...
if self.config.dendrite.prometheus.level != bittensor.prometheus.level.OFF.name:
registry = CollectorRegistry()
self.prometheus_counters = Counter('dendrite_counters', 'dendrite_counters', ['name'], registry=registry)
self.prometheus_latency = Histogram('dendrite_latency', 'dendrite_latency', buckets=list(range(0,bittensor.__blocktime__,1)), registry=registry)
self.prometheus_latency_per_uid = Summary('dendrite_latency_per_uid', 'dendrite_latency_per_uid', ['uid'], registry=registry)
self.prometheus_successes_per_uid = Counter('dendrite_successes_per_uid', 'dendrite_successes_per_uid', ['uid'], registry=registry)
self.prometheus_failures_per_uid = Counter('dendrite_failures_per_uid', 'dendrite_failures_per_uid', ['uid'], registry=registry)
self._prometheus_uuid = uuid.uuid1()

def __str__(self):
return "Dendrite({}, {})".format(self.wallet.hotkey.ss58_address, self.receptor_pool)
Expand Down Expand Up @@ -313,40 +310,40 @@ def _forward(
outputs: List[torch.Tensor] = forward_response[2:]
packed_outputs: List[ List[torch.Tensor] ] = [ outputs[ s : s + len(synapses) ] for s in range (0, len(outputs), len( synapses )) ]

# === Prometheus counters.
# === Prometheus counters.
if self.config.dendrite.prometheus.level != bittensor.prometheus.level.OFF.name:
self.prometheus_counters.labels( 'total_requests' ).inc()
self.prometheus_counters.labels( 'total_endpoint_requests' ).inc( len(endpoints) )
self.prometheus_counters.labels( 'total_request_bytes' ).inc( sum(p.element_size() * p.nelement() for p in inputs) )
self.prometheus_counters.labels( 'total_request_params' ).inc( sum(p.numel() for p in inputs) )
PROM_prometheus_counters.labels( wallet = self.wallet.hotkey.ss58_address, identifier = self._prometheus_uuid, name = 'total_requests' ).inc()
PROM_prometheus_counters.labels( wallet = self.wallet.hotkey.ss58_address, identifier = self._prometheus_uuid, name = 'total_endpoint_requests' ).inc( len(endpoints) )
PROM_prometheus_counters.labels( wallet = self.wallet.hotkey.ss58_address, identifier = self._prometheus_uuid, name = 'total_request_bytes' ).inc( sum(p.element_size() * p.nelement() for p in inputs) )
PROM_prometheus_counters.labels( wallet = self.wallet.hotkey.ss58_address, identifier = self._prometheus_uuid, name = 'total_request_params' ).inc( sum(p.numel() for p in inputs) )

# Capture synapses.
for synapse in enumerate( synapses ):
self.prometheus_counters.labels( str(synapse) ).inc()
PROM_prometheus_counters.labels( wallet = self.wallet.hotkey.ss58_address, identifier = self._prometheus_uuid, name = str(synapse) ).inc()

for i in range(len(endpoints)):
n_success = (codes[i] == 1).sum().item()
is_success = (n_success > 0) # One is a success.
response_time = times[i].mean().item()

# Capture outputs.
self.prometheus_counters.labels( 'total_response_bytes' ).inc( sum(p.element_size() * p.nelement() for p in outputs[i]) )
self.prometheus_counters.labels( 'total_response_params' ).inc( sum(p.numel() for p in outputs[i]) )
PROM_prometheus_counters.labels( wallet = self.wallet.hotkey.ss58_address, identifier = self._prometheus_uuid, name = 'total_response_bytes' ).inc( sum(p.element_size() * p.nelement() for p in outputs[i]) )
PROM_prometheus_counters.labels( wallet = self.wallet.hotkey.ss58_address, identifier = self._prometheus_uuid, name = 'total_response_params' ).inc( sum(p.numel() for p in outputs[i]) )

# Capture global success rates.
if is_success:
self.prometheus_counters.labels( 'total_success' ).inc()
self.prometheus_latency.observe( response_time )
PROM_prometheus_counters.labels( wallet = self.wallet.hotkey.ss58_address, identifier = self._prometheus_uuid, name = 'total_success' ).inc()
PROM_prometheus_latency.labels( wallet = self.wallet.hotkey.ss58_address, identifier = self._prometheus_uuid).observe( response_time )
else:
self.prometheus_counters.labels( 'total_failure' ).inc()
PROM_prometheus_counters.labels( wallet = self.wallet.hotkey.ss58_address, identifier = self._prometheus_uuid, name = 'total_failure' ).inc()

# === Prometheus DEBUG (per uid info.)
if self.config.dendrite.prometheus.level == bittensor.prometheus.level.DEBUG.name:
if is_success:
self.prometheus_latency_per_uid.labels(str(endpoints[i].uid)).observe( response_time )
self.prometheus_successes_per_uid.labels(str(endpoints[i].uid)).inc()
PROM_prometheus_latency_per_uid.labels( wallet = self.wallet.hotkey.ss58_address, identifier = self._prometheus_uuid, uid = str(endpoints[i].uid) ).observe( response_time )
PROM_prometheus_successes_per_uid.labels( wallet = self.wallet.hotkey.ss58_address, identifier = self._prometheus_uuid, uid = str(endpoints[i].uid) ).inc()
else:
self.prometheus_failures_per_uid.labels(str(endpoints[i].uid)).inc()
PROM_prometheus_failures_per_uid.labels( wallet = self.wallet.hotkey.ss58_address, identifier = self._prometheus_uuid, uid = str(endpoints[i].uid) ).inc()

return packed_outputs, packed_codes, packed_times

Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/test_dendrite.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ def forward_casual_lm_next(inputs_x, synapse, model_output=None):
axon.attach_synapse_callback( forward_hidden_state, synapse_type = bittensor.proto.Synapse.SynapseType.TEXT_LAST_HIDDEN_STATE )
axon.attach_synapse_callback( forward_generate, synapse_type = bittensor.proto.Synapse.SynapseType.TEXT_SEQ_2_SEQ )
axon.attach_synapse_callback( forward_casual_lm, synapse_type = bittensor.proto.Synapse.SynapseType.TEXT_CAUSAL_LM )
axon.attach_synapse_callback(forward_casual_lm_next, synapse_type=bittensor.proto.Synapse.SynapseType.TEXT_CAUSAL_LM_NEXT)
axon.attach_synapse_callback( forward_casual_lm_next, synapse_type=bittensor.proto.Synapse.SynapseType.TEXT_CAUSAL_LM_NEXT)
axon.start()

endpoint = bittensor.endpoint(
Expand Down