Skip to content

Commit

Permalink
move ingester configrution to env vars (#728)
Browse files Browse the repository at this point in the history
  • Loading branch information
gnmahanth committed Dec 12, 2022
1 parent 01e7de6 commit 6e41335
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 55 deletions.
39 changes: 20 additions & 19 deletions deepfence_ingester/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,26 @@ RUN apk add --no-cache kafkacat

COPY deepfence_ingester/entrypoint.sh /entrypoint.sh

ENV POSTGRES_USER_DB_HOST=deepfence-postgres \
POSTGRES_USER_DB_PORT=5432 \
POSTGRES_USER_DB_USER=deepfence \
POSTGRES_USER_DB_PASSWORD=deepfence \
POSTGRES_USER_DB_NAME=users \
POSTGRES_USER_DB_SSLMODE=disable \
REDIS_HOST=deepfence-redis \
REDIS_PORT=6379 \
REDIS_DB_NUMBER=0 \
NEO4J_HOST=deepfence-neo4j \
NEO4J_BOLT_PORT=7687 \
NEO4J_USER=neo4j \
NEO4J_PASSWORD=password \
HTTP_LISTEN_ENDPOINT=8090 \
SAAS_DEPLOYMENT=false \
KAFKA_BROKERS=deepfence-kafka-broker:9092 \
KAFKA_TOPIC_PARTITIONS=3 \
KAFKA_TOPIC_REPLICAS=1 \
KAFKA_TOPIC_RETENTION_MS="86400000"
ENV DEEPFENCE_POSTGRES_USER_DB_HOST=deepfence-postgres \
DEEPFENCE_POSTGRES_USER_DB_PORT=5432 \
DEEPFENCE_POSTGRES_USER_DB_USER=deepfence \
DEEPFENCE_POSTGRES_USER_DB_PASSWORD=deepfence \
DEEPFENCE_POSTGRES_USER_DB_NAME=users \
DEEPFENCE_POSTGRES_USER_DB_SSLMODE=disable \
DEEPFENCE_REDIS_HOST=deepfence-redis \
DEEPFENCE_REDIS_PORT=6379 \
DEEPFENCE_REDIS_DB_NUMBER=0 \
DEEPFENCE_NEO4J_HOST=deepfence-neo4j \
DEEPFENCE_NEO4J_BOLT_PORT=7687 \
DEEPFENCE_NEO4J_USER=neo4j \
DEEPFENCE_NEO4J_PASSWORD=password \
DEEPFENCE_METRICS_PORT=8181 \
DEEPFENCE_SAAS_DEPLOYMENT=false \
DEEPFENCE_KAFKA_BROKERS=deepfence-kafka-broker:9092 \
DEEPFENCE_KAFKA_TOPIC_PARTITIONS=3 \
DEEPFENCE_KAFKA_TOPIC_REPLICAS=1 \
DEEPFENCE_KAFKA_TOPIC_RETENTION_MS="86400000" \
DEEPFENCE_DEBUG=false

COPY --from=build /go/deepfence_ingester/deepfence_ingester /usr/local/bin/deepfence_ingester
EXPOSE 8090
Expand Down
2 changes: 1 addition & 1 deletion deepfence_ingester/entrypoint.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/sh
set -e

until kcat -L -b ${KAFKA_BROKERS};
until kcat -L -b ${DEEPFENCE_KAFKA_BROKERS};
do
sleep 5
done
Expand Down
1 change: 1 addition & 0 deletions deepfence_ingester/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ require (
)

require (
github.com/kelseyhightower/envconfig v1.4.0
github.com/klauspost/compress v1.15.12 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/prometheus/client_golang v1.14.0
Expand Down
2 changes: 2 additions & 0 deletions deepfence_ingester/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8=
github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.15.12 h1:YClS/PImqYbn+UILDnqxQCZ3RehC9N318SU3kElDUEM=
Expand Down
12 changes: 6 additions & 6 deletions deepfence_ingester/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"os"
"strconv"
"strings"
"time"

"github.com/twmb/franz-go/pkg/kadm"
Expand All @@ -22,9 +21,9 @@ var kgoLogger kgo.Logger = kgo.BasicLogger(
func() string { return "[" + getCurrentTime() + "]" + " " },
)

func checkKafkaConn() error {
func checkKafkaConn(kafkaBrokers []string) error {
opts := []kgo.Opt{
kgo.SeedBrokers(strings.Split(kafkaBrokers, ",")...),
kgo.SeedBrokers(kafkaBrokers...),
kgo.WithLogger(kgoLogger),
}
kClient, err := kgo.NewClient(opts...)
Expand All @@ -35,15 +34,16 @@ func checkKafkaConn() error {
if err := kClient.Ping(context.Background()); err != nil {
return err
}
log.Info("connection successful to kafka brokers " + kafkaBrokers)
log.Infof("connection successful to kafka brokers %s", kafkaBrokers)
return nil
}

func createMissingTopics(topics []string, partitions int32, replicas int16, retention_ms string) error {
func createMissingTopics(kafkaBrokers []string, topics []string,
partitions int32, replicas int16, retention_ms string) error {
log.Infof("create topics with partitions=%d and replicas=%d", partitions, replicas)

opts := []kgo.Opt{
kgo.SeedBrokers(strings.Split(kafkaBrokers, ",")...),
kgo.SeedBrokers(kafkaBrokers...),
kgo.WithLogger(kgoLogger),
}
kClient, err := kgo.NewClient(opts...)
Expand Down
5 changes: 2 additions & 3 deletions deepfence_ingester/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@ package main

import (
"context"
"strings"

"github.com/deepfence/ThreatMapper/deepfence_utils/utils"
"github.com/twmb/franz-go/pkg/kgo"
)

func startKafkaConsumers(
ctx context.Context,
brokers string,
brokers []string,
topics []string,
group string,
) {
Expand All @@ -20,7 +19,7 @@ func startKafkaConsumers(
log.Info("group ID: ", group)

opts := []kgo.Opt{
kgo.SeedBrokers(strings.Split(brokers, ",")...),
kgo.SeedBrokers(brokers...),
kgo.ConsumerGroup(group),
kgo.ConsumeTopics(topics...),
kgo.ClientID(group),
Expand Down
61 changes: 38 additions & 23 deletions deepfence_ingester/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,43 @@ package main

import (
"context"
"fmt"
"net/http"
"os"
"os/signal"
"path"
"runtime"
"strconv"
"strings"
"syscall"

"github.com/deepfence/ThreatMapper/deepfence_utils/utils"
"github.com/kelseyhightower/envconfig"
_ "github.com/lib/pq"
"github.com/prometheus/client_golang/prometheus/promhttp"
logrus "github.com/sirupsen/logrus"
)

var (
kafkaBrokers string
log *logrus.Logger
cveProcessor *BulkProcessor
complianceProcessor *BulkProcessor
cloudComplianceProcessor *BulkProcessor
secretsProcessor *BulkProcessor
)

func init() {
type config struct {
KafkaBrokers []string `default:"deepfence-kafka-broker:9092" required:"true" split_words:"true"`
KafkaTopicPartitions int32 `default:"1" split_words:"true"`
KafkaTopicReplicas int16 `default:"1" split_words:"true"`
KafkaTopicRetentionMs string `default:"86400000" split_words:"true"`
MetricsPort string `default:"8181" split_words:"true"`
Debug bool `default:"false"`
}

func init() {
// setup logger
log = logrus.New()
debug := os.Getenv("DEBUG")
if strings.ToLower(debug) == "true" {
log.SetLevel(logrus.DebugLevel)
} else {
log.SetLevel(logrus.InfoLevel)
}
log.SetLevel(logrus.InfoLevel)
log.SetOutput(os.Stdout)
log.SetReportCaller(true)
log.SetFormatter(&logrus.TextFormatter{
Expand All @@ -46,18 +49,27 @@ func init() {
return "", " " + path.Base(f.File) + ":" + strconv.Itoa(f.Line)
},
})
}

func main() {

var cfg config
var err error
err = envconfig.Process("DEEPFENCE", &cfg)
if err != nil {
log.Fatal(err.Error())
}

log.Infof("config: %+v", cfg)

kafkaBrokers = os.Getenv("KAFKA_BROKERS")
if kafkaBrokers == "" {
kafkaBrokers = "deepfence-kafka-broker:9092"
if cfg.Debug {
log.SetLevel(logrus.DebugLevel)
}
err := checkKafkaConn()

err = checkKafkaConn(cfg.KafkaBrokers)
if err != nil {
gracefulExit(err)
}
}

func main() {

ctx, cancel := signal.NotifyContext(context.Background(),
os.Interrupt, syscall.SIGTERM)
Expand All @@ -66,7 +78,10 @@ func main() {
// for prometheus metrics
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
srv := &http.Server{Addr: "0.0.0.0:8181", Handler: mux}
srv := &http.Server{
Addr: fmt.Sprintf("0.0.0.0:%s", cfg.MetricsPort),
Handler: mux,
}
go func() {
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Errorf("Server listen failed: %s", err)
Expand All @@ -78,16 +93,16 @@ func main() {
log.Info("topics list: ", utils.Topics)

//create if any topics is missing
partitions := GetEnvIntWithDefault("KAFKA_TOPIC_PARTITIONS", 1)
replicas := GetEnvIntWithDefault("KAFKA_TOPIC_REPLICAS", 1)
retention_ms := GetEnvStringWithDefault("KAFKA_TOPIC_RETENTION_MS", "86400000")
err := createMissingTopics(utils.Topics, int32(partitions), int16(replicas), retention_ms)
err = createMissingTopics(
cfg.KafkaBrokers,
utils.Topics, cfg.KafkaTopicPartitions,
cfg.KafkaTopicReplicas, cfg.KafkaTopicRetentionMs)
if err != nil {
log.Error(err)
}

// start kafka consumers for all given topics
go startKafkaConsumers(ctx, kafkaBrokers, utils.Topics, "default")
go startKafkaConsumers(ctx, cfg.KafkaBrokers, utils.Topics, "default")

// bulk processors
cveProcessor = NewBulkProcessor(utils.VULNERABILITY_SCAN, commitFuncCVEs)
Expand All @@ -103,7 +118,7 @@ func main() {
secretsProcessor.Start(ctx)

// collect consumer lag for metrics
go getLagByTopic(ctx, kafkaBrokers, "default")
go getLagByTopic(ctx, cfg.KafkaBrokers, "default")

// wait for exit
// flush all data from bulk processor
Expand Down
5 changes: 2 additions & 3 deletions deepfence_ingester/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"context"
"strings"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -78,9 +77,9 @@ var (
})
)

func getLagByTopic(ctx context.Context, kafkaBrokers string, groupID string) {
func getLagByTopic(ctx context.Context, kafkaBrokers []string, groupID string) {
opts := []kgo.Opt{
kgo.SeedBrokers(strings.Split(kafkaBrokers, ",")...),
kgo.SeedBrokers(kafkaBrokers...),
kgo.WithLogger(kgoLogger),
}
client, err := kgo.NewClient(opts...)
Expand Down

0 comments on commit 6e41335

Please sign in to comment.