diff --git a/pkg/deploytest/simtimer.go b/pkg/deploytest/simtimer.go index 415bd5966..726f9f80c 100644 --- a/pkg/deploytest/simtimer.go +++ b/pkg/deploytest/simtimer.go @@ -19,7 +19,7 @@ import ( type simTimerModule struct { *SimNode eventsOut chan *events.EventList - processes map[t.TimerRetIndex]*testsim.Process + processes map[t.RetentionIndex]*testsim.Process } // NewSimTimerModule returns a Timer modules to be used in simulation. @@ -27,7 +27,7 @@ func NewSimTimerModule(node *SimNode) modules.ActiveModule { return &simTimerModule{ SimNode: node, eventsOut: make(chan *events.EventList, 1), - processes: map[t.TimerRetIndex]*testsim.Process{}, + processes: map[t.RetentionIndex]*testsim.Process{}, } } @@ -55,10 +55,10 @@ func (m *simTimerModule) applyEvent(ctx context.Context, e *eventpb.Event) error case *eventpb.Event_TimerRepeat: eventsOut := events.EmptyList().PushBackSlice(e.TimerRepeat.Events) d := t.TimeDuration(e.TimerRepeat.Delay) - retIdx := t.TimerRetIndex(e.TimerRepeat.RetentionIndex) + retIdx := t.RetentionIndex(e.TimerRepeat.RetentionIndex) m.repeat(ctx, eventsOut, d, retIdx) case *eventpb.Event_TimerGarbageCollect: - retIdx := t.TimerRetIndex(e.TimerGarbageCollect.RetentionIndex) + retIdx := t.RetentionIndex(e.TimerGarbageCollect.RetentionIndex) m.garbageCollect(retIdx) default: return fmt.Errorf("unexpected type of Timer event: %T", e) @@ -99,7 +99,7 @@ func (m *simTimerModule) delay(ctx context.Context, eventList *events.EventList, }() } -func (m *simTimerModule) repeat(ctx context.Context, eventList *events.EventList, d t.TimeDuration, retIdx t.TimerRetIndex) { +func (m *simTimerModule) repeat(ctx context.Context, eventList *events.EventList, d t.TimeDuration, retIdx t.RetentionIndex) { proc := m.Spawn() m.processes[retIdx] = proc @@ -132,7 +132,7 @@ func (m *simTimerModule) repeat(ctx context.Context, eventList *events.EventList }() } -func (m *simTimerModule) garbageCollect(retIdx t.TimerRetIndex) { +func (m *simTimerModule) garbageCollect(retIdx t.RetentionIndex) { for i, proc := range m.processes { if i < retIdx { proc.Kill() diff --git a/pkg/events/events.go b/pkg/events/events.go index 00f9b5abe..db959398e 100644 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -236,7 +236,7 @@ func NodeSigsVerified( // WALAppend returns an event of appending a new entry to the WAL. // This event is produced by the protocol state machine for persisting its state. -func WALAppend(destModule t.ModuleID, event *eventpb.Event, retentionIndex t.WALRetIndex) *eventpb.Event { +func WALAppend(destModule t.ModuleID, event *eventpb.Event, retentionIndex t.RetentionIndex) *eventpb.Event { return &eventpb.Event{DestModule: destModule.Pb(), Type: &eventpb.Event_WalAppend{WalAppend: &eventpb.WALAppend{ Event: event, RetentionIndex: retentionIndex.Pb(), @@ -246,7 +246,7 @@ func WALAppend(destModule t.ModuleID, event *eventpb.Event, retentionIndex t.WAL // WALTruncate returns and event on removing all entries from the WAL // that have been appended with a retentionIndex smaller than the // specified one. -func WALTruncate(destModule t.ModuleID, retentionIndex t.WALRetIndex) *eventpb.Event { +func WALTruncate(destModule t.ModuleID, retentionIndex t.RetentionIndex) *eventpb.Event { return &eventpb.Event{ DestModule: destModule.Pb(), Type: &eventpb.Event_WalTruncate{WalTruncate: &eventpb.WALTruncate{ @@ -257,7 +257,7 @@ func WALTruncate(destModule t.ModuleID, retentionIndex t.WALRetIndex) *eventpb.E // WALEntry returns an event of reading an entry from the WAL. // Those events are used at system initialization. -func WALEntry(persistedEvent *eventpb.Event, retentionIndex t.WALRetIndex) *eventpb.Event { +func WALEntry(persistedEvent *eventpb.Event, retentionIndex t.RetentionIndex) *eventpb.Event { return &eventpb.Event{Type: &eventpb.Event_WalEntry{WalEntry: &eventpb.WALEntry{ Event: persistedEvent, }}} @@ -339,7 +339,7 @@ func TimerRepeat( destModule t.ModuleID, events []*eventpb.Event, delay t.TimeDuration, - retIndex t.TimerRetIndex, + retIndex t.RetentionIndex, ) *eventpb.Event { return &eventpb.Event{ DestModule: destModule.Pb(), @@ -351,7 +351,7 @@ func TimerRepeat( } } -func TimerGarbageCollect(destModule t.ModuleID, retIndex t.TimerRetIndex) *eventpb.Event { +func TimerGarbageCollect(destModule t.ModuleID, retIndex t.RetentionIndex) *eventpb.Event { return &eventpb.Event{ DestModule: destModule.Pb(), Type: &eventpb.Event_TimerGarbageCollect{TimerGarbageCollect: &eventpb.TimerGarbageCollect{ diff --git a/pkg/iss/checkpoint.go b/pkg/iss/checkpoint.go index fb48a5e5a..ec759c111 100644 --- a/pkg/iss/checkpoint.go +++ b/pkg/iss/checkpoint.go @@ -138,7 +138,7 @@ func (ct *checkpointTracker) ProcessCheckpointSignResult(signature []byte) *even // Write Checkpoint to WAL persistEvent := PersistCheckpointEvent(ct.seqNr, ct.stateSnapshot, ct.stateSnapshotHash, signature) - walEvent := events.WALAppend(walModuleName, persistEvent, t.WALRetIndex(ct.epoch)) + walEvent := events.WALAppend(walModuleName, persistEvent, t.RetentionIndex(ct.epoch)) // Send a checkpoint message to all nodes after persisting checkpoint to the WAL. m := CheckpointMessage(ct.epoch, ct.seqNr, ct.stateSnapshotHash, signature) @@ -146,7 +146,7 @@ func (ct *checkpointTracker) ProcessCheckpointSignResult(signature []byte) *even "timer", []*eventpb.Event{events.SendMessage(netModuleName, m, ct.membership)}, ct.resendPeriod, - t.TimerRetIndex(ct.epoch)), + t.RetentionIndex(ct.epoch)), ) ct.Log(logging.LevelDebug, "Sending checkpoint message", @@ -247,7 +247,7 @@ func (ct *checkpointTracker) announceStable() *events.EventList { persistEvent := events.WALAppend( walModuleName, PersistStableCheckpointEvent(stableCheckpoint), - t.WALRetIndex(ct.epoch), + t.RetentionIndex(ct.epoch), ) persistEvent.FollowUp(StableCheckpointEvent(stableCheckpoint)) return events.ListOf(persistEvent) diff --git a/pkg/iss/iss.go b/pkg/iss/iss.go index cd358b7ab..ac22db4c4 100644 --- a/pkg/iss/iss.go +++ b/pkg/iss/iss.go @@ -641,8 +641,8 @@ func (iss *ISS) applyStableCheckpoint(stableCheckpoint *isspb.StableCheckpoint) if pruneIndex > 0 { // "> 0" and not ">= 0", since only entries strictly smaller than the index are pruned. // Prune WAL and Timer - eventsOut.PushBack(events.WALTruncate(walModuleName, t.WALRetIndex(pruneIndex))) - eventsOut.PushBack(events.TimerGarbageCollect(timerModuleName, t.TimerRetIndex(pruneIndex))) + eventsOut.PushBack(events.WALTruncate(walModuleName, t.RetentionIndex(pruneIndex))) + eventsOut.PushBack(events.TimerGarbageCollect(timerModuleName, t.RetentionIndex(pruneIndex))) // Prune epoch state. for epoch := range iss.epochs { @@ -663,7 +663,7 @@ func (iss *ISS) applyStableCheckpoint(stableCheckpoint *isspb.StableCheckpoint) // Note that we are not using the current epoch number here, because it is not relevant for checkpoints. // Using pruneIndex makes sure that the re-transmission is stopped // on every stable checkpoint (when another one is started). - t.TimerRetIndex(pruneIndex), + t.RetentionIndex(pruneIndex), )) } diff --git a/pkg/iss/sbeventservice.go b/pkg/iss/sbeventservice.go index 6e817712a..d689390c9 100644 --- a/pkg/iss/sbeventservice.go +++ b/pkg/iss/sbeventservice.go @@ -46,7 +46,7 @@ func (ec *sbEventService) SendMessage(message *isspb.SBInstanceMessage, destinat // On recovery, this event will be fed back to the same orderer instance // (which, however, must be created during the recovery process). func (ec *sbEventService) WALAppend(event *isspb.SBInstanceEvent) *eventpb.Event { - return events.WALAppend(walModuleName, SBEvent(ec.epoch, ec.instance, event), t.WALRetIndex(ec.epoch)) + return events.WALAppend(walModuleName, SBEvent(ec.epoch, ec.instance, event), t.RetentionIndex(ec.epoch)) } func (ec *sbEventService) HashRequest(data [][][]byte, origin *isspb.SBInstanceHashOrigin) *eventpb.Event { @@ -77,7 +77,7 @@ func (ec *sbEventService) TimerDelay(delay t.TimeDuration, evts ...*eventpb.Even } func (ec *sbEventService) TimerRepeat(period t.TimeDuration, evts ...*eventpb.Event) *eventpb.Event { - return events.TimerRepeat(timerModuleName, evts, period, t.TimerRetIndex(ec.epoch)) + return events.TimerRepeat(timerModuleName, evts, period, t.RetentionIndex(ec.epoch)) } // SBEvent creates an event to be processed by ISS in association with the orderer that created it (e.g. Deliver). diff --git a/pkg/simplewal/simplewal.go b/pkg/simplewal/simplewal.go index 5b8b7bf31..1eaaee4ce 100644 --- a/pkg/simplewal/simplewal.go +++ b/pkg/simplewal/simplewal.go @@ -42,14 +42,14 @@ type WAL struct { // It is required to skip outdated entries when loading. // Otherwise it could be completely ephemeral. // TODO: Implement persisting and loading the retentionIndex - retentionIndex t.WALRetIndex + retentionIndex t.RetentionIndex } func (w *WAL) LoadAll(ctx context.Context) (*events.EventList, error) { storedEvents := events.EmptyList() // Add all events from the WAL to the new EventList. - if err := w.loadAll(func(retIdx t.WALRetIndex, event *eventpb.Event) { + if err := w.loadAll(func(retIdx t.RetentionIndex, event *eventpb.Event) { storedEvents.PushBack(event) }); err != nil { return nil, fmt.Errorf("could not load WAL events: %w", err) @@ -69,12 +69,12 @@ func (w *WAL) ApplyEvent(event *eventpb.Event) (*events.EventList, error) { case *eventpb.Event_Init: // no actions on init case *eventpb.Event_WalAppend: - if err := w.Append(e.WalAppend.Event, t.WALRetIndex(e.WalAppend.RetentionIndex)); err != nil { + if err := w.Append(e.WalAppend.Event, t.RetentionIndex(e.WalAppend.RetentionIndex)); err != nil { return nil, fmt.Errorf("could not persist event (retention index %d) to WAL: %w", e.WalAppend.RetentionIndex, err) } case *eventpb.Event_WalTruncate: - if err := w.Truncate(t.WALRetIndex(e.WalTruncate.RetentionIndex)); err != nil { + if err := w.Truncate(t.RetentionIndex(e.WalTruncate.RetentionIndex)); err != nil { return nil, fmt.Errorf("could not truncate WAL (retention index %d): %w", e.WalTruncate.RetentionIndex, err) } @@ -130,7 +130,7 @@ func (w *WAL) IsEmpty() (bool, error) { return firstIndex == 0, nil } -func (w *WAL) loadAll(forEach func(index t.WALRetIndex, p *eventpb.Event)) error { +func (w *WAL) loadAll(forEach func(index t.RetentionIndex, p *eventpb.Event)) error { w.mutex.Lock() defer w.mutex.Unlock() firstIndex, err := w.log.FirstIndex() @@ -160,8 +160,8 @@ func (w *WAL) loadAll(forEach func(index t.WALRetIndex, p *eventpb.Event)) error return errors.WithMessage(err, "error decoding to proto, is the WAL corrupt?") } - if t.WALRetIndex(result.RetentionIndex) >= w.retentionIndex { - forEach(t.WALRetIndex(result.RetentionIndex), result.Event) + if t.RetentionIndex(result.RetentionIndex) >= w.retentionIndex { + forEach(t.RetentionIndex(result.RetentionIndex), result.Event) } } @@ -187,14 +187,14 @@ func (w *WAL) write(index uint64, entry *WALEntry) error { return w.log.Write(index+1, data) // The log implementation seems to be indexing starting with 1. } -func (w *WAL) Append(event *eventpb.Event, retentionIndex t.WALRetIndex) error { +func (w *WAL) Append(event *eventpb.Event, retentionIndex t.RetentionIndex) error { return w.write(w.idx, &WALEntry{ RetentionIndex: retentionIndex.Pb(), Event: event, }) } -func (w *WAL) Truncate(retentionIndex t.WALRetIndex) error { +func (w *WAL) Truncate(retentionIndex t.RetentionIndex) error { w.mutex.Lock() defer w.mutex.Unlock() diff --git a/pkg/timer/timer.go b/pkg/timer/timer.go index d2da68448..f3f29c92e 100644 --- a/pkg/timer/timer.go +++ b/pkg/timer/timer.go @@ -17,7 +17,7 @@ type Timer struct { eventsOut chan *events.EventList retIndexMutex sync.RWMutex - retIndex t.TimerRetIndex + retIndex t.RetentionIndex } func New() *Timer { @@ -55,10 +55,10 @@ func (tm *Timer) ApplyEvents(ctx context.Context, eventList *events.EventList) e ctx, events.EmptyList().PushBackSlice(e.TimerRepeat.Events), t.TimeDuration(e.TimerRepeat.Delay), - t.TimerRetIndex(e.TimerRepeat.RetentionIndex), + t.RetentionIndex(e.TimerRepeat.RetentionIndex), ) case *eventpb.Event_TimerGarbageCollect: - tm.GarbageCollect(t.TimerRetIndex(e.TimerGarbageCollect.RetentionIndex)) + tm.GarbageCollect(t.RetentionIndex(e.TimerGarbageCollect.RetentionIndex)) default: return fmt.Errorf("unexpected type of Timer event: %T", event.Type) } @@ -103,7 +103,7 @@ func (tm *Timer) Repeat( ctx context.Context, events *events.EventList, period t.TimeDuration, - retIndex t.TimerRetIndex, + retIndex t.RetentionIndex, ) { go func() { @@ -139,7 +139,7 @@ func (tm *Timer) Repeat( // When GarbageCollect is called, the Timer stops repeatedly producing events associated with a retention index // smaller than retIndex. // If GarbageCollect already has been invoked with the same or higher retention index, the call has no effect. -func (tm *Timer) GarbageCollect(retIndex t.TimerRetIndex) { +func (tm *Timer) GarbageCollect(retIndex t.RetentionIndex) { tm.retIndexMutex.Lock() defer tm.retIndexMutex.Unlock() @@ -149,7 +149,7 @@ func (tm *Timer) GarbageCollect(retIndex t.TimerRetIndex) { } } -func (tm *Timer) getRetIndex() t.TimerRetIndex { +func (tm *Timer) getRetIndex() t.RetentionIndex { tm.retIndexMutex.RLock() defer tm.retIndexMutex.RUnlock() diff --git a/pkg/types/types.go b/pkg/types/types.go index 803ee4537..3962fd64c 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -226,22 +226,12 @@ func (rn ReqNo) Pb() uint64 { // ================================================================================ -// WALRetIndex represents the WAL (Write-Ahead Log) retention index assigned to every entry (and used for truncating). -type WALRetIndex uint64 +// RetentionIndex represents the WAL (Write-Ahead Log) retention index assigned to every entry (and used for truncating). +type RetentionIndex uint64 -// Pb converts a WALRetIndex to its underlying native type. -func (wri WALRetIndex) Pb() uint64 { - return uint64(wri) -} - -// ================================================================================ - -// TimerRetIndex represents the Timer retention index used for garbage-collecting "Repeat" invocations. -type TimerRetIndex uint64 - -// Pb converts a TimerRetIndex to its underlying native type. -func (tri TimerRetIndex) Pb() uint64 { - return uint64(tri) +// Pb converts a RetentionIndex to its underlying native type. +func (ri RetentionIndex) Pb() uint64 { + return uint64(ri) } // ================================================================================