Skip to content

Commit

Permalink
Merge pull request #100 from codex-storage/metrics
Browse files Browse the repository at this point in the history
rename and add more dht metrics
  • Loading branch information
cskiraly authored Oct 10, 2024
2 parents 6e180af + 4ccaaee commit ff5391a
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 37 deletions.
16 changes: 8 additions & 8 deletions codexdht/private/eth/p2p/discoveryv5/encoding.nim
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ from stew/objects import checkedEnumAssign

export crypto

declareCounter discovery_session_lru_cache_hits, "Session LRU cache hits"
declareCounter discovery_session_lru_cache_misses, "Session LRU cache misses"
declareCounter discovery_session_decrypt_failures, "Session decrypt failures"
declareCounter dht_session_lru_cache_hits, "Session LRU cache hits"
declareCounter dht_session_lru_cache_misses, "Session LRU cache misses"
declareCounter dht_session_decrypt_failures, "Session decrypt failures"

logScope:
topics = "discv5"
Expand Down Expand Up @@ -234,7 +234,7 @@ proc encodeMessagePacket*(rng: var HmacDrbgContext, c: var Codec,
if c.sessions.load(toId, toAddr, recipientKey1, recipientKey2, initiatorKey):
haskey = true
messageEncrypted = encryptGCM(initiatorKey, nonce, message, @iv & header)
discovery_session_lru_cache_hits.inc()
dht_session_lru_cache_hits.inc()
else:
# We might not have the node's keys if the handshake hasn't been performed
# yet. That's fine, we send a random-packet and we will be responded with
Expand All @@ -247,7 +247,7 @@ proc encodeMessagePacket*(rng: var HmacDrbgContext, c: var Codec,
var randomData: array[gcmTagSize + 4, byte]
hmacDrbgGenerate(rng, randomData)
messageEncrypted.add(randomData)
discovery_session_lru_cache_misses.inc()
dht_session_lru_cache_misses.inc()

let maskedHeader = encryptHeader(toId, iv, header)

Expand Down Expand Up @@ -431,11 +431,11 @@ proc decodeMessagePacket(c: var Codec, fromAddr: Address, nonce: AESGCMNonce,
# Don't consider this an error, simply haven't done a handshake yet or
# the session got removed.
trace "Decrypting failed (no keys)"
discovery_session_lru_cache_misses.inc()
dht_session_lru_cache_misses.inc()
return ok(Packet(flag: Flag.OrdinaryMessage, requestNonce: nonce,
srcId: srcId))

discovery_session_lru_cache_hits.inc()
dht_session_lru_cache_hits.inc()

var pt = decryptGCM(recipientKey2, nonce, ct, @iv & @header)
if pt.isNone():
Expand All @@ -448,7 +448,7 @@ proc decodeMessagePacket(c: var Codec, fromAddr: Address, nonce: AESGCMNonce,
# needed later, depending on message order.
trace "Decrypting failed (invalid keys)", address = fromAddr
#c.sessions.del(srcId, fromAddr)
discovery_session_decrypt_failures.inc()
dht_session_decrypt_failures.inc()
return ok(Packet(flag: Flag.OrdinaryMessage, requestNonce: nonce,
srcId: srcId))

Expand Down
48 changes: 24 additions & 24 deletions codexdht/private/eth/p2p/discoveryv5/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,13 @@ import nimcrypto except toHex

export options, results, node, spr, providers

declareCounter discovery_message_requests_outgoing,
declareCounter dht_message_requests_outgoing,
"Discovery protocol outgoing message requests", labels = ["response"]
declareCounter discovery_message_requests_incoming,
declareCounter dht_message_requests_incoming,
"Discovery protocol incoming message requests", labels = ["response"]
declareCounter discovery_unsolicited_messages,
declareCounter dht_unsolicited_messages,
"Discovery protocol unsolicited or timed-out messages"
declareCounter discovery_enr_auto_update,
declareCounter dht_enr_auto_update,
"Amount of discovery IP:port address SPR auto updates"

logScope:
Expand Down Expand Up @@ -407,35 +407,35 @@ proc handleMessage(d: Protocol, srcId: NodeId, fromAddr: Address,
message: Message) =
case message.kind
of ping:
discovery_message_requests_incoming.inc()
dht_message_requests_incoming.inc()
d.handlePing(srcId, fromAddr, message.ping, message.reqId)
of findNode:
discovery_message_requests_incoming.inc()
dht_message_requests_incoming.inc()
d.handleFindNode(srcId, fromAddr, message.findNode, message.reqId)
of findNodeFast:
discovery_message_requests_incoming.inc()
dht_message_requests_incoming.inc()
d.handleFindNodeFast(srcId, fromAddr, message.findNodeFast, message.reqId)
of talkReq:
discovery_message_requests_incoming.inc()
dht_message_requests_incoming.inc()
d.handleTalkReq(srcId, fromAddr, message.talkReq, message.reqId)
of addProvider:
discovery_message_requests_incoming.inc()
discovery_message_requests_incoming.inc(labelValues = ["no_response"])
dht_message_requests_incoming.inc()
dht_message_requests_incoming.inc(labelValues = ["no_response"])
d.handleAddProvider(srcId, fromAddr, message.addProvider, message.reqId)
of getProviders:
discovery_message_requests_incoming.inc()
dht_message_requests_incoming.inc()
asyncSpawn d.handleGetProviders(srcId, fromAddr, message.getProviders, message.reqId)
of regTopic, topicQuery:
discovery_message_requests_incoming.inc()
discovery_message_requests_incoming.inc(labelValues = ["no_response"])
dht_message_requests_incoming.inc()
dht_message_requests_incoming.inc(labelValues = ["no_response"])
trace "Received unimplemented message kind", kind = message.kind,
origin = fromAddr
else:
var waiter: Future[Option[Message]]
if d.awaitedMessages.take((srcId, message.reqId), waiter):
waiter.complete(some(message))
else:
discovery_unsolicited_messages.inc()
dht_unsolicited_messages.inc()
trace "Timed out or unrequested message", kind = message.kind,
origin = fromAddr

Expand Down Expand Up @@ -464,7 +464,7 @@ proc sendRequest*[T: SomeMessage](d: Protocol, toNode: Node, m: T,

trace "Send message packet", dstId = toNode.id,
address = toNode.address, kind = messageKind(T)
discovery_message_requests_outgoing.inc()
dht_message_requests_outgoing.inc()

d.transport.sendMessage(toNode, message)

Expand Down Expand Up @@ -513,10 +513,10 @@ proc waitNodes(d: Protocol, fromNode: Node, reqId: RequestId):
break
return ok(res)
else:
discovery_message_requests_outgoing.inc(labelValues = ["invalid_response"])
dht_message_requests_outgoing.inc(labelValues = ["invalid_response"])
return err("Invalid response to find node message")
else:
discovery_message_requests_outgoing.inc(labelValues = ["no_response"])
dht_message_requests_outgoing.inc(labelValues = ["no_response"])
return err("Nodes message not received in time")

proc ping*(d: Protocol, toNode: Node):
Expand All @@ -534,11 +534,11 @@ proc ping*(d: Protocol, toNode: Node):
return ok(resp.get().pong)
else:
d.replaceNode(toNode)
discovery_message_requests_outgoing.inc(labelValues = ["invalid_response"])
dht_message_requests_outgoing.inc(labelValues = ["invalid_response"])
return err("Invalid response to ping message")
else:
d.replaceNode(toNode)
discovery_message_requests_outgoing.inc(labelValues = ["no_response"])
dht_message_requests_outgoing.inc(labelValues = ["no_response"])
return err("Pong message not received in time")

proc findNode*(d: Protocol, toNode: Node, distances: seq[uint16]):
Expand Down Expand Up @@ -594,11 +594,11 @@ proc talkReq*(d: Protocol, toNode: Node, protocol, request: seq[byte]):
return ok(resp.get().talkResp.response)
else:
d.replaceNode(toNode)
discovery_message_requests_outgoing.inc(labelValues = ["invalid_response"])
dht_message_requests_outgoing.inc(labelValues = ["invalid_response"])
return err("Invalid response to talk request message")
else:
d.replaceNode(toNode)
discovery_message_requests_outgoing.inc(labelValues = ["no_response"])
dht_message_requests_outgoing.inc(labelValues = ["no_response"])
return err("Talk response message not received in time")

proc lookupDistances*(target, dest: NodeId): seq[uint16] =
Expand Down Expand Up @@ -723,12 +723,12 @@ proc sendGetProviders(d: Protocol, toNode: Node,
else:
# TODO: do we need to do something when there is an invalid response?
d.replaceNode(toNode)
discovery_message_requests_outgoing.inc(labelValues = ["invalid_response"])
dht_message_requests_outgoing.inc(labelValues = ["invalid_response"])
return err("Invalid response to GetProviders message")
else:
# TODO: do we need to do something when there is no response?
d.replaceNode(toNode)
discovery_message_requests_outgoing.inc(labelValues = ["no_response"])
dht_message_requests_outgoing.inc(labelValues = ["no_response"])
return err("GetProviders response message not received in time")

proc getProvidersLocal*(
Expand Down Expand Up @@ -997,7 +997,7 @@ proc ipMajorityLoop(d: Protocol) {.async.} =
warn "Failed updating SPR with newly discovered external address",
majority, previous, error = res.error
else:
discovery_enr_auto_update.inc()
dht_enr_auto_update.inc()
info "Updated SPR with newly discovered external address",
majority, previous, uri = toURI(d.localNode.record)
else:
Expand Down
14 changes: 9 additions & 5 deletions codexdht/private/eth/p2p/discoveryv5/routing_table.nim
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ import

export options

declarePublicGauge routing_table_nodes,
declarePublicGauge dht_routing_table_nodes,
"Discovery routing table nodes", labels = ["state"]
declarePublicGauge dht_routing_table_buckets,
"Discovery routing table: number of buckets"

logScope:
topics = "discv5 routingtable"
Expand Down Expand Up @@ -208,14 +210,14 @@ proc ipLimitDec(r: var RoutingTable, b: KBucket, n: Node) =

proc add(k: KBucket, n: Node) =
k.nodes.add(n)
routing_table_nodes.inc()
dht_routing_table_nodes.inc()

proc remove(k: KBucket, n: Node): bool =
let i = k.nodes.find(n)
if i != -1:
routing_table_nodes.dec()
dht_routing_table_nodes.dec()
if k.nodes[i].seen:
routing_table_nodes.dec(labelValues = ["seen"])
dht_routing_table_nodes.dec(labelValues = ["seen"])
k.nodes.delete(i)
trace "removed node:", node = n
true
Expand Down Expand Up @@ -286,6 +288,7 @@ proc init*(T: type RoutingTable, localNode: Node, bitsPerHop = DefaultBitsPerHop
distanceCalculator = XorDistanceCalculator): T =
## Initialize the routing table for provided `Node` and bitsPerHop value.
## `bitsPerHop` is default set to 5 as recommended by original Kademlia paper.
dht_routing_table_buckets.inc()
RoutingTable(
localNode: localNode,
buckets: @[KBucket.new(0.u256, high(UInt256), ipLimits.bucketIpLimit)],
Expand All @@ -299,6 +302,7 @@ proc splitBucket(r: var RoutingTable, index: int) =
let (a, b) = bucket.split()
r.buckets[index] = a
r.buckets.insert(b, index + 1)
dht_routing_table_buckets.inc()

proc bucketForNode(r: RoutingTable, id: NodeId): KBucket =
result = binaryGetBucketForNode(r.buckets, id)
Expand Down Expand Up @@ -527,7 +531,7 @@ proc setJustSeen*(r: RoutingTable, n: Node) =

if not n.seen:
b.nodes[0].seen = true
routing_table_nodes.inc(labelValues = ["seen"])
dht_routing_table_nodes.inc(labelValues = ["seen"])

proc nodeToRevalidate*(r: RoutingTable): Node =
## Return a node to revalidate. The least recently seen node from a random
Expand Down
18 changes: 18 additions & 0 deletions codexdht/private/eth/p2p/discoveryv5/transport.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import
bearssl/rand,
chronos,
chronicles,
metrics,
libp2p/crypto/crypto,
stew/shims/net,
"."/[node, encoding, sessions]
Expand All @@ -23,6 +24,15 @@ const
logScope:
topics = "discv5 transport"

declarePublicCounter dht_transport_tx_packets,
"Discovery transport packets sent", labels = ["state"]
declarePublicCounter dht_transport_tx_bytes,
"Discovery transport bytes sent", labels = ["state"]
declarePublicCounter dht_transport_rx_packets,
"Discovery transport packets received", labels = ["state"]
declarePublicCounter dht_transport_rx_bytes,
"Discovery transport bytes received", labels = ["state"]

type
Transport* [Client] = ref object
client: Client
Expand Down Expand Up @@ -56,7 +66,11 @@ proc sendToA(t: Transport, a: Address, msg: seq[byte]) =
# nodes. Else the revalidation might end up clearing the routing tabl
# because of ping failures due to own network connection failure.
warn "Discovery send failed", msg = f.readError.msg
dht_transport_tx_packets.inc(labelValues = ["failed"])
dht_transport_tx_bytes.inc(msg.len.int64, labelValues = ["failed"])
)
dht_transport_tx_packets.inc()
dht_transport_tx_bytes.inc(msg.len.int64)

proc send(t: Transport, n: Node, data: seq[byte]) =
doAssert(n.address.isSome())
Expand Down Expand Up @@ -144,6 +158,8 @@ proc sendPending(t:Transport, toNode: Node):
t.pendingRequestsByNode.del(toNode.id)

proc receive*(t: Transport, a: Address, packet: openArray[byte]) =
dht_transport_rx_packets.inc()
dht_transport_rx_bytes.inc(packet.len.int64)
let decoded = t.codec.decodePacket(a, packet)
if decoded.isOk:
let packet = decoded[]
Expand Down Expand Up @@ -215,6 +231,8 @@ proc receive*(t: Transport, a: Address, packet: openArray[byte]) =
else:
trace "address mismatch, not adding seen flag", node, address = a, nodeAddress = node.address.get()
else:
dht_transport_rx_packets.inc(labelValues = ["failed_decode"])
dht_transport_rx_bytes.inc(packet.len.int64, labelValues = ["failed_decode"])
trace "Packet decoding error", myport = t.bindAddress.port, error = decoded.error, address = a

proc processClient[T](transp: DatagramTransport, raddr: TransportAddress):
Expand Down

0 comments on commit ff5391a

Please sign in to comment.