Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add initial skeleton of filestream input #21427

Merged
merged 3 commits into from
Oct 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package filestream

import (
loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile"
input "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/logp"
)

// filestream is the input for reading from files which
// are actively written by other applications.
type filestream struct{}

const pluginName = "filestream"

// Plugin creates a new filestream input plugin for creating a stateful input.
func Plugin(log *logp.Logger, store loginp.StateStore) input.Plugin {
return input.Plugin{
Name: pluginName,
Stability: feature.Experimental,
Deprecated: false,
Info: "filestream input",
Doc: "The filestream input collects logs from the local filestream service",
Manager: &loginp.InputManager{
Logger: log,
StateStore: store,
Type: pluginName,
Configure: configure,
},
}
}

func configure(cfg *common.Config) (loginp.Prospector, loginp.Harvester, error) {
panic("TODO: implement me")
}

func (inp *filestream) Name() string { return pluginName }

func (inp *filestream) Test(src loginp.Source, ctx input.TestContext) error {
panic("TODO: implement me")
}

func (inp *filestream) Run(
ctx input.Context,
src loginp.Source,
cursor loginp.Cursor,
publisher loginp.Publisher,
) error {
panic("TODO: implement me")
}
124 changes: 124 additions & 0 deletions filebeat/input/filestream/internal/input-logfile/clean.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package input_logfile

import (
"time"

"github.com/elastic/go-concert/timed"
"github.com/elastic/go-concert/unison"

"github.com/elastic/beats/v7/libbeat/logp"
)

// cleaner removes finished entries from the registry file.
type cleaner struct {
log *logp.Logger
}

// run starts a loop that tries to clean entries from the registry.
// The cleaner locks the store, such that no new states can be created
// during the cleanup phase. Only resources that are finished and whos TTL
// (clean_timeout setting) has expired will be removed.
//
// Resources are considered "Finished" if they do not have a current owner (active input), and
// if they have no pending updates that still need to be written to the registry file after associated
// events have been ACKed by the outputs.
// The event acquisition timestamp is used as reference to clean resources. If a resources was blocked
// for a long time, and the life time has been exhausted, then the resource will be removed immediately
// once the last event has been ACKed.
func (c *cleaner) run(canceler unison.Canceler, store *store, interval time.Duration) {
started := time.Now()
timed.Periodic(canceler, interval, func() error {
gcStore(c.log, started, store)
return nil
})
}

// gcStore looks for resources to remove and deletes these. `gcStore` receives
// the start timestamp of the cleaner as reference. If we have entries without
// updates in the registry, that are older than `started`, we will use `started
// + ttl` to decide if an entry will be removed. This way old entries are not
// removed immediately on startup if the Beat is down for a longer period of
// time.
func gcStore(log *logp.Logger, started time.Time, store *store) {
log.Debugf("Start store cleanup")
defer log.Debugf("Done store cleanup")

states := store.ephemeralStore
states.mu.Lock()
defer states.mu.Unlock()

keys := gcFind(states.table, started, time.Now())
if len(keys) == 0 {
log.Debug("No entries to remove were found")
return
}

if err := gcClean(store, keys); err != nil {
log.Errorf("Failed to remove all entries from the registry: %+v", err)
}
}

// gcFind searches the store of resources that can be removed. A set of keys to delete is returned.
func gcFind(table map[string]*resource, started, now time.Time) map[string]struct{} {
keys := map[string]struct{}{}
for key, resource := range table {
clean := checkCleanResource(started, now, resource)
if !clean {
// do not clean the resource if it is still live or not serialized to the persistent store yet.
continue
}
keys[key] = struct{}{}
}

return keys
}

// gcClean removes key value pairs in the removeSet from the store.
// If deletion in the persistent store fails the entry is kept in memory and
// eventually cleaned up later.
func gcClean(store *store, removeSet map[string]struct{}) error {
for key := range removeSet {
if err := store.persistentStore.Remove(key); err != nil {
return err
}
delete(store.ephemeralStore.table, key)
}
return nil
}

// checkCleanResource returns true for a key-value pair is assumed to be old,
// if is not in use and there are no more pending updates that still need to be
// written to the persistent store anymore.
func checkCleanResource(started, now time.Time, resource *resource) bool {
if !resource.Finished() {
return false
}

resource.stateMutex.Lock()
defer resource.stateMutex.Unlock()

ttl := resource.internalState.TTL
reference := resource.internalState.Updated
if started.After(reference) {
reference = started
}

return reference.Add(ttl).Before(now) && resource.stored
}
162 changes: 162 additions & 0 deletions filebeat/input/filestream/internal/input-logfile/clean_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package input_logfile

import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/logp"
)

func TestGCStore(t *testing.T) {
t.Run("empty store", func(t *testing.T) {
started := time.Now()

backend := createSampleStore(t, nil)
store := testOpenStore(t, "test", backend)
defer store.Release()

gcStore(logp.NewLogger("test"), started, store)

want := map[string]state{}
checkEqualStoreState(t, want, backend.snapshot())
})

t.Run("state is still alive", func(t *testing.T) {
started := time.Now()
const ttl = 60 * time.Second

initState := map[string]state{
"test::key": {
TTL: ttl,
Updated: started.Add(-ttl / 2),
},
}

backend := createSampleStore(t, initState)
store := testOpenStore(t, "test", backend)
defer store.Release()

gcStore(logp.NewLogger("test"), started, store)

checkEqualStoreState(t, initState, backend.snapshot())
})

t.Run("old state can be removed", func(t *testing.T) {
const ttl = 60 * time.Second
started := time.Now().Add(-5 * ttl) // cleanup process is running for a while already

initState := map[string]state{
"test::key": {
TTL: ttl,
Updated: started.Add(-ttl),
},
}

backend := createSampleStore(t, initState)
store := testOpenStore(t, "test", backend)
defer store.Release()

gcStore(logp.NewLogger("test"), started, store)

want := map[string]state{}
checkEqualStoreState(t, want, backend.snapshot())
})

t.Run("old state is not removed if cleanup is not active long enough", func(t *testing.T) {
const ttl = 60 * time.Minute
started := time.Now()

initState := map[string]state{
"test::key": {
TTL: ttl,
Updated: started.Add(-2 * ttl),
},
}

backend := createSampleStore(t, initState)
store := testOpenStore(t, "test", backend)
defer store.Release()

gcStore(logp.NewLogger("test"), started, store)

checkEqualStoreState(t, initState, backend.snapshot())
})

t.Run("old state but resource is accessed", func(t *testing.T) {
const ttl = 60 * time.Second
started := time.Now().Add(-5 * ttl) // cleanup process is running for a while already

initState := map[string]state{
"test::key": {
TTL: ttl,
Updated: started.Add(-ttl),
},
}

backend := createSampleStore(t, initState)
store := testOpenStore(t, "test", backend)
defer store.Release()

// access resource and check it is not gc'ed
res := store.Get("test::key")
gcStore(logp.NewLogger("test"), started, store)
checkEqualStoreState(t, initState, backend.snapshot())

// release resource and check it gets gc'ed
res.Release()
want := map[string]state{}
gcStore(logp.NewLogger("test"), started, store)
checkEqualStoreState(t, want, backend.snapshot())
})

t.Run("old state but resource has pending updates", func(t *testing.T) {
const ttl = 60 * time.Second
started := time.Now().Add(-5 * ttl) // cleanup process is running for a while already

initState := map[string]state{
"test::key": {
TTL: ttl,
Updated: started.Add(-ttl),
},
}

backend := createSampleStore(t, initState)
store := testOpenStore(t, "test", backend)
defer store.Release()

// create pending update operation
res := store.Get("test::key")
op, err := createUpdateOp(store, res, "test-state-update")
require.NoError(t, err)
res.Release()

// cleanup fails
gcStore(logp.NewLogger("test"), started, store)
checkEqualStoreState(t, initState, backend.snapshot())

// cancel operation (no more pending operations) and try to gc again
op.done(1)
gcStore(logp.NewLogger("test"), started, store)
want := map[string]state{}
checkEqualStoreState(t, want, backend.snapshot())
})
}
Loading