Skip to content

Commit

Permalink
Cherry-pick #19408, #19533 to 7.x: Implement memlog on-disk handling (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Steffen Siering committed Jul 8, 2020
1 parent 80735bb commit 5f37ab5
Show file tree
Hide file tree
Showing 45 changed files with 1,307 additions and 33 deletions.
558 changes: 542 additions & 16 deletions libbeat/statestore/backend/memlog/diskstore.go

Large diffs are not rendered by default.

53 changes: 53 additions & 0 deletions libbeat/statestore/backend/memlog/json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package memlog

import (
"io"

"github.com/elastic/go-structform/gotype"
"github.com/elastic/go-structform/json"
)

type jsonEncoder struct {
out io.Writer
folder *gotype.Iterator
}

func newJSONEncoder(out io.Writer) *jsonEncoder {
e := &jsonEncoder{out: out}
e.reset()
return e
}

func (e *jsonEncoder) reset() {
visitor := json.NewVisitor(e.out)
visitor.SetEscapeHTML(false)

var err error

// create new encoder with custom time.Time encoding
e.folder, err = gotype.NewIterator(visitor)
if err != nil {
panic(err)
}
}

func (e *jsonEncoder) Encode(v interface{}) error {
return e.folder.Fold(v)
}
7 changes: 5 additions & 2 deletions libbeat/statestore/backend/memlog/memlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ type Settings struct {
// Checkpoint predicate that can trigger a registry file rotation. If not
// configured, memlog will automatically trigger a checkpoint every 10MB.
Checkpoint CheckpointPredicate

// If set memlog will not check the version of the meta file.
IgnoreVersionCheck bool
}

// CheckpointPredicate is the type for configurable checkpoint checks.
Expand All @@ -62,7 +65,7 @@ type CheckpointPredicate func(fileSize uint64) bool

const defaultFileMode os.FileMode = 0600

const defaultBufferSize = 4096
const defaultBufferSize = 4 * 1024

func defaultCheckpoint(filesize uint64) bool {
const limit = 10 * 1 << 20 // set rotation limit to 10MB by default
Expand Down Expand Up @@ -110,7 +113,7 @@ func (r *Registry) Access(name string) (backend.Store, error) {
home := filepath.Join(r.settings.Root, name)
fileMode := r.settings.FileMode
bufSz := r.settings.BufferSize
store, err := openStore(logger, home, fileMode, bufSz, r.settings.Checkpoint)
store, err := openStore(logger, home, fileMode, bufSz, r.settings.IgnoreVersionCheck, r.settings.Checkpoint)
if err != nil {
return nil, err
}
Expand Down
258 changes: 258 additions & 0 deletions libbeat/statestore/backend/memlog/memlog_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package memlog

import (
"encoding/json"
"io"
"io/ioutil"
"math"
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/beats/v7/libbeat/statestore/backend"
"github.com/elastic/beats/v7/libbeat/statestore/internal/storecompliance"
)

func init() {
logp.DevelopmentSetup()
}

func TestCompliance_Default(t *testing.T) {
storecompliance.TestBackendCompliance(t, func(testPath string) (backend.Registry, error) {
return New(logp.NewLogger("test"), Settings{Root: testPath})
})
}

func TestCompliance_AlwaysCheckpoint(t *testing.T) {
storecompliance.TestBackendCompliance(t, func(testPath string) (backend.Registry, error) {
return New(logp.NewLogger("test"), Settings{
Root: testPath,
Checkpoint: func(filesize uint64) bool {
return true
},
})
})
}

func TestLoadVersion1(t *testing.T) {
dataHome := "testdata/1"

list, err := ioutil.ReadDir(dataHome)
if err != nil {
t.Fatal(err)
}

cases := list[:0]
for _, info := range list {
if info.IsDir() {
cases = append(cases, info)
}
}

for _, info := range cases {
name := filepath.Base(info.Name())
t.Run(name, func(t *testing.T) {
testLoadVersion1Case(t, filepath.Join(dataHome, info.Name()))
})
}
}

func testLoadVersion1Case(t *testing.T, dataPath string) {
path, err := ioutil.TempDir("", "")
if err != nil {
t.Fatalf("Failed to create temporary test directory: %v", err)
}
defer os.RemoveAll(path)

t.Logf("Test tmp dir: %v", path)

if err := copyPath(path, dataPath); err != nil {
t.Fatalf("Failed to copy test file to the temporary directory: %v", err)
}

// load expected test results
raw, err := ioutil.ReadFile(filepath.Join(path, "expected.json"))
if err != nil {
t.Fatalf("Failed to load expected.json: %v", err)
}

expected := struct {
Txid uint64
Datafile string
Entries map[string]interface{}
}{}
if err := json.Unmarshal(raw, &expected); err != nil {
t.Fatalf("Failed to parse expected.json: %v", err)
}

// load store:
store, err := openStore(logp.NewLogger("test"), path, 0660, 4096, true, func(_ uint64) bool {
return false
})
if err != nil {
t.Fatalf("Failed to load test store: %v", err)
}
defer store.Close()

disk := store.disk
disk.removeOldDataFiles()

// validate store:
assert.Equal(t, expected.Txid, disk.nextTxID-1)
if expected.Datafile != "" {
assert.Equal(t, filepath.Join(path, expected.Datafile), disk.activeDataFile.path)
}

// check all keys in expected are known and do match stored values:
func() {
for key, val := range expected.Entries {
var tmp interface{}
err := store.Get(key, &tmp)
require.NoError(t, err, "error reading entry (key=%v)", key)

assert.Equal(t, val, tmp, "failed when checking key '%s'", key)
}
}()

// check store does not contain any additional keys
func() {
err = store.Each(func(key string, val statestore.ValueDecoder) (bool, error) {
_, exists := expected.Entries[string(key)]
if !exists {
t.Errorf("unexpected key: %s", key)
}
return true, nil
})
assert.NoError(t, err)
}()
}

func TestTxIDLessEqual(t *testing.T) {
cases := map[string]struct {
a, b uint64
want bool
}{
"is equal": {10, 10, true},
"is less": {8, 9, true},
"is bigger": {9, 8, false},
"is less 0 with integer overflow": {
math.MaxUint64 - 2, 0, true,
},
"is less random value with integer overflow": {
math.MaxUint64 - 2, 10, true,
},
"is less with large ids": {
math.MaxUint64 - 10, math.MaxUint64 - 9, true,
},
"is bigger with large ids": {
math.MaxUint64 - 9, math.MaxUint64 - 10, false,
},
}

for name, test := range cases {
t.Run(name, func(t *testing.T) {
got := isTxIDLessEqual(test.a, test.b)
if got != test.want {
t.Fatalf("%v <= %v should be %v", test.a, test.b, test.want)
}
})
}
}

func copyPath(to, from string) error {
info, err := os.Stat(from)
if err != nil {
return err
}

if info.IsDir() {
return copyDir(to, from)
}
if info.Mode().IsRegular() {
return copyFile(to, from)
}

// ignore other file types
return nil
}

func copyDir(to, from string) error {
if !isDir(to) {
info, err := os.Stat(from)
if err != nil {
return err
}

if err := os.MkdirAll(to, info.Mode()); err != nil {
return err
}
}

list, err := ioutil.ReadDir(from)
if err != nil {
return err
}

for _, file := range list {
name := file.Name()
err := copyPath(filepath.Join(to, name), filepath.Join(from, name))
if err != nil {
return err
}
}
return nil
}

func copyFile(to, from string) error {
in, err := os.Open(from)
if err != nil {
return err
}
defer in.Close()

info, err := in.Stat()
if err != nil {
return err
}

out, err := os.OpenFile(to, os.O_CREATE|os.O_RDWR|os.O_TRUNC, info.Mode())
if err != nil {
return err
}
defer out.Close()

_, err = io.Copy(out, in)
return err
}

func isDir(path string) bool {
info, err := os.Stat(path)
return err == nil && info.IsDir()
}

func isFile(path string) bool {
info, err := os.Stat(path)
return err == nil && info.Mode().IsRegular()
}
Loading

0 comments on commit 5f37ab5

Please sign in to comment.