Skip to content

Commit

Permalink
add IngestWorkers config value
Browse files Browse the repository at this point in the history
  • Loading branch information
lezhnev74 committed Jan 17, 2024
1 parent 93815f2 commit 4490ac9
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 2 deletions.
4 changes: 2 additions & 2 deletions heaplog/heaplog.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"heaplog/storage"
"log"
"regexp"
"runtime"
"time"
)

Expand All @@ -33,6 +32,7 @@ func NewHeaplog(
tokenizerFunc func(input string) []string,
unboundTokenizerFunc func(input string) []string,
indexSegmentSize int64,
ingestWorkers int,
) (*Heaplog, error) {
s, err := storage.NewStorage(storageRoot, ingestFlushTick, searchFlushTick)
if err != nil {
Expand All @@ -42,7 +42,7 @@ func NewHeaplog(

_scanner := scanner.NewScanner(dateLayout, messageStartPattern, 10_000_000, 100_000_000)
_indexer := indexer.NewIndexer(_scanner, tokenizerFunc)
ingestor := ingest.NewIngestor(s, _indexer, indexSegmentSize, runtime.NumCPU())
ingestor := ingest.NewIngestor(s, _indexer, indexSegmentSize, ingestWorkers)
discover := ingest.NewDiscover(globs, s)

_selector := search.NewSegmentSelector(s, unboundTokenizerFunc, tokenizerFunc)
Expand Down
2 changes: 2 additions & 0 deletions heaplog/heaplog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"heaplog/test"
"os"
"regexp"
"runtime"
"testing"
"time"
)
Expand Down Expand Up @@ -178,6 +179,7 @@ func initHeaplog(t *testing.T) *Heaplog {
boundTokenizerFunc,
unboundTokenizerFunc,
80, // small segments will contain a single message
runtime.NumCPU(),
)
require.NoError(t, err)

Expand Down
3 changes: 3 additions & 0 deletions ui/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ type Config struct {
// the pattern of a date in a message
// see https://go.dev/src/time/format.go
DateFormat string `validate:"required" yaml:"DateFormat"`
// sets parallel degree of ingesting.
// defaults to the number of cores is omitted or 0.
IngestWorkers uint
}

var DefaultCfg = Config{
Expand Down
21 changes: 21 additions & 0 deletions ui/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"os"
"path/filepath"
"regexp"
"runtime"
"strings"
"time"
)
Expand All @@ -22,6 +23,11 @@ func buildHeaplog(cfg Config) *heaplog.Heaplog {
tokenizerFunc := func(input string) []string { return tokenizer.TokenizeS2(input, 4, maxTokenLen) }
unboundTokenizerFunc := func(input string) []string { return tokenizer.TokenizeS2(input, 1, maxTokenLen) }

ingestWorkers := int(cfg.IngestWorkers)
if ingestWorkers == 0 {
ingestWorkers = runtime.NumCPU()
}

hl, err := heaplog.NewHeaplog(
cfg.StoragePath,
regexp.MustCompile(cfg.MessageStartRE),
Expand All @@ -32,6 +38,7 @@ func buildHeaplog(cfg Config) *heaplog.Heaplog {
tokenizerFunc,
unboundTokenizerFunc,
50_000_000,
ingestWorkers,
)

if err != nil {
Expand All @@ -54,6 +61,20 @@ var runCmd = &cobra.Command{
Short: "Run heaplog daemon",
PreRun: allowConfigFlags,
Run: func(cmd *cobra.Command, args []string) {

// // -- Start Tracing
// traceFile, _ := os.Create("trace")
// trace.Start(traceFile)
// c := make(chan os.Signal, 1)
// signal.Notify(c, os.Interrupt)
// go func() {
// <-c
// trace.Stop()
// traceFile.Close()
// os.Exit(0)
// }()
// // -- End Tracing

hl := buildHeaplog(LoadConfig(true))
go hl.Background()

Expand Down

0 comments on commit 4490ac9

Please sign in to comment.