Skip to content

Commit

Permalink
Storage persistence (#33)
Browse files Browse the repository at this point in the history
* Restructuring some of storage

* Clean-up, disable writes

* New storage initialization

* Avoid busy loop for raw results

* POC results saving and loading

* Clean-ups

* Make history usage a flag

* Clean-up

* Initialize labels with indexes

* Fix bugs, account for reader index reset with previous results

* Fix channel closing and index range get

* Clean-up

* Bug-fixing

* Update readme
  • Loading branch information
spacez320 authored Feb 9, 2024
1 parent 5465721 commit e72f04b
Show file tree
Hide file tree
Showing 9 changed files with 419 additions and 201 deletions.
13 changes: 10 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ Cryptarch's interactive window. The examples above use stream displays.

![Demo of graph display](https://raw.githubusercontent.com/spacez320/cryptarch/master/media/graph-display.gif)

### Persistence

Cryptarch, by default, will store results and load them when re-executing the same query. Storage is
located in the user's cache directory.

See: <https://pkg.go.dev/os#UserCacheDir>

### More Examples

> The examples below have been tested on `GNU bash, version 5.2.15(1)-release`.
Expand All @@ -56,11 +63,11 @@ Cryptarch's interactive window. The examples above use stream displays.
# See help.
cryptarch -h

# Execute `whoami` once, printing results to the console.
# Execute `whoami` once, printing results to the console and waiting for a user to `^C`.
cryptarch -q 'whoami'

# Execute `uptime` continuously, printing results to the console.
cryptarch -q 'uptime' -t -1
# Execute `uptime` continuously, printing results to the console, without using persistence.
cryptarch -q 'uptime' -t -1 -e=false

# Get the size of an NVME used space and output it to a table.
cryptarch -q 'df -h | grep nvme0n1p2 | awk '\''{print $3}'\''' -r 3 -v "NVME Used Space" -t -1
Expand Down
104 changes: 62 additions & 42 deletions internal/lib/display.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,8 @@ var (
interruptChan = make(chan bool) // Channel for interrupting displays.
)

// Starts the display. Applies contextual logic depending on the provided
// display driver. Expects a function to execute within a goroutine to update
// the display.
// Starts the display. Applies contextual logic depending on the provided display driver. Expects a
// function to execute within a goroutine to update the display.
func display(driver DisplayDriver, displayUpdateFunc func()) {
// Execute the update function.
go displayUpdateFunc()
Expand All @@ -85,44 +84,61 @@ func helpText() string {
return HELP_TEXT + fmt.Sprintf(
"\nQuery: %v | Labels: %v | Filters: %v",
currentCtx.Value("query"),
currentCtx.Value("labels"),
store.GetLabels(currentCtx.Value("query").(string)),
currentCtx.Value("filters"))
}

// Presents raw output.
func RawDisplay(query string) {
go func() {
for {
fmt.Println(GetResult(query))
}
}()
var (
reader = readerIndexes[query] // Reader index for the query.
)

// Wait for the first result to appear to synchronize storage.
GetResultWait(query)
reader.Dec()

// Load existing results.
for _, result := range store.GetToIndex(query, reader) {
fmt.Println(result)
}

// Load new results.
for {
fmt.Println(GetResult(query))
}
}

// Update the results pane with new results as they are generated.
func StreamDisplay(query string) {
var (
reader = readerIndexes[query] // Reader index for the query.
)

// Wait for the first result to appear to synchronize storage.
GetResultWait(query)
reader.Dec()

// Initialize the display.
resultsView, _, _ := initDisplayTviewText(helpText())

// Start the display.
display(
DISPLAY_TVIEW,
func() {
// Print labels as the first line, if they are present.
if labels := store.GetLabels(query); len(labels) > 0 {
appTview.QueueUpdateDraw(func() {
fmt.Fprintln(resultsView, labels)
})
}
// Print labels as the first line.
appTview.QueueUpdateDraw(func() {
fmt.Fprintln(resultsView, store.GetLabels(query))
})

// Print all previous results.
for _, result := range store.GetToIndex(query, readerIndexes[query]) {
for _, result := range store.GetToIndex(query, reader) {
fmt.Fprintln(resultsView, result.Value)
}

// Print results.
for {
// Listen for an interrupt event to stop result consumption in
// preparation for some display change.
// Listen for an interrupt to stop result consumption for some display change.
select {
case <-interruptChan:
// We've received an interrupt.
Expand All @@ -142,10 +158,15 @@ func StreamDisplay(query string) {
// Creates a table of results for the results pane.
func TableDisplay(query string, filters []string) {
var (
reader = readerIndexes[query] // Reader index for the query.
tableCellPadding = strings.Repeat(" ", TABLE_PADDING) // Padding to add to table cell content.
valueIndexes = []int{} // Indexes of the result values to add to the table.
)

// Wait for the first result to appear to synchronize storage.
GetResultWait(query)
reader.Dec()

// Initialize the display.
resultsView, _, _ := initDisplayTviewTable(helpText())

Expand All @@ -158,29 +179,26 @@ func TableDisplay(query string, filters []string) {
i = 0 // Used to determine the next row index.
)

// Determine the value indexes to populate into the graph. If no filter is
// provided, the index is assumed to be zero.
// Determine the value indexes to populate into the graph. If no filter is provided, the index
// is assumed to be zero.
if len(filters) > 0 {
for _, filter := range filters {
valueIndexes = append(valueIndexes, store.GetValueIndex(query, filter))
}
}

// Create the table header.
if labels := store.GetLabels(query); len(labels) > 0 {
appTview.QueueUpdateDraw(func() {
// Row to contain the labels.
headerRow := resultsView.InsertRow(i)
appTview.QueueUpdateDraw(func() {
// Row to contain the labels.
headerRow := resultsView.InsertRow(i)

for j, label := range FilterSlice(labels, valueIndexes) {
headerRow.SetCellSimple(i, j, tableCellPadding+label+tableCellPadding)
}
})
i += 1
}
for j, label := range FilterSlice(store.GetLabels(query), valueIndexes) {
headerRow.SetCellSimple(i, j, tableCellPadding+label+tableCellPadding)
}
})
i += 1

// Print all previous results.
for _, result := range store.GetToIndex(query, readerIndexes[query]) {
for _, result := range store.GetToIndex(query, reader) {
appTview.QueueUpdateDraw(func() {
var (
row = resultsView.InsertRow(i) // Row to contain the result.
Expand All @@ -204,8 +222,7 @@ func TableDisplay(query string, filters []string) {

// Print results.
for {
// Listen for an interrupt event to stop result consumption in
// preparation for some display change.
// Listen for an interrupt to stop result consumption for some display change.
select {
case <-interruptChan:
// We've received an interrupt.
Expand Down Expand Up @@ -243,11 +260,15 @@ func TableDisplay(query string, filters []string) {
// Creates a graph of results for the results pane.
func GraphDisplay(query string, filters []string) {
var (
valueIndex = 0 // Index of the result value to graph.
reader = readerIndexes[query] // Reader index for the query.
valueIndex = 0 // Index of the result value to graph.
)

// Determine the values to populate into the graph. If no filter is provided,
// the first value is taken.
// Wait for the first result to appear to synchronize storage.
GetResultWait(query)
reader.Dec()

// Determine the values to populate into the graph. If none is provided, the first value is taken.
if len(filters) > 0 {
valueIndex = store.GetValueIndex(query, filters[0])
}
Expand Down Expand Up @@ -278,7 +299,7 @@ func GraphDisplay(query string, filters []string) {
DISPLAY_TERMDASH,
func() {
// Print all previous results.
for _, result := range store.GetToIndex(query, readerIndexes[query]) {
for _, result := range store.GetToIndex(query, reader) {
// We can display the next result.
value := result.Values[valueIndex]

Expand All @@ -291,8 +312,7 @@ func GraphDisplay(query string, filters []string) {
}

for {
// Listen for an interrupt event to stop result consumption in
// preparation for some display change.
// Listen for an interrupt to stop result consumption for some display change.
select {
case <-interruptChan:
// We've received an interrupt.
Expand All @@ -315,7 +335,7 @@ func GraphDisplay(query string, filters []string) {
},
)

// Initialize the display. This must happen after the display function is
// invoked, otherwise data will never appear.
// Initialize the display. This must happen after the display function is invoked, otherwise data
// will never appear.
initDisplayTermdash(resultWidget, helpWidget, &logsWidgetWriter.text)
}
38 changes: 27 additions & 11 deletions internal/lib/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ const (
func runQuery(
query string,
attempts, delay int,
history bool,
doneChan, pauseChan chan bool,
queryFunc func(string),
queryFunc func(string, bool),
) {
// This loop executes as long as attempts has not been reached, or
// indefinitely if attempts is less than zero.
Expand All @@ -34,7 +35,7 @@ func runQuery(
// message from the pause channel.
<-pauseChan
default:
queryFunc(query)
queryFunc(query, history)

// This is not the last execution--add a delay.
if i != attempts {
Expand All @@ -47,16 +48,16 @@ func runQuery(
}

// Executes a query as a process to profile.
func runQueryProfile(pid string) {
func runQueryProfile(pid string, history bool) {
slog.Debug(fmt.Sprintf("Executing profile for PID: '%s' ...", pid))

pidInt, err := strconv.Atoi(pid)
e(err)
AddResult(pid, runProfile(pidInt))
AddResult(pid, runProfile(pidInt), history)
}

// Executes a query as a command to exec.
func runQueryExec(query string) {
func runQueryExec(query string, history bool) {
slog.Debug(fmt.Sprintf("Executing query: '%s' ...", query))

// Prepare query execution.
Expand All @@ -82,7 +83,7 @@ func runQueryExec(query string) {
// Interpret results.
cmd_output, cmd_output_err := io.ReadAll(stdout)
e(cmd_output_err)
AddResult(query, string(cmd_output))
AddResult(query, string(cmd_output), history)
slog.Debug(fmt.Sprintf("Query '%s' result is: %s", query, cmd_output))

// Clean-up.
Expand All @@ -91,11 +92,10 @@ func runQueryExec(query string) {

// Entrypoint for 'query' mode.
func Query(
queryMode int,
queryMode, attempts, delay int,
queries []string,
attempts int,
delay int,
port string,
history bool,
) (chan bool, map[string]chan bool) {
var (
doneQueriesChan = make(chan bool) // Signals overall completion.
Expand All @@ -114,10 +114,26 @@ func Query(
switch queryMode {
case QUERY_MODE_COMMAND:
slog.Debug("Executing in query mode command.")
go runQuery(query, attempts, delay, doneQueryChan, pauseQueryChans[query], runQueryExec)
go runQuery(
query,
attempts,
delay,
history,
doneQueryChan,
pauseQueryChans[query],
runQueryExec,
)
case QUERY_MODE_PROFILE:
slog.Debug("Executing in query mode profile.")
go runQuery(query, attempts, delay, doneQueryChan, pauseQueryChans[query], runQueryProfile)
go runQuery(
query,
attempts,
delay,
history,
doneQueryChan,
pauseQueryChans[query],
runQueryProfile,
)
}
}

Expand Down
Loading

0 comments on commit e72f04b

Please sign in to comment.