Skip to content
This repository has been archived by the owner on Dec 1, 2018. It is now read-only.

Commit

Permalink
Merge pull request #1260 from AlmogBaku/es_sink_improvments
Browse files Browse the repository at this point in the history
Improve ES Sink:
  • Loading branch information
piosz authored Sep 2, 2016
2 parents 0e22d39 + e6425d1 commit 2a7fbcf
Show file tree
Hide file tree
Showing 198 changed files with 1,593 additions and 46 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ eventer
*.un~
Session.vim
.netrwhist
.idea
18 changes: 13 additions & 5 deletions Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

57 changes: 57 additions & 0 deletions common/elasticsearch/aws.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2015 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package elasticsearch

import (
"fmt"
awsauth "github.com/smartystreets/go-aws-auth"
"net/http"
"os"
)

type AWSSigningTransport struct {
HTTPClient *http.Client
Credentials awsauth.Credentials
}

// RoundTrip implementation
func (a AWSSigningTransport) RoundTrip(req *http.Request) (*http.Response, error) {
return a.HTTPClient.Do(awsauth.Sign4(req, a.Credentials))
}

func createAWSClient() (*http.Client, error) {
id := os.Getenv("AWS_ACCESS_KEY_ID")
if id == "" {
id = os.Getenv("AWS_ACCESS_KEY")
}

secret := os.Getenv("AWS_SECRET_ACCESS_KEY")
if secret == "" {
secret = os.Getenv("AWS_SECRET_KEY")
}

if id == "" || secret == "" {
return nil, fmt.Errorf("Failed to configure AWS authentication. Both `AWS_ACCESS_KEY_ID` and " +
"`AWS_SECRET_ACCESS_KEY` environment veriables required")
}

signingTransport := AWSSigningTransport{
Credentials: awsauth.Credentials{
AccessKeyID: id,
SecretAccessKey: secret,
},
HTTPClient: http.DefaultClient,
}
return &http.Client{Transport: http.RoundTripper(signingTransport)}, nil
}
51 changes: 34 additions & 17 deletions common/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"time"

"github.com/golang/glog"
"github.com/olivere/elastic"
"github.com/pborman/uuid"

"gopkg.in/olivere/elastic.v3"
"os"
)

const (
Expand Down Expand Up @@ -53,14 +55,14 @@ func SaveDataIntoES(esClient *elastic.Client, indexName string, typeName string,
return err
}
if !createIndex.Acknowledged {
return fmt.Errorf("failed to create Index in ES cluster: %s", err)
return fmt.Errorf("Failed to create Index in ES cluster: %s", err)
}
}
indexID := uuid.NewUUID()
_, err = esClient.Index().
Index(indexName).
Type(typeName).
Id(string(indexID)).
Id(indexID.String()).
BodyJson(sinkData).
Do()
if err != nil {
Expand All @@ -76,7 +78,7 @@ func CreateElasticSearchConfig(uri *url.URL) (*ElasticSearchConfig, error) {
var esConfig ElasticSearchConfig
opts, err := url.ParseQuery(uri.RawQuery)
if err != nil {
return nil, fmt.Errorf("failed to parser url's query string: %s", err)
return nil, fmt.Errorf("Failed to parser url's query string: %s", err)
}

// set the index for es,the default value is "heapster"
Expand All @@ -87,12 +89,15 @@ func CreateElasticSearchConfig(uri *url.URL) (*ElasticSearchConfig, error) {

// Set the URL endpoints of the ES's nodes. Notice that when sniffing is
// enabled, these URLs are used to initially sniff the cluster on startup.
if len(opts["nodes"]) < 1 {
var startupFns []elastic.ClientOptionFunc
if len(opts["nodes"]) > 0 {
startupFns = append(startupFns, elastic.SetURL(opts["nodes"]...))
} else if uri.Opaque != "" {
startupFns = append(startupFns, elastic.SetURL(uri.Opaque))
} else {
return nil, fmt.Errorf("There is no node assigned for connecting ES cluster")
}

startupFns := []elastic.ClientOptionFunc{elastic.SetURL(opts["nodes"]...)}

// If the ES cluster needs authentication, the username and secret
// should be set in sink config.Else, set the Authenticate flag to false
if len(opts["esUserName"]) > 0 && len(opts["esUserSecret"]) > 0 {
Expand All @@ -115,14 +120,6 @@ func CreateElasticSearchConfig(uri *url.URL) (*ElasticSearchConfig, error) {
startupFns = append(startupFns, elastic.SetHealthcheck(healthCheck))
}

if len(opts["sniff"]) > 0 {
sniff, err := strconv.ParseBool(opts["sniff"][0])
if err != nil {
return nil, fmt.Errorf("Failed to parse URL's sniff value into a bool")
}
startupFns = append(startupFns, elastic.SetSniff(sniff))
}

if len(opts["startupHealthcheckTimeout"]) > 0 {
timeout, err := time.ParseDuration(opts["startupHealthcheckTimeout"][0] + "s")
if err != nil {
Expand All @@ -131,12 +128,32 @@ func CreateElasticSearchConfig(uri *url.URL) (*ElasticSearchConfig, error) {
startupFns = append(startupFns, elastic.SetHealthcheckTimeoutStartup(timeout))
}

if os.Getenv("AWS_ACCESS_KEY_ID") != "" || os.Getenv("AWS_ACCESS_KEY") != "" ||
os.Getenv("AWS_SECRET_ACCESS_KEY") != "" || os.Getenv("AWS_SECRET_KEY") != "" {
glog.Info("Configuring with AWS credentials..")

awsClient, err := createAWSClient()
if err != nil {
return nil, err
}

startupFns = append(startupFns, elastic.SetHttpClient(awsClient), elastic.SetSniff(false))
} else {
if len(opts["sniff"]) > 0 {
sniff, err := strconv.ParseBool(opts["sniff"][0])
if err != nil {
return nil, fmt.Errorf("Failed to parse URL's sniff value into a bool")
}
startupFns = append(startupFns, elastic.SetSniff(sniff))
}
}

esConfig.EsClient, err = elastic.NewClient(startupFns...)
if err != nil {
return nil, fmt.Errorf("failed to create ElasticSearch client: %v", err)
return nil, fmt.Errorf("Failed to create ElasticSearch client: %v", err)
}

glog.V(2).Infof("elasticsearch sink configure successfully")
glog.V(2).Infof("ElasticSearch sink configure successfully")

return &esConfig, nil
}
2 changes: 1 addition & 1 deletion common/elasticsearch/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"testing"
"time"

"github.com/olivere/elastic"
"gopkg.in/olivere/elastic.v3"
)

func TestCreateElasticSearchConfig(t *testing.T) {
Expand Down
35 changes: 32 additions & 3 deletions docs/sink-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,15 +155,18 @@ The following options are available:
### Elasticsearch
This sink supports monitoring metrics and events. To use the ElasticSearch
sink add the following flag:

```
--sink=elasticsearch:<ES_SERVER_URL>[?<OPTIONS>]

```
Normally an ElasticSearch cluster has multiple nodes or a proxy, so these need
to be configured for the ElasticSearch sink. To do this, you can set
`ES_SERVER_URL` to a dummy value, and use the `?nodes=` query value for each
additional node in the cluster. For example:

```
--sink=elasticsearch:?nodes=foo.com:9200&nodes=bar.com:9200
```
(*) Notice that using the `?nodes` notation will override the `ES_SERVER_URL`


Besides this, the following options can be set in query string:

Expand All @@ -189,6 +192,32 @@ Like this:

--sink="elasticsearch:?nodes=0.0.0.0:9200&Index=testEvent"

#### AWS Integration
In order to use AWS Managed Elastic we need to use one of the following methods:

1. Making sure the public IPs of the Heapster are allowed on the ElasticSearch's Access Policy

-OR-

2. Configuring an Access Policy with IAM
1. Configure the ElasticSearch cluster policy with IAM User
2. Create a secret that stores the IAM credentials
3. Expose the credentials to the environment variables: `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`

```
env:
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: aws-heapster
key: aws.id
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: aws-heapster
key: aws.secret
```

## Using multiple sinks

Heapster can be configured to send k8s metrics and events to multiple sinks by specifying the`--sink=...` flag multiple times.
Expand Down
11 changes: 7 additions & 4 deletions events/sinks/elasticsearch/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"encoding/json"

"github.com/golang/glog"
"github.com/olivere/elastic"
"gopkg.in/olivere/elastic.v3"
esCommon "k8s.io/heapster/common/elasticsearch"
event_core "k8s.io/heapster/events/core"
"k8s.io/heapster/metrics/core"
Expand Down Expand Up @@ -86,7 +86,10 @@ func (sink *elasticSearchSink) ExportEvents(eventBatch *event_core.EventBatch) {
if err != nil {
glog.Warningf("Failed to convert event to point: %v", err)
}
sink.saveDataFunc(sink.esConfig.EsClient, sink.esConfig.Index, typeName, point)
err = sink.saveDataFunc(sink.esConfig.EsClient, sink.esConfig.Index, typeName, point)
if err != nil {
glog.Warningf("Failed to export data to ElasticSearch sink: %v", err)
}
}
}

Expand All @@ -102,12 +105,12 @@ func NewElasticSearchSink(uri *url.URL) (event_core.EventSink, error) {
var esSink elasticSearchSink
elasticsearchConfig, err := esCommon.CreateElasticSearchConfig(uri)
if err != nil {
glog.V(2).Infof("failed to config elasticsearch")
glog.Warning("Failed to config ElasticSearch")
return nil, err
}

esSink.esConfig = *elasticsearchConfig
esSink.saveDataFunc = esCommon.SaveDataIntoES
glog.V(2).Infof("elasticsearch sink setup successfully")
glog.V(2).Info("ElasticSearch sink setup successfully")
return &esSink, nil
}
2 changes: 1 addition & 1 deletion events/sinks/elasticsearch/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
"testing"
"time"

"github.com/olivere/elastic"
"github.com/stretchr/testify/assert"
"gopkg.in/olivere/elastic.v3"
esCommon "k8s.io/heapster/common/elasticsearch"
"k8s.io/heapster/events/core"
kube_api "k8s.io/kubernetes/pkg/api"
Expand Down
16 changes: 11 additions & 5 deletions metrics/sinks/elasticsearch/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"time"

"github.com/golang/glog"
"github.com/olivere/elastic"
"gopkg.in/olivere/elastic.v3"
esCommon "k8s.io/heapster/common/elasticsearch"
"k8s.io/heapster/metrics/core"
)
Expand Down Expand Up @@ -58,7 +58,10 @@ func (sink *elasticSearchSink) ExportData(dataBatch *core.DataBatch) {
},
MetricsTimestamp: dataBatch.Timestamp.UTC(),
}
sink.saveDataFunc(sink.esConfig.EsClient, sink.esConfig.Index, typeName, point)
err := sink.saveDataFunc(sink.esConfig.EsClient, sink.esConfig.Index, typeName, point)
if err != nil {
glog.Warningf("Failed to export data to ElasticSearch sink: %v", err)
}
}
for _, metric := range metricSet.LabeledMetrics {
labels := make(map[string]string)
Expand All @@ -76,7 +79,10 @@ func (sink *elasticSearchSink) ExportData(dataBatch *core.DataBatch) {
},
MetricsTimestamp: dataBatch.Timestamp.UTC(),
}
sink.saveDataFunc(sink.esConfig.EsClient, sink.esConfig.Index, typeName, point)
err := sink.saveDataFunc(sink.esConfig.EsClient, sink.esConfig.Index, typeName, point)
if err != nil {
glog.Warningf("Failed to export data to ElasticSearch sink: %v", err)
}
}
}
}
Expand All @@ -93,12 +99,12 @@ func NewElasticSearchSink(uri *url.URL) (core.DataSink, error) {
var esSink elasticSearchSink
elasticsearchConfig, err := esCommon.CreateElasticSearchConfig(uri)
if err != nil {
glog.V(2).Infof("failed to config elasticsearch")
glog.Warningf("Failed to config ElasticSearch: %v", err)
return nil, err
}

esSink.esConfig = *elasticsearchConfig
esSink.saveDataFunc = esCommon.SaveDataIntoES
glog.V(2).Infof("elasticsearch sink setup successfully")
glog.V(2).Info("ElasticSearch sink setup successfully")
return &esSink, nil
}
2 changes: 1 addition & 1 deletion metrics/sinks/elasticsearch/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
"testing"
"time"

"github.com/olivere/elastic"
"github.com/stretchr/testify/assert"
"gopkg.in/olivere/elastic.v3"
esCommon "k8s.io/heapster/common/elasticsearch"
"k8s.io/heapster/metrics/core"
)
Expand Down
3 changes: 3 additions & 0 deletions vendor/github.com/gedex/inflector/.travis.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 2a7fbcf

Please sign in to comment.