-
Notifications
You must be signed in to change notification settings - Fork 117
/
main.go
157 lines (143 loc) · 4.92 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package main
import (
"flag"
"fmt"
"io/ioutil"
"os"
"os/signal"
"strings"
"syscall"
"github.com/mailgun/holster/v4/setter"
"github.com/mailgun/kafka-pixy/config"
"github.com/mailgun/kafka-pixy/logging"
"github.com/mailgun/kafka-pixy/service"
log "github.com/sirupsen/logrus"
)
var (
cmdGRPCAddr string
cmdConfig string
cmdTCPAddr string
cmdUnixAddr string
cmdKafkaPeers string
cmdZookeeperPeers string
cmdPIDFile string
cmdLoggingJSONCfg string
cmdTLSEnabled bool
cmdCACertFile string
cmdClientCertFile string
cmdClientCertKeyFile string
cmdInsecure bool
)
func init() {
flag.StringVar(&cmdConfig, "config", "", "YAML configuration file, refer to https://github.com/mailgun/kafka-pixy/blob/master/default.yaml for a list of available configuration options")
flag.StringVar(&cmdGRPCAddr, "grpcAddr", "", "TCP address that the gRPC API should listen on")
flag.StringVar(&cmdTCPAddr, "tcpAddr", "", "TCP address that the HTTP API should listen on")
flag.StringVar(&cmdUnixAddr, "unixAddr", "", "Unix domain socket address that the HTTP API should listen on")
flag.StringVar(&cmdKafkaPeers, "kafkaPeers", "", "Comma separated list of brokers")
flag.StringVar(&cmdZookeeperPeers, "zookeeperPeers", "", "Comma separated list of ZooKeeper nodes followed by optional chroot")
flag.StringVar(&cmdPIDFile, "pidFile", "", "Path to the PID file")
flag.StringVar(&cmdLoggingJSONCfg, "logging", "", "Logging configuration")
flag.BoolVar(&cmdTLSEnabled, "tls", false, "Enable TLS (Kafka consumer/producer)")
flag.StringVar(&cmdCACertFile, "caCertFile", "", "CA certificate file")
flag.StringVar(&cmdClientCertFile, "clientCertFile", "", "Client certificate file")
flag.StringVar(&cmdClientCertKeyFile, "clientCertKeyFile", "", "Client certificate key file")
flag.BoolVar(&cmdInsecure, "insecureTLS", false, "Disable TLS hostname verification")
flag.Parse()
}
func main() {
cfg, err := makeConfig()
if err != nil {
fmt.Printf("Failed to load config: err=(%s)\n", err)
os.Exit(1)
}
if err := logging.Init(cmdLoggingJSONCfg, cfg); err != nil {
fmt.Printf("Failed to initialize logger: err=(%s)\n", err)
os.Exit(1)
}
if cmdPIDFile != "" {
if err := writePID(cmdPIDFile); err != nil {
log.Errorf("Failed to write PID file: err=(%s)", err)
os.Exit(1)
}
}
// Clean up the unix domain socket file in case we failed to clean up on
// shutdown the last time. Otherwise the service won't be able to listen
// on this address and as a result will fail to start up.
if cfg.UnixAddr != "" {
if err := os.Remove(cfg.UnixAddr); err != nil && !os.IsNotExist(err) {
log.Errorf("Cannot remove %s: err=(%s)", cfg.UnixAddr, err)
}
}
log.Infof("Starting with config: %+v", cfg)
svc, err := service.Spawn(cfg)
if err != nil {
log.Errorf("Failed to start service: err=(%s)", err)
os.Exit(1)
}
// Spawn OS signal listener to ensure graceful stop.
osSigCh := make(chan os.Signal, 1)
signal.Notify(osSigCh, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM)
// Wait for a quit signal and terminate the service when it is received.
<-osSigCh
svc.Stop()
}
func makeConfig() (*config.App, error) {
var cfg *config.App
// If a YAML configuration file is provided, then load it and let
// parameters provided on the command line override values on it.
if cmdConfig != "" {
var err error
if cfg, err = config.FromYAMLFile(cmdConfig); err != nil {
return nil, err
}
} else {
cfg = config.DefaultApp("default")
}
if cmdGRPCAddr != "" {
cfg.GRPCAddr = cmdGRPCAddr
}
if cmdTCPAddr != "" {
cfg.TCPAddr = cmdTCPAddr
}
if cmdUnixAddr != "" {
cfg.UnixAddr = cmdUnixAddr
}
if cmdKafkaPeers != "" {
cfg.Proxies[cfg.DefaultCluster].Kafka.SeedPeers = strings.Split(cmdKafkaPeers, ",")
}
if cmdTLSEnabled {
cfg.Proxies[cfg.DefaultCluster].Kafka.TLSEnabled = cmdTLSEnabled
}
if cmdCACertFile != "" {
cfg.Proxies[cfg.DefaultCluster].Kafka.CACertFile = cmdCACertFile
}
if cmdClientCertFile != "" {
cfg.Proxies[cfg.DefaultCluster].Kafka.ClientCertFile = cmdClientCertFile
}
if cmdClientCertKeyFile != "" {
cfg.Proxies[cfg.DefaultCluster].Kafka.ClientCertKeyFile = cmdClientCertKeyFile
}
if cmdInsecure {
cfg.Proxies[cfg.DefaultCluster].Kafka.InsecureSkipVerify = cmdInsecure
}
if cmdZookeeperPeers != "" {
chrootStartIdx := strings.Index(cmdZookeeperPeers, "/")
if chrootStartIdx >= 0 {
cfg.Proxies[cfg.DefaultCluster].ZooKeeper.SeedPeers = strings.Split(cmdZookeeperPeers[:chrootStartIdx], ",")
cfg.Proxies[cfg.DefaultCluster].ZooKeeper.Chroot = cmdZookeeperPeers[chrootStartIdx:]
} else {
cfg.Proxies[cfg.DefaultCluster].ZooKeeper.SeedPeers = strings.Split(cmdZookeeperPeers, ",")
}
}
setter.SetDefault(&cfg.Logging, []config.LoggerCfg{
{
Name: "console",
Severity: "info",
},
})
return cfg, nil
}
func writePID(path string) error {
pid := os.Getpid()
return ioutil.WriteFile(path, []byte(fmt.Sprint(pid)), 0644)
}