-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.go
120 lines (107 loc) · 3.37 KB
/
index.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package chevalier
import (
"fmt"
"strconv"
"time"
"github.com/anchor/elastigo/api"
es "github.com/anchor/elastigo/core"
)
// ElasticsearchSource is the type used to serialize sources for
// indexing.
type ElasticsearchSource struct {
Origin string
// Address in Vaultaire.
Address string
Source map[string]string `json:"source"`
}
// ElasticsearchOrigin stores metadata for each origin.
type ElasticsearchOrigin struct {
Origin string `json:"origin"`
Count uint64 `json:"count"`
Address uint64 `json:"address"`
LastUpdated time.Time `json:"last_updated"`
}
func NewElasticsearchOrigin(origin string, count uint64, updated time.Time) *ElasticsearchOrigin {
o := new(ElasticsearchOrigin)
o.Origin = origin
o.Count = count
o.LastUpdated = updated
return o
}
// GetID returns a (probably) unique ID for an ElasticsearchSource, in
// the form of a sha1 hash of underscore-separated field-value pairs
// separated by newlines.
func (s *ElasticsearchSource) GetID() string {
k := fmt.Sprintf("%v/%v", s.Origin, s.Address)
return k
}
// Unmarshal turns an ElasticsearchSource (presumably itself unmarshaled
// from a JSON object stored in Elasticsearch) into the equivalent
// DataSource.
func (s *ElasticsearchSource) Unmarshal() (*DataSource, error) {
tags := make([]*DataSource_Tag, len(s.Source))
idx := 0
for field, value := range s.Source {
tags[idx] = NewDataSourceTag(field, value)
idx++
}
pb := NewDataSource(tags)
addr, err := strconv.ParseUint(s.Address, 10, 64)
if err != nil {
return nil, err
}
pb.Address = &addr
return pb, nil
}
// ElasticsearchWriter maintains context for writes to the index.
type ElasticsearchWriter struct {
indexer *es.BulkIndexer
indexName string
// Metadata index
metaIndex string
dataType string
originType string
done chan bool
}
// NewElasticsearchWriter builds a new Writer. retrySeconds is for the
// bulk indexer. index and dataType can be anything as long as they're
// consistent.
func NewElasticsearchWriter(host string, maxConns int, retrySeconds int, index, metaIndex, dataType string) *ElasticsearchWriter {
writer := new(ElasticsearchWriter)
api.Domain = host
writer.indexer = es.NewBulkIndexerErrors(maxConns, retrySeconds)
writer.indexName = index
writer.metaIndex = metaIndex
writer.dataType = dataType
writer.originType = "chevalier_origin"
writer.done = make(chan bool)
writer.indexer.Run(writer.done)
return writer
}
func (w *ElasticsearchWriter) UpdateOrigin(origin string, count uint64) error {
o := NewElasticsearchOrigin(origin, count, time.Now())
update := map[string]interface{}{
"doc": o,
"doc_as_upsert": true,
}
err := w.indexer.Update(w.metaIndex, w.originType, origin, "", nil, update, true)
return err
}
// Write queues a DataSource for writing by the bulk indexer.
// Non-blocking.
func (w *ElasticsearchWriter) Write(origin string, source *ElasticsearchSource) error {
update := map[string]interface{}{
"doc": source,
"doc_as_upsert": true,
}
err := w.indexer.Update(w.indexName, w.dataType, source.GetID(), "", nil, update, true)
return err
}
// Shutdown signals the bulk indexer to flush all pending writes.
func (w *ElasticsearchWriter) Shutdown() {
w.done <- true
}
// GetErrorChan returns the channel the bulk indexer writes errors to.
func (w *ElasticsearchWriter) GetErrorChan() chan *es.ErrorBuffer {
return w.indexer.ErrorChannel
}