From 4490ac956df2df4c1659c6a236fe82c404f83cd1 Mon Sep 17 00:00:00 2001 From: lezhnev Date: Wed, 17 Jan 2024 23:01:24 +0100 Subject: [PATCH] add IngestWorkers config value --- heaplog/heaplog.go | 4 ++-- heaplog/heaplog_test.go | 2 ++ ui/config.go | 3 +++ ui/console.go | 21 +++++++++++++++++++++ 4 files changed, 28 insertions(+), 2 deletions(-) diff --git a/heaplog/heaplog.go b/heaplog/heaplog.go index e50750b..8e843b0 100644 --- a/heaplog/heaplog.go +++ b/heaplog/heaplog.go @@ -10,7 +10,6 @@ import ( "heaplog/storage" "log" "regexp" - "runtime" "time" ) @@ -33,6 +32,7 @@ func NewHeaplog( tokenizerFunc func(input string) []string, unboundTokenizerFunc func(input string) []string, indexSegmentSize int64, + ingestWorkers int, ) (*Heaplog, error) { s, err := storage.NewStorage(storageRoot, ingestFlushTick, searchFlushTick) if err != nil { @@ -42,7 +42,7 @@ func NewHeaplog( _scanner := scanner.NewScanner(dateLayout, messageStartPattern, 10_000_000, 100_000_000) _indexer := indexer.NewIndexer(_scanner, tokenizerFunc) - ingestor := ingest.NewIngestor(s, _indexer, indexSegmentSize, runtime.NumCPU()) + ingestor := ingest.NewIngestor(s, _indexer, indexSegmentSize, ingestWorkers) discover := ingest.NewDiscover(globs, s) _selector := search.NewSegmentSelector(s, unboundTokenizerFunc, tokenizerFunc) diff --git a/heaplog/heaplog_test.go b/heaplog/heaplog_test.go index c6a825a..4c3606e 100644 --- a/heaplog/heaplog_test.go +++ b/heaplog/heaplog_test.go @@ -7,6 +7,7 @@ import ( "heaplog/test" "os" "regexp" + "runtime" "testing" "time" ) @@ -178,6 +179,7 @@ func initHeaplog(t *testing.T) *Heaplog { boundTokenizerFunc, unboundTokenizerFunc, 80, // small segments will contain a single message + runtime.NumCPU(), ) require.NoError(t, err) diff --git a/ui/config.go b/ui/config.go index 7acbca9..a8574c8 100644 --- a/ui/config.go +++ b/ui/config.go @@ -23,6 +23,9 @@ type Config struct { // the pattern of a date in a message // see https://go.dev/src/time/format.go DateFormat string `validate:"required" yaml:"DateFormat"` + // sets parallel degree of ingesting. + // defaults to the number of cores is omitted or 0. + IngestWorkers uint } var DefaultCfg = Config{ diff --git a/ui/console.go b/ui/console.go index f1afa28..59766a4 100644 --- a/ui/console.go +++ b/ui/console.go @@ -13,6 +13,7 @@ import ( "os" "path/filepath" "regexp" + "runtime" "strings" "time" ) @@ -22,6 +23,11 @@ func buildHeaplog(cfg Config) *heaplog.Heaplog { tokenizerFunc := func(input string) []string { return tokenizer.TokenizeS2(input, 4, maxTokenLen) } unboundTokenizerFunc := func(input string) []string { return tokenizer.TokenizeS2(input, 1, maxTokenLen) } + ingestWorkers := int(cfg.IngestWorkers) + if ingestWorkers == 0 { + ingestWorkers = runtime.NumCPU() + } + hl, err := heaplog.NewHeaplog( cfg.StoragePath, regexp.MustCompile(cfg.MessageStartRE), @@ -32,6 +38,7 @@ func buildHeaplog(cfg Config) *heaplog.Heaplog { tokenizerFunc, unboundTokenizerFunc, 50_000_000, + ingestWorkers, ) if err != nil { @@ -54,6 +61,20 @@ var runCmd = &cobra.Command{ Short: "Run heaplog daemon", PreRun: allowConfigFlags, Run: func(cmd *cobra.Command, args []string) { + + // // -- Start Tracing + // traceFile, _ := os.Create("trace") + // trace.Start(traceFile) + // c := make(chan os.Signal, 1) + // signal.Notify(c, os.Interrupt) + // go func() { + // <-c + // trace.Stop() + // traceFile.Close() + // os.Exit(0) + // }() + // // -- End Tracing + hl := buildHeaplog(LoadConfig(true)) go hl.Background()