From e72f04bf184071680d9bde24d4b990f7f371a30b Mon Sep 17 00:00:00 2001 From: Matthew Coleman Date: Fri, 9 Feb 2024 16:04:06 -0500 Subject: [PATCH] Storage persistence (#33) * Restructuring some of storage * Clean-up, disable writes * New storage initialization * Avoid busy loop for raw results * POC results saving and loading * Clean-ups * Make history usage a flag * Clean-up * Initialize labels with indexes * Fix bugs, account for reader index reset with previous results * Fix channel closing and index range get * Clean-up * Bug-fixing * Update readme --- README.md | 13 +- internal/lib/display.go | 104 +++++++------ internal/lib/query.go | 38 +++-- internal/lib/results.go | 47 +++--- internal/lib/results_test.go | 2 +- main.go | 31 ++-- pkg/storage/results.go | 101 +++++++++++++ pkg/storage/storage.go | 280 +++++++++++++++++++++-------------- pkg/storage/storage_test.go | 4 +- 9 files changed, 419 insertions(+), 201 deletions(-) create mode 100644 pkg/storage/results.go diff --git a/README.md b/README.md index 70ab996..4889c47 100644 --- a/README.md +++ b/README.md @@ -48,6 +48,13 @@ Cryptarch's interactive window. The examples above use stream displays. ![Demo of graph display](https://raw.githubusercontent.com/spacez320/cryptarch/master/media/graph-display.gif) +### Persistence + +Cryptarch, by default, will store results and load them when re-executing the same query. Storage is +located in the user's cache directory. + +See: + ### More Examples > The examples below have been tested on `GNU bash, version 5.2.15(1)-release`. @@ -56,11 +63,11 @@ Cryptarch's interactive window. The examples above use stream displays. # See help. cryptarch -h -# Execute `whoami` once, printing results to the console. +# Execute `whoami` once, printing results to the console and waiting for a user to `^C`. cryptarch -q 'whoami' -# Execute `uptime` continuously, printing results to the console. -cryptarch -q 'uptime' -t -1 +# Execute `uptime` continuously, printing results to the console, without using persistence. +cryptarch -q 'uptime' -t -1 -e=false # Get the size of an NVME used space and output it to a table. cryptarch -q 'df -h | grep nvme0n1p2 | awk '\''{print $3}'\''' -r 3 -v "NVME Used Space" -t -1 diff --git a/internal/lib/display.go b/internal/lib/display.go index 7deedb7..c558d0b 100644 --- a/internal/lib/display.go +++ b/internal/lib/display.go @@ -57,9 +57,8 @@ var ( interruptChan = make(chan bool) // Channel for interrupting displays. ) -// Starts the display. Applies contextual logic depending on the provided -// display driver. Expects a function to execute within a goroutine to update -// the display. +// Starts the display. Applies contextual logic depending on the provided display driver. Expects a +// function to execute within a goroutine to update the display. func display(driver DisplayDriver, displayUpdateFunc func()) { // Execute the update function. go displayUpdateFunc() @@ -85,21 +84,41 @@ func helpText() string { return HELP_TEXT + fmt.Sprintf( "\nQuery: %v | Labels: %v | Filters: %v", currentCtx.Value("query"), - currentCtx.Value("labels"), + store.GetLabels(currentCtx.Value("query").(string)), currentCtx.Value("filters")) } // Presents raw output. func RawDisplay(query string) { - go func() { - for { - fmt.Println(GetResult(query)) - } - }() + var ( + reader = readerIndexes[query] // Reader index for the query. + ) + + // Wait for the first result to appear to synchronize storage. + GetResultWait(query) + reader.Dec() + + // Load existing results. + for _, result := range store.GetToIndex(query, reader) { + fmt.Println(result) + } + + // Load new results. + for { + fmt.Println(GetResult(query)) + } } // Update the results pane with new results as they are generated. func StreamDisplay(query string) { + var ( + reader = readerIndexes[query] // Reader index for the query. + ) + + // Wait for the first result to appear to synchronize storage. + GetResultWait(query) + reader.Dec() + // Initialize the display. resultsView, _, _ := initDisplayTviewText(helpText()) @@ -107,22 +126,19 @@ func StreamDisplay(query string) { display( DISPLAY_TVIEW, func() { - // Print labels as the first line, if they are present. - if labels := store.GetLabels(query); len(labels) > 0 { - appTview.QueueUpdateDraw(func() { - fmt.Fprintln(resultsView, labels) - }) - } + // Print labels as the first line. + appTview.QueueUpdateDraw(func() { + fmt.Fprintln(resultsView, store.GetLabels(query)) + }) // Print all previous results. - for _, result := range store.GetToIndex(query, readerIndexes[query]) { + for _, result := range store.GetToIndex(query, reader) { fmt.Fprintln(resultsView, result.Value) } // Print results. for { - // Listen for an interrupt event to stop result consumption in - // preparation for some display change. + // Listen for an interrupt to stop result consumption for some display change. select { case <-interruptChan: // We've received an interrupt. @@ -142,10 +158,15 @@ func StreamDisplay(query string) { // Creates a table of results for the results pane. func TableDisplay(query string, filters []string) { var ( + reader = readerIndexes[query] // Reader index for the query. tableCellPadding = strings.Repeat(" ", TABLE_PADDING) // Padding to add to table cell content. valueIndexes = []int{} // Indexes of the result values to add to the table. ) + // Wait for the first result to appear to synchronize storage. + GetResultWait(query) + reader.Dec() + // Initialize the display. resultsView, _, _ := initDisplayTviewTable(helpText()) @@ -158,29 +179,26 @@ func TableDisplay(query string, filters []string) { i = 0 // Used to determine the next row index. ) - // Determine the value indexes to populate into the graph. If no filter is - // provided, the index is assumed to be zero. + // Determine the value indexes to populate into the graph. If no filter is provided, the index + // is assumed to be zero. if len(filters) > 0 { for _, filter := range filters { valueIndexes = append(valueIndexes, store.GetValueIndex(query, filter)) } } - // Create the table header. - if labels := store.GetLabels(query); len(labels) > 0 { - appTview.QueueUpdateDraw(func() { - // Row to contain the labels. - headerRow := resultsView.InsertRow(i) + appTview.QueueUpdateDraw(func() { + // Row to contain the labels. + headerRow := resultsView.InsertRow(i) - for j, label := range FilterSlice(labels, valueIndexes) { - headerRow.SetCellSimple(i, j, tableCellPadding+label+tableCellPadding) - } - }) - i += 1 - } + for j, label := range FilterSlice(store.GetLabels(query), valueIndexes) { + headerRow.SetCellSimple(i, j, tableCellPadding+label+tableCellPadding) + } + }) + i += 1 // Print all previous results. - for _, result := range store.GetToIndex(query, readerIndexes[query]) { + for _, result := range store.GetToIndex(query, reader) { appTview.QueueUpdateDraw(func() { var ( row = resultsView.InsertRow(i) // Row to contain the result. @@ -204,8 +222,7 @@ func TableDisplay(query string, filters []string) { // Print results. for { - // Listen for an interrupt event to stop result consumption in - // preparation for some display change. + // Listen for an interrupt to stop result consumption for some display change. select { case <-interruptChan: // We've received an interrupt. @@ -243,11 +260,15 @@ func TableDisplay(query string, filters []string) { // Creates a graph of results for the results pane. func GraphDisplay(query string, filters []string) { var ( - valueIndex = 0 // Index of the result value to graph. + reader = readerIndexes[query] // Reader index for the query. + valueIndex = 0 // Index of the result value to graph. ) - // Determine the values to populate into the graph. If no filter is provided, - // the first value is taken. + // Wait for the first result to appear to synchronize storage. + GetResultWait(query) + reader.Dec() + + // Determine the values to populate into the graph. If none is provided, the first value is taken. if len(filters) > 0 { valueIndex = store.GetValueIndex(query, filters[0]) } @@ -278,7 +299,7 @@ func GraphDisplay(query string, filters []string) { DISPLAY_TERMDASH, func() { // Print all previous results. - for _, result := range store.GetToIndex(query, readerIndexes[query]) { + for _, result := range store.GetToIndex(query, reader) { // We can display the next result. value := result.Values[valueIndex] @@ -291,8 +312,7 @@ func GraphDisplay(query string, filters []string) { } for { - // Listen for an interrupt event to stop result consumption in - // preparation for some display change. + // Listen for an interrupt to stop result consumption for some display change. select { case <-interruptChan: // We've received an interrupt. @@ -315,7 +335,7 @@ func GraphDisplay(query string, filters []string) { }, ) - // Initialize the display. This must happen after the display function is - // invoked, otherwise data will never appear. + // Initialize the display. This must happen after the display function is invoked, otherwise data + // will never appear. initDisplayTermdash(resultWidget, helpWidget, &logsWidgetWriter.text) } diff --git a/internal/lib/query.go b/internal/lib/query.go index 48f9f22..d565d21 100644 --- a/internal/lib/query.go +++ b/internal/lib/query.go @@ -22,8 +22,9 @@ const ( func runQuery( query string, attempts, delay int, + history bool, doneChan, pauseChan chan bool, - queryFunc func(string), + queryFunc func(string, bool), ) { // This loop executes as long as attempts has not been reached, or // indefinitely if attempts is less than zero. @@ -34,7 +35,7 @@ func runQuery( // message from the pause channel. <-pauseChan default: - queryFunc(query) + queryFunc(query, history) // This is not the last execution--add a delay. if i != attempts { @@ -47,16 +48,16 @@ func runQuery( } // Executes a query as a process to profile. -func runQueryProfile(pid string) { +func runQueryProfile(pid string, history bool) { slog.Debug(fmt.Sprintf("Executing profile for PID: '%s' ...", pid)) pidInt, err := strconv.Atoi(pid) e(err) - AddResult(pid, runProfile(pidInt)) + AddResult(pid, runProfile(pidInt), history) } // Executes a query as a command to exec. -func runQueryExec(query string) { +func runQueryExec(query string, history bool) { slog.Debug(fmt.Sprintf("Executing query: '%s' ...", query)) // Prepare query execution. @@ -82,7 +83,7 @@ func runQueryExec(query string) { // Interpret results. cmd_output, cmd_output_err := io.ReadAll(stdout) e(cmd_output_err) - AddResult(query, string(cmd_output)) + AddResult(query, string(cmd_output), history) slog.Debug(fmt.Sprintf("Query '%s' result is: %s", query, cmd_output)) // Clean-up. @@ -91,11 +92,10 @@ func runQueryExec(query string) { // Entrypoint for 'query' mode. func Query( - queryMode int, + queryMode, attempts, delay int, queries []string, - attempts int, - delay int, port string, + history bool, ) (chan bool, map[string]chan bool) { var ( doneQueriesChan = make(chan bool) // Signals overall completion. @@ -114,10 +114,26 @@ func Query( switch queryMode { case QUERY_MODE_COMMAND: slog.Debug("Executing in query mode command.") - go runQuery(query, attempts, delay, doneQueryChan, pauseQueryChans[query], runQueryExec) + go runQuery( + query, + attempts, + delay, + history, + doneQueryChan, + pauseQueryChans[query], + runQueryExec, + ) case QUERY_MODE_PROFILE: slog.Debug("Executing in query mode profile.") - go runQuery(query, attempts, delay, doneQueryChan, pauseQueryChans[query], runQueryProfile) + go runQuery( + query, + attempts, + delay, + history, + doneQueryChan, + pauseQueryChans[query], + runQueryProfile, + ) } } diff --git a/internal/lib/results.go b/internal/lib/results.go index 44b65af..ffc0c20 100644 --- a/internal/lib/results.go +++ b/internal/lib/results.go @@ -31,14 +31,14 @@ var ( driver DisplayDriver // Display driver, dictated by the results. pauseQueryChans map[string]chan bool // Channels for dealing with 'pause' events for results. readerIndexes map[string]*storage.ReaderIndex // Reader indexes for queries. + store storage.Storage // Stored results. ctxDefaults = map[string]interface{}{ "advanceDisplayMode": false, "advanceQuery": false, "quit": false, } // Defaults applied to context. - pauseDisplayChan = make(chan bool) // Channel for dealing with 'pause' events for the display. - store = storage.NewStorage() // Stored results. + pauseDisplayChan = make(chan bool) // Channel for dealing with 'pause' events for the display. ) // Resets the current context to its default values. @@ -50,9 +50,10 @@ func resetContext(query string) { } // Adds a result to the result store based on a string. -func AddResult(query, result string) { +func AddResult(query, result string, history bool) { result = strings.TrimSpace(result) - store.Put(query, result, TokenizeResult(result)...) + _, err := store.Put(query, result, history, TokenizeResult(result)...) + e(err) } // Retrieves a next result. @@ -60,13 +61,13 @@ func GetResult(query string) storage.Result { return store.Next(query, readerIndexes[query]) } -// Retrieves a next result, waiting for a non-empty return. +// Retrieves a next result, waiting for a non-empty return in a non-blocking manner. func GetResultWait(query string) (result storage.Result) { for { if result = store.NextOrEmpty(query, readerIndexes[query]); result.IsEmpty() { - // Wait a tiny bit if we receive an empty result to avoid an excessive - // amount of busy waiting. This wait time should be less than the query - // delay, otherwise displays will show a release of buffered results. + // Wait a tiny bit if we receive an empty result to avoid an excessive amount of busy waiting. + // This wait time should be less than the query delay, otherwise displays will show a release + // of buffered results. time.Sleep(time.Duration(10) * time.Millisecond) } else { // We found a result. @@ -141,30 +142,34 @@ func Results( ctx context.Context, displayMode DisplayMode, query string, + history bool, inputConfig Config, inputPauseQueryChans map[string]chan bool, ) { var ( + err error // General error holder. + filters = ctx.Value("filters").([]string) // Capture filters from context. labels = ctx.Value("labels").([]string) // Capture labels from context. queries = ctx.Value("queries").([]string) // Capture queries from context. ) - // Assign global config and global control channels. - config = inputConfig - pauseQueryChans = inputPauseQueryChans + // Initialize storage. + store, err = storage.NewStorage(history) + e(err) + defer store.Close() - // There is currently no reason why we may arrive at deffers, but in case we - // do someday, perform some clean-up. + // Assign global config and global control channels. + config, pauseQueryChans = inputConfig, inputPauseQueryChans defer close(pauseDisplayChan) for _, pauseQueryChan := range pauseQueryChans { defer close(pauseQueryChan) } - // Iniitialize reader indexes. + // Initialize reader indexes. readerIndexes = make(map[string]*storage.ReaderIndex, len(queries)) for _, query := range queries { - readerIndexes[query] = store.NewReaderIndex() + readerIndexes[query] = store.NewReaderIndex(query) } for { @@ -172,8 +177,10 @@ func Results( currentCtx = ctx resetContext(query) - // Set up labelling or any schema for the results store. - store.PutLabels(query, labels) + // Set up labelling or any schema for the results store, if any were explicitly provided. + if len(labels) > 0 { + store.PutLabels(query, labels) + } switch displayMode { case DISPLAY_MODE_RAW: @@ -193,9 +200,9 @@ func Results( os.Exit(1) } - // If we get here, it's because the display functions have returned, probably - // because of an interrupt. Assuming we haven't reached some other terminal - // situation, restart the results display, adjusting for context. + // If we get here, it's because the display functions have returned, probably because of an + // interrupt. Assuming we haven't reached some other terminal situation, restart the results + // display, adjusting for context. if currentCtx.Value("quit").(bool) { // Guess I'll die. displayQuit() diff --git a/internal/lib/results_test.go b/internal/lib/results_test.go index 5786192..cc5466f 100644 --- a/internal/lib/results_test.go +++ b/internal/lib/results_test.go @@ -21,7 +21,7 @@ func TestFilterResult(t *testing.T) { testResults := storage.Results{ Labels: []string{"fizz"}, Results: []storage.Result{ - storage.Result{ + { Time: testTime, Value: "foo bar", Values: testResultValues, diff --git a/main.go b/main.go index 8fab8da..1d5a6eb 100644 --- a/main.go +++ b/main.go @@ -1,10 +1,13 @@ +// +// Entrypoint for cryptarch execution. + package main import ( "context" "flag" "fmt" - "io/ioutil" + "io" "log" "os" "strings" @@ -49,6 +52,7 @@ var ( delay int // Delay between queries. displayMode int // Result mode to display. filters string // Result filters. + history bool // Whether or not to preserve or use historical results. logLevel string // Log level. mode int // Mode to execute in. port string // Port for RPC. @@ -66,8 +70,8 @@ var ( } // Log levels acceptable as a flag. ) -// Parses a comma delimited argument string, returning a slice of strings if -// any are found, or an empty slice if not. +// Parses a comma delimited argument string, returning a slice of strings if any are found, or an +// empty slice if not. func parseCommaDelimitedArg(arg string) []string { if parsed := strings.Split(arg, ","); parsed[0] == "" { return []string{} @@ -82,28 +86,24 @@ func main() { pauseQueryChans map[string]chan bool // Channels for pausing queries. ) - defer close(doneQueriesChan) - for _, pauseChan := range pauseQueryChans { - defer close(pauseChan) - } - // Define arguments. + flag.BoolVar(&history, "e", true, "Whether or not to use or preserve history.") flag.BoolVar(&silent, "s", false, "Don't output anything to a console.") flag.IntVar(&attempts, "t", 1, "Number of query executions. -1 for continuous.") flag.IntVar(&delay, "d", 3, "Delay between queries (seconds).") flag.IntVar(&displayMode, "r", int(lib.DISPLAY_MODE_RAW), "Result mode to display.") flag.IntVar(&mode, "m", int(MODE_QUERY), "Mode to execute in.") flag.StringVar(&filters, "f", "", "Results filters.") + flag.StringVar(&labels, "v", "", "Labels to apply to query values, separated by commas.") flag.StringVar(&logLevel, "l", "error", "Log level.") flag.StringVar(&port, "p", "12345", "Port for RPC.") - flag.StringVar(&labels, "v", "", "Labels to apply to query values, separated by commas.") flag.Var(&queries, "q", "Query to execute. When in query mode, this is expected to be some command. When in profile mode it is expected to be PID.") flag.Parse() // Set-up logging. if silent || displayMode == int(lib.DISPLAY_MODE_GRAPH) { // Silence all output. - logger.SetOutput(ioutil.Discard) + logger.SetOutput(io.Discard) } else { // Set the default to be standard error--result modes may change this. slog.SetDefault(slog.New(slog.NewTextHandler( @@ -119,10 +119,11 @@ func main() { doneQueriesChan, pauseQueryChans = lib.Query( lib.QUERY_MODE_PROFILE, - queries, attempts, delay, + queries, port, + history, ) // Process mode has specific labels--ignore user provided ones. @@ -132,10 +133,11 @@ func main() { doneQueriesChan, pauseQueryChans = lib.Query( lib.QUERY_MODE_COMMAND, - queries, attempts, delay, + queries, port, + history, ) // Rely on user-defined labels. @@ -160,6 +162,7 @@ func main() { ctx, lib.DisplayMode(displayMode), ctx.Value("queries").([]string)[0], // Always start with the first query. + history, lib.Config{ LogLevel: logLevel, }, @@ -167,5 +170,9 @@ func main() { ) } + // XXX This isn't strictly necessary, mainly because getting here shouldn't be possible + // (`lib.Results` does not have any intentional return condition), but it's being left here in + // case in the future we do want to control for query completion. <-doneQueriesChan + close(doneQueriesChan) } diff --git a/pkg/storage/results.go b/pkg/storage/results.go new file mode 100644 index 0000000..5b4b277 --- /dev/null +++ b/pkg/storage/results.go @@ -0,0 +1,101 @@ +// +// Manages individual or series of results for storage. + +package storage + +import ( + "fmt" + "reflect" + "strconv" + "time" + + "golang.org/x/exp/slices" + _ "golang.org/x/exp/slog" +) + +// Individual result. +type Result struct { + Time time.Time // Time the result was created. + Value interface{} // Raw value of the result. + Values []interface{} // Tokenized value of the result. +} + +// Collection of results. +type Results struct { + // Meta field for result values acting as a name, corresponding by index. In the event that no + // explicit labels are defined, the indexes are the labels. + Labels []string + // Stored results. + Results []Result +} + +// Get a result based on a timestamp. +func (r *Results) get(time time.Time) Result { + for _, result := range (*r).Results { + if result.Time.Compare(time) == 0 { + // We found a result to return. + return result + } + } + + // Return an empty result if nothing was discovered. + return Result{} +} + +// Gets results based on a start and end timestamp. +func (r *Results) getRange(startTime time.Time, endTime time.Time) (found []Result) { + for _, result := range (*r).Results { + if result.Time.Compare(startTime) >= 0 { + if result.Time.Compare(endTime) > 0 { + // Break out of the loop if we've exhausted the upper bounds of the range. + break + } else { + found = append(found, result) + } + } + } + + return +} + +// Given a filter, return the corresponding value index. +func (r *Results) getValueIndex(filter string) int { + return slices.Index((*r).Labels, filter) +} + +// Put a new compound result. +func (r *Results) put(value string, values ...interface{}) Result { + next := Result{ + Time: time.Now(), + Value: value, + Values: values, + } + + (*r).Results = append((*r).Results, next) + + return next +} + +// Show all currently stored results. +func (r *Results) show() { + for _, result := range (*r).Results { + fmt.Printf("Label: %v, Time: %v, Value: %v, Values: %v\n", + (*r).Labels, result.Time, result.Value, result.Values) + } +} + +// Determines whether this is an empty result. +func (r *Result) IsEmpty() bool { + return reflect.DeepEqual(*r, Result{}) +} + +// Creates new results. +func newResults(size int) (results Results) { + // Iniitialize labels. + results.Labels = make([]string, size) + for i := range results.Labels { + results.Labels[i] = strconv.Itoa(i) + } + + return +} diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index ddccedf..c9da743 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -1,24 +1,28 @@ // // Storage management for query results. // -// The storage engine is a simple, time-series indexed, multi-type data store. -// It interfaces as a Go library in a few ways, namely: +// The storage engine is a simple, time-series indexed, multi-type data store. It interfaces as a Go +// library in a few ways, namely: // // - It can be used as a library. // - It can broadcast events into public Go channels. // - It can broadcast events via RPC. // -// Results are stored simply in an ordered sequence, and querying time is -// linear. +// Results are stored simply in an ordered sequence, and querying time is linear. package storage import ( - "fmt" - "reflect" + "encoding/json" + _ "fmt" + "io" + "io/fs" + "os" + "path/filepath" + "sync" "time" - "golang.org/x/exp/slices" + _ "golang.org/x/exp/slog" ) /////////////////////////////////////////////////////////////////////////////////////////////////// @@ -27,24 +31,7 @@ import ( // /////////////////////////////////////////////////////////////////////////////////////////////////// -// Individual result. -type Result struct { - // Time the result was created. - Time time.Time - // Raw value of the result. - Value interface{} - // Tokenized value of the result. - Values []interface{} -} - -// Collection of results. -type Results struct { - // Meta field for result values acting as a name, corresponding by index. - Labels []string - // Stored results. - Results []Result -} - +// Reader indexes control where a consumer has last read a result. type ReaderIndex int // Collection of results mapped to their queries. @@ -57,13 +44,17 @@ type Storage map[string]*Results /////////////////////////////////////////////////////////////////////////////////////////////////// const ( - // Size of Put channels. This is the amount of results that may accumulate if - // not being actively consumed. - PUT_EVENT_CHANNEL_SIZE = 128 + PUT_EVENT_CHANNEL_SIZE = 128 // Size of put channels, controlling the amount of waiting results. + STORAGE_FILE_DIR = "cryptarch" // Directory in user cache to use for storage. + STORAGE_FILE_NAME = "storage.json" // Filename to use for actual storage. ) -// Channels for broadcasting Put calls. -var PutEvents = make(map[string](chan Result)) +var ( + storageFile *os.File // File for storage writes. + storageMutex sync.Mutex // Lock for storage file writes. + + PutEvents = make(map[string](chan Result)) // Channels for broadcasting put calls. +) /////////////////////////////////////////////////////////////////////////////////////////////////// // @@ -71,81 +62,133 @@ var PutEvents = make(map[string](chan Result)) // /////////////////////////////////////////////////////////////////////////////////////////////////// -// Get a result based on a timestamp. -func (r *Results) get(time time.Time) Result { - for _, result := range (*r).Results { - if result.Time.Compare(time) == 0 { - // We found a result to return. - return result - } +// initializes a new results series in storage. Must be called when a new results series is created. +// This function is idempotent in that it will check if results for a query have already been +// initialized and pass silently if so. +func (s *Storage) newResults(query string, size int) { + var ( + results Results // Results to add. + ) + + if _, ok := (*s)[query]; !ok { + // Initialize results. + results = newResults(size) + (*s)[query] = &results + + // Initialize the query's put event channel. + PutEvents[query] = make(chan Result, PUT_EVENT_CHANNEL_SIZE) } +} + +// Saves current storage to disk. Currently this replaces the entire storage file with all the data +// in storage (a full write-over). +func (s *Storage) save() error { + var ( + err error // General error holder. + storageJson []byte // Bytes as json. + ) - // Return an empty result if nothing was discovered. - return Result{} + // Lock storage to prevent dirty writes. + storageMutex.Lock() + defer storageMutex.Unlock() + + // Translate current storage into binary json and save it. + storageJson, err = json.MarshalIndent(&s, "", "\t") + _, err = storageFile.WriteAt(storageJson, 0) + + return err } -// Gets results based on a start and end timestamp. -func (r *Results) getRange(startTime time.Time, endTime time.Time) (found []Result) { - for _, result := range (*r).Results { - if result.Time.Compare(startTime) >= 0 { - if result.Time.Compare(endTime) > 0 { - // Break out of the loop if we've exhausted the upper bounds of the - // range. - break - } else { - found = append(found, result) - } - } +/////////////////////////////////////////////////////////////////////////////////////////////////// +// +// Public +// +/////////////////////////////////////////////////////////////////////////////////////////////////// + +// Initializes a new storage, loading in any saved storage data. +func NewStorage(persistence bool) (storage Storage, err error) { + var ( + cryptarchUserCacheDir string // Cryptarch specific user cache data. + storageData []byte // Raw read storage data. + storageFP string // Filepath for storage. + storagePre map[string]*Results // Pre-built storage with existing data. + storageStat fs.FileInfo // Stat for the storage file. + userCacheDir string // User cache directory, contextual to OS. + ) + + // Initialize storage. + storage = Storage{} + + // If we have disabled persistence, simply return the new storage instance. + if !persistence { + return } - return -} + // Retrieve the user cache directory. + userCacheDir, err = os.UserCacheDir() + if err != nil { + return + } -// Given a filter, return the corresponding value index. -func (r *Results) getValueIndex(filter string) int { - return slices.Index((*r).Labels, filter) -} + // Create the user cache directory for data. + cryptarchUserCacheDir = filepath.Join(userCacheDir, STORAGE_FILE_DIR) + err = os.MkdirAll(cryptarchUserCacheDir, fs.FileMode(0770)) + if err != nil { + return + } -// Put a new compound result. -func (r *Results) put(value string, values ...interface{}) Result { - next := Result{ - Time: time.Now(), - Value: value, - Values: values, + // Instantiate the storage file. + storageFP = filepath.Join(cryptarchUserCacheDir, STORAGE_FILE_NAME) + storageFile, err = os.OpenFile( + storageFP, + os.O_CREATE|os.O_RDWR, + fs.FileMode(0770), + ) + if err != nil { + return } - (*r).Results = append((*r).Results, next) + if storageStat, err = os.Stat(storageFP); storageStat.Size() > 0 { + // There is pre-existing storage data. + if err != nil { + return + } + + // Read in storage data and supply it to storage. We must initialize any results series before + // populating data. + storageData, err = io.ReadAll(storageFile) + if err != nil { + return + } + json.Unmarshal(storageData, &storagePre) + for query := range storagePre { + // TODO Results loading should also preserve and restore actual labels. + storage.newResults(query, len(storagePre[query].Results[0].Values)) + } + storage = storagePre + } - return next + return } -// Show all currently stored results. -func (r *Results) show() { - for _, result := range (*r).Results { - fmt.Printf("Label: %v, Time: %v, Value: %v, Values: %v\n", - (*r).Labels, result.Time, result.Value, result.Values) - } +// Decrement a reader index, to re-read the last read. +func (i *ReaderIndex) Dec() { + (*i)-- } // Incremement a reader index, likely after a read. -func (i *ReaderIndex) inc() { +func (i *ReaderIndex) Inc() { (*i)++ } -/////////////////////////////////////////////////////////////////////////////////////////////////// -// -// Public -// -/////////////////////////////////////////////////////////////////////////////////////////////////// - -// Initializes a new storage. -func NewStorage() Storage { - return Storage{} +// Sets a redaer index to a specified value. +func (i *ReaderIndex) Set(newI int) { + *i = ReaderIndex(newI) } -// Determines whether this is an empty result. -func (r *Result) IsEmpty() bool { - return reflect.DeepEqual(*r, Result{}) +// Closes a storage. Should be called after all storage operations cease. +func (s *Storage) Close() { + storageFile.Close() } // Get a result based on a timestamp. @@ -153,6 +196,7 @@ func (s *Storage) Get(query string, time time.Time) Result { return (*s)[query].get(time) } +// Get all results. func (s *Storage) GetAll(query string) []Result { return (*s)[query].Results } @@ -167,8 +211,9 @@ func (s *Storage) GetRange(query string, startTime, endTime time.Time) []Result return (*s)[query].getRange(startTime, endTime) } +// Given results up to a reader index (a.k.a. "playback"). func (s *Storage) GetToIndex(query string, index *ReaderIndex) []Result { - return (*s)[query].Results[:*index] + return (*s)[query].Results[:(*index)+1] } // Given a filter, return the corresponding value index. @@ -176,57 +221,72 @@ func (s *Storage) GetValueIndex(query, filter string) int { return (*s)[query].getValueIndex(filter) } -// Initialize a new reader index. -func (s *Storage) NewReaderIndex() *ReaderIndex { - r := ReaderIndex(0) - return &r -} +// Initialize a new reader index. Will attempt to set the initial value to the end of existing +// results, if results already exist. +func (s *Storage) NewReaderIndex(query string) *ReaderIndex { + var ( + r ReaderIndex // Reader index to return the address of. + ) -// Initializes a new results series in a storage. -func (s *Storage) NewResults(query string) { if _, ok := (*s)[query]; !ok { - // This is a new query, initialize an empty results. - (*s)[query] = &Results{} - PutEvents[query] = make(chan Result, PUT_EVENT_CHANNEL_SIZE) + // There is no data. + r = ReaderIndex(0) + } else { + // There is existing data to account for. + r = ReaderIndex(len((*s)[query].Results)) } + + return &r } -// Retrieve the next result from a put event channel. +// Retrieve the next result from a put event channel, blocking if none exists. func (s *Storage) Next(query string, index *ReaderIndex) (next Result) { next = <-PutEvents[query] - index.inc() + index.Inc() + return } -// Retrieve the next result from a put event channel, returning an empty result -// if nothing exists. +// Retrieve the next result from a put event channel, returning an empty result if nothing exists. func (s *Storage) NextOrEmpty(query string, index *ReaderIndex) (next Result) { select { case next = <-PutEvents[query]: - index.inc() + // Only increment the read counter if something consumed the event. + index.Inc() default: } + return } -// Put a new compound result. -func (s *Storage) Put(query string, value string, values ...interface{}) Result { - s.NewResults(query) - result := (*s)[query].put(value, values...) - - // Send a non-blocking put event. Put events are lossy and clients may lose - // information if not actively listening. +// Put a new result. +func (s *Storage) Put( + query, value string, + persistence bool, + values ...interface{}, +) (result Result, err error) { + // Initialize the result. + s.newResults(query, len(values)) + result = (*s)[query].put(value, values...) + + // Send a non-blocking put event. Put events are lossy and clients may lose information if not + // actively listening. select { case PutEvents[query] <- result: default: } - return result + // Persist data to disk. + if persistence { + err = s.save() + } + + return } -// Assigns labels to a results series. +// Assigns explicit labels to a results series. func (s *Storage) PutLabels(query string, labels []string) { - s.NewResults(query) + s.newResults(query, len(labels)) (*s)[query].Labels = labels } diff --git a/pkg/storage/storage_test.go b/pkg/storage/storage_test.go index 39c8d16..c2a60ca 100644 --- a/pkg/storage/storage_test.go +++ b/pkg/storage/storage_test.go @@ -16,12 +16,12 @@ func testResults() Results { return Results{ Labels: []string{"foo", "bar"}, Results: []Result{ - Result{ + { Time: testTime, Value: "foo", Values: nil, }, - Result{ + { Time: testTime.Add(time.Second * 30), Value: "bar", Values: nil,