From ba343109984a99fe8ff568ec720c3aa0effd7239 Mon Sep 17 00:00:00 2001 From: Hannah Marsh Date: Mon, 24 Jun 2024 17:52:29 -0400 Subject: [PATCH] Update code to handle new node management and fix some bugs. This commit fixes a few bugs and updates the code to handle new node management. It fixes the logic for determining the number of messages a client should send and updates the logic for handling onions in the client. It also includes some logging to help with debugging. The `StartRuns` function now correctly signals nodes to start and returns an error if an error occurs. The `formOnions` function now correctly determines the routing path for each message and logs the routing path. The `startRun` function now correctly sends onions to the first node and logs the message if an error occurs. The `Receive` function now correctly removes the outermost layer of an onion and logs an error if the onion is invalid. The `HandleStartRun` function now logs the result of starting a run and the `start` function now correctly sends onions to all nodes. --- cmd/node/main.go | 2 +- internal/api/startRun.go | 1 + internal/bulletin_board/bulletin_board.go | 2 + .../bulletin_board/bulletin_board_handler.go | 5 ++ internal/client/client.go | 54 +++++++++++++++++-- internal/client/clientHandler.go | 2 +- pkg/utils/stream.go | 10 ++++ 7 files changed, 69 insertions(+), 7 deletions(-) diff --git a/cmd/node/main.go b/cmd/node/main.go index 1aee32e..fc9a519 100755 --- a/cmd/node/main.go +++ b/cmd/node/main.go @@ -84,7 +84,7 @@ func main() { } } - http.HandleFunc("/receiveOnion", newNode.HandleReceiveOnion) + http.HandleFunc("/receive", newNode.HandleReceiveOnion) http.HandleFunc("/start", newNode.HandleStartRun) go func() { diff --git a/internal/api/startRun.go b/internal/api/startRun.go index db41b63..c425b1b 100644 --- a/internal/api/startRun.go +++ b/internal/api/startRun.go @@ -3,4 +3,5 @@ package api type StartRunApi struct { ParticipatingClients []PublicNodeApi ActiveNodes []PublicNodeApi + NumMessagesPerClient int } diff --git a/internal/bulletin_board/bulletin_board.go b/internal/bulletin_board/bulletin_board.go index 734423e..6b30439 100644 --- a/internal/bulletin_board/bulletin_board.go +++ b/internal/bulletin_board/bulletin_board.go @@ -107,6 +107,8 @@ func (bb *BulletinBoard) StartRuns() error { if bb.allNodesReady() { if err := bb.signalNodesToStart(); err != nil { return PrettyLogger.WrapError(err, "error signaling nodes to start") + } else { + return nil } } } diff --git a/internal/bulletin_board/bulletin_board_handler.go b/internal/bulletin_board/bulletin_board_handler.go index 939d0df..6fdae32 100644 --- a/internal/bulletin_board/bulletin_board_handler.go +++ b/internal/bulletin_board/bulletin_board_handler.go @@ -124,9 +124,14 @@ func (bb *BulletinBoard) signalNodesToStart() error { } }) + numMessages := utils.Max(utils.MapEntries(bb.Clients, func(_ int, client *ClientView) int { + return len(client.MessageQueue) + })) + 2 + vs := api.StartRunApi{ ParticipatingClients: activeClients, ActiveNodes: activeNodes, + NumMessagesPerClient: numMessages, } if data, err := json.Marshal(vs); err != nil { diff --git a/internal/client/client.go b/internal/client/client.go index 9efaa37..5b4be00 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -88,6 +88,7 @@ func (c *Client) RegisterWithBulletinBoard() error { func (c *Client) StartGeneratingMessages(client_addresses []string) { slog.Info("Client starting to generate messages", "id", c.ID) + var msgNum int = 0 for { select { case <-config.GlobalCtx.Done(): @@ -100,8 +101,9 @@ func (c *Client) StartGeneratingMessages(client_addresses []string) { messages = append(messages, api.Message{ From: c.Adddress, To: addr, - Msg: fmt.Sprintf("msg from client(id=%d)", c.ID), + Msg: fmt.Sprintf("Msg#%d from client(id=%d)", msgNum, c.ID), }) + msgNum++ } } var wg sync.WaitGroup @@ -120,7 +122,7 @@ func (c *Client) StartGeneratingMessages(client_addresses []string) { }() wg.Wait() } - time.Sleep(15 * time.Second) + time.Sleep(5 * time.Second) } } @@ -146,7 +148,31 @@ func (c *Client) formOnions(start api.StartRunApi) (map[string][]api.OnionApi, e onions := make(map[string][]api.OnionApi) - slog.Info("formOnions", "id", c.ID, "num_messages", len(c.Messages), "num_participants", len(start.ParticipatingClients), "num_active_nodes", len(start.ActiveNodes)) + nodes := utils.Filter(start.ActiveNodes, func(node api.PublicNodeApi) bool { + return node.Address != c.Adddress && node.Address != "" + }) + + numMessagesToSend := make(map[string]int) + + for _, msg := range c.Messages { + if _, found := numMessagesToSend[msg.To]; !found { + numMessagesToSend[msg.To] = 0 + } + numMessagesToSend[msg.To]++ + } + + for addr, numMessages := range numMessagesToSend { + if numMessages < start.NumMessagesPerClient { + numDummyNeeded := start.NumMessagesPerClient - numMessages + for i := 0; i < numDummyNeeded; i++ { + c.Messages = append(c.Messages, api.Message{ + From: c.Adddress, + To: addr, + Msg: "dummy", + }) + } + } + } for _, msg := range c.Messages { if destination, found := utils.Find(start.ParticipatingClients, api.PublicNodeApi{}, func(client api.PublicNodeApi) bool { @@ -157,7 +183,7 @@ func (c *Client) formOnions(start api.StartRunApi) (map[string][]api.OnionApi, e if msgString, err := json.Marshal(msg); err != nil { return nil, pl.WrapError(err, "failed to marshal message") - } else if routingPath, err2 := DetermineRoutingPath(3, start.ActiveNodes); err2 != nil { + } else if routingPath, err2 := DetermineRoutingPath(3, nodes); err2 != nil { return nil, pl.WrapError(err2, "failed to determine routing path") } else { routingPath = append(routingPath, destination) @@ -167,6 +193,7 @@ func (c *Client) formOnions(start api.StartRunApi) (map[string][]api.OnionApi, e addresses := utils.Map(routingPath, func(node api.PublicNodeApi) string { return node.Address }) + slog.Info("routing path", "path", addresses) if addr, onion, err3 := pi_t.FormOnion(msgString, publicKeys, addresses); err3 != nil { return nil, pl.WrapError(err3, "failed to create onion") } else { @@ -212,7 +239,7 @@ func (c *Client) startRun(start api.StartRunApi) (bool, error) { } else { for addr, onions := range toSend { for _, onion := range onions { - url := fmt.Sprintf("%s/receiveOnion", addr) + url := fmt.Sprintf("%s/receive", addr) if data, err2 := json.Marshal(onion); err2 != nil { slog.Error("failed to marshal msgs", err2) @@ -226,11 +253,14 @@ func (c *Client) startRun(start api.StartRunApi) (bool, error) { }(resp.Body) if resp.StatusCode != http.StatusOK { return true, pl.NewError("%s: Failed to send to first node(url=%s), status code: %d, status: %s", pl.GetFuncName(), url, resp.StatusCode, resp.Status) + } else { + slog.Info("Client sent onion to first mixer", "mixer_address", addr) } } } } } + c.Messages = make([]api.Message, 0) return true, nil } @@ -246,6 +276,20 @@ func (c *Client) startRun(start api.StartRunApi) (bool, error) { //} func (c *Client) Receive(o string) error { + if destination, payload, err := pi_t.PeelOnion(o, c.PrivateKey); err != nil { + return pl.WrapError(err, "node.Receive(): failed to remove layer") + } else { + if destination == "" { + var msg api.Message + if err2 := json.Unmarshal([]byte(payload), &msg); err2 != nil { + return pl.WrapError(err2, "node.Receive(): failed to unmarshal message") + } + slog.Info("Received message", "from", msg.From, "to", msg.To, "msg", msg.Msg) + + } else { + return pl.NewError("Received onion", "destination", destination) + } + } return nil } diff --git a/internal/client/clientHandler.go b/internal/client/clientHandler.go index 8802404..9a8066c 100644 --- a/internal/client/clientHandler.go +++ b/internal/client/clientHandler.go @@ -38,7 +38,7 @@ func (c *Client) HandleStartRun(w http.ResponseWriter, r *http.Request) { if didParticipate, err := c.startRun(start); err != nil { slog.Error("Error starting run", err) } else { - slog.Info("Run complete", "did_participate", didParticipate) + slog.Info("Done sending onions", "did_participate", didParticipate) } }() w.WriteHeader(http.StatusOK) diff --git a/pkg/utils/stream.go b/pkg/utils/stream.go index b3f4110..4864956 100644 --- a/pkg/utils/stream.go +++ b/pkg/utils/stream.go @@ -193,6 +193,16 @@ func Sum(values []int) int { return sum } +func Max(values []int) int { + m := values[0] + for _, v := range values { + if v > m { + m = v + } + } + return m +} + func GetValues[K comparable, V any](m map[K]V) []V { values := make([]V, 0, len(m)) for _, v := range m {