-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.go
136 lines (122 loc) · 4.32 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package main
import (
"flag"
"os"
"path/filepath"
"time"
log "github.com/Sirupsen/logrus"
colorable "github.com/mattn/go-colorable"
elastic "gopkg.in/olivere/elastic.v5"
)
const srcExtension = "pep.xml"
var (
pepxml = flag.String("pepxml", "", "path to the pepxml file to index")
host = flag.String("host", "http://localhost:9200", "Elasticsearch host with port and protocol information")
index = flag.String("index", "promec", "Index name in elasticsearch where xml data will be indexed")
dataType = flag.String("datatype", "search_hit", "Data type to be used under index")
timeZone = flag.String("timezone", "Europe/Oslo", "Timezone to be used in parsing the date from Pep XML file")
bulkSize = flag.Int("bulksize", 500, "Number of request to send in one bulk request")
loglevel = flag.String("loglevel", "info", "Log level used for printing logs")
dirName = flag.String("directory", "", "Directory Path to watch for pepxml files and index")
sleepInterval = flag.Int64("sleep-interval", 30, "Sleep interval in seconds")
waitMode = flag.Bool("wait-mode", false, "Indexer will wait for the file to be created")
logformat = flag.String("logformat", "text", "Choose Log format: json or text")
retry = flag.Int("retry", 5, "How many times to retry to connect to Elasticsearch with 10secs interval")
)
func getElasticsearchClient() (*elastic.Client, error) {
var err error
for i := 0; i <= *retry; i++ {
client, err := elastic.NewClient(
elastic.SetURL(*host),
elastic.SetSniff(false),
elastic.SetRetrier(elastic.NewBackoffRetrier(elastic.NewExponentialBackoff(10*time.Millisecond, 10*time.Second))))
if err == nil {
return client, nil
} else {
log.Warn("Failed in connection to Elasticsearch '", err, "' sleeping now..")
time.Sleep(10 * time.Second)
}
}
return nil, err
}
func main() {
flag.Parse()
// Set up correct log level and format
if *logformat == "text" {
log.SetFormatter(&log.TextFormatter{ForceColors: true})
log.SetOutput(colorable.NewColorableStdout())
} else {
log.SetFormatter(&log.JSONFormatter{})
}
lvl, err := log.ParseLevel(*loglevel)
if err != nil {
log.WithFields(log.Fields{
"detail": err,
}).Warn("Could not parse log level, using default")
log.SetLevel(log.InfoLevel)
} else {
log.SetLevel(lvl)
}
if *dirName == "" && *pepxml == "" {
log.Fatal("Provide atleast directory path or Pep XML file to index")
} else if *dirName != "" && *pepxml != "" {
log.Fatal("Provide only one directory path or Pep XML file to index, not both")
}
interval := time.Duration(*sleepInterval)
// Create elasticsearch client
var client *elastic.Client
client, err = getElasticsearchClient()
if err != nil {
log.Fatal("Failed in creating elasticserch client ", err)
}
// We are running in single file indexing mode
if *dirName == "" {
log.Info("Promec Indexer started to index file ", *pepxml)
// Read XML data into a Map
xmlMap, err := readCometXML(*pepxml, *waitMode)
if err != nil {
os.Exit(1)
}
// Convert XML map to ELS bulk index format
err = indexELSData(xmlMap, *host, *index, *dataType, *bulkSize, *timeZone, *pepxml, client)
if err != nil {
log.Error("Failed in ingesting data for file ", *pepxml, err)
os.Exit(1)
} else {
log.Info("Successfully indexed data from ", *pepxml)
}
} else {
// Now we are watching the directory for new xml files
log.Info("Promec Indexer is watching directory \"", *dirName, "\"")
for {
//Get the files which are not processed yet
files, err := watchDir(*dirName, client, *index)
if err != nil {
log.Error("Error in watching directory ", err)
// Sleep predfined interval and retry
time.Sleep(interval * time.Second)
continue
}
log.Debug("Got files to index ", files)
for _, file := range files {
fPath := filepath.Join(*dirName, file)
xmlMap, err := readCometXML(fPath, *waitMode)
if err != nil {
log.Error(err)
continue
}
err = indexELSData(xmlMap, *host, *index, *dataType, *bulkSize, *timeZone, file, client)
if err != nil {
log.Error("Failed in ingesting data for file ", file, err)
continue
} else {
log.Info("Successfully indexed data from ", file)
}
}
if len(files) == 0 {
log.Info("Found no files, sleeping..")
}
time.Sleep(interval * time.Second)
}
}
}