Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logging and error handling audit #272

Merged
merged 12 commits into from
Oct 17, 2022
4 changes: 2 additions & 2 deletions cmd/bench/cmd/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ func runNode() error {
logger = logging.ConsoleWarnLogger
}

ctx := context.Background()

nodeAddrs, err := membership.FromFileName(membershipFile)
if err != nil {
return fmt.Errorf("could not load membership: %w", err)
Expand Down Expand Up @@ -103,8 +105,6 @@ func runNode() error {
return fmt.Errorf("could not create node: %w", err)
}

ctx := context.Background()

reqReceiver := requestreceiver.NewRequestReceiver(node, "mempool", logger)
if err := reqReceiver.Start(ReqReceiverBasePort + ownNumericID); err != nil {
return fmt.Errorf("could not start request receiver: %w", err)
Expand Down
3 changes: 3 additions & 0 deletions pkg/availability/multisigcollector/multisigcollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ func NewModule(mc *ModuleConfig, params *ModuleParams, nodeID t.NodeID) (modules
}

func NewReconfigurableModule(mc *ModuleConfig, nodeID t.NodeID, logger logging.Logger) modules.PassiveModule {
if logger == nil {
logger = logging.ConsoleErrorLogger
}
return factorymodule.New(
mc.Self,
factorymodule.DefaultParams(
Expand Down
3 changes: 3 additions & 0 deletions pkg/checkpoint/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import (
)

func Factory(mc *ModuleConfig, ownID t.NodeID, logger logging.Logger) modules.PassiveModule {
if logger == nil {
logger = logging.ConsoleErrorLogger
}
return factorymodule.New(
mc.Self,
factorymodule.DefaultParams(
Expand Down
19 changes: 9 additions & 10 deletions pkg/deploytest/faketransport.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"sync"

"github.com/filecoin-project/mir/pkg/events"
"github.com/filecoin-project/mir/pkg/logging"
"github.com/filecoin-project/mir/pkg/net"
"github.com/filecoin-project/mir/pkg/pb/eventpb"
"github.com/filecoin-project/mir/pkg/pb/messagepb"
Expand Down Expand Up @@ -56,8 +57,8 @@ func (fl *FakeLink) ApplyEvents(
}()
} else {
// Send message to another node.
if err := fl.Send(ctx, t.NodeID(destID), e.SendMessage.Msg); err != nil { // nolint
// TODO: Handle sending errors (and remove "nolint" comment above).
if err := fl.Send(t.NodeID(destID), e.SendMessage.Msg); err != nil {
fl.FakeTransport.logger.Log(logging.LevelWarn, "failed to send a message", "err", err)
}
}
}
Expand All @@ -72,8 +73,8 @@ func (fl *FakeLink) ApplyEvents(
// The ImplementsModule method only serves the purpose of indicating that this is a Module and must not be called.
func (fl *FakeLink) ImplementsModule() {}

func (fl *FakeLink) Send(ctx context.Context, dest t.NodeID, msg *messagepb.Message) error {
fl.FakeTransport.Send(ctx, fl.Source, dest, msg)
func (fl *FakeLink) Send(dest t.NodeID, msg *messagepb.Message) error {
fl.FakeTransport.Send(fl.Source, dest, msg)
return nil
}

Expand All @@ -86,6 +87,7 @@ type FakeTransport struct {
Buffers map[t.NodeID]map[t.NodeID]chan *events.EventList
NodeSinks map[t.NodeID]chan *events.EventList
WaitGroup sync.WaitGroup
logger logging.Logger
}

func NewFakeTransport(nodeIDs []t.NodeID) *FakeTransport {
Expand All @@ -105,10 +107,11 @@ func NewFakeTransport(nodeIDs []t.NodeID) *FakeTransport {
return &FakeTransport{
Buffers: buffers,
NodeSinks: nodeSinks,
logger: logging.ConsoleErrorLogger,
}
}

func (ft *FakeTransport) Send(ctx context.Context, source, dest t.NodeID, msg *messagepb.Message) {
func (ft *FakeTransport) Send(source, dest t.NodeID, msg *messagepb.Message) {
select {
case ft.Buffers[source][dest] <- events.ListOf(
events.MessageReceived(t.ModuleID(msg.DestModule), source, msg),
Expand Down Expand Up @@ -149,7 +152,7 @@ func (fl *FakeLink) Start() error {
return nil
}

func (fl *FakeLink) Connect(ctx context.Context, nodes map[t.NodeID]t.NodeAddress) {
func (fl *FakeLink) Connect(nodes map[t.NodeID]t.NodeAddress) {
sourceBuffers := fl.FakeTransport.Buffers[fl.Source]

for destID, buffer := range sourceBuffers {
Expand All @@ -163,13 +166,9 @@ func (fl *FakeLink) Connect(ctx context.Context, nodes map[t.NodeID]t.NodeAddres
case msg := <-buffer:
select {
case fl.FakeTransport.NodeSinks[destID] <- msg:
case <-ctx.Done():
return
case <-fl.DoneC:
return
}
case <-ctx.Done():
return
case <-fl.DoneC:
return
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/deploytest/localgrpctransport.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ type LocalGrpcTransport struct {
}

func NewLocalGrpcTransport(nodeIDs []t.NodeID, logger logging.Logger) *LocalGrpcTransport {
if logger == nil {
logger = logging.ConsoleErrorLogger
}
// Compute network addresses and ports for all test replicas.
// Each test replica is on the local machine - 127.0.0.1
membership := make(map[t.NodeID]t.NodeAddress)
Expand Down
20 changes: 8 additions & 12 deletions pkg/deploytest/simtransport.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,16 @@ func (m *simTransportModule) Stop() {
close(m.stopChan)
}

func (m *simTransportModule) Send(ctx context.Context, dest t.NodeID, msg *messagepb.Message) error {
m.sendMessage(ctx, msg, dest)
func (m *simTransportModule) Send(dest t.NodeID, msg *messagepb.Message) error {
m.sendMessage(msg, dest)
return nil
}

func (m *simTransportModule) CloseOldConnections(newNodes map[t.NodeID]t.NodeAddress) {
}

func (m *simTransportModule) Connect(ctx context.Context, nodes map[t.NodeID]t.NodeAddress) {
go m.handleOutChan(ctx, m.SimTransport.Simulation.Spawn())
func (m *simTransportModule) Connect(nodes map[t.NodeID]t.NodeAddress) {
go m.handleOutChan(m.SimTransport.Simulation.Spawn())
}

// WaitFor returns immediately, since the simulated transport does not need to wait for anything.
Expand Down Expand Up @@ -127,17 +127,16 @@ func (m *simTransportModule) applyEvent(ctx context.Context, e *eventpb.Event) e

func (m *simTransportModule) multicastMessage(ctx context.Context, msg *messagepb.Message, targets []t.NodeID) {
for _, target := range targets {
m.sendMessage(ctx, msg, target)
m.sendMessage(msg, target)
}
}

func (m *simTransportModule) sendMessage(ctx context.Context, msg *messagepb.Message, target t.NodeID) {
func (m *simTransportModule) sendMessage(msg *messagepb.Message, target t.NodeID) {
proc := m.SimTransport.Simulation.Spawn()

done := make(chan struct{})
go func() {
select {
case <-ctx.Done():
case <-m.stopChan:
case <-done:
return
Expand Down Expand Up @@ -167,12 +166,9 @@ func (m *simTransportModule) EventsOut() <-chan *events.EventList {
return m.outChan
}

func (m *simTransportModule) handleOutChan(ctx context.Context, proc *testsim.Process) {
func (m *simTransportModule) handleOutChan(proc *testsim.Process) {
go func() {
select {
case <-m.stopChan:
case <-ctx.Done():
}
<-m.stopChan
proc.Kill()
}()

Expand Down
6 changes: 2 additions & 4 deletions pkg/deploytest/testreplica.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (tr *TestReplica) Run(ctx context.Context) error {
if err != nil {
return fmt.Errorf("error starting the network link: %w", err)
}
transport.Connect(ctx, tr.Nodes)
transport.Connect(tr.Nodes)
defer transport.Stop()
}

Expand Down Expand Up @@ -202,9 +202,7 @@ func (tr *TestReplica) submitFakeRequests(ctx context.Context, node *mir.Node, d
))

if err := node.InjectEvents(ctx, eventList); err != nil {

// TODO (Jason), failing on err causes flakes in the teardown,
// so just returning for now, we should address later
tr.Config.Logger.Log(logging.LevelError, "failed to inject events", "err", err)
break
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/eventlog/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ type Recorder struct {
}

func NewRecorder(nodeID t.NodeID, path string, logger logging.Logger, opts ...RecorderOpt) (*Recorder, error) {
if logger == nil {
logger = logging.ConsoleErrorLogger
}

startTime := time.Now()

if err := os.MkdirAll(path, 0700); err != nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/factorymodule/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ type FactoryModule struct {
}

func New(id t.ModuleID, params ModuleParams, logger logging.Logger) *FactoryModule {
if logger == nil {
logger = logging.ConsoleErrorLogger
}

// Zero value of the t.NodeID type.
// This is used as a dummy value, as for now the node ID is ignored by the message buffer.
Expand Down
3 changes: 3 additions & 0 deletions pkg/iss/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ type RequestBucket struct {

// newRequestBucket returns a new initialized request bucket with ID `id`.
func newRequestBucket(id int, logger logging.Logger) *RequestBucket {
if logger == nil {
logger = logging.ConsoleErrorLogger
}
return &RequestBucket{
ID: id,
reqMap: make(map[string]*list.Element),
Expand Down
Loading