Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

8 prometheus metrics #9

Merged
merged 2 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ func main() {
// Define command-line flags
id_ := flag.Int("id", -1, "ID of the newClient (required)")
ip_ := flag.String("host", "x", "IP address of the client")
port_ := flag.Int("port", 8080, "Port of the client")
promPort_ := flag.Int("promPort", 8200, "Port of the client's Prometheus metrics")
port_ := flag.Int("port", 0, "Port of the client")
promPort_ := flag.Int("promPort", 0, "Port of the client's Prometheus metrics")
logLevel_ := flag.String("log-level", "debug", "Log level")

flag.Usage = func() {
Expand All @@ -45,6 +45,24 @@ func main() {

pl.SetUpLogrusAndSlog(logLevel)

if port == 0 {
var err error
port, err = utils.GetAvailablePort()
if err != nil {
slog.Error("failed to get available port", err)
os.Exit(1)
}
}

if promPort == 0 {
var err error
promPort, err = utils.GetAvailablePort()
if err != nil {
slog.Error("failed to get available port", err)
os.Exit(1)
}
}

// Check if the required flag is provided
if id == -1 {
_, _ = fmt.Fprintf(os.Stderr, "Error: the -id flag is required\n")
Expand Down Expand Up @@ -108,6 +126,7 @@ func main() {
}
})

slog.Info("🌏 serving prometheus metrics..", "address", fmt.Sprintf("http://%s:%d", ip, port))
// Serve Prometheus metrics in a separate goroutine.
shutdownMetrics := metrics.ServeMetrics(promPort, metrics.MSG_SENT, metrics.MSG_RECEIVED, metrics.ONION_SIZE)

Expand Down
22 changes: 20 additions & 2 deletions cmd/relay/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ func main() {
// Define command-line flags
id_ := flag.Int("id", -1, "ID of the newClient (required)")
ip_ := flag.String("host", "x", "IP address of the relay")
port_ := flag.Int("port", 8080, "Port of the client")
promPort_ := flag.Int("promPort", 8200, "Port of the relay's Prometheus metrics")
port_ := flag.Int("port", 0, "Port of the client")
promPort_ := flag.Int("promPort", 0, "Port of the relay's Prometheus metrics")
logLevel_ := flag.String("log-level", "debug", "Log level")

flag.Usage = func() {
Expand All @@ -52,6 +52,24 @@ func main() {
os.Exit(2)
}

if port == 0 {
var err error
port, err = utils.GetAvailablePort()
if err != nil {
slog.Error("failed to get available port", err)
os.Exit(1)
}
}

if promPort == 0 {
var err error
promPort, err = utils.GetAvailablePort()
if err != nil {
slog.Error("failed to get available port", err)
os.Exit(1)
}
}

if ip == "x" {
IP, err := utils.GetPublicIP()
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ func Set(id string, value float64, labels ...string) {
}

func ServeMetrics(prometheusPort int, collectorIds ...string) (shutdown func()) {

// Register the default process and Go metrics collectors
//prometheus.MustRegister(cols.NewProcessCollector(cols.ProcessCollectorOpts{}))
//prometheus.MustRegister(cols.NewGoCollector())

// Register the histogram with Prometheus
for _, id := range collectorIds {
if collector, ok := collectors[id]; ok {
Expand Down
5 changes: 5 additions & 0 deletions internal/model/bulletin_board/metrics/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"os/exec"
"path/filepath"
"runtime"
"strings"
"sync"
)

Expand All @@ -37,6 +38,7 @@ type StaticConfig struct {
type PromConfig struct {
Global Global `yaml:"global"`
ScrapeConfigs []ScrapeConfig `yaml:"scrape_configs"`
RuleFile []string `yaml:"rule_files"`
}

var PID int
Expand Down Expand Up @@ -71,6 +73,8 @@ func RestartPrometheus(relays, clients []structs.PublicNodeApi) error {
}
}

rules := strings.Replace(path, "prometheus.yml", "rules.yml", 1)

promCfg_ := PromConfig{
Global: Global{
ScrapeInterval: fmt.Sprintf("%ds", config.GetScrapeInterval()),
Expand All @@ -79,6 +83,7 @@ func RestartPrometheus(relays, clients []structs.PublicNodeApi) error {
},
},
ScrapeConfigs: []ScrapeConfig{},
RuleFile: []string{rules},
}

for _, client := range clients {
Expand Down
50 changes: 26 additions & 24 deletions internal/model/bulletin_board/metrics/prometheus.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,63 +3,65 @@ global:
external_labels:
monitor: pi_t
scrape_configs:
- job_name: client-3
- job_name: client-6
scrape_interval: 5s
static_configs:
- targets:
- localhost:9103
- job_name: client-4
- localhost:61464
- job_name: client-1
scrape_interval: 5s
static_configs:
- targets:
- localhost:9104
- job_name: client-5
- localhost:61436
- job_name: client-2
scrape_interval: 5s
static_configs:
- targets:
- localhost:9105
- job_name: client-6
- localhost:61441
- job_name: client-3
scrape_interval: 5s
static_configs:
- targets:
- localhost:9106
- job_name: client-1
- localhost:61446
- job_name: client-4
scrape_interval: 5s
static_configs:
- targets:
- localhost:9101
- job_name: client-2
- localhost:61454
- job_name: client-5
scrape_interval: 5s
static_configs:
- targets:
- localhost:9102
- job_name: relay-4
- localhost:61459
- job_name: relay-6
scrape_interval: 5s
static_configs:
- targets:
- localhost:9204
- job_name: relay-5
- localhost:61494
- job_name: relay-1
scrape_interval: 5s
static_configs:
- targets:
- localhost:9205
- job_name: relay-6
- localhost:61469
- job_name: relay-2
scrape_interval: 5s
static_configs:
- targets:
- localhost:9206
- job_name: relay-1
- localhost:61474
- job_name: relay-3
scrape_interval: 5s
static_configs:
- targets:
- localhost:9201
- job_name: relay-2
- localhost:61479
- job_name: relay-4
scrape_interval: 5s
static_configs:
- targets:
- localhost:9202
- job_name: relay-3
- localhost:61484
- job_name: relay-5
scrape_interval: 5s
static_configs:
- targets:
- localhost:9203
- localhost:61489
rule_files:
- /Users/hanma/Documents/GitHub/pi_t-experiment/internal/model/bulletin_board/metrics/rules.yml
5 changes: 5 additions & 0 deletions internal/model/bulletin_board/metrics/rules.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
groups:
- name: latency_rules
rules:
- record: message_latency_seconds
expr: (messageReceivedTimestamp - on(hash) messageSentTimestamp)
55 changes: 28 additions & 27 deletions internal/model/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,19 @@ import (

// Client represents a user in the network.
type Client struct {
ID int // Unique identifier for the client.
Host string // Host address of the client.
Port int // Port number on which the client listens.
Address string // Full address of the client in the form http://host:port.
PrivateKey string // Client's long term private key for decryption.
PublicKey string // Client's long term public key for encryption.
PrometheusPort int // Port number for Prometheus metrics.
ActiveRelays []structs.PublicNodeApi // List of active relays known to the client.
OtherClients []structs.PublicNodeApi // List of other client known to the client.
Messages []structs.Message // Messages to be sent by the client.
BulletinBoardUrl string // URL of the bulletin board for client registration and communication.
status *structs.ClientStatus // Client status, including sent and received messages.
wg sync.WaitGroup // WaitGroup to ensure the client does not start protocol until all messages are generated
ID int // Unique identifier for the client.
Host string // Host address of the client.
Port int // Port number on which the client listens.
Address string // Full address of the client in the form http://host:port.
PrivateKey string // Client's long term private key for decryption.
PublicKey string // Client's long term public key for encryption.
PrometheusPort int // Port number for Prometheus metrics.
ActiveRelays []structs.PublicNodeApi // List of active relays known to the client.
OtherClients []structs.PublicNodeApi // List of other client known to the client.
//Messages []structs.Message // Messages to be sent by the client.
BulletinBoardUrl string // URL of the bulletin board for client registration and communication.
status *structs.ClientStatus // Client status, including sent and received messages.
wg sync.WaitGroup // WaitGroup to ensure the client does not start protocol until all messages are generated
mu sync.RWMutex
}

Expand All @@ -54,9 +54,9 @@ func NewClient(id int, host string, port int, promPort int, bulletinBoardUrl str
PrometheusPort: promPort,
ActiveRelays: make([]structs.PublicNodeApi, 0),
BulletinBoardUrl: bulletinBoardUrl,
Messages: make([]structs.Message, 0),
OtherClients: make([]structs.PublicNodeApi, 0),
status: structs.NewClientStatus(id, port, promPort, fmt.Sprintf("http://%s:%d", host, port), host, publicKey),
//Messages: make([]structs.Message, 0),
OtherClients: make([]structs.PublicNodeApi, 0),
status: structs.NewClientStatus(id, port, promPort, fmt.Sprintf("http://%s:%d", host, port), host, publicKey),
}
c.wg.Add(1)

Expand Down Expand Up @@ -128,7 +128,7 @@ func (c *Client) getRecipient(clients []structs.PublicNodeApi) (string, int) {
}

// StartGeneratingMessages generates a single message to be sent to another client.
func (c *Client) generateMessages(start structs.ClientStartRunApi) {
func (c *Client) generateMessages(start structs.ClientStartRunApi) []structs.Message {
defer c.wg.Done() // Mark this operation as done in the WaitGroup when finished.
slog.Info("Client starting to generate messages", "id", c.ID)

Expand All @@ -140,15 +140,17 @@ func (c *Client) generateMessages(start structs.ClientStartRunApi) {
structs.NewMessage(c.Address, recipientAddress, fmt.Sprintf("Msg from client(id=%d)", c.ID)),
}

return messages

// Register the intent to send the message with the bulletin board.
//if err := c.RegisterIntentToSend(messages); err != nil {
// slog.Error(pl.GetFuncName()+": Error registering intent to send", err)
//} else {
// slog.Info(fmt.Sprintf("Client %d sending to client %d", c.ID, recipientId))
c.mu.Lock()
defer c.mu.Unlock()
c.Messages = messages // Store the messages to be sent.
//}
//c.mu.Lock()
//defer c.mu.Unlock()
//c.Messages = messages // Store the messages to be sent.
////}
}

// DetermineRoutingPath determines a random routing path of mixers and gatekeepers.
Expand All @@ -175,14 +177,15 @@ func (c *Client) formOnions(start structs.ClientStartRunApi) ([]queuedOnion, err
var wg sync.WaitGroup // WaitGroup to manage concurrent onion formation.

// Iterate over the client's messages to form onions for each one.
for i := range c.Messages {
messages := c.generateMessages(start)
for i := range messages {
if destination := utils.Find(start.Clients, func(relay structs.PublicNodeApi) bool {
return relay.Address == c.Messages[i].To
return relay.Address == messages[i].To
}); destination != nil {
wg.Add(1)
go func(destination structs.PublicNodeApi) {
defer wg.Done()
if o, err := c.processMessage(c.Messages[i], destination, start.Relays); err != nil {
if o, err := c.processMessage(messages[i], destination, start.Relays); err != nil {
slog.Error("failed to process message", err)
} else {
mu.Lock()
Expand Down Expand Up @@ -335,8 +338,6 @@ func (c *Client) startRun(start structs.ClientStartRunApi) error {

config.UpdateConfig(start.Config) // Update the global configuration based on the start signal.

c.generateMessages(start)

// Ensure that there are relays and client participating in the run.
if len(start.Relays) == 0 {
return pl.NewError("%s: no participating relays", pl.GetFuncName())
Expand Down Expand Up @@ -376,7 +377,7 @@ func (c *Client) startRun(start structs.ClientStartRunApi) error {

wg.Wait() // Wait for all onions to be sent.

c.Messages = make([]structs.Message, 0) // Clear the client's messages after sending.
//c.Messages = make([]structs.Message, 0) // Clear the client's messages after sending.
return nil
}
}
Expand Down
24 changes: 24 additions & 0 deletions pkg/utils/port.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package utils

import (
"log/slog"
"net"
)

// GetAvailablePort finds an available port and returns it
func GetAvailablePort() (int, error) {
// Create a TCP listener on port 0 to have the OS assign a free port
listener, err := net.Listen("tcp", "localhost:0")
if err != nil {
return 0, err
}
defer func(listener net.Listener) {
if err := listener.Close(); err != nil {
slog.Error("", err)
}
}(listener)

// Get the actual port that was assigned by the OS
addr := listener.Addr().(*net.TCPAddr)
return addr.Port, nil
}
Loading