diff --git a/cmd/client/main.go b/cmd/client/main.go index a952c0c..6065f89 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -24,8 +24,8 @@ func main() { // Define command-line flags id_ := flag.Int("id", -1, "ID of the newClient (required)") ip_ := flag.String("host", "x", "IP address of the client") - port_ := flag.Int("port", 8080, "Port of the client") - promPort_ := flag.Int("promPort", 8200, "Port of the client's Prometheus metrics") + port_ := flag.Int("port", 0, "Port of the client") + promPort_ := flag.Int("promPort", 0, "Port of the client's Prometheus metrics") logLevel_ := flag.String("log-level", "debug", "Log level") flag.Usage = func() { @@ -45,6 +45,24 @@ func main() { pl.SetUpLogrusAndSlog(logLevel) + if port == 0 { + var err error + port, err = utils.GetAvailablePort() + if err != nil { + slog.Error("failed to get available port", err) + os.Exit(1) + } + } + + if promPort == 0 { + var err error + promPort, err = utils.GetAvailablePort() + if err != nil { + slog.Error("failed to get available port", err) + os.Exit(1) + } + } + // Check if the required flag is provided if id == -1 { _, _ = fmt.Fprintf(os.Stderr, "Error: the -id flag is required\n") @@ -108,6 +126,7 @@ func main() { } }) + slog.Info("🌏 serving prometheus metrics..", "address", fmt.Sprintf("http://%s:%d", ip, port)) // Serve Prometheus metrics in a separate goroutine. shutdownMetrics := metrics.ServeMetrics(promPort, metrics.MSG_SENT, metrics.MSG_RECEIVED, metrics.ONION_SIZE) diff --git a/cmd/relay/main.go b/cmd/relay/main.go index 094b41e..9fc76cd 100755 --- a/cmd/relay/main.go +++ b/cmd/relay/main.go @@ -24,8 +24,8 @@ func main() { // Define command-line flags id_ := flag.Int("id", -1, "ID of the newClient (required)") ip_ := flag.String("host", "x", "IP address of the relay") - port_ := flag.Int("port", 8080, "Port of the client") - promPort_ := flag.Int("promPort", 8200, "Port of the relay's Prometheus metrics") + port_ := flag.Int("port", 0, "Port of the client") + promPort_ := flag.Int("promPort", 0, "Port of the relay's Prometheus metrics") logLevel_ := flag.String("log-level", "debug", "Log level") flag.Usage = func() { @@ -52,6 +52,24 @@ func main() { os.Exit(2) } + if port == 0 { + var err error + port, err = utils.GetAvailablePort() + if err != nil { + slog.Error("failed to get available port", err) + os.Exit(1) + } + } + + if promPort == 0 { + var err error + promPort, err = utils.GetAvailablePort() + if err != nil { + slog.Error("failed to get available port", err) + os.Exit(1) + } + } + if ip == "x" { IP, err := utils.GetPublicIP() if err != nil { diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 7b6a8fd..5c7bd90 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -98,6 +98,11 @@ func Set(id string, value float64, labels ...string) { } func ServeMetrics(prometheusPort int, collectorIds ...string) (shutdown func()) { + + // Register the default process and Go metrics collectors + //prometheus.MustRegister(cols.NewProcessCollector(cols.ProcessCollectorOpts{})) + //prometheus.MustRegister(cols.NewGoCollector()) + // Register the histogram with Prometheus for _, id := range collectorIds { if collector, ok := collectors[id]; ok { diff --git a/internal/model/bulletin_board/metrics/prometheus.go b/internal/model/bulletin_board/metrics/prometheus.go index 7542aa5..2018ccd 100644 --- a/internal/model/bulletin_board/metrics/prometheus.go +++ b/internal/model/bulletin_board/metrics/prometheus.go @@ -12,6 +12,7 @@ import ( "os/exec" "path/filepath" "runtime" + "strings" "sync" ) @@ -37,6 +38,7 @@ type StaticConfig struct { type PromConfig struct { Global Global `yaml:"global"` ScrapeConfigs []ScrapeConfig `yaml:"scrape_configs"` + RuleFile []string `yaml:"rule_files"` } var PID int @@ -71,6 +73,8 @@ func RestartPrometheus(relays, clients []structs.PublicNodeApi) error { } } + rules := strings.Replace(path, "prometheus.yml", "rules.yml", 1) + promCfg_ := PromConfig{ Global: Global{ ScrapeInterval: fmt.Sprintf("%ds", config.GetScrapeInterval()), @@ -79,6 +83,7 @@ func RestartPrometheus(relays, clients []structs.PublicNodeApi) error { }, }, ScrapeConfigs: []ScrapeConfig{}, + RuleFile: []string{rules}, } for _, client := range clients { diff --git a/internal/model/bulletin_board/metrics/prometheus.yml b/internal/model/bulletin_board/metrics/prometheus.yml index e9d36a5..d40e9e3 100644 --- a/internal/model/bulletin_board/metrics/prometheus.yml +++ b/internal/model/bulletin_board/metrics/prometheus.yml @@ -3,63 +3,65 @@ global: external_labels: monitor: pi_t scrape_configs: - - job_name: client-3 + - job_name: client-6 scrape_interval: 5s static_configs: - targets: - - localhost:9103 - - job_name: client-4 + - localhost:61464 + - job_name: client-1 scrape_interval: 5s static_configs: - targets: - - localhost:9104 - - job_name: client-5 + - localhost:61436 + - job_name: client-2 scrape_interval: 5s static_configs: - targets: - - localhost:9105 - - job_name: client-6 + - localhost:61441 + - job_name: client-3 scrape_interval: 5s static_configs: - targets: - - localhost:9106 - - job_name: client-1 + - localhost:61446 + - job_name: client-4 scrape_interval: 5s static_configs: - targets: - - localhost:9101 - - job_name: client-2 + - localhost:61454 + - job_name: client-5 scrape_interval: 5s static_configs: - targets: - - localhost:9102 - - job_name: relay-4 + - localhost:61459 + - job_name: relay-6 scrape_interval: 5s static_configs: - targets: - - localhost:9204 - - job_name: relay-5 + - localhost:61494 + - job_name: relay-1 scrape_interval: 5s static_configs: - targets: - - localhost:9205 - - job_name: relay-6 + - localhost:61469 + - job_name: relay-2 scrape_interval: 5s static_configs: - targets: - - localhost:9206 - - job_name: relay-1 + - localhost:61474 + - job_name: relay-3 scrape_interval: 5s static_configs: - targets: - - localhost:9201 - - job_name: relay-2 + - localhost:61479 + - job_name: relay-4 scrape_interval: 5s static_configs: - targets: - - localhost:9202 - - job_name: relay-3 + - localhost:61484 + - job_name: relay-5 scrape_interval: 5s static_configs: - targets: - - localhost:9203 + - localhost:61489 +rule_files: + - /Users/hanma/Documents/GitHub/pi_t-experiment/internal/model/bulletin_board/metrics/rules.yml diff --git a/internal/model/bulletin_board/metrics/rules.yml b/internal/model/bulletin_board/metrics/rules.yml new file mode 100644 index 0000000..e33ebfe --- /dev/null +++ b/internal/model/bulletin_board/metrics/rules.yml @@ -0,0 +1,5 @@ +groups: + - name: latency_rules + rules: + - record: message_latency_seconds + expr: (messageReceivedTimestamp - on(hash) messageSentTimestamp) diff --git a/internal/model/client/client.go b/internal/model/client/client.go index 56e574e..56cc1ff 100644 --- a/internal/model/client/client.go +++ b/internal/model/client/client.go @@ -22,19 +22,19 @@ import ( // Client represents a user in the network. type Client struct { - ID int // Unique identifier for the client. - Host string // Host address of the client. - Port int // Port number on which the client listens. - Address string // Full address of the client in the form http://host:port. - PrivateKey string // Client's long term private key for decryption. - PublicKey string // Client's long term public key for encryption. - PrometheusPort int // Port number for Prometheus metrics. - ActiveRelays []structs.PublicNodeApi // List of active relays known to the client. - OtherClients []structs.PublicNodeApi // List of other client known to the client. - Messages []structs.Message // Messages to be sent by the client. - BulletinBoardUrl string // URL of the bulletin board for client registration and communication. - status *structs.ClientStatus // Client status, including sent and received messages. - wg sync.WaitGroup // WaitGroup to ensure the client does not start protocol until all messages are generated + ID int // Unique identifier for the client. + Host string // Host address of the client. + Port int // Port number on which the client listens. + Address string // Full address of the client in the form http://host:port. + PrivateKey string // Client's long term private key for decryption. + PublicKey string // Client's long term public key for encryption. + PrometheusPort int // Port number for Prometheus metrics. + ActiveRelays []structs.PublicNodeApi // List of active relays known to the client. + OtherClients []structs.PublicNodeApi // List of other client known to the client. + //Messages []structs.Message // Messages to be sent by the client. + BulletinBoardUrl string // URL of the bulletin board for client registration and communication. + status *structs.ClientStatus // Client status, including sent and received messages. + wg sync.WaitGroup // WaitGroup to ensure the client does not start protocol until all messages are generated mu sync.RWMutex } @@ -54,9 +54,9 @@ func NewClient(id int, host string, port int, promPort int, bulletinBoardUrl str PrometheusPort: promPort, ActiveRelays: make([]structs.PublicNodeApi, 0), BulletinBoardUrl: bulletinBoardUrl, - Messages: make([]structs.Message, 0), - OtherClients: make([]structs.PublicNodeApi, 0), - status: structs.NewClientStatus(id, port, promPort, fmt.Sprintf("http://%s:%d", host, port), host, publicKey), + //Messages: make([]structs.Message, 0), + OtherClients: make([]structs.PublicNodeApi, 0), + status: structs.NewClientStatus(id, port, promPort, fmt.Sprintf("http://%s:%d", host, port), host, publicKey), } c.wg.Add(1) @@ -128,7 +128,7 @@ func (c *Client) getRecipient(clients []structs.PublicNodeApi) (string, int) { } // StartGeneratingMessages generates a single message to be sent to another client. -func (c *Client) generateMessages(start structs.ClientStartRunApi) { +func (c *Client) generateMessages(start structs.ClientStartRunApi) []structs.Message { defer c.wg.Done() // Mark this operation as done in the WaitGroup when finished. slog.Info("Client starting to generate messages", "id", c.ID) @@ -140,15 +140,17 @@ func (c *Client) generateMessages(start structs.ClientStartRunApi) { structs.NewMessage(c.Address, recipientAddress, fmt.Sprintf("Msg from client(id=%d)", c.ID)), } + return messages + // Register the intent to send the message with the bulletin board. //if err := c.RegisterIntentToSend(messages); err != nil { // slog.Error(pl.GetFuncName()+": Error registering intent to send", err) //} else { // slog.Info(fmt.Sprintf("Client %d sending to client %d", c.ID, recipientId)) - c.mu.Lock() - defer c.mu.Unlock() - c.Messages = messages // Store the messages to be sent. - //} + //c.mu.Lock() + //defer c.mu.Unlock() + //c.Messages = messages // Store the messages to be sent. + ////} } // DetermineRoutingPath determines a random routing path of mixers and gatekeepers. @@ -175,14 +177,15 @@ func (c *Client) formOnions(start structs.ClientStartRunApi) ([]queuedOnion, err var wg sync.WaitGroup // WaitGroup to manage concurrent onion formation. // Iterate over the client's messages to form onions for each one. - for i := range c.Messages { + messages := c.generateMessages(start) + for i := range messages { if destination := utils.Find(start.Clients, func(relay structs.PublicNodeApi) bool { - return relay.Address == c.Messages[i].To + return relay.Address == messages[i].To }); destination != nil { wg.Add(1) go func(destination structs.PublicNodeApi) { defer wg.Done() - if o, err := c.processMessage(c.Messages[i], destination, start.Relays); err != nil { + if o, err := c.processMessage(messages[i], destination, start.Relays); err != nil { slog.Error("failed to process message", err) } else { mu.Lock() @@ -335,8 +338,6 @@ func (c *Client) startRun(start structs.ClientStartRunApi) error { config.UpdateConfig(start.Config) // Update the global configuration based on the start signal. - c.generateMessages(start) - // Ensure that there are relays and client participating in the run. if len(start.Relays) == 0 { return pl.NewError("%s: no participating relays", pl.GetFuncName()) @@ -376,7 +377,7 @@ func (c *Client) startRun(start structs.ClientStartRunApi) error { wg.Wait() // Wait for all onions to be sent. - c.Messages = make([]structs.Message, 0) // Clear the client's messages after sending. + //c.Messages = make([]structs.Message, 0) // Clear the client's messages after sending. return nil } } diff --git a/pkg/utils/port.go b/pkg/utils/port.go new file mode 100644 index 0000000..ac5a636 --- /dev/null +++ b/pkg/utils/port.go @@ -0,0 +1,24 @@ +package utils + +import ( + "log/slog" + "net" +) + +// GetAvailablePort finds an available port and returns it +func GetAvailablePort() (int, error) { + // Create a TCP listener on port 0 to have the OS assign a free port + listener, err := net.Listen("tcp", "localhost:0") + if err != nil { + return 0, err + } + defer func(listener net.Listener) { + if err := listener.Close(); err != nil { + slog.Error("", err) + } + }(listener) + + // Get the actual port that was assigned by the OS + addr := listener.Addr().(*net.TCPAddr) + return addr.Port, nil +}