Skip to content

Commit

Permalink
Cursor input skeleton (elastic#19378)
Browse files Browse the repository at this point in the history
This PR is part of introducing a new input architecture in filebeat. The current state of the full implementation can be seen [here](https://github.com/urso/beats/tree/fb-input-v2-combined/filebeat/input/v2) and [sample inputs based on the new API](https://github.com/urso/beats/tree/fb-input-v2-combined/filebeat/features/input).

The full list of changes will include:
- Introduce v2 API interfaces
- Introduce [compatibility layer](https://github.com/urso/beats/tree/fb-input-v2-combined/filebeat/input/v2/compat) to integrate API with existing functionality
- Introduce helpers for writing [stateless](https://github.com/urso/beats/blob/fb-input-v2-combined/filebeat/input/v2/input-stateless/stateless.go) inputs.
- Introduce helpers for writing [inputs that store a state](https://github.com/urso/beats/tree/fb-input-v2-combined/filebeat/input/v2/input-cursor) between restarts.
- Integrate new API with [existing inputs and modules](https://github.com/urso/beats/blob/fb-input-v2-combined/filebeat/beater/filebeat.go#L301) in filebeat.

The change introduces the skeleton and documentation with details for
cursor based inputs. Future updates will add the actual implementation
and tests.
  • Loading branch information
Steffen Siering authored Jun 29, 2020
1 parent 4286e1e commit 13633ce
Show file tree
Hide file tree
Showing 7 changed files with 611 additions and 0 deletions.
45 changes: 45 additions & 0 deletions filebeat/input/v2/input-cursor/clean.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package cursor

import (
"time"

"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/go-concert/unison"
)

// cleaner removes finished entries from the registry file.
type cleaner struct {
log *logp.Logger
}

// run starts a loop that tries to clean entries from the registry.
// The cleaner locks the store, such that no new states can be created
// during the cleanup phase. Only resources that are finished and whos TTL
// (clean_timeout setting) has expired will be removed.
//
// Resources are considered "Finished" if they do not have a current owner (active input), and
// if they have no pending updates that still need to be written to the registry file after associated
// events have been ACKed by the outputs.
// The event acquisition timestamp is used as reference to clean resources. If a resources was blocked
// for a long time, and the life time has been exhausted, then the resource will be removed immediately
// once the last event has been ACKed.
func (c *cleaner) run(canceler unison.Canceler, store *store, interval time.Duration) {
panic("TODO: implement me")
}
43 changes: 43 additions & 0 deletions filebeat/input/v2/input-cursor/cursor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package cursor

// Cursor allows the input to check if cursor status has been stored
// in the past and unpack the status into a custom structure.
type Cursor struct {
store *store
resource *resource
}

func makeCursor(store *store, res *resource) Cursor {
return Cursor{store: store, resource: res}
}

// IsNew returns true if no cursor information has been stored
// for the current Source.
func (c Cursor) IsNew() bool { return c.resource.IsNew() }

// Unpack deserialized the cursor state into to. Unpack fails if no pointer is
// given, or if the structure to points to is not compatible with the document
// stored.
func (c Cursor) Unpack(to interface{}) error {
if c.IsNew() {
return nil
}
return c.resource.UnpackCursor(to)
}
58 changes: 58 additions & 0 deletions filebeat/input/v2/input-cursor/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// Package cursor provides an InputManager for use with the v2 API, that is
// capable of storing an internal cursor state between restarts.
//
// The InputManager requires authors to Implement a configuration function and
// the cursor.Input interface. The configuration function returns a slice of
// sources ([]Source) that it has read from the configuration object, and the
// actual Input that will be used to collect events from each configured
// source.
// When Run a go-routine will be started per configured source. If two inputs have
// configured the same source, only one will be active, while the other waits
// for the resource to become free.
// The manager keeps track of the state per source. When publishing an event a
// new cursor value can be passed as well. Future instance of the input can
// read the last published cursor state.
//
// For each source an in-memory and a persitent state are tracked. Internal
// meta updates by the input manager can not be read by Inputs, and will be
// written to the persistent store immediately. Cursor state updates are read
// and update by the input. Cursor updates are written to the persistent store
// only after the events have been ACKed by the output. Internally the input
// manager keeps track of already ACKed updates and pending ACKs.
// In order to guarantee progress even if the pbulishing is slow or blocked, all cursor
// updates are written to the in-memory state immediately. Source without any
// pending updates are in-sync (in-memory state == persistet state). All
// updates are ordered, but we allow the in-memory state to be ahead of the
// persistent state.
// When an input is started, the cursor state is read from the in-memory state.
// This way a new input instance can continue where other inputs have been
// stopped, even if we still have in-flight events from older input instances.
// The coordination between inputs guarantees that all updates are always in
// order.
//
// When a shutdown signal is received, the publisher is directly disconnected
// from the outputs. As all coordination is directly handled by the
// InputManager, shutdown will be immediate (once the input itself has
// returned), and can not be blocked by the outputs.
//
// An input that is about to collect a source that is already collected by
// another input will wait until the other input has returned or the current
// input did receive a shutdown signal.
package cursor
84 changes: 84 additions & 0 deletions filebeat/input/v2/input-cursor/input.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package cursor

import (
"fmt"
"time"

input "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
)

// Input interface for cursor based inputs. This interface must be implemented
// by inputs that with to use the InputManager in order to implement a stateful
// input that can store state between restarts.
type Input interface {
Name() string

// Test checks the configuaration and runs additional checks if the Input can
// actually collect data for the given configuration (e.g. check if host/port or files are
// accessible).
// The input manager will call Test per configured source.
Test(Source, input.TestContext) error

// Run starts the data collection. Run must return an error only if the
// error is fatal making it impossible for the input to recover.
// The input run a go-routine can call Run per configured Source.
Run(input.Context, Source, Cursor, Publisher) error
}

// managedInput implements the v2.Input interface, integrating cursor Inputs
// with the v2 input API.
// The managedInput starts go-routines per configured source.
// If a Run returns the error is 'remembered', but active data collecting
// continues. Only after all Run calls have returned will the managedInput be
// done.
type managedInput struct {
manager *InputManager
userID string
sources []Source
input Input
cleanTimeout time.Duration
}

// Name is required to implement the v2.Input interface
func (inp *managedInput) Name() string { return inp.input.Name() }

// Test runs the Test method for each configured source.
func (inp *managedInput) Test(ctx input.TestContext) error {
panic("TODO: implement me")
}

// Run creates a go-routine per source, waiting until all go-routines have
// returned, either by error, or by shutdown signal.
// If an input panics, we create an error value with stack trace to report the
// issue, but not crash the whole process.
func (inp *managedInput) Run(
ctx input.Context,
pipeline beat.PipelineConnector,
) (err error) {
panic("TODO: implement me")
}

func (inp *managedInput) createSourceID(s Source) string {
if inp.userID != "" {
return fmt.Sprintf("%v::%v::%v", inp.manager.Type, inp.userID, s.Name())
}
return fmt.Sprintf("%v::%v", inp.manager.Type, s.Name())
}
106 changes: 106 additions & 0 deletions filebeat/input/v2/input-cursor/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package cursor

import (
"time"

input "github.com/elastic/beats/v7/filebeat/input/v2"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/statestore"

"github.com/elastic/go-concert/unison"
)

// InputManager is used to create, manage, and coordinate stateful inputs and
// their persistent state.
// The InputManager ensures that only one input can be active for a unique source.
// If two inputs have overlapping sources, both can still collect data, but
// only one input will collect from the common source.
//
// The InputManager automatically cleans up old entries without an active
// input, and without any pending update operations for the persistent store.
//
// The Type field is used to create the key name in the persistent store. Users
// are allowed to add a custome per input configuration ID using the `id`
// setting, to collect the same source multiple times, but with different
// state. The key name in the persistent store becomes <Type>-[<ID>]-<Source Name>
type InputManager struct {
Logger *logp.Logger

// StateStore gives the InputManager access to the persitent key value store.
StateStore StateStore

// Type must contain the name of the input type. It is used to create the key name
// for all sources the inputs collect from.
Type string

// DefaultCleanTimeout configures the key/value garbage collection interval.
// The InputManager will only collect keys for the configured 'Type'
DefaultCleanTimeout time.Duration

// Configure returns an array of Sources, and a configured Input instances
// that will be used to collect events from each source.
Configure func(cfg *common.Config) ([]Source, Input, error)

store *store
}

// Source describe a source the input can collect data from.
// The `Name` method must return an unique name, that will be used to identify
// the source in the persistent state store.
type Source interface {
Name() string
}

// StateStore interface and configurations used to give the Manager access to the persistent store.
type StateStore interface {
Access() (*statestore.Store, error)
CleanupInterval() time.Duration
}

// Init starts background processes for deleting old entries from the
// persistent store if mode is ModeRun.
func (cim *InputManager) Init(group unison.Group, mode v2.Mode) error {
panic("TODO: implement me")
}

// Create builds a new v2.Input using the provided Configure function.
// The Input will run a go-routine per source that has been configured.
func (cim *InputManager) Create(config *common.Config) (input.Input, error) {
panic("TODO: implement me")
}

func lockResource(log *logp.Logger, resource *resource, canceler input.Canceler) error {
if !resource.lock.TryLock() {
log.Infof("Resource '%v' currently in use, waiting...", resource.key)
err := resource.lock.LockContext(canceler)
if err != nil {
log.Infof("Input for resource '%v' has been stopped while waiting", resource.key)
return err
}
}
return nil
}

func releaseResource(resource *resource) {
resource.lock.Unlock()
resource.Release()
}
Loading

0 comments on commit 13633ce

Please sign in to comment.