Skip to content

Commit

Permalink
Unify retention index types
Browse files Browse the repository at this point in the history
Having a separate type for the retention index for each module
seems like over-engineering, given that they are all used
exactly the same way.

Signed-off-by: Matej Pavlovic <matopavlovic@gmail.com>
  • Loading branch information
matejpavlovic committed Aug 19, 2022
1 parent 907ef76 commit a0fd2e1
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 49 deletions.
12 changes: 6 additions & 6 deletions pkg/deploytest/simtimer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ import (
type simTimerModule struct {
*SimNode
eventsOut chan *events.EventList
processes map[t.TimerRetIndex]*testsim.Process
processes map[t.RetentionIndex]*testsim.Process
}

// NewSimTimerModule returns a Timer modules to be used in simulation.
func NewSimTimerModule(node *SimNode) modules.ActiveModule {
return &simTimerModule{
SimNode: node,
eventsOut: make(chan *events.EventList, 1),
processes: map[t.TimerRetIndex]*testsim.Process{},
processes: map[t.RetentionIndex]*testsim.Process{},
}
}

Expand Down Expand Up @@ -55,10 +55,10 @@ func (m *simTimerModule) applyEvent(ctx context.Context, e *eventpb.Event) error
case *eventpb.Event_TimerRepeat:
eventsOut := events.EmptyList().PushBackSlice(e.TimerRepeat.Events)
d := t.TimeDuration(e.TimerRepeat.Delay)
retIdx := t.TimerRetIndex(e.TimerRepeat.RetentionIndex)
retIdx := t.RetentionIndex(e.TimerRepeat.RetentionIndex)
m.repeat(ctx, eventsOut, d, retIdx)
case *eventpb.Event_TimerGarbageCollect:
retIdx := t.TimerRetIndex(e.TimerGarbageCollect.RetentionIndex)
retIdx := t.RetentionIndex(e.TimerGarbageCollect.RetentionIndex)
m.garbageCollect(retIdx)
default:
return fmt.Errorf("unexpected type of Timer event: %T", e)
Expand Down Expand Up @@ -99,7 +99,7 @@ func (m *simTimerModule) delay(ctx context.Context, eventList *events.EventList,
}()
}

func (m *simTimerModule) repeat(ctx context.Context, eventList *events.EventList, d t.TimeDuration, retIdx t.TimerRetIndex) {
func (m *simTimerModule) repeat(ctx context.Context, eventList *events.EventList, d t.TimeDuration, retIdx t.RetentionIndex) {
proc := m.Spawn()
m.processes[retIdx] = proc

Expand Down Expand Up @@ -132,7 +132,7 @@ func (m *simTimerModule) repeat(ctx context.Context, eventList *events.EventList
}()
}

func (m *simTimerModule) garbageCollect(retIdx t.TimerRetIndex) {
func (m *simTimerModule) garbageCollect(retIdx t.RetentionIndex) {
for i, proc := range m.processes {
if i < retIdx {
proc.Kill()
Expand Down
10 changes: 5 additions & 5 deletions pkg/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func NodeSigsVerified(

// WALAppend returns an event of appending a new entry to the WAL.
// This event is produced by the protocol state machine for persisting its state.
func WALAppend(destModule t.ModuleID, event *eventpb.Event, retentionIndex t.WALRetIndex) *eventpb.Event {
func WALAppend(destModule t.ModuleID, event *eventpb.Event, retentionIndex t.RetentionIndex) *eventpb.Event {
return &eventpb.Event{DestModule: destModule.Pb(), Type: &eventpb.Event_WalAppend{WalAppend: &eventpb.WALAppend{
Event: event,
RetentionIndex: retentionIndex.Pb(),
Expand All @@ -246,7 +246,7 @@ func WALAppend(destModule t.ModuleID, event *eventpb.Event, retentionIndex t.WAL
// WALTruncate returns and event on removing all entries from the WAL
// that have been appended with a retentionIndex smaller than the
// specified one.
func WALTruncate(destModule t.ModuleID, retentionIndex t.WALRetIndex) *eventpb.Event {
func WALTruncate(destModule t.ModuleID, retentionIndex t.RetentionIndex) *eventpb.Event {
return &eventpb.Event{
DestModule: destModule.Pb(),
Type: &eventpb.Event_WalTruncate{WalTruncate: &eventpb.WALTruncate{
Expand All @@ -257,7 +257,7 @@ func WALTruncate(destModule t.ModuleID, retentionIndex t.WALRetIndex) *eventpb.E

// WALEntry returns an event of reading an entry from the WAL.
// Those events are used at system initialization.
func WALEntry(persistedEvent *eventpb.Event, retentionIndex t.WALRetIndex) *eventpb.Event {
func WALEntry(persistedEvent *eventpb.Event, retentionIndex t.RetentionIndex) *eventpb.Event {
return &eventpb.Event{Type: &eventpb.Event_WalEntry{WalEntry: &eventpb.WALEntry{
Event: persistedEvent,
}}}
Expand Down Expand Up @@ -339,7 +339,7 @@ func TimerRepeat(
destModule t.ModuleID,
events []*eventpb.Event,
delay t.TimeDuration,
retIndex t.TimerRetIndex,
retIndex t.RetentionIndex,
) *eventpb.Event {
return &eventpb.Event{
DestModule: destModule.Pb(),
Expand All @@ -351,7 +351,7 @@ func TimerRepeat(
}
}

func TimerGarbageCollect(destModule t.ModuleID, retIndex t.TimerRetIndex) *eventpb.Event {
func TimerGarbageCollect(destModule t.ModuleID, retIndex t.RetentionIndex) *eventpb.Event {
return &eventpb.Event{
DestModule: destModule.Pb(),
Type: &eventpb.Event_TimerGarbageCollect{TimerGarbageCollect: &eventpb.TimerGarbageCollect{
Expand Down
6 changes: 3 additions & 3 deletions pkg/iss/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,15 @@ func (ct *checkpointTracker) ProcessCheckpointSignResult(signature []byte) *even

// Write Checkpoint to WAL
persistEvent := PersistCheckpointEvent(ct.seqNr, ct.stateSnapshot, ct.stateSnapshotHash, signature)
walEvent := events.WALAppend(walModuleName, persistEvent, t.WALRetIndex(ct.epoch))
walEvent := events.WALAppend(walModuleName, persistEvent, t.RetentionIndex(ct.epoch))

// Send a checkpoint message to all nodes after persisting checkpoint to the WAL.
m := CheckpointMessage(ct.epoch, ct.seqNr, ct.stateSnapshotHash, signature)
walEvent.FollowUp(events.TimerRepeat(
"timer",
[]*eventpb.Event{events.SendMessage(netModuleName, m, ct.membership)},
ct.resendPeriod,
t.TimerRetIndex(ct.epoch)),
t.RetentionIndex(ct.epoch)),
)

ct.Log(logging.LevelDebug, "Sending checkpoint message",
Expand Down Expand Up @@ -247,7 +247,7 @@ func (ct *checkpointTracker) announceStable() *events.EventList {
persistEvent := events.WALAppend(
walModuleName,
PersistStableCheckpointEvent(stableCheckpoint),
t.WALRetIndex(ct.epoch),
t.RetentionIndex(ct.epoch),
)
persistEvent.FollowUp(StableCheckpointEvent(stableCheckpoint))
return events.ListOf(persistEvent)
Expand Down
6 changes: 3 additions & 3 deletions pkg/iss/iss.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,8 +641,8 @@ func (iss *ISS) applyStableCheckpoint(stableCheckpoint *isspb.StableCheckpoint)
if pruneIndex > 0 { // "> 0" and not ">= 0", since only entries strictly smaller than the index are pruned.

// Prune WAL and Timer
eventsOut.PushBack(events.WALTruncate(walModuleName, t.WALRetIndex(pruneIndex)))
eventsOut.PushBack(events.TimerGarbageCollect(timerModuleName, t.TimerRetIndex(pruneIndex)))
eventsOut.PushBack(events.WALTruncate(walModuleName, t.RetentionIndex(pruneIndex)))
eventsOut.PushBack(events.TimerGarbageCollect(timerModuleName, t.RetentionIndex(pruneIndex)))

// Prune epoch state.
for epoch := range iss.epochs {
Expand All @@ -663,7 +663,7 @@ func (iss *ISS) applyStableCheckpoint(stableCheckpoint *isspb.StableCheckpoint)
// Note that we are not using the current epoch number here, because it is not relevant for checkpoints.
// Using pruneIndex makes sure that the re-transmission is stopped
// on every stable checkpoint (when another one is started).
t.TimerRetIndex(pruneIndex),
t.RetentionIndex(pruneIndex),
))

}
Expand Down
4 changes: 2 additions & 2 deletions pkg/iss/sbeventservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (ec *sbEventService) SendMessage(message *isspb.SBInstanceMessage, destinat
// On recovery, this event will be fed back to the same orderer instance
// (which, however, must be created during the recovery process).
func (ec *sbEventService) WALAppend(event *isspb.SBInstanceEvent) *eventpb.Event {
return events.WALAppend(walModuleName, SBEvent(ec.epoch, ec.instance, event), t.WALRetIndex(ec.epoch))
return events.WALAppend(walModuleName, SBEvent(ec.epoch, ec.instance, event), t.RetentionIndex(ec.epoch))
}

func (ec *sbEventService) HashRequest(data [][][]byte, origin *isspb.SBInstanceHashOrigin) *eventpb.Event {
Expand Down Expand Up @@ -77,7 +77,7 @@ func (ec *sbEventService) TimerDelay(delay t.TimeDuration, evts ...*eventpb.Even
}

func (ec *sbEventService) TimerRepeat(period t.TimeDuration, evts ...*eventpb.Event) *eventpb.Event {
return events.TimerRepeat(timerModuleName, evts, period, t.TimerRetIndex(ec.epoch))
return events.TimerRepeat(timerModuleName, evts, period, t.RetentionIndex(ec.epoch))
}

// SBEvent creates an event to be processed by ISS in association with the orderer that created it (e.g. Deliver).
Expand Down
18 changes: 9 additions & 9 deletions pkg/simplewal/simplewal.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ type WAL struct {
// It is required to skip outdated entries when loading.
// Otherwise it could be completely ephemeral.
// TODO: Implement persisting and loading the retentionIndex
retentionIndex t.WALRetIndex
retentionIndex t.RetentionIndex
}

func (w *WAL) LoadAll(ctx context.Context) (*events.EventList, error) {
storedEvents := events.EmptyList()

// Add all events from the WAL to the new EventList.
if err := w.loadAll(func(retIdx t.WALRetIndex, event *eventpb.Event) {
if err := w.loadAll(func(retIdx t.RetentionIndex, event *eventpb.Event) {
storedEvents.PushBack(event)
}); err != nil {
return nil, fmt.Errorf("could not load WAL events: %w", err)
Expand All @@ -69,12 +69,12 @@ func (w *WAL) ApplyEvent(event *eventpb.Event) (*events.EventList, error) {
case *eventpb.Event_Init:
// no actions on init
case *eventpb.Event_WalAppend:
if err := w.Append(e.WalAppend.Event, t.WALRetIndex(e.WalAppend.RetentionIndex)); err != nil {
if err := w.Append(e.WalAppend.Event, t.RetentionIndex(e.WalAppend.RetentionIndex)); err != nil {
return nil, fmt.Errorf("could not persist event (retention index %d) to WAL: %w",
e.WalAppend.RetentionIndex, err)
}
case *eventpb.Event_WalTruncate:
if err := w.Truncate(t.WALRetIndex(e.WalTruncate.RetentionIndex)); err != nil {
if err := w.Truncate(t.RetentionIndex(e.WalTruncate.RetentionIndex)); err != nil {
return nil, fmt.Errorf("could not truncate WAL (retention index %d): %w",
e.WalTruncate.RetentionIndex, err)
}
Expand Down Expand Up @@ -130,7 +130,7 @@ func (w *WAL) IsEmpty() (bool, error) {
return firstIndex == 0, nil
}

func (w *WAL) loadAll(forEach func(index t.WALRetIndex, p *eventpb.Event)) error {
func (w *WAL) loadAll(forEach func(index t.RetentionIndex, p *eventpb.Event)) error {
w.mutex.Lock()
defer w.mutex.Unlock()
firstIndex, err := w.log.FirstIndex()
Expand Down Expand Up @@ -160,8 +160,8 @@ func (w *WAL) loadAll(forEach func(index t.WALRetIndex, p *eventpb.Event)) error
return errors.WithMessage(err, "error decoding to proto, is the WAL corrupt?")
}

if t.WALRetIndex(result.RetentionIndex) >= w.retentionIndex {
forEach(t.WALRetIndex(result.RetentionIndex), result.Event)
if t.RetentionIndex(result.RetentionIndex) >= w.retentionIndex {
forEach(t.RetentionIndex(result.RetentionIndex), result.Event)
}
}

Expand All @@ -187,14 +187,14 @@ func (w *WAL) write(index uint64, entry *WALEntry) error {
return w.log.Write(index+1, data) // The log implementation seems to be indexing starting with 1.
}

func (w *WAL) Append(event *eventpb.Event, retentionIndex t.WALRetIndex) error {
func (w *WAL) Append(event *eventpb.Event, retentionIndex t.RetentionIndex) error {
return w.write(w.idx, &WALEntry{
RetentionIndex: retentionIndex.Pb(),
Event: event,
})
}

func (w *WAL) Truncate(retentionIndex t.WALRetIndex) error {
func (w *WAL) Truncate(retentionIndex t.RetentionIndex) error {
w.mutex.Lock()
defer w.mutex.Unlock()

Expand Down
12 changes: 6 additions & 6 deletions pkg/timer/timer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type Timer struct {
eventsOut chan *events.EventList

retIndexMutex sync.RWMutex
retIndex t.TimerRetIndex
retIndex t.RetentionIndex
}

func New() *Timer {
Expand Down Expand Up @@ -55,10 +55,10 @@ func (tm *Timer) ApplyEvents(ctx context.Context, eventList *events.EventList) e
ctx,
events.EmptyList().PushBackSlice(e.TimerRepeat.Events),
t.TimeDuration(e.TimerRepeat.Delay),
t.TimerRetIndex(e.TimerRepeat.RetentionIndex),
t.RetentionIndex(e.TimerRepeat.RetentionIndex),
)
case *eventpb.Event_TimerGarbageCollect:
tm.GarbageCollect(t.TimerRetIndex(e.TimerGarbageCollect.RetentionIndex))
tm.GarbageCollect(t.RetentionIndex(e.TimerGarbageCollect.RetentionIndex))
default:
return fmt.Errorf("unexpected type of Timer event: %T", event.Type)
}
Expand Down Expand Up @@ -103,7 +103,7 @@ func (tm *Timer) Repeat(
ctx context.Context,
events *events.EventList,
period t.TimeDuration,
retIndex t.TimerRetIndex,
retIndex t.RetentionIndex,
) {
go func() {

Expand Down Expand Up @@ -139,7 +139,7 @@ func (tm *Timer) Repeat(
// When GarbageCollect is called, the Timer stops repeatedly producing events associated with a retention index
// smaller than retIndex.
// If GarbageCollect already has been invoked with the same or higher retention index, the call has no effect.
func (tm *Timer) GarbageCollect(retIndex t.TimerRetIndex) {
func (tm *Timer) GarbageCollect(retIndex t.RetentionIndex) {
tm.retIndexMutex.Lock()
defer tm.retIndexMutex.Unlock()

Expand All @@ -149,7 +149,7 @@ func (tm *Timer) GarbageCollect(retIndex t.TimerRetIndex) {
}
}

func (tm *Timer) getRetIndex() t.TimerRetIndex {
func (tm *Timer) getRetIndex() t.RetentionIndex {
tm.retIndexMutex.RLock()
defer tm.retIndexMutex.RUnlock()

Expand Down
20 changes: 5 additions & 15 deletions pkg/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,22 +226,12 @@ func (rn ReqNo) Pb() uint64 {

// ================================================================================

// WALRetIndex represents the WAL (Write-Ahead Log) retention index assigned to every entry (and used for truncating).
type WALRetIndex uint64
// RetentionIndex represents the WAL (Write-Ahead Log) retention index assigned to every entry (and used for truncating).
type RetentionIndex uint64

// Pb converts a WALRetIndex to its underlying native type.
func (wri WALRetIndex) Pb() uint64 {
return uint64(wri)
}

// ================================================================================

// TimerRetIndex represents the Timer retention index used for garbage-collecting "Repeat" invocations.
type TimerRetIndex uint64

// Pb converts a TimerRetIndex to its underlying native type.
func (tri TimerRetIndex) Pb() uint64 {
return uint64(tri)
// Pb converts a RetentionIndex to its underlying native type.
func (ri RetentionIndex) Pb() uint64 {
return uint64(ri)
}

// ================================================================================
Expand Down

0 comments on commit a0fd2e1

Please sign in to comment.