Skip to content

Commit

Permalink
Merge pull request #13 from HannahMarsh/12-synchronize-timestamps-and…
Browse files Browse the repository at this point in the history
…-track-throughput-and-arrival-rate

12 synchronize timestamps and track throughput and arrival rate
  • Loading branch information
HannahMarsh authored Oct 23, 2024
2 parents 80e0def + 285f7cc commit e1ce31f
Show file tree
Hide file tree
Showing 22 changed files with 507 additions and 451 deletions.
3 changes: 1 addition & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,4 @@ out/
out/*


internal/model/bulletin_board/metrics/prometheus.yml
internal/model/bulletin_board/metrics/rules.yml
internal/model/bulletin_board/metrics/prometheus.yml
19 changes: 16 additions & 3 deletions cmd/bulletin-board/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
pl "github.com/HannahMarsh/PrettyLogger"
"github.com/HannahMarsh/pi_t-experiment/config"
"github.com/HannahMarsh/pi_t-experiment/internal/model/bulletin_board"
"github.com/HannahMarsh/pi_t-experiment/pkg/utils"
"net/http"
"os"
"os/signal"
Expand All @@ -16,6 +17,9 @@ import (
"log/slog"
)

var stopNTC func()
var bulletinBoard *bulletin_board.BulletinBoard

func main() {
// Define command-line flags
logLevel := flag.String("log-level", "debug", "Log level")
Expand All @@ -38,6 +42,8 @@ func main() {
os.Exit(1)
}

stopNTC = utils.StartNTP()

// Initialize global configurations by loading them from config/config.yml
if err, _ := config.InitGlobal(); err != nil {
slog.Error("failed to init config", err)
Expand All @@ -50,7 +56,7 @@ func main() {
slog.Info("⚡ init Bulletin board", "url", url)

// Create a new instance of the Bulletin Board with the current configuration.
bulletinBoard := bulletin_board.NewBulletinBoard()
bulletinBoard = bulletin_board.NewBulletinBoard()

// Start the Bulletin Board's main operations in a new goroutine
go func() {
Expand Down Expand Up @@ -98,10 +104,17 @@ func main() {
select {
case v := <-quit: // OS signal is received
config.GlobalCancel()
bulletinBoard.Shutdown()
slog.Info("", "signal.Notify", v)
cleanup()
case done := <-config.GlobalCtx.Done(): // global context is canceled
slog.Info("", "ctx.Done", done)
bulletinBoard.Shutdown()
cleanup()
}
}

func cleanup() {
if err := bulletinBoard.Shutdown(); err != nil {
slog.Error("failed to shutdown bulletin board", err)
}
stopNTC()
}
24 changes: 17 additions & 7 deletions cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
pl "github.com/HannahMarsh/PrettyLogger"
"github.com/HannahMarsh/pi_t-experiment/config"
"github.com/HannahMarsh/pi_t-experiment/internal/metrics"
"github.com/HannahMarsh/pi_t-experiment/internal/model/client"
"github.com/HannahMarsh/pi_t-experiment/pkg/utils"
"go.uber.org/automaxprocs/maxprocs"
Expand All @@ -20,6 +19,11 @@ import (
_ "github.com/lib/pq"
)

var stopNTC func()
var newClient *client.Client

//var shutdownMetrics func()

func main() {
// Define command-line flags
id_ := flag.Int("id", -1, "ID of the newClient (required)")
Expand All @@ -45,6 +49,8 @@ func main() {

pl.SetUpLogrusAndSlog(logLevel)

stopNTC = utils.StartNTP()

if port == 0 {
var err error
port, err = utils.GetAvailablePort()
Expand Down Expand Up @@ -94,7 +100,6 @@ func main() {

slog.Info("⚡ init newClient", "id", id)

var newClient *client.Client
// Attempt to create a new client instance, retrying every 5 seconds upon failure (in case the bulletin board isn't ready yet).
for {
if n, err := client.NewClient(id, ip, port, promPort, config.GetBulletinBoardAddress()); err != nil {
Expand All @@ -116,7 +121,6 @@ func main() {
// Set up HTTP handlers
http.HandleFunc("/receive", newClient.HandleReceive)
http.HandleFunc("/start", newClient.HandleStartRun)
http.HandleFunc("/status", newClient.HandleGetStatus)
http.HandleFunc("/shutdown", func(w http.ResponseWriter, r *http.Request) {
slog.Info("Shutdown signal received")
quit <- os.Signal(syscall.SIGTERM) // signal shutdown
Expand All @@ -128,7 +132,7 @@ func main() {

slog.Info("🌏 serving prometheus metrics..", "address", fmt.Sprintf("http://%s:%d", ip, port))
// Serve Prometheus metrics in a separate goroutine.
shutdownMetrics := metrics.ServeMetrics(promPort, metrics.MSG_SENT, metrics.MSG_RECEIVED, metrics.ONION_SIZE)
//shutdownMetrics = metrics.ServeMetrics(promPort, metrics.END_TO_END_LATENCY, metrics.ONION_SIZE, metrics.LATENCY_BETWEEN_HOPS, metrics.PROCESSING_TIME, metrics.ONIONS_RECEIVED, metrics.ONIONS_SENT)

// Start the HTTP server
go func() {
Expand All @@ -145,12 +149,18 @@ func main() {
// Wait for either an OS signal to quit or the global context to be canceled
select {
case v := <-quit: // OS signal is received
config.GlobalCancel()
shutdownMetrics()
slog.Info("", "signal.Notify", v)
config.GlobalCancel()
cleanup()
case done := <-config.GlobalCtx.Done(): // global context is canceled
slog.Info("", "ctx.Done", done)
shutdownMetrics()
cleanup()
}

}

func cleanup() {
//shutdownMetrics()
newClient.ShutdownMetrics()
stopNTC()
}
21 changes: 15 additions & 6 deletions cmd/relay/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
pl "github.com/HannahMarsh/PrettyLogger"
"github.com/HannahMarsh/pi_t-experiment/config"
"github.com/HannahMarsh/pi_t-experiment/internal/metrics"
"github.com/HannahMarsh/pi_t-experiment/internal/model/relay"
"github.com/HannahMarsh/pi_t-experiment/pkg/utils"
"go.uber.org/automaxprocs/maxprocs"
Expand All @@ -20,6 +19,10 @@ import (
_ "github.com/lib/pq"
)

var stopNTC func()
var newRelay *relay.Relay
var shutdownMetrics func()

func main() {
// Define command-line flags
id_ := flag.Int("id", -1, "ID of the newClient (required)")
Expand All @@ -45,6 +48,8 @@ func main() {

pl.SetUpLogrusAndSlog(logLevel)

stopNTC = utils.StartNTP()

// Check if the required flag is provided
if id == -1 {
_, _ = fmt.Fprintf(os.Stderr, "Error: the -id flag is required\n")
Expand Down Expand Up @@ -94,7 +99,6 @@ func main() {

slog.Info("⚡ init newRelay", "id", id)

var newRelay *relay.Relay
// Attempt to create a new relay instance, retrying every 5 seconds upon failure (in case the bulletin board isn't ready yet).
for {
if n, err := relay.NewRelay(id, ip, port, promPort, config.GetBulletinBoardAddress()); err != nil {
Expand All @@ -116,7 +120,6 @@ func main() {
// Set up HTTP handlers
http.HandleFunc("/receive", newRelay.HandleReceiveOnion)
http.HandleFunc("/start", newRelay.HandleStartRun)
http.HandleFunc("/status", newRelay.HandleGetStatus)
http.HandleFunc("/shutdown", func(w http.ResponseWriter, r *http.Request) {
slog.Info("Shutdown signal received")
quit <- os.Signal(syscall.SIGTERM) // signal shutdown
Expand All @@ -127,7 +130,6 @@ func main() {
})

// Serve Prometheus metrics in a separate goroutine.
shutdownMetrics := metrics.ServeMetrics(promPort, metrics.PROCESSING_TIME, metrics.ONION_COUNT, metrics.ONION_SIZE)

// Start the HTTP server
go func() {
Expand All @@ -145,11 +147,18 @@ func main() {
// Wait for either an OS signal to quit or the global context to be canceled
select {
case v := <-quit: // OS signal is received
config.GlobalCancel()
shutdownMetrics()
slog.Info("", "signal.Notify", v)
config.GlobalCancel()
cleanup()
case done := <-config.GlobalCtx.Done(): // global context is canceled
slog.Info("", "ctx.Done", done)
cleanup()
}

}

func cleanup() {
//shutdownMetrics()
newRelay.ShutdownMetrics()
stopNTC()
}
1 change: 0 additions & 1 deletion config/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ x: 25 # Server load (x = Ω(polylog λ)) i.e. the expected number of o
tau: 0.8 # (τ < (1 − γ)(1 − X)) Fraction of checkpoints needed to progress local clock
d: 2 # Threshold for number of bruises before an onion is discarded by a gatekeeper
delta: 1e-5 # The probability of differential privacy violation due to the adversary's actions.
tao: 0.8 # Fraction of expected chekpint onions required to advance local clock
chi: 1.0 # Fraction of corrupted relays (which perform no mixing)
dropAllOnionsFromClient: 1 # Client ID to drop all onions from
vis: true # Visualize the network
Expand Down
21 changes: 12 additions & 9 deletions internal/api/api_functions/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

// sendOnion sends an onion to the specified address with compression and timeout
func SendOnion(to, from string, o onion_model.Onion, layer int) error {
func SendOnion(to, from string, firstSent int64, o onion_model.Onion, layer int) error {
slog.Debug("Sending onion...", "from", from, "to", to)
url := fmt.Sprintf("%s/receive", to)

Expand All @@ -29,10 +29,13 @@ func SendOnion(to, from string, o onion_model.Onion, layer int) error {

oStr := base64.StdEncoding.EncodeToString(data)

_, lastSent := utils.GetTimestamp()
onion := structs.OnionApi{
To: to,
From: from,
Onion: oStr,
To: to,
From: from,
Onion: oStr,
OriginallySentTimestamp: firstSent,
LastSentTimestamp: int64(lastSent),
}

payload, err := json.Marshal(onion)
Expand All @@ -49,9 +52,8 @@ func SendOnion(to, from string, o onion_model.Onion, layer int) error {
Timeout: 30 * time.Second, // Set timeout
}

if layer >= 0 {
metrics.Observe(metrics.ONION_SIZE, float64(compressedBuffer.Len()))
}
onionSize := int64(compressedBuffer.Len())
metrics.SetOnionSize(onionSize, layer)

req, err := http.NewRequestWithContext(context.Background(), "POST", url, &compressedBuffer)
if err != nil {
Expand Down Expand Up @@ -79,8 +81,9 @@ func SendOnion(to, from string, o onion_model.Onion, layer int) error {
return nil
}

func HandleReceiveOnion(w http.ResponseWriter, r *http.Request, receiveFunction func(api structs.OnionApi) error) {
func HandleReceiveOnion(w http.ResponseWriter, r *http.Request, receiveFunction func(api structs.OnionApi, receivedTime time.Time) error) {

receivedTimestamp, _ := utils.GetTimestamp()
var body []byte
var err error

Expand Down Expand Up @@ -119,7 +122,7 @@ func HandleReceiveOnion(w http.ResponseWriter, r *http.Request, receiveFunction
return
}

if err = receiveFunction(o); err != nil {
if err = receiveFunction(o, receivedTimestamp); err != nil {
slog.Error("Error receiving onion", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
Expand Down
9 changes: 5 additions & 4 deletions internal/api/api_functions/functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"strings"
"sync"
"testing"
"time"
)

var usedPorts sync.Map
Expand Down Expand Up @@ -119,7 +120,7 @@ func TestReceiveOnionMultipleLayers(t *testing.T) {
defer wg.Done()
mux := http.NewServeMux()
mux.HandleFunc("/receive", func(w http.ResponseWriter, r *http.Request) {
HandleReceiveOnion(w, r, func(oApi structs.OnionApi) error {
HandleReceiveOnion(w, r, func(oApi structs.OnionApi, timeS time.Time) error {
onionStr := oApi.Onion
_, layer, _, peeled, nextDestination, err2 := pi_t.PeelOnion(onionStr, nodes[i].privateKeyPEM)
if err2 != nil {
Expand All @@ -145,7 +146,7 @@ func TestReceiveOnionMultipleLayers(t *testing.T) {
peeled.Sepal = peeled.Sepal.RemoveBlock()
}

err4 := SendOnion(nextDestination, nodes[i].address, peeled, -1)
err4 := SendOnion(nextDestination, nodes[i].address, 0, peeled, -1)
if err4 != nil {
slog.Error("SendOnion() error", err4)
t.Errorf("SendOnion() error = %v", err4)
Expand All @@ -172,7 +173,7 @@ func TestReceiveOnionMultipleLayers(t *testing.T) {
go func() {
mux := http.NewServeMux()
mux.HandleFunc("/receive", func(w http.ResponseWriter, r *http.Request) {
HandleReceiveOnion(w, r, func(oApi structs.OnionApi) error {
HandleReceiveOnion(w, r, func(oApi structs.OnionApi, timeS time.Time) error {
onionStr := oApi.Onion
defer wg.Done()
_, layer, _, peeled, _, err2 := pi_t.PeelOnion(onionStr, nodes[l].privateKeyPEM)
Expand Down Expand Up @@ -237,7 +238,7 @@ func TestReceiveOnionMultipleLayers(t *testing.T) {
}
}()

err = SendOnion(nodes[1].address, nodes[0].address, onions[0][0], -1)
err = SendOnion(nodes[1].address, nodes[0].address, 0, onions[0][0], -1)
if err != nil {
slog.Error("SendOnion() error", err)
t.Fatalf("SendOnion() error = %v", err)
Expand Down
Loading

0 comments on commit e1ce31f

Please sign in to comment.