Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move core monitoring to relay. Add balance monitoring for solana #126

Merged
merged 21 commits into from
Jan 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
9e150d2
refactor http server
topliceanu Jan 17, 2022
7c01f75
drop reading feeds from file - unnecessary extra complexity
topliceanu Jan 17, 2022
d990892
move Feeds from config into pkg/monitoring to prepare for abstraction
topliceanu Jan 17, 2022
a5cbd0c
update Metrics to expose a cleanup() and HTTPHandler() method
topliceanu Jan 17, 2022
c5abcea
update Manager to expose an HTTPHandler() method
topliceanu Jan 17, 2022
e904c1f
update benchmarks
topliceanu Jan 17, 2022
2a238f6
refactor AccountReader into ChainReader to make it more abstract
topliceanu Jan 17, 2022
e54d7e2
refactor the monitor to only report the configuration and the
topliceanu Jan 18, 2022
5e3b3dd
extract the solana-specific configuration from the monitoring/config
topliceanu Jan 18, 2022
594271a
replace dependency on the chain specific feed configuration object
topliceanu Jan 18, 2022
ede1c81
replace SolanaConfig with ChainConfig and update the codebase, the tests
topliceanu Jan 19, 2022
a47782c
add Cleanup in the Exporter interface and implement it in the prometheus
topliceanu Jan 19, 2022
52f7aa2
update the monitor to only use a single pipeline to read from the chain
topliceanu Jan 19, 2022
34b5d83
extract Avro DLS to a separate package to make it easier to extract for
topliceanu Jan 19, 2022
5a866dd
introduce a facade to simplify the use of the package
topliceanu Jan 19, 2022
014cdd1
update schemas for backwards compatibility with the previous version
topliceanu Jan 19, 2022
693540f
update testutils to split the generate from solana-specific ones.
topliceanu Jan 20, 2022
326666e
move the core components of the ocr2 monitoring into chainlink-relay
topliceanu Jan 20, 2022
b2c9376
set solana-specific transmission and state addresses in the feed
topliceanu Jan 21, 2022
8bc3389
introduce monitoring for account balances
topliceanu Jan 26, 2022
056ea22
fix lint issues
topliceanu Jan 26, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 103 additions & 95 deletions cmd/monitoring/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,145 +2,141 @@ package main

import (
"context"
"errors"
"net"
"net/http"
"os"
"os/signal"
"sync"
"syscall"

"github.com/gagliardetto/solana-go/rpc"
"github.com/prometheus/client_golang/prometheus/promhttp"
relayMonitoring "github.com/smartcontractkit/chainlink-relay/pkg/monitoring"
relayConfig "github.com/smartcontractkit/chainlink-relay/pkg/monitoring/config"
"github.com/smartcontractkit/chainlink-solana/pkg/monitoring"
"github.com/smartcontractkit/chainlink-solana/pkg/monitoring/config"
"github.com/smartcontractkit/chainlink/core/logger"
"go.uber.org/zap/zapcore"
)

func main() {
bgCtx, cancelBgCtx := context.WithCancel(context.Background())
defer cancelBgCtx()
wg := &sync.WaitGroup{}

log := logger.NewLogger(loggerConfig{})
coreLog := logger.NewLogger(loggerConfig{}).With("project", "solana")
log := logWrapper{coreLog}

cfg, err := config.Parse()
chainConfig, err := monitoring.ParseSolanaConfig()
if err != nil {
log.Fatalw("failed to parse configuration", "error", err)
log.Fatalw("failed to parse solana-specific config", "error", err)
}

mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
server := &http.Server{
Addr: cfg.Http.Address,
Handler: mux,
BaseContext: func(_ net.Listener) context.Context { return bgCtx },
sourceFactory := monitoring.NewSolanaSourceFactory(
chainConfig,
logWrapper{coreLog.With("component", "source")},
)

wg := &sync.WaitGroup{}
defer wg.Wait()

bgCtx, cancelBgCtx := context.WithCancel(context.Background())
defer cancelBgCtx()

cfg, err := relayConfig.Parse()
if err != nil {
log.Fatalw("failed to parse generic configuration", "error", err)
}
defer server.Close()
wg.Add(1)
go func() {
defer wg.Done()
log := log.With("component", "http")
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalw("failed to start http server", "address", cfg.Http.Address, "error", err)
} else {
log.Info("http server stopped")
}
}()

client := rpc.New(cfg.Solana.RPCEndpoint)
schemaRegistry := relayMonitoring.NewSchemaRegistry(cfg.SchemaRegistry, log)

schemaRegistry := monitoring.NewSchemaRegistry(cfg.SchemaRegistry, log)
transmissionSchema, err := schemaRegistry.EnsureSchema(cfg.Kafka.TransmissionTopic+"-value", monitoring.TransmissionAvroSchema)
transmissionSchema, err := schemaRegistry.EnsureSchema(cfg.Kafka.TransmissionTopic+"-value", relayMonitoring.TransmissionAvroSchema)
if err != nil {
log.Fatalw("failed to prepare transmission schema", "error", err)
}
configSetSchema, err := schemaRegistry.EnsureSchema(cfg.Kafka.ConfigSetTopic+"-value", monitoring.ConfigSetAvroSchema)
configSetSimplifiedSchema, err := schemaRegistry.EnsureSchema(cfg.Kafka.ConfigSetSimplifiedTopic+"-value", relayMonitoring.ConfigSetSimplifiedAvroSchema)
if err != nil {
log.Fatalf("failed to prepare config_set schema", "error", err)
log.Fatalw("failed to prepare config_set_simplified schema", "error", err)
}
configSetSimplifiedSchema, err := schemaRegistry.EnsureSchema(cfg.Kafka.ConfigSetSimplifiedTopic+"-value", monitoring.ConfigSetSimplifiedAvroSchema)

producer, err := relayMonitoring.NewProducer(bgCtx, log.With("component", "producer"), cfg.Kafka)
if err != nil {
log.Fatalf("failed to prepare config_set_simplified schema", "error", err)
log.Fatalw("failed to create kafka producer", "error", err)
}

producer, err := monitoring.NewProducer(bgCtx, log.With("component", "producer"), cfg.Kafka)
if err != nil {
log.Fatalf("failed to create kafka producer", "error", err)
if cfg.Feature.TestOnlyFakeReaders {
sourceFactory = relayMonitoring.NewRandomDataSourceFactory(bgCtx, wg, log.With("component", "rand-source"))
}

var transmissionReader, stateReader monitoring.AccountReader
balancesSourceFactory := monitoring.NewBalancesSourceFactory(chainConfig, log.With("component", "balances-source"))
if cfg.Feature.TestOnlyFakeReaders {
transmissionReader = monitoring.NewRandomDataReader(bgCtx, wg, "transmission", log.With("component", "rand-reader", "account", "transmissions"))
stateReader = monitoring.NewRandomDataReader(bgCtx, wg, "state", log.With("component", "rand-reader", "account", "state"))
} else {
transmissionReader = monitoring.NewTransmissionReader(client)
stateReader = monitoring.NewStateReader(client)
balancesSourceFactory = monitoring.NewFakeBalancesSourceFactory(log.With("component", "fake-balances-source"))
}

monitor := monitoring.NewMultiFeedMonitor(
cfg.Solana,
metrics := relayMonitoring.DefaultMetrics

log,
transmissionReader, stateReader,
prometheusExporterFactory := relayMonitoring.NewPrometheusExporterFactory(
log.With("component", "prometheus-exporter"),
metrics,
)
kafkaExporterFactory := relayMonitoring.NewKafkaExporterFactory(
log.With("component", "kafka-exporter"),
producer,
monitoring.DefaultMetrics,

cfg.Kafka.ConfigSetTopic,
transmissionSchema,
configSetSimplifiedSchema,

cfg.Kafka.ConfigSetSimplifiedTopic,
cfg.Kafka.TransmissionTopic,
)

configSetSchema,
configSetSimplifiedSchema,
transmissionSchema,
balancesPrometheusExporterFactory := monitoring.NewPrometheusExporterFactory(
log.With("component", "balances-prometheus-exporter"),
monitoring.DefaultMetrics,
)

monitor := relayMonitoring.NewMultiFeedMonitor(
chainConfig,
log,
[]relayMonitoring.SourceFactory{sourceFactory, balancesSourceFactory},
[]relayMonitoring.ExporterFactory{prometheusExporterFactory, kafkaExporterFactory, balancesPrometheusExporterFactory},
)

rddSource := relayMonitoring.NewRDDSource(cfg.Feeds.URL, monitoring.SolanaFeedParser)
if cfg.Feature.TestOnlyFakeRdd {
// Generate between 2 and 10 random feeds every RDDPollInterval.
rddSource = monitoring.NewFakeRDDSource(2, 10)
}
rddPoller := relayMonitoring.NewSourcePoller(
rddSource,
log.With("component", "rdd-poller"),
cfg.Feeds.RDDPollInterval,
cfg.Feeds.RDDReadTimeout,
0, // no buffering!
)
wg.Add(1)
go func() {
defer wg.Done()
rddPoller.Run(bgCtx)
}()

manager := relayMonitoring.NewManager(
log.With("component", "manager"),
rddPoller,
)
wg.Add(1)
go func() {
defer wg.Done()
manager.Run(bgCtx, monitor.Run)
}()

if cfg.Feeds.FilePath != "" {
wg.Add(1)
go func() {
defer wg.Done()
monitor.Start(bgCtx, wg, cfg.Feeds.Feeds)
}()
} else if cfg.Feeds.URL != "" {
source := monitoring.NewRDDSource(cfg.Feeds.URL)
if cfg.Feature.TestOnlyFakeRdd {
source = monitoring.NewFakeRDDSource(2, 10)
}
rddPoller := monitoring.NewSourcePoller(
source,
log.With("component", "rdd-poller"),
cfg.Feeds.RDDPollInterval,
cfg.Feeds.RDDReadTimeout,
0, // no buffering!
)
wg.Add(1)
go func() {
defer wg.Done()
rddPoller.Start(bgCtx)
}()
manager := monitoring.NewManager(
log.With("component", "manager"),
rddPoller,
)
wg.Add(1)
go func() {
defer wg.Done()
manager.Start(bgCtx, wg, monitor.Start)
}()
} // the config package makes sure there is either a FilePath or a URL set!
// Configure HTTP server
http := relayMonitoring.NewHttpServer(bgCtx, cfg.Http.Address, log.With("component", "http-server"))
http.Handle("/metrics", metrics.HTTPHandler())
http.Handle("/debug", manager.HTTPHandler())
wg.Add(1)
go func() {
defer wg.Done()
http.Run(bgCtx)
}()

// Handle signals from the OS
osSignalsCh := make(chan os.Signal, 1)
signal.Notify(osSignalsCh, syscall.SIGINT, syscall.SIGTERM)
sig := <-osSignalsCh
log.Infof("received signal '%v'. Stopping", sig)
log.Infow("received signal. Stopping", "signal", sig)

cancelBgCtx()
if err := server.Shutdown(bgCtx); err != nil && !errors.Is(err, context.Canceled) {
log.Errorw("failed to shut http server down", "error", err)
}
wg.Wait()
log.Info("process stopped")
}

// logger config
Expand Down Expand Up @@ -168,3 +164,15 @@ func (l loggerConfig) LogLevel() zapcore.Level {
func (l loggerConfig) LogUnixTimestamps() bool {
return false // log timestamp in ISO8601
}

type logWrapper struct {
logger.Logger
}

func (l logWrapper) Criticalw(format string, values ...interface{}) {
l.Logger.CriticalW(format, values...)
}

func (l logWrapper) With(values ...interface{}) relayMonitoring.Logger {
return logWrapper{l.Logger.With(values...)}
}
21 changes: 11 additions & 10 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,20 @@ require (
require github.com/satori/go.uuid v1.2.0

require (
github.com/confluentinc/confluent-kafka-go v1.7.0
github.com/davecgh/go-spew v1.1.1
github.com/gagliardetto/gofuzz v1.2.2
github.com/gagliardetto/treeout v0.1.4
github.com/golang/protobuf v1.5.2
github.com/linkedin/goavro v2.1.0+incompatible
github.com/mr-tron/base58 v1.2.0
github.com/onsi/ginkgo/v2 v2.0.0
github.com/onsi/gomega v1.17.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.11.0
github.com/riferrei/srclient v0.4.0
github.com/prometheus/client_golang v1.12.0
github.com/rs/zerolog v1.26.1
github.com/smartcontractkit/chainlink-relay v0.0.0-20220126142536-3aae7ed1e37c
github.com/smartcontractkit/integrations-framework v1.0.37-0.20220125141905-c76d1f04870f
go.uber.org/atomic v1.9.0
go.uber.org/zap v1.19.1
golang.org/x/crypto v0.0.0-20211215165025-cf75a172585e
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
google.golang.org/protobuf v1.27.1
gopkg.in/yaml.v2 v2.4.0
)

Expand Down Expand Up @@ -66,6 +61,7 @@ require (
github.com/cavaliercoder/grab v2.0.0+incompatible // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/chai2010/gettext-go v0.0.0-20160711120539-c6fed771bfd5 // indirect
github.com/confluentinc/confluent-kafka-go v1.8.2 // indirect
github.com/containerd/containerd v1.5.9 // indirect
github.com/cyphar/filepath-securejoin v0.2.3 // indirect
github.com/deckarep/golang-set v1.7.1 // indirect
Expand All @@ -77,7 +73,7 @@ require (
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-metrics v0.0.1 // indirect
github.com/docker/go-units v0.4.0 // indirect
github.com/ethereum/go-ethereum v1.10.11 // indirect
github.com/ethereum/go-ethereum v1.10.15 // indirect
github.com/evanphx/json-patch v5.6.0+incompatible // indirect
github.com/exponent-io/jsonpath v0.0.0-20210407135951-1de76d718b3f // indirect
github.com/fatih/camelcase v1.0.0 // indirect
Expand All @@ -97,6 +93,7 @@ require (
github.com/gobwas/glob v0.2.3 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/btree v1.0.1 // indirect
github.com/google/go-cmp v0.5.6 // indirect
Expand Down Expand Up @@ -124,6 +121,7 @@ require (
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect
github.com/lib/pq v1.10.3 // indirect
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de // indirect
github.com/linkedin/goavro v2.1.0+incompatible // indirect
github.com/linkedin/goavro/v2 v2.9.7 // indirect
github.com/logrusorgru/aurora v2.0.3+incompatible // indirect
github.com/magiconair/properties v1.8.5 // indirect
Expand All @@ -146,6 +144,7 @@ require (
github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/mostynb/zstdpool-freelist v0.0.0-20201229113212-927304c0c3b1 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.2 // indirect
Expand All @@ -155,11 +154,13 @@ require (
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/riferrei/srclient v0.4.1-0.20211229125508-8edc580da179 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/rjeczalik/notify v0.9.2 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/rubenv/sql-migrate v0.0.0-20211023115951-9f02b1e13857 // indirect
github.com/russross/blackfriday v1.6.0 // indirect
github.com/santhosh-tekuri/jsonschema/v5 v5.0.0 // indirect
github.com/shirou/gopsutil v3.21.10+incompatible // indirect
github.com/shopspring/decimal v1.3.1 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
Expand Down Expand Up @@ -187,18 +188,18 @@ require (
go.uber.org/ratelimit v0.2.0 // indirect
golang.org/x/net v0.0.0-20211209124913-491a49abca63 // indirect
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect
golang.org/x/sys v0.0.0-20211210111614-af8b64212486 // indirect
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 // indirect
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect
google.golang.org/grpc v1.43.0 // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/gorp.v1 v1.7.2 // indirect
gopkg.in/guregu/null.v4 v4.0.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.66.2 // indirect
gopkg.in/linkedin/goavro.v1 v1.0.5 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
helm.sh/helm/v3 v3.7.2 // indirect
Expand Down
Loading