Skip to content

Commit

Permalink
Fix bugs, account for reader index reset with previous results
Browse files Browse the repository at this point in the history
  • Loading branch information
spacez320 committed Feb 9, 2024
1 parent cede0f1 commit 7873bfd
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 17 deletions.
13 changes: 8 additions & 5 deletions internal/lib/display.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ func RawDisplay(query string) {
reader = readerIndexes[query] // Reader index for the query.
)

// Wait for the first result to appear to synchronize storage.
GetResultWait(query)
reader.Dec()

// Load existing results.
for _, result := range store.GetToIndex(query, reader) {
fmt.Println(result)
Expand All @@ -114,10 +118,9 @@ func StreamDisplay(query string) {
// Initialize the display.
resultsView, _, _ := initDisplayTviewText(helpText())

// Wait for the first result to appear to extrapolate display information about it. Afterwards,
// rewind the reader index.
// Wait for the first result to appear to synchronize storage.
GetResultWait(query)
reader.Set(0)
reader.Dec()

// Start the display.
display(
Expand Down Expand Up @@ -163,9 +166,9 @@ func TableDisplay(query string, filters []string) {
// Initialize the display.
resultsView, _, _ := initDisplayTviewTable(helpText())

// Wait for the first result to appear to extrapolate display information about it.
// Wait for the first result to appear to synchronize storage.
GetResultWait(query)
reader.Set(0)
reader.Dec()

// Start the display.
display(
Expand Down
2 changes: 1 addition & 1 deletion internal/lib/results.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func GetResult(query string) storage.Result {
return store.Next(query, readerIndexes[query])
}

// Retrieves a next result, waiting for a non-empty return.
// Retrieves a next result, waiting for a non-empty return in a non-blocking manner.
func GetResultWait(query string) (result storage.Result) {
for {
if result = store.NextOrEmpty(query, readerIndexes[query]); result.IsEmpty() {
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/results.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"golang.org/x/exp/slices"
_ "golang.org/x/exp/slog"
)

// Individual result.
Expand Down Expand Up @@ -46,8 +47,7 @@ func (r *Results) getRange(startTime time.Time, endTime time.Time) (found []Resu
for _, result := range (*r).Results {
if result.Time.Compare(startTime) >= 0 {
if result.Time.Compare(endTime) > 0 {
// Break out of the loop if we've exhausted the upper bounds of the
// range.
// Break out of the loop if we've exhausted the upper bounds of the range.
break
} else {
found = append(found, result)
Expand Down
23 changes: 14 additions & 9 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ const (
)

var (
storageFile *os.File // File for storage writes.
storageMutex *sync.Mutex // Lock for storage file writes.
storageFile *os.File // File for storage writes.
storageMutex sync.Mutex // Lock for storage file writes.

PutEvents = make(map[string](chan Result)) // Channels for broadcasting put calls.
)
Expand All @@ -62,11 +62,6 @@ var (
//
///////////////////////////////////////////////////////////////////////////////////////////////////

// Incremement a reader index, likely after a read.
func (i *ReaderIndex) inc() {
(*i)++
}

// initializes a new results series in storage. Must be called when a new results series is created.
// This function is idempotent in that it will check if results for a query have already been
// initialized and pass silently if so.
Expand Down Expand Up @@ -176,6 +171,16 @@ func NewStorage(persistence bool) (storage Storage, err error) {
return
}

// Decrement a reader index, to re-read the last read.
func (i *ReaderIndex) Dec() {
(*i)--
}

// Incremement a reader index, likely after a read.
func (i *ReaderIndex) Inc() {
(*i)++
}

// Sets a redaer index to a specified value.
func (i *ReaderIndex) Set(newI int) {
*i = ReaderIndex(newI)
Expand Down Expand Up @@ -237,7 +242,7 @@ func (s *Storage) NewReaderIndex(query string) *ReaderIndex {
// Retrieve the next result from a put event channel, blocking if none exists.
func (s *Storage) Next(query string, index *ReaderIndex) (next Result) {
next = <-PutEvents[query]
index.inc()
index.Inc()

return
}
Expand All @@ -247,7 +252,7 @@ func (s *Storage) NextOrEmpty(query string, index *ReaderIndex) (next Result) {
select {
case next = <-PutEvents[query]:
// Only increment the read counter if something consumed the event.
index.inc()
index.Inc()
default:
}

Expand Down

0 comments on commit 7873bfd

Please sign in to comment.