Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[extension/observer/docker] add ListAndWatch to observer #5851

Merged
merged 3 commits into from
Oct 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ INTEGRATION_TEST_MODULES := \
receiver/zookeeperreceiver \
receiver/kafkametricsreceiver \
receiver/nginxreceiver \
internal/common
internal/common \
extension/observer/dockerobserver

.DEFAULT_GOAL := all

Expand Down Expand Up @@ -292,4 +293,4 @@ multimod-verify: install-tools

.PHONY: multimod-prerelease
multimod-prerelease: install-tools
multimod prerelease -v ./versions.yaml -m contrib-base
multimod prerelease -v ./versions.yaml -m contrib-base
21 changes: 21 additions & 0 deletions extension/observer/dockerobserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package dockerobserver

import (
"errors"
"fmt"
"time"

"go.opentelemetry.io/collector/config"
Expand Down Expand Up @@ -54,4 +56,23 @@ type Config struct {
// through the docker event listener example: cache_sync_interval: "20m"
// Default: "60m"
CacheSyncInterval time.Duration `mapstructure:"cache_sync_interval"`

// Docker client API version. Default is 1.22
DockerAPIVersion float64 `mapstructure:"api_version"`
}

func (config Config) Validate() error {
if config.Endpoint == "" {
return errors.New("endpoint must be specified")
}
if config.DockerAPIVersion < minimalRequiredDockerAPIVersion {
return fmt.Errorf("api_version must be at least %v", minimalRequiredDockerAPIVersion)
}
if config.Timeout == 0 {
return fmt.Errorf("timeout must be specified")
}
if config.CacheSyncInterval == 0 {
return fmt.Errorf("cache_sync_interval must be specified")
}
return nil
}
18 changes: 18 additions & 0 deletions extension/observer/dockerobserver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,24 @@ func TestLoadConfig(t *testing.T) {
UseHostnameIfPresent: true,
UseHostBindings: true,
IgnoreNonHostBindings: true,
DockerAPIVersion: 1.22,
},
ext1)
}

func TestValidateConfig(t *testing.T) {
cfg := &Config{}
assert.Equal(t, "endpoint must be specified", cfg.Validate().Error())

cfg = &Config{Endpoint: "someEndpoint"}
assert.Equal(t, "api_version must be at least 1.22", cfg.Validate().Error())

cfg = &Config{Endpoint: "someEndpoint", DockerAPIVersion: 1.22}
assert.Equal(t, "timeout must be specified", cfg.Validate().Error())

cfg = &Config{Endpoint: "someEndpoint", DockerAPIVersion: 1.22, Timeout: 5 * time.Minute}
assert.Equal(t, "cache_sync_interval must be specified", cfg.Validate().Error())

cfg = &Config{Endpoint: "someEndpoint", DockerAPIVersion: 1.22, Timeout: 5 * time.Minute, CacheSyncInterval: 5 * time.Minute}
assert.Nil(t, cfg.Validate())
}
183 changes: 166 additions & 17 deletions extension/observer/dockerobserver/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,189 @@ import (
"context"
"encoding/json"
"fmt"
"reflect"
"strings"
"time"

dtypes "github.com/docker/docker/api/types"
"github.com/docker/go-connections/nat"
"go.opentelemetry.io/collector/component"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer"
docker "github.com/open-telemetry/opentelemetry-collector-contrib/internal/docker"
)

var _ (component.Extension) = (*dockerObserver)(nil)
mstumpfx marked this conversation as resolved.
Show resolved Hide resolved
var _ observer.Observable = (*dockerObserver)(nil)

const (
defaultDockerAPIVersion = 1.22
minimalRequiredDockerAPIVersion = 1.22
)

type dockerObserver struct {
logger *zap.Logger
config *Config
logger *zap.Logger
config *Config
cancel func()
existingEndpoints map[string][]observer.Endpoint
ctx context.Context
dClient *docker.Client
}

// Start will instantiate required components needed by the Docker observer
func (d *dockerObserver) Start(ctx context.Context, host component.Host) error {
dCtx, cancel := context.WithCancel(context.Background())
d.cancel = cancel
d.ctx = dCtx
mstumpfx marked this conversation as resolved.
Show resolved Hide resolved
var err error

// Create new Docker client
dConfig, err := docker.NewConfig(d.config.Endpoint, d.config.Timeout, d.config.ExcludedImages, d.config.DockerAPIVersion)
if err != nil {
return err
}

d.dClient, err = docker.NewDockerClient(dConfig, d.logger)
if err != nil {
return fmt.Errorf("could not create docker client: %w", err)
}

// Load initial set of containers
err = d.dClient.LoadContainerList(d.ctx)
if err != nil {
return fmt.Errorf("could not load initial list of containers: %w", err)
}

d.existingEndpoints = make(map[string][]observer.Endpoint)

return nil
}

func (d *dockerObserver) Shutdown(ctx context.Context) error {
d.cancel()
return nil
}

// ListAndWatch provides initial state sync as well as change notification.
// Emits initial list of endpoints loaded upon extension Start. It then goes into
// a loop to sync the container cache periodically and change endpoints.
// TODO: Watch docker events to notify listener of changed endpoints as
// events stream
func (d *dockerObserver) ListAndWatch(listener observer.Notify) {
mstumpfx marked this conversation as resolved.
Show resolved Hide resolved
d.emitContainerEndpoints(listener)

go func() {
ticker := time.NewTicker(d.config.CacheSyncInterval)
defer ticker.Stop()
for {
select {
case <-d.ctx.Done():
return
case <-ticker.C:
err := d.syncContainerList(listener)
if err != nil {
d.logger.Error("Could not sync container cache", zap.Error(err))
}
}
}
// TODO: Implement event loop to watch container events to add/remove/update
// endpoints as they occur.
}()
}

// emitContainerEndpoints notifies the listener of all changes
// by loading all current containers the client has cached and
// creating endpoints for each.
func (d *dockerObserver) emitContainerEndpoints(listener observer.Notify) {
for _, c := range d.dClient.Containers() {
endpointsMap := d.endpointsForContainer(c.ContainerJSON)
d.updateEndpointsByContainerID(listener, c.ContainerJSON.ID, endpointsMap)
}
}

// syncContainerList refreshes the client's container cache and
// uses the listener to notify endpoint changes.
func (d *dockerObserver) syncContainerList(listener observer.Notify) error {
err := d.dClient.LoadContainerList(d.ctx)
if err != nil {
return err
}
d.emitContainerEndpoints(listener)
return nil
}

// updateEndpointsByID uses the listener to add / remove / update endpoints by container ID.
// latestEndpointsMap is a map of latest endpoints for the given container ID.
// If an endpoint is in the cache but NOT in latestEndpoints, the endpoint will be removed
func (d *dockerObserver) updateEndpointsByContainerID(listener observer.Notify, cid string, latestEndpointsMap map[observer.EndpointID]observer.Endpoint) {
var removedEndpoints, addedEndpoints, updatedEndpoints []observer.Endpoint

if latestEndpointsMap != nil || len(latestEndpointsMap) != 0 {
// map of EndpointID to endpoint to help with lookups
existingEndpointsMap := make(map[observer.EndpointID]observer.Endpoint)
if endpoints, ok := d.existingEndpoints[cid]; ok {
for _, e := range endpoints {
existingEndpointsMap[e.ID] = e
}
}

// If the endpoint is present in existingEndpoints but is not
// present in latestEndpointsMap, then it needs to be removed.
for id, e := range existingEndpointsMap {
if _, ok := latestEndpointsMap[id]; !ok {
removedEndpoints = append(removedEndpoints, e)
}
}

// if the endpoint is present in latestEndpointsMap, check if it exists
// already in existingEndpoints.
for _, e := range latestEndpointsMap {
// If it does not exist already, it is a new endpoint. Add it.
if existingEndpoint, ok := existingEndpointsMap[e.ID]; !ok {
addedEndpoints = append(addedEndpoints, e)
} else {
// if it already exists, see if it's equal.
// if it's not equal, update it
// if its equal, no-op.
if !reflect.DeepEqual(existingEndpoint, e) {
updatedEndpoints = append(updatedEndpoints, e)
}
}
}

// reset endpoints for this container
d.existingEndpoints[cid] = nil
// set the current known endpoints to the latest endpoints
for _, e := range latestEndpointsMap {
d.existingEndpoints[cid] = append(d.existingEndpoints[cid], e)
}
} else {
// if latestEndpointsMap is nil, we are removing all endpoints for the container
removedEndpoints = append(removedEndpoints, d.existingEndpoints[cid]...)
delete(d.existingEndpoints, cid)
}

if len(removedEndpoints) > 0 {
listener.OnRemove(removedEndpoints)
}

if len(addedEndpoints) > 0 {
listener.OnAdd(addedEndpoints)
}

if len(updatedEndpoints) > 0 {
listener.OnChange(updatedEndpoints)
}
}

// endpointsForContainer generates a list of observer.Endpoint given a Docker ContainerJSON.
// This function will only generate endpoints if a container is in the Running state and not Paused.
func (d *dockerObserver) endpointsForContainer(c *dtypes.ContainerJSON) []observer.Endpoint {
cEndpoints := make([]observer.Endpoint, 0)
func (d *dockerObserver) endpointsForContainer(c *dtypes.ContainerJSON) map[observer.EndpointID]observer.Endpoint {
endpointsMap := make(map[observer.EndpointID]observer.Endpoint)

if !c.State.Running || c.State.Running && c.State.Paused {
return cEndpoints
return endpointsMap
}

knownPorts := map[nat.Port]bool{}
Expand All @@ -64,15 +215,19 @@ func (d *dockerObserver) endpointsForContainer(c *dtypes.ContainerJSON) []observ
if endpoint == nil {
continue
}
cEndpoints = append(cEndpoints, *endpoint)
endpointsMap[endpoint.ID] = *endpoint
}

if len(endpointsMap) == 0 {
return nil
}

for _, e := range cEndpoints {
for _, e := range endpointsMap {
s, _ := json.MarshalIndent(e, "", "\t")
d.logger.Debug("Discovered Docker container endpoint", zap.Any("endpoint", s))
}

return cEndpoints
return endpointsMap
}

// endpointForPort creates an observer.Endpoint for a given port that is exposed in a Docker container.
Expand Down Expand Up @@ -105,38 +260,32 @@ func (d *dockerObserver) endpointForPort(portObj nat.Port, c *dtypes.ContainerJS
Transport: portProtoToTransport(proto),
Labels: c.Config.Labels,
}
var target string

// Set our target and hostname based on config settings
// Set our hostname based on config settings
if d.config.UseHostnameIfPresent && c.Config.Hostname != "" {
target = c.Config.Hostname
details.Host = c.Config.Hostname
} else {
// Use the IP Address of the first network we iterate over.
// This can be made configurable if so desired.
for _, n := range c.NetworkSettings.Networks {
target = n.IPAddress
details.Host = n.IPAddress
break
}

// If we still haven't gotten a host at this point and we are using
// host bindings, just make it localhost.
if target == "" && d.config.UseHostBindings {
target = "127.0.0.1"
if details.Host == "" && d.config.UseHostBindings {
details.Host = "127.0.0.1"
}
}

// If we are using HostBindings & port and IP are set, use those
if d.config.UseHostBindings && mappedPort != 0 && mappedIP != "" {
details.Host = mappedIP
target = mappedIP
details.Port = mappedPort
details.AlternatePort = port
if details.Host == "0.0.0.0" {
details.Host = "127.0.0.1"
target = "127.0.0.1"
}
} else {
details.Port = port
Expand All @@ -145,7 +294,7 @@ func (d *dockerObserver) endpointForPort(portObj nat.Port, c *dtypes.ContainerJS

endpoint = observer.Endpoint{
ID: id,
Target: target,
Target: fmt.Sprintf("%s:%d", details.Host, details.Port),
Details: details,
}

Expand Down
Loading