-
Notifications
You must be signed in to change notification settings - Fork 6
/
environment.go
197 lines (166 loc) · 5.11 KB
/
environment.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
package main
import (
"github.com/docker/docker/daemon/logger"
"strings"
"github.com/Sirupsen/logrus"
"errors"
"os"
)
/**
ENVIRONMENT VARIABLES
*/
const
(
// Minimum level for the plugin ITSELF to log at. Does not effect log level from the containers.
ENV_LOG_LEVEL string = "LOG_LEVEL"
DEFAULT_LOG_LEVEL string = "info"
// Name of the environment variable the user can use to override the topic name to allow per-container topics
ENV_TOPIC string = "LOG_TOPIC"
DEFAULT_TOPIC string = "dockerlogs"
// Value the logs should be tagged with
ENV_LOG_TAG string = "LOG_TAG"
DEFAULT_LOG_TAG string = "common"
// Partition strategy
ENV_PARTITION_STRATEGY string = "PARTITION_STRATEGY"
// Key strategy
ENV_KEY_STRATEGY string = "KEY_STRATEGY"
// Kafka brokers in the usual host:port,host:port format. Must *always* be set
ENV_KAFKA_BROKER_ADDR string = "KAFKA_BROKER_ADDR"
)
/**
Special values for the ENV_TOPIC
*/
const (
// Set the topic name to be the container name
TOPIC__CONTAINERNAME = "$CONTAINERNAME"
// Set the topic name to tbe the container id
TOPIC__CONTAINERID = "$CONTAINERID"
)
/**
Available values for partition strategy
*/
const (
PARTITION_STRATEGY__ROUND_ROBIN string = "round_robin"
PARTITION_STRATEGY__KEY_HASH string = "key_hash"
)
const DEFAULT_PARTITION_STRATEGY string = PARTITION_STRATEGY__ROUND_ROBIN
/**
Available values for key strategy
*/
const (
KEY_STRATEGY__CONTAINER_ID string = "key_by_container_id"
KEY_STRATEGY__TIMESTAMP string = "key_by_timestamp"
)
const DEFAULT_KEY_STRATEGY string = KEY_STRATEGY__TIMESTAMP
/**
Environment variable value mappings
*/
type KeyStrategy int
const (
KEY_BY_CONTAINER_ID KeyStrategy = iota
KEY_BY_TIMESTAMP KeyStrategy = iota
TAG string = "common"
)
type PartitionStrategy int
const (
PARTITION_ROUND_ROBIN PartitionStrategy = iota
PARTITION_KEY_HASH PartitionStrategy = iota
)
func getPartitionStrategyEnv() PartitionStrategy {
partitionStrategyStr := os.Getenv(ENV_PARTITION_STRATEGY)
if partitionStrategyStr == "" {
partitionStrategyStr = DEFAULT_PARTITION_STRATEGY
}
partitionStrat, err := getPartitionStrategyFromString(partitionStrategyStr)
if err != nil {
logrus.Error("unknown partition strategy", err)
os.Exit(1)
}
return partitionStrat
}
func getKafkaBrokersEnv() []string {
addrList := os.Getenv(ENV_KAFKA_BROKER_ADDR)
if addrList == "" {
logrus.Error("Missing environment var " + ENV_KAFKA_BROKER_ADDR)
os.Exit(1)
}
addrs := strings.Split(addrList, ",")
return addrs
}
func getKafkaTopicEnv() string {
outputTopic := os.Getenv(ENV_TOPIC)
if outputTopic == "" {
outputTopic = DEFAULT_TOPIC
}
return outputTopic
}
func getKeyStrategyEnv() KeyStrategy {
keyStrategyVar := os.Getenv(ENV_KEY_STRATEGY)
if keyStrategyVar == "" {
keyStrategyVar = DEFAULT_KEY_STRATEGY
}
keyStrat, err := getKeyStrategyFromString(keyStrategyVar)
if err != nil {
logrus.Error("unknown key strategy", err)
os.Exit(1)
}
return keyStrat
}
func getKeyTagEnv() string {
tag := os.Getenv(ENV_LOG_TAG)
if tag == "" {
tag = DEFAULT_LOG_TAG
}
return tag
}
func getKeyStrategyFromString(keyStrategyString string) (KeyStrategy, error) {
// Trim and whitespace and lowercase the string so it matches
// no matter what someone has put in
switch strings.TrimSpace(strings.ToLower(keyStrategyString)) {
case KEY_STRATEGY__CONTAINER_ID:
return KEY_BY_CONTAINER_ID, nil
case KEY_STRATEGY__TIMESTAMP:
return KEY_BY_TIMESTAMP, nil
default:
return 0, errors.New("Unknown keying strategy " + keyStrategyString +". Expected: key_by_container_id,key_by_timestamp" )
}
}
func getPartitionStrategyFromString(partitionStrategy string) (PartitionStrategy, error) {
// Trim and whitespace and lowercase the string so it matches
// no matter what someone has put in
switch strings.TrimSpace(strings.ToLower(partitionStrategy)) {
case PARTITION_STRATEGY__ROUND_ROBIN:
return PARTITION_ROUND_ROBIN, nil
case PARTITION_STRATEGY__KEY_HASH:
return PARTITION_KEY_HASH, nil
default:
return 0, errors.New("Unknown partition strategy" + partitionStrategy +". Expected: round_robin,key_hash")
}
}
func getEnvVarOrDefault(logCtx logger.Info, envVarName string, defaultValue string) string {
value := defaultValue
for _, env := range logCtx.ContainerEnv {
// Only split on the first '='. An equals might be present in the topic name, we don't want to split on that
envArg := strings.SplitN(env, "=", 2)
// Check that there was a key=value and not just a random key.
if len(envArg) == 2 {
envName := envArg[0]
envValue := envArg[1]
if strings.ToUpper(envName) == envVarName {
logrus.WithField(envVarName, envValue).Info("environment property overriden for container")
value = envValue
}
}
}
return value
}
func getHostname() string {
hostname, err := os.Hostname()
if err != nil {
//In the event that an error occured, set the hostname to an empty string as we're unable to retrieve it
hostname = ""
//Only log at debug level, we don't want to flood the system if this should happen for a particular reason (permissions perhaps?)
logrus.WithField("err", err).Debug("Unable to retrieve hostname")
}
return hostname
}