This repository has been archived by the owner on Jun 13, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathmain.go
204 lines (179 loc) · 8.55 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
package main
import (
"encoding/json"
"errors"
"os"
"os/signal"
"runtime/pprof"
"strings"
"syscall"
"time"
"code.cloudfoundry.org/lager"
"github.com/Azure/oms-log-analytics-firehose-nozzle/caching"
"github.com/Azure/oms-log-analytics-firehose-nozzle/client"
"github.com/Azure/oms-log-analytics-firehose-nozzle/firehose"
"github.com/Azure/oms-log-analytics-firehose-nozzle/omsnozzle"
"github.com/cloudfoundry-community/go-cfclient"
"gopkg.in/alecthomas/kingpin.v2"
)
const (
firehoseSubscriptionID = "oms-nozzle"
// lower limit for override
minOMSPostTimeoutSeconds = 1
// upper limit for override
maxOMSPostTimeoutSeconds = 60
// upper limit of max message number per batch
ceilingMaxMsgNumPerBatch = 10000
// filter metrics
metricEventType = "METRIC"
// filter stdout/stderr events
logEventType = "LOG"
// filter http start/stop events
httpEventType = "HTTP"
// the prefix of message type in OMS Log Analytics
omsTypePrefix = "CF_"
version = "1.1.2"
)
// Required parameters
var (
apiAddress = kingpin.Flag("api-addr", "Api URL").OverrideDefaultFromEnvar("API_ADDR").String()
dopplerAddress = kingpin.Flag("doppler-addr", "Traffic controller URL").OverrideDefaultFromEnvar("DOPPLER_ADDR").String()
cfUser = kingpin.Flag("firehose-user", "CF user with admin and firehose access").OverrideDefaultFromEnvar("FIREHOSE_USER").Required().String()
cfPassword = kingpin.Flag("firehose-user-password", "Password of the CF user").OverrideDefaultFromEnvar("FIREHOSE_USER_PASSWORD").Required().String()
environment = kingpin.Flag("cf-environment", "CF environment name").OverrideDefaultFromEnvar("CF_ENVIRONMENT").Default("cf").String()
omsWorkspace = kingpin.Flag("oms-workspace", "OMS workspace ID").OverrideDefaultFromEnvar("OMS_WORKSPACE").Required().String()
omsKey = kingpin.Flag("oms-key", "OMS workspace key").OverrideDefaultFromEnvar("OMS_KEY").Required().String()
omsPostTimeout = kingpin.Flag("oms-post-timeout", "HTTP timeout for posting events to OMS Log Analytics").Default("5s").OverrideDefaultFromEnvar("OMS_POST_TIMEOUT").Duration()
omsBatchTime = kingpin.Flag("oms-batch-time", "Interval to post an OMS batch").Default("5s").OverrideDefaultFromEnvar("OMS_BATCH_TIME").Duration()
omsMaxMsgNumPerBatch = kingpin.Flag("oms-max-msg-num-per-batch", "Max number of messages per OMS batch").Default("1000").OverrideDefaultFromEnvar("OMS_MAX_MSG_NUM_PER_BATCH").Int()
// comma separated list of types to exclude. For now use metric,log,http and revisit later
spaceFilter = kingpin.Flag("appFilter", "Comma separated white list of orgs/spaces/apps").Default("").OverrideDefaultFromEnvar("SPACE_WHITELIST").String()
eventFilter = kingpin.Flag("eventFilter", "Comma separated list of types to exclude").Default("").OverrideDefaultFromEnvar("EVENT_FILTER").String()
skipSslValidation = kingpin.Flag("skip-ssl-validation", "Skip SSL validation").Default("false").OverrideDefaultFromEnvar("SKIP_SSL_VALIDATION").Bool()
idleTimeout = kingpin.Flag("idle-timeout", "Keep Alive duration for the firehose consumer").Default("25s").OverrideDefaultFromEnvar("IDLE_TIMEOUT").Duration()
logLevel = kingpin.Flag("log-level", "Log level: DEBUG, INFO, ERROR").Default("INFO").OverrideDefaultFromEnvar("LOG_LEVEL").String()
logEventCount = kingpin.Flag("log-event-count", "Whether to log the total count of received and sent events to OMS").Default("false").OverrideDefaultFromEnvar("LOG_EVENT_COUNT").Bool()
logEventCountInterval = kingpin.Flag("log-event-count-interval", "The interval to log the total count of received and sent events to OMS").Default("60s").OverrideDefaultFromEnvar("LOG_EVENT_COUNT_INTERVAL").Duration()
excludeMetricEvents = false
excludeLogEvents = false
excludeHttpEvents = false
)
func main() {
kingpin.Version(version)
kingpin.Parse()
logger := lager.NewLogger("oms-nozzle")
level := lager.INFO
switch strings.ToUpper(*logLevel) {
case "DEBUG":
level = lager.DEBUG
case "ERROR":
level = lager.ERROR
}
logger.RegisterSink(lager.NewWriterSink(os.Stdout, level))
// enable thread dump
threadDumpChan := registerGoRoutineDumpSignalChannel()
defer close(threadDumpChan)
go dumpGoRoutine(threadDumpChan)
var VCAP_APPLICATION map[string]*json.RawMessage
err := json.Unmarshal([]byte(os.Getenv("VCAP_APPLICATION")), &VCAP_APPLICATION)
if err != nil {
logger.Error("environment variable unmarshal failed", errors.New(err.Error()))
}
var ENVIRONMENT_API_ADDR string
err = json.Unmarshal(*VCAP_APPLICATION["cf_api"], &ENVIRONMENT_API_ADDR)
if err != nil {
logger.Error("environment variable unmarshal failed", errors.New(err.Error()))
}
if len(*apiAddress) <= 0 {
apiAddress = &ENVIRONMENT_API_ADDR
}
logger.Info("config", lager.Data{"API_ADDR": *apiAddress})
if len(*dopplerAddress) <= 0 {
temp := strings.Replace(*apiAddress, "https://api.", "wss://doppler.", 1) + ":443"
dopplerAddress = &temp
}
logger.Info("config", lager.Data{"DOPPLER_ADDR": *dopplerAddress})
if maxOMSPostTimeoutSeconds >= omsPostTimeout.Seconds() && minOMSPostTimeoutSeconds <= omsPostTimeout.Seconds() {
logger.Info("config", lager.Data{"OMS_POST_TIMEOUT": (*omsPostTimeout).String()})
} else {
logger.Info("invalid OMS_POST_TIMEOUT value, set to default",
lager.Data{"invalid value": (*omsPostTimeout).String()},
lager.Data{"min seconds": minOMSPostTimeoutSeconds},
lager.Data{"max seconds": maxOMSPostTimeoutSeconds},
lager.Data{"default seconds": 5})
*omsPostTimeout = time.Duration(5) * time.Second
}
logger.Info("config", lager.Data{"SKIP_SSL_VALIDATION": *skipSslValidation})
logger.Info("config", lager.Data{"IDLE_TIMEOUT": (*idleTimeout).String()})
logger.Info("config", lager.Data{"OMS_BATCH_TIME": (*omsBatchTime).String()})
logger.Info("config", lager.Data{"CF_ENVIRONMENT": *environment})
if ceilingMaxMsgNumPerBatch >= *omsMaxMsgNumPerBatch && *omsMaxMsgNumPerBatch > 0 {
logger.Info("config", lager.Data{"OMS_MAX_MSG_NUM_PER_BATCH": *omsMaxMsgNumPerBatch})
} else {
logger.Info("invalid OMS_MAX_MSG_NUM_PER_BATCH value, set to default",
lager.Data{"invalid value": *omsMaxMsgNumPerBatch},
lager.Data{"max value": ceilingMaxMsgNumPerBatch},
lager.Data{"default value": 1000})
*omsMaxMsgNumPerBatch = 1000
}
logger.Info("config", lager.Data{"LOG_EVENT_COUNT": *logEventCount})
logger.Info("config", lager.Data{"LOG_EVENT_COUNT_INTERVAL": (*logEventCountInterval).String()})
if len(*eventFilter) > 0 {
*eventFilter = strings.ToUpper(*eventFilter)
// by default we don't filter any events
if strings.Contains(*eventFilter, metricEventType) {
excludeMetricEvents = true
}
if strings.Contains(*eventFilter, logEventType) {
excludeLogEvents = true
}
if strings.Contains(*eventFilter, httpEventType) {
excludeHttpEvents = true
}
logger.Info("config", lager.Data{"EVENT_FILTER": *eventFilter},
lager.Data{"excludeMetricEvents": excludeMetricEvents},
lager.Data{"excludeLogEvents": excludeLogEvents},
lager.Data{"excludeHTTPEvents": excludeHttpEvents})
} else {
logger.Info("config EVENT_FILTER is nil. all events will be published")
}
cfClientConfig := &cfclient.Config{
ApiAddress: *apiAddress,
Username: *cfUser,
Password: *cfPassword,
SkipSslValidation: *skipSslValidation,
}
firehoseConfig := &firehose.FirehoseConfig{
SubscriptionId: firehoseSubscriptionID,
TrafficControllerUrl: *dopplerAddress,
IdleTimeout: *idleTimeout,
}
firehoseClient := firehose.NewClient(cfClientConfig, firehoseConfig, logger)
omsClient := client.NewOmsClient(*omsWorkspace, *omsKey, *omsPostTimeout, logger)
nozzleConfig := &omsnozzle.NozzleConfig{
OmsTypePrefix: omsTypePrefix,
OmsBatchTime: *omsBatchTime,
OmsMaxMsgNumPerBatch: *omsMaxMsgNumPerBatch,
ExcludeMetricEvents: excludeMetricEvents,
ExcludeLogEvents: excludeLogEvents,
ExcludeHttpEvents: excludeHttpEvents,
LogEventCount: *logEventCount,
LogEventCountInterval: *logEventCountInterval,
}
cachingClient := caching.NewCaching(cfClientConfig, logger, *environment, *spaceFilter)
nozzle := omsnozzle.NewOmsNozzle(logger, firehoseClient, omsClient, nozzleConfig, cachingClient)
nozzle.Start()
}
func registerGoRoutineDumpSignalChannel() chan os.Signal {
threadDumpChan := make(chan os.Signal, 1)
signal.Notify(threadDumpChan, syscall.SIGUSR1)
return threadDumpChan
}
func dumpGoRoutine(dumpChan chan os.Signal) {
for range dumpChan {
goRoutineProfiles := pprof.Lookup("goroutine")
if goRoutineProfiles != nil {
goRoutineProfiles.WriteTo(os.Stdout, 2)
}
}
}