diff --git a/cmd/monitoring/main.go b/cmd/monitoring/main.go index 37c447f5d..c380f79a0 100644 --- a/cmd/monitoring/main.go +++ b/cmd/monitoring/main.go @@ -2,35 +2,141 @@ package main import ( "context" + "os" + "os/signal" + "sync" + "syscall" relayMonitoring "github.com/smartcontractkit/chainlink-relay/pkg/monitoring" + relayConfig "github.com/smartcontractkit/chainlink-relay/pkg/monitoring/config" "github.com/smartcontractkit/chainlink-solana/pkg/monitoring" "github.com/smartcontractkit/chainlink/core/logger" "go.uber.org/zap/zapcore" ) func main() { - ctx := context.Background() - coreLog := logger.NewLogger(loggerConfig{}).With("project", "solana") log := logWrapper{coreLog} - solanaConfig, err := monitoring.ParseSolanaConfig() + chainConfig, err := monitoring.ParseSolanaConfig() + if err != nil { + log.Fatalw("failed to parse solana-specific config", "error", err) + } + + sourceFactory := monitoring.NewSolanaSourceFactory( + chainConfig, + logWrapper{coreLog.With("component", "source")}, + ) + + wg := &sync.WaitGroup{} + defer wg.Wait() + + bgCtx, cancelBgCtx := context.WithCancel(context.Background()) + defer cancelBgCtx() + + cfg, err := relayConfig.Parse() + if err != nil { + log.Fatalw("failed to parse generic configuration", "error", err) + } + + schemaRegistry := relayMonitoring.NewSchemaRegistry(cfg.SchemaRegistry, log) + + transmissionSchema, err := schemaRegistry.EnsureSchema(cfg.Kafka.TransmissionTopic+"-value", relayMonitoring.TransmissionAvroSchema) + if err != nil { + log.Fatalw("failed to prepare transmission schema", "error", err) + } + configSetSimplifiedSchema, err := schemaRegistry.EnsureSchema(cfg.Kafka.ConfigSetSimplifiedTopic+"-value", relayMonitoring.ConfigSetSimplifiedAvroSchema) + if err != nil { + log.Fatalw("failed to prepare config_set_simplified schema", "error", err) + } + + producer, err := relayMonitoring.NewProducer(bgCtx, log.With("component", "producer"), cfg.Kafka) if err != nil { - log.Fatalw("failed to parse solana specific configuration", "error", err) + log.Fatalw("failed to create kafka producer", "error", err) + } + + if cfg.Feature.TestOnlyFakeReaders { + sourceFactory = relayMonitoring.NewRandomDataSourceFactory(bgCtx, wg, log.With("component", "rand-source")) + } + + balancesSourceFactory := monitoring.NewBalancesSourceFactory(chainConfig, log.With("component", "balances-source")) + if cfg.Feature.TestOnlyFakeReaders { + balancesSourceFactory = monitoring.NewFakeBalancesSourceFactory(log.With("component", "fake-balances-source")) } - solanaSourceFactory := monitoring.NewSolanaSourceFactory(logWrapper{coreLog.With("component", "source")}) + metrics := relayMonitoring.DefaultMetrics + + prometheusExporterFactory := relayMonitoring.NewPrometheusExporterFactory( + log.With("component", "prometheus-exporter"), + metrics, + ) + kafkaExporterFactory := relayMonitoring.NewKafkaExporterFactory( + log.With("component", "kafka-exporter"), + producer, + + transmissionSchema, + configSetSimplifiedSchema, - relayMonitoring.Facade( - ctx, + cfg.Kafka.ConfigSetSimplifiedTopic, + cfg.Kafka.TransmissionTopic, + ) + + balancesPrometheusExporterFactory := monitoring.NewPrometheusExporterFactory( + log.With("component", "balances-prometheus-exporter"), + monitoring.DefaultMetrics, + ) + + monitor := relayMonitoring.NewMultiFeedMonitor( + chainConfig, log, - solanaConfig, - solanaSourceFactory, - monitoring.SolanaFeedParser, + []relayMonitoring.SourceFactory{sourceFactory, balancesSourceFactory}, + []relayMonitoring.ExporterFactory{prometheusExporterFactory, kafkaExporterFactory, balancesPrometheusExporterFactory}, + ) + + rddSource := relayMonitoring.NewRDDSource(cfg.Feeds.URL, monitoring.SolanaFeedParser) + if cfg.Feature.TestOnlyFakeRdd { + // Generate between 2 and 10 random feeds every RDDPollInterval. + rddSource = monitoring.NewFakeRDDSource(2, 10) + } + rddPoller := relayMonitoring.NewSourcePoller( + rddSource, + log.With("component", "rdd-poller"), + cfg.Feeds.RDDPollInterval, + cfg.Feeds.RDDReadTimeout, + 0, // no buffering! + ) + wg.Add(1) + go func() { + defer wg.Done() + rddPoller.Run(bgCtx) + }() + + manager := relayMonitoring.NewManager( + log.With("component", "manager"), + rddPoller, ) + wg.Add(1) + go func() { + defer wg.Done() + manager.Run(bgCtx, monitor.Run) + }() + + // Configure HTTP server + http := relayMonitoring.NewHttpServer(bgCtx, cfg.Http.Address, log.With("component", "http-server")) + http.Handle("/metrics", metrics.HTTPHandler()) + http.Handle("/debug", manager.HTTPHandler()) + wg.Add(1) + go func() { + defer wg.Done() + http.Run(bgCtx) + }() + + // Handle signals from the OS + osSignalsCh := make(chan os.Signal, 1) + signal.Notify(osSignalsCh, syscall.SIGINT, syscall.SIGTERM) + sig := <-osSignalsCh + log.Infow("received signal. Stopping", "signal", sig) - log.Info("monitor stopped") } // logger config diff --git a/go.mod b/go.mod index e6bd2dc5e..a8efd76c8 100644 --- a/go.mod +++ b/go.mod @@ -14,25 +14,20 @@ require ( require github.com/satori/go.uuid v1.2.0 require ( - github.com/confluentinc/confluent-kafka-go v1.7.0 github.com/davecgh/go-spew v1.1.1 github.com/gagliardetto/gofuzz v1.2.2 github.com/gagliardetto/treeout v0.1.4 - github.com/golang/protobuf v1.5.2 - github.com/linkedin/goavro v2.1.0+incompatible - github.com/mr-tron/base58 v1.2.0 github.com/onsi/ginkgo/v2 v2.0.0 github.com/onsi/gomega v1.17.0 github.com/pkg/errors v0.9.1 - github.com/prometheus/client_golang v1.11.0 - github.com/riferrei/srclient v0.4.0 + github.com/prometheus/client_golang v1.12.0 github.com/rs/zerolog v1.26.1 + github.com/smartcontractkit/chainlink-relay v0.0.0-20220126142536-3aae7ed1e37c github.com/smartcontractkit/integrations-framework v1.0.37-0.20220125141905-c76d1f04870f go.uber.org/atomic v1.9.0 go.uber.org/zap v1.19.1 golang.org/x/crypto v0.0.0-20211215165025-cf75a172585e golang.org/x/sync v0.0.0-20210220032951-036812b2e83c - google.golang.org/protobuf v1.27.1 gopkg.in/yaml.v2 v2.4.0 ) @@ -66,6 +61,7 @@ require ( github.com/cavaliercoder/grab v2.0.0+incompatible // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/chai2010/gettext-go v0.0.0-20160711120539-c6fed771bfd5 // indirect + github.com/confluentinc/confluent-kafka-go v1.8.2 // indirect github.com/containerd/containerd v1.5.9 // indirect github.com/cyphar/filepath-securejoin v0.2.3 // indirect github.com/deckarep/golang-set v1.7.1 // indirect @@ -77,7 +73,7 @@ require ( github.com/docker/go-connections v0.4.0 // indirect github.com/docker/go-metrics v0.0.1 // indirect github.com/docker/go-units v0.4.0 // indirect - github.com/ethereum/go-ethereum v1.10.11 // indirect + github.com/ethereum/go-ethereum v1.10.15 // indirect github.com/evanphx/json-patch v5.6.0+incompatible // indirect github.com/exponent-io/jsonpath v0.0.0-20210407135951-1de76d718b3f // indirect github.com/fatih/camelcase v1.0.0 // indirect @@ -97,6 +93,7 @@ require ( github.com/gobwas/glob v0.2.3 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect + github.com/golang/protobuf v1.5.2 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/btree v1.0.1 // indirect github.com/google/go-cmp v0.5.6 // indirect @@ -124,6 +121,7 @@ require ( github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect github.com/lib/pq v1.10.3 // indirect github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de // indirect + github.com/linkedin/goavro v2.1.0+incompatible // indirect github.com/linkedin/goavro/v2 v2.9.7 // indirect github.com/logrusorgru/aurora v2.0.3+incompatible // indirect github.com/magiconair/properties v1.8.5 // indirect @@ -146,6 +144,7 @@ require ( github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00 // indirect github.com/morikuni/aec v1.0.0 // indirect github.com/mostynb/zstdpool-freelist v0.0.0-20201229113212-927304c0c3b1 // indirect + github.com/mr-tron/base58 v1.2.0 // indirect github.com/onsi/ginkgo v1.16.5 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.0.2 // indirect @@ -155,11 +154,13 @@ require ( github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.32.1 // indirect github.com/prometheus/procfs v0.7.3 // indirect + github.com/riferrei/srclient v0.4.1-0.20211229125508-8edc580da179 // indirect github.com/rivo/uniseg v0.2.0 // indirect github.com/rjeczalik/notify v0.9.2 // indirect github.com/robfig/cron/v3 v3.0.1 // indirect github.com/rubenv/sql-migrate v0.0.0-20211023115951-9f02b1e13857 // indirect github.com/russross/blackfriday v1.6.0 // indirect + github.com/santhosh-tekuri/jsonschema/v5 v5.0.0 // indirect github.com/shirou/gopsutil v3.21.10+incompatible // indirect github.com/shopspring/decimal v1.3.1 // indirect github.com/sirupsen/logrus v1.8.1 // indirect @@ -187,18 +188,18 @@ require ( go.uber.org/ratelimit v0.2.0 // indirect golang.org/x/net v0.0.0-20211209124913-491a49abca63 // indirect golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect - golang.org/x/sys v0.0.0-20211210111614-af8b64212486 // indirect + golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 // indirect golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect google.golang.org/grpc v1.43.0 // indirect + google.golang.org/protobuf v1.27.1 // indirect gopkg.in/gorp.v1 v1.7.2 // indirect gopkg.in/guregu/null.v4 v4.0.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.66.2 // indirect - gopkg.in/linkedin/goavro.v1 v1.0.5 // indirect gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect helm.sh/helm/v3 v3.7.2 // indirect diff --git a/go.sum b/go.sum index 0ebbfee57..c5186a978 100644 --- a/go.sum +++ b/go.sum @@ -348,8 +348,8 @@ github.com/cockroachdb/logtags v0.0.0-20190617123548-eb05cc24525f/go.mod h1:i/u9 github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI= github.com/codegangsta/inject v0.0.0-20150114235600-33e0aa1cb7c0/go.mod h1:4Zcjuz89kmFXt9morQgcfYZAYZ5n8WHjt81YYWIwtTM= github.com/codegangsta/negroni v1.0.0/go.mod h1:v0y3T5G7Y1UlFfyxFn/QLRU4a2EuNau2iZY63YTKWo0= -github.com/confluentinc/confluent-kafka-go v1.7.0 h1:tXh3LWb2Ne0WiU3ng4h5qiGA9XV61rz46w60O+cq8bM= -github.com/confluentinc/confluent-kafka-go v1.7.0/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg= +github.com/confluentinc/confluent-kafka-go v1.8.2 h1:PBdbvYpyOdFLehj8j+9ba7FL4c4Moxn79gy9cYKxG5E= +github.com/confluentinc/confluent-kafka-go v1.8.2/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg= github.com/consensys/bavard v0.1.8-0.20210406032232-f3452dc9b572/go.mod h1:Bpd0/3mZuaj6Sj+PqrmIquiOKy397AKGThQPaGzNXAQ= github.com/consensys/gnark-crypto v0.4.1-0.20210426202927-39ac3d4b3f1f/go.mod h1:815PAHg3wvysy0SyIqanF8gZ0Y1wjk/hrDHD/iT88+Q= github.com/containerd/aufs v0.0.0-20200908144142-dab0cbea06f4/go.mod h1:nukgQABAEopAHvB6j7cnP5zJ+/3aVcE7hCYqvIwAHyE= @@ -583,8 +583,9 @@ github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHj github.com/ethereum-optimism/go-optimistic-ethereum-utils v0.1.0 h1:+Pj8lKxF/2v5Frwrlted7XxcdlK7UtBIyfmrB+CrhD8= github.com/ethereum/go-ethereum v1.9.18/go.mod h1:JSSTypSMTkGZtAdAChH2wP5dZEvPGh3nUTuDpH+hNrg= github.com/ethereum/go-ethereum v1.9.24/go.mod h1:JIfVb6esrqALTExdz9hRYvrP0xBDf6wCncIu1hNwHpM= -github.com/ethereum/go-ethereum v1.10.11 h1:KKIcwpmur9iTaVbR2dxlHu+peHVhU+/KX//NWvT1n9U= github.com/ethereum/go-ethereum v1.10.11/go.mod h1:W3yfrFyL9C1pHcwY5hmRHVDaorTiQxhYBkKyu5mEDHw= +github.com/ethereum/go-ethereum v1.10.15 h1:E9o0kMbD8HXhp7g6UwIwntY05WTDheCGziMhegcBsQw= +github.com/ethereum/go-ethereum v1.10.15/go.mod h1:W3yfrFyL9C1pHcwY5hmRHVDaorTiQxhYBkKyu5mEDHw= github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch v4.11.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= @@ -1804,8 +1805,9 @@ github.com/prometheus/client_golang v1.1.0/go.mod h1:I1FGZT9+L76gKKOs5djB6ezCbFQ github.com/prometheus/client_golang v1.3.0/go.mod h1:hJaj2vgQTGQmVCsAACORcieXFeDPbaTKGT+JTgUa3og= github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= github.com/prometheus/client_golang v1.8.0/go.mod h1:O9VU6huf47PktckDQfMTX0Y8tY0/7TSWwj+ITvv0TnM= -github.com/prometheus/client_golang v1.11.0 h1:HNkLOAEQMIDv/K+04rukrLx6ch7msSRwf3/SASFAGtQ= github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= +github.com/prometheus/client_golang v1.12.0 h1:C+UIj/QWtmqY13Arb8kwMt5j34/0Z2iKamrJ+ryC0Gg= +github.com/prometheus/client_golang v1.12.0/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrbd179Rt297l4aS6nDY= github.com/prometheus/client_model v0.0.0-20171117100541-99fa1f4be8e5/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= @@ -1850,8 +1852,8 @@ github.com/prometheus/tsdb v0.10.0 h1:If5rVCMTp6W2SiRAQFlbpJNgVlgMEd+U2GZckwK38i github.com/prometheus/tsdb v0.10.0/go.mod h1:oi49uRhEe9dPUTlS3JRZOwJuVi6tmh10QSgwXEyGCt4= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52/go.mod h1:RDpi1RftBQPUCDRw6SmxeaREsAaRKnOclghuzp/WRzc= -github.com/riferrei/srclient v0.4.0 h1:lms2bs8BXZNRlSEQioqXjMrPYlFeT9yoeCe22yb51rM= -github.com/riferrei/srclient v0.4.0/go.mod h1:SmCz0lrYQ1pLqXlYq0yPnRccHLGh+llDA0i6hecPeW8= +github.com/riferrei/srclient v0.4.1-0.20211229125508-8edc580da179 h1:eImEYUKu9U/1xijKfmDWJd9xkrX2sLa+y4+Fv7/aA+c= +github.com/riferrei/srclient v0.4.1-0.20211229125508-8edc580da179/go.mod h1:vbkLmWcgYa7JgfPvuy/+K8fTS0p1bApqadxrxi/S1MI= github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rjeczalik/notify v0.9.1/go.mod h1:rKwnCoCGeuQnwBtTSPL9Dad03Vh2n40ePRrjvIXnJho= @@ -1884,6 +1886,8 @@ github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/safchain/ethtool v0.0.0-20190326074333-42ed695e3de8/go.mod h1:Z0q5wiBQGYcxhMZ6gUqHn6pYNLypFAvaL3UvgZLR0U4= github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= +github.com/santhosh-tekuri/jsonschema/v5 v5.0.0 h1:TToq11gyfNlrMFZiYujSekIsPd9AmsA2Bj/iv+s4JHE= +github.com/santhosh-tekuri/jsonschema/v5 v5.0.0/go.mod h1:FKdcjfQW6rpZSnxxUvEA5H/cDPdvJ/SZJQLWWXWGrZ0= github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/schollz/closestmatch v2.1.0+incompatible/go.mod h1:RtP1ddjLong6gTkbtmuhtR2uUrrJOpYzYRvbcPAid+g= @@ -1938,6 +1942,8 @@ github.com/smartcontractkit/chainlink v0.8.10-0.20200825114219-81dd2fc95bac/go.m github.com/smartcontractkit/chainlink v0.9.5-0.20201207211610-6c7fee37d5b7/go.mod h1:kmdLJbVZRCnBLiL6gG+U+1+0ofT3bB48DOF8tjQvcoI= github.com/smartcontractkit/chainlink v1.0.1-0.20211209223503-68928efa429a h1:no2PzLdUgYp4fLkVTGCiAuFx2ySAi2cjRjCmFeuViMc= github.com/smartcontractkit/chainlink v1.0.1-0.20211209223503-68928efa429a/go.mod h1:PSKo1vbT0/cUSbgH/rKNTDwWJyX8bc6M9/YVRe/nrXU= +github.com/smartcontractkit/chainlink-relay v0.0.0-20220126142536-3aae7ed1e37c h1:ccrPbNlQYFqHSf9+q4hZzglgzAdxMyFkFlBSftpjnUQ= +github.com/smartcontractkit/chainlink-relay v0.0.0-20220126142536-3aae7ed1e37c/go.mod h1:ojPwXEIk6xHFAoVMM5PoybiN/ici+FrVs5pI9/lq/WI= github.com/smartcontractkit/helmenv v1.0.27 h1:BVxTQZQlFElh6YVK8mY9LSljX7uyx4jyImmHh/fMqEo= github.com/smartcontractkit/helmenv v1.0.27/go.mod h1:ef0doolSZf8ckqaWMIK2M+EPXdIKYVzttd6EXaCgCK4= github.com/smartcontractkit/integrations-framework v1.0.37-0.20220125141905-c76d1f04870f h1:H/9yoU5QNrrTcq0wc9xxWwqwtqhDEuK8QxxgRkoqlRw= @@ -2220,8 +2226,8 @@ go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.0.0/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= -go.uber.org/goleak v1.1.11-0.20210813005559-691160354723 h1:sHOAIxRGBp443oHZIPB+HsUGaksVCXVQENPxwTfQdH4= go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= +go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= @@ -2573,8 +2579,8 @@ golang.org/x/sys v0.0.0-20210831042530-f4d43177bf5e/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210908233432-aa78b53d3365/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20211210111614-af8b64212486 h1:5hpz5aRr+W1erYCL5JRhSUBJRph7l9XkNveoExlrKYk= -golang.org/x/sys v0.0.0-20211210111614-af8b64212486/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 h1:XfKQ4OlFl8okEOr5UvAqFRVj8pY/4yfcXrddB8qAbU0= +golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20201210144234-2321bbc49cbf/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -2914,7 +2920,6 @@ gopkg.in/jcmturner/goidentity.v3 v3.0.0/go.mod h1:oG2kH0IvSYNIu80dVAyu/yoefjq1mN gopkg.in/jcmturner/gokrb5.v7 v7.2.3/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM= gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8= gopkg.in/linkedin/goavro.v1 v1.0.5 h1:BJa69CDh0awSsLUmZ9+BowBdokpduDZSM9Zk8oKHfN4= -gopkg.in/linkedin/goavro.v1 v1.0.5/go.mod h1:Aw5GdAbizjOEl0kAMHV9iHmA8reZzW/OKuJAl4Hb9F0= gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA= gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce h1:+JknDZhAj8YMt7GC73Ei8pv4MzjDUNPHgQWJdtMAaDU= diff --git a/pkg/monitoring/chain_reader.go b/pkg/monitoring/chain_reader.go deleted file mode 100644 index d3e245874..000000000 --- a/pkg/monitoring/chain_reader.go +++ /dev/null @@ -1,60 +0,0 @@ -package monitoring - -import ( - "context" - "fmt" - - "github.com/gagliardetto/solana-go" - "github.com/gagliardetto/solana-go/rpc" - pkgSolana "github.com/smartcontractkit/chainlink-solana/pkg/solana" -) - -const ( - commitment = rpc.CommitmentConfirmed -) - -// ChainReader is a wrapper on top of the chain-specific RCP client. -type ChainReader interface { - Read(ctx context.Context, address []byte) (interface{}, error) -} - -type TransmissionEnvelope struct { - Answer pkgSolana.Answer - BlockNumber uint64 -} - -func NewTransmissionReader(client *rpc.Client) ChainReader { - return &trReader{client} -} - -type trReader struct { - client *rpc.Client -} - -func (t *trReader) Read(ctx context.Context, transmissionsAccountRaw []byte) (interface{}, error) { - transmissionsAccount := solana.PublicKeyFromBytes(transmissionsAccountRaw) - answer, blockNum, err := pkgSolana.GetLatestTransmission(ctx, t.client, transmissionsAccount, commitment) - return TransmissionEnvelope{answer, blockNum}, err -} - -func NewStateReader(client *rpc.Client) ChainReader { - return &stReader{client} -} - -type stReader struct { - client *rpc.Client -} - -type StateEnvelope struct { - State pkgSolana.State - BlockNumber uint64 -} - -func (s *stReader) Read(ctx context.Context, stateAccountRaw []byte) (interface{}, error) { - stateAccount := solana.PublicKeyFromBytes(stateAccountRaw) - state, blockNum, err := pkgSolana.GetState(ctx, s.client, stateAccount, commitment) - if err != nil { - return nil, fmt.Errorf("failed to fetch state : %w", err) - } - return StateEnvelope{state, blockNum}, nil -} diff --git a/pkg/monitoring/feed.go b/pkg/monitoring/feed.go index fe1c0ac50..41ea942c8 100644 --- a/pkg/monitoring/feed.go +++ b/pkg/monitoring/feed.go @@ -28,6 +28,14 @@ type SolanaFeedConfig struct { var _ relayMonitoring.FeedConfig = SolanaFeedConfig{} +// GetID returns the state account's address as that uniquely +// identifies a feed on Solana. In Solana, a program is stateless and we +// use the same program for all feeds so we can't use the program +// account's address. +func (s SolanaFeedConfig) GetID() string { + return s.StateAccountBase58 +} + func (s SolanaFeedConfig) GetName() string { return s.Name } @@ -56,12 +64,16 @@ func (s SolanaFeedConfig) GetContractStatus() string { return s.ContractStatus } +// GetID returns the state account's address as that uniquely +// identifies a feed on Solana. In Solana, a program is stateless and we +// use the same program for all feeds so we can't use the program +// account's address. func (s SolanaFeedConfig) GetContractAddress() string { - return s.ContractAddress.String() + return s.StateAccountBase58 } func (s SolanaFeedConfig) GetContractAddressBytes() []byte { - return s.ContractAddress.Bytes() + return s.StateAccount.Bytes() } func (s SolanaFeedConfig) ToMapping() map[string]interface{} { diff --git a/pkg/monitoring/metrics.go b/pkg/monitoring/metrics.go index 17246a932..db32076f4 100644 --- a/pkg/monitoring/metrics.go +++ b/pkg/monitoring/metrics.go @@ -1,142 +1,66 @@ package monitoring import ( - "math/big" - "net/http" + "fmt" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" ) -type Metrics interface { - SetHeadTrackerCurrentHead(blockNumber uint64, networkName, chainID, networkID string) - SetFeedContractMetadata(chainID, contractAddress, feedID, contractStatus, contractType, feedName, feedPath, networkID, networkName, symbol string) - SetNodeMetadata(chainID, networkID, networkName, oracleName, sender string) - SetOffchainAggregatorAnswers(answer *big.Int, contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string) - IncOffchainAggregatorAnswersTotal(contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string) - SetOffchainAggregatorSubmissionReceivedValues(value *big.Int, contractAddress, feedID, sender, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string) - SetOffchainAggregatorAnswerStalled(isSet bool, contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string) - // Cleanup deletes all the metrics - Cleanup(networkName, networkID, chainID, oracleName, sender, feedName, feedPath, symbol, contractType, contractStatus, contractAddress, feedID string) - // Exposes the accumulated metrics to HTTP. - HTTPHandler() http.Handler +var BalanceAccountNames = []string{ + "contract", + "state", + "transmissions", + "token_vault", + "requester_access_controller", + "billing_access_controller", } -var ( - headTrackerCurrentHead = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "head_tracker_current_head", - Help: "Tracks the current block height that the monitoring instance has processed.", - }, - []string{"network_name", "chain_id", "network_id"}, - ) - feedContractMetadata = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "feed_contract_metadata", - Help: "Exposes metadata for individual feeds. It should simply be set to 1, as the relevant info is in the labels.", - }, - []string{"chain_id", "contract_address", "feed_id", "contract_status", "contract_type", "feed_name", "feed_path", "network_id", "network_name", "symbol"}, - ) - nodeMetadata = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "node_metadata", - Help: "Exposes metadata for node operators. It should simply be set to 1, as the relevant info is in the labels.", - }, - []string{"chain_id", "network_id", "network_name", "oracle_name", "sender"}, - ) - offchainAggregatorAnswers = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "offchain_aggregator_answers", - Help: "Reports the latest answer for a contract.", - }, - []string{"contract_address", "feed_id", "chain_id", "contract_status", "contract_type", "feed_name", "feed_path", "network_id", "network_name"}, - ) - offchainAggregatorAnswersTotal = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "offchain_aggregator_answers_total", - Help: "Bump this metric every time there is a transmission on chain.", - }, - []string{"contract_address", "feed_id", "chain_id", "contract_status", "contract_type", "feed_name", "feed_path", "network_id", "network_name"}, - ) - offchainAggregatorSubmissionReceivedValues = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "offchain_aggregator_submission_received_values", - Help: "Report individual node observations for the latest transmission on chain. (Should be 1 time series per node per contract)", - }, - []string{"contract_address", "feed_id", "sender", "chain_id", "contract_status", "contract_type", "feed_name", "feed_path", "network_id", "network_name"}, - ) - offchainAggregatorAnswerStalled = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "offchain_aggregator_answer_stalled", - Help: "Set to 1 if the heartbeat interval has passed on a feed without a transmission. Set to 0 otherwise.", - }, - []string{"contract_address", "feed_id", "chain_id", "contract_status", "contract_type", "feed_name", "feed_path", "network_id", "network_name"}, - ) -) - -var DefaultMetrics Metrics - -func init() { - prometheus.MustRegister(headTrackerCurrentHead) - prometheus.MustRegister(feedContractMetadata) - prometheus.MustRegister(nodeMetadata) - prometheus.MustRegister(offchainAggregatorAnswers) - prometheus.MustRegister(offchainAggregatorAnswersTotal) - prometheus.MustRegister(offchainAggregatorSubmissionReceivedValues) - prometheus.MustRegister(offchainAggregatorAnswerStalled) - - DefaultMetrics = &defaultMetrics{} +var gauges map[string]*prometheus.GaugeVec + +var labelNames = []string{ + "account_address", + "feed_id", + "chain_id", + "contract_status", + "contract_type", + "feed_name", + "feed_path", + "network_id", + "network_name", } -type defaultMetrics struct{} - -func (d *defaultMetrics) SetHeadTrackerCurrentHead(blockNumber uint64, networkName, chainID, networkID string) { - headTrackerCurrentHead.WithLabelValues(networkName, chainID, networkID).Set(float64(blockNumber)) -} - -func (d *defaultMetrics) SetFeedContractMetadata(chainID, contractAddress, feedID, contractStatus, contractType, feedName, feedPath, networkID, networkName, symbol string) { - feedContractMetadata.WithLabelValues(chainID, contractAddress, feedID, contractStatus, contractType, feedName, feedPath, networkID, networkName, symbol).Set(1) -} - -func (d *defaultMetrics) SetNodeMetadata(chainID, networkID, networkName, oracleName, sender string) { - nodeMetadata.WithLabelValues(chainID, networkID, networkName, oracleName, sender).Set(1) +func init() { + gauges = map[string]*prometheus.GaugeVec{} + for _, name := range BalanceAccountNames { + gauges[name] = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: fmt.Sprintf("sol_balance_%s", name), + }, + labelNames, + ) + prometheus.MustRegister(gauges[name]) + } } -func (d *defaultMetrics) SetOffchainAggregatorAnswers(answer *big.Int, contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string) { - offchainAggregatorAnswers.WithLabelValues(contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName).Set(float64(answer.Int64())) +type Metrics interface { + SetBalance(balance uint64, balanceAccountName, accountAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string) + Cleanup(accountAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string) } -func (d *defaultMetrics) IncOffchainAggregatorAnswersTotal(contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string) { - offchainAggregatorAnswersTotal.WithLabelValues(contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName).Inc() -} +type defaultMetrics struct{} -func (d *defaultMetrics) SetOffchainAggregatorSubmissionReceivedValues(value *big.Int, contractAddress, feedID, sender, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string) { - offchainAggregatorSubmissionReceivedValues.WithLabelValues(contractAddress, feedID, sender, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName).Set(float64(value.Int64())) -} +var DefaultMetrics = &defaultMetrics{} -func (d *defaultMetrics) SetOffchainAggregatorAnswerStalled(isSet bool, contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string) { - var value float64 = 0 - if isSet { - value = 1 +func (d *defaultMetrics) SetBalance(balance uint64, balanceAccountName, accountAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string) { + gauge, found := gauges[balanceAccountName] + if !found { + panic(fmt.Sprintf("gauge not know %s", balanceAccountName)) } - offchainAggregatorAnswerStalled.WithLabelValues(contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName).Set(value) + gauge.WithLabelValues(accountAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName).Set(float64(balance)) } -func (d *defaultMetrics) Cleanup( - networkName, networkID, chainID, oracleName, sender string, - feedName, feedPath, symbol, contractType, contractStatus string, - contractAddress, feedID string, -) { - // TODO (dru) can delete fail?! - _ = headTrackerCurrentHead.DeleteLabelValues(networkName, chainID, networkID) - _ = feedContractMetadata.DeleteLabelValues(chainID, contractAddress, feedID, contractStatus, contractType, feedName, feedPath, networkID, networkName, symbol) - _ = nodeMetadata.DeleteLabelValues(chainID, networkID, networkName, oracleName, sender) - _ = offchainAggregatorAnswers.DeleteLabelValues(contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName) - _ = offchainAggregatorAnswersTotal.DeleteLabelValues(contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName) - _ = offchainAggregatorSubmissionReceivedValues.DeleteLabelValues(contractAddress, feedID, sender, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName) - _ = offchainAggregatorAnswerStalled.DeleteLabelValues(contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName) -} - -func (d *defaultMetrics) HTTPHandler() http.Handler { - return promhttp.Handler() +func (d *defaultMetrics) Cleanup(accountAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string) { + for _, name := range BalanceAccountNames { + _ = gauges[name].DeleteLabelValues(accountAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName) + } } diff --git a/pkg/monitoring/prometheus_exporter.go b/pkg/monitoring/prometheus_exporter.go new file mode 100644 index 000000000..8d6b1f09f --- /dev/null +++ b/pkg/monitoring/prometheus_exporter.go @@ -0,0 +1,100 @@ +package monitoring + +import ( + "context" + "sync" + + relayMonitoring "github.com/smartcontractkit/chainlink-relay/pkg/monitoring" +) + +func NewPrometheusExporterFactory( + log relayMonitoring.Logger, + metrics Metrics, +) relayMonitoring.ExporterFactory { + return &prometheusExporterFactory{ + log, + metrics, + } +} + +type prometheusExporterFactory struct { + log relayMonitoring.Logger + metrics Metrics +} + +func (p *prometheusExporterFactory) NewExporter( + chainConfig relayMonitoring.ChainConfig, + feedConfig relayMonitoring.FeedConfig, +) (relayMonitoring.Exporter, error) { + return &prometheusExporter{ + chainConfig, + feedConfig, + p.log, + p.metrics, + sync.Mutex{}, + make(map[string]struct{}), + }, nil +} + +type prometheusExporter struct { + chainConfig relayMonitoring.ChainConfig + feedConfig relayMonitoring.FeedConfig + + log relayMonitoring.Logger + metrics Metrics + + addressesMu sync.Mutex + addressesSet map[string]struct{} +} + +func (p *prometheusExporter) Export(ctx context.Context, data interface{}) { + balances, isBalances := data.(Balances) + if !isBalances { + return + } + for _, key := range BalanceAccountNames { + address, okAddress := balances.Addresses[key] + value, okValue := balances.Values[key] + gauge, okGauge := gauges[key] + if !okAddress || !okValue || !okGauge { + p.log.Errorw("mismatch address and balance for key", "key", key, "address", address, "value", value, "gauge", gauge) + continue + } + p.metrics.SetBalance( + value, + key, + address.String(), + p.feedConfig.GetContractAddress(), + p.chainConfig.GetChainID(), + p.feedConfig.GetContractStatus(), + p.feedConfig.GetContractType(), + p.feedConfig.GetName(), + p.feedConfig.GetPath(), + p.chainConfig.GetNetworkID(), + p.chainConfig.GetNetworkName(), + ) + } + p.addressesMu.Lock() + defer p.addressesMu.Unlock() + for _, address := range balances.Addresses { + p.addressesSet[address.String()] = struct{}{} + } +} + +func (p *prometheusExporter) Cleanup(_ context.Context) { + p.addressesMu.Lock() + defer p.addressesMu.Unlock() + for address := range p.addressesSet { + p.metrics.Cleanup( + address, + p.feedConfig.GetContractAddress(), + p.chainConfig.GetChainID(), + p.feedConfig.GetContractStatus(), + p.feedConfig.GetContractType(), + p.feedConfig.GetName(), + p.feedConfig.GetPath(), + p.chainConfig.GetNetworkID(), + p.chainConfig.GetNetworkName(), + ) + } +} diff --git a/pkg/monitoring/source.go b/pkg/monitoring/source.go index b1a27fa45..190f896a5 100644 --- a/pkg/monitoring/source.go +++ b/pkg/monitoring/source.go @@ -3,17 +3,33 @@ package monitoring import ( "context" "fmt" + "math/big" + "time" + "github.com/gagliardetto/solana-go/rpc" relayMonitoring "github.com/smartcontractkit/chainlink-relay/pkg/monitoring" pkgSolana "github.com/smartcontractkit/chainlink-solana/pkg/solana" + "github.com/smartcontractkit/libocr/offchainreporting2/types" ) -func NewSolanaSourceFactory(log relayMonitoring.Logger) relayMonitoring.SourceFactory { - return &sourceFactory{log} +const ( + commitment = rpc.CommitmentConfirmed +) + +func NewSolanaSourceFactory( + solanaConfig SolanaConfig, + log relayMonitoring.Logger, +) relayMonitoring.SourceFactory { + client := rpc.New(solanaConfig.RPCEndpoint) + return &sourceFactory{ + client, + log, + } } type sourceFactory struct { - log relayMonitoring.Logger + client *rpc.Client + log relayMonitoring.Logger } func (s *sourceFactory) NewSource( @@ -28,51 +44,58 @@ func (s *sourceFactory) NewSource( if !ok { return nil, fmt.Errorf("expected feedConfig to be of type SolanaFeedConfig not %T", feedConfig) } - spec := pkgSolana.OCR2Spec{ - ProgramID: solanaFeedConfig.ContractAddress, - StateID: solanaFeedConfig.StateAccount, - } - client := pkgSolana.NewClient(solanaConfig.RPCEndpoint) - tracker := pkgSolana.NewTracker(spec, client, nil, &logAdapter{s.log}) return &solanaSource{ - &tracker, + s.client, solanaConfig, solanaFeedConfig, }, nil } type solanaSource struct { - tracker *pkgSolana.ContractTracker + client *rpc.Client solanaConfig SolanaConfig feedConfig SolanaFeedConfig } func (s *solanaSource) Fetch(ctx context.Context) (interface{}, error) { - changedInBlock, _, err := s.tracker.LatestConfigDetails(ctx) + state, blockNum, err := pkgSolana.GetState(ctx, s.client, s.feedConfig.StateAccount, commitment) if err != nil { - return relayMonitoring.Envelope{}, fmt.Errorf("failed to fetch latest config details from on-chain: %w", err) + return nil, fmt.Errorf("failed to state from on-chain: %w", err) } - cfg, err := s.tracker.LatestConfig(ctx, changedInBlock) + contractConfig, err := pkgSolana.ConfigFromState(state) if err != nil { - return relayMonitoring.Envelope{}, fmt.Errorf("failed to read latest config from on-chain: %w", err) + return nil, fmt.Errorf("failed to decode ContractConfig from on-chain state: %w", err) } - configDigest, epoch, round, latestAnswer, latestTimestamp, err := s.tracker.LatestTransmissionDetails(ctx) + answer, blockNum, err := pkgSolana.GetLatestTransmission(ctx, s.client, state.Transmissions, commitment) if err != nil { - return relayMonitoring.Envelope{}, fmt.Errorf("failed to read latest transmission from on-chain: %w", err) + return nil, fmt.Errorf("failed to fetch latest on-chain transmission: %w", err) + } + linkBalanceRes, err := s.client.GetTokenAccountBalance(ctx, state.Config.TokenVault, commitment) + if err != nil { + return nil, fmt.Errorf("failed to read the feed's link balance: %w", err) + } + if linkBalanceRes.Value == nil { + return nil, fmt.Errorf("link balance not found for token vault") + } + linkBalance, success := big.NewInt(0).SetString(linkBalanceRes.Value.Amount, 10) + if !success { + return nil, fmt.Errorf("failed to parse link balance value: %s", linkBalanceRes.Value.Amount) } - transmitter := s.tracker.FromAccount() - return relayMonitoring.Envelope{ - configDigest, - epoch, - round, - latestAnswer, - latestTimestamp, + ConfigDigest: state.Config.LatestConfigDigest, + Epoch: state.Config.Epoch, + Round: state.Config.Round, + + LatestAnswer: answer.Data, + LatestTimestamp: time.Unix(int64(answer.Timestamp), 0), - cfg, + // latest contract config + ContractConfig: contractConfig, - changedInBlock, - transmitter, + // extra + BlockNumber: blockNum, + Transmitter: types.Account(state.Config.LatestTransmitter.String()), + LinkBalance: linkBalance.Uint64(), }, nil } diff --git a/pkg/monitoring/source_balances.go b/pkg/monitoring/source_balances.go new file mode 100644 index 000000000..be5481ec1 --- /dev/null +++ b/pkg/monitoring/source_balances.go @@ -0,0 +1,103 @@ +package monitoring + +import ( + "context" + "fmt" + "sync" + + "github.com/gagliardetto/solana-go" + "github.com/gagliardetto/solana-go/rpc" + relayMonitoring "github.com/smartcontractkit/chainlink-relay/pkg/monitoring" + pkgSolana "github.com/smartcontractkit/chainlink-solana/pkg/solana" +) + +func NewBalancesSourceFactory( + solanaConfig SolanaConfig, + log relayMonitoring.Logger, +) relayMonitoring.SourceFactory { + client := rpc.New(solanaConfig.RPCEndpoint) + return &balancesSourceFactory{ + client, + log, + } +} + +type balancesSourceFactory struct { + client *rpc.Client + log relayMonitoring.Logger +} + +func (s *balancesSourceFactory) NewSource( + chainConfig relayMonitoring.ChainConfig, + feedConfig relayMonitoring.FeedConfig, +) (relayMonitoring.Source, error) { + solanaConfig, ok := chainConfig.(SolanaConfig) + if !ok { + return nil, fmt.Errorf("expected chainConfig to be of type SolanaConfig not %T", chainConfig) + } + solanaFeedConfig, ok := feedConfig.(SolanaFeedConfig) + if !ok { + return nil, fmt.Errorf("expected feedConfig to be of type SolanaFeedConfig not %T", feedConfig) + } + return &balancesSource{ + s.client, + s.log, + solanaConfig, + solanaFeedConfig, + }, nil +} + +type balancesSource struct { + client *rpc.Client + log relayMonitoring.Logger + solanaConfig SolanaConfig + feedConfig SolanaFeedConfig +} + +type Balances struct { + Values map[string]uint64 + Addresses map[string]solana.PublicKey +} + +func (s *balancesSource) Fetch(ctx context.Context) (interface{}, error) { + state, _, err := pkgSolana.GetState(ctx, s.client, s.feedConfig.StateAccount, rpc.CommitmentConfirmed) + if err != nil { + return nil, fmt.Errorf("failed to get contract state: %w", err) + } + isErr := false + balances := Balances{ + Values: make(map[string]uint64), + Addresses: make(map[string]solana.PublicKey), + } + balancesMu := &sync.Mutex{} + wg := &sync.WaitGroup{} + wg.Add(len(BalanceAccountNames)) + for key, address := range map[string]solana.PublicKey{ + "contract": s.feedConfig.ContractAddress, + "state": s.feedConfig.StateAccount, + "transmissions": state.Transmissions, + "token_vault": state.Config.TokenVault, + "requester_access_controller": state.Config.RequesterAccessController, + "billing_access_controller": state.Config.BillingAccessController, + } { + go func(key string, address solana.PublicKey) { + defer wg.Done() + res, err := s.client.GetBalance(ctx, address, rpc.CommitmentProcessed) + balancesMu.Lock() + defer balancesMu.Unlock() + if err != nil { + s.log.Errorw("failed to read the sol balance", "key", key, "address", address.String(), "error", err) + isErr = true + return + } + balances.Values[key] = res.Value + balances.Addresses[key] = address + }(key, address) + } + + wg.Wait() + if isErr { + return Balances{}, fmt.Errorf("error while fetching balances") + } + return balances, nil +} diff --git a/pkg/monitoring/testutils.go b/pkg/monitoring/testutils.go index 4871b97e5..7a105a411 100644 --- a/pkg/monitoring/testutils.go +++ b/pkg/monitoring/testutils.go @@ -1,11 +1,13 @@ package monitoring import ( + "context" "fmt" "math/rand" "time" "github.com/gagliardetto/solana-go" + relayMonitoring "github.com/smartcontractkit/chainlink-relay/pkg/monitoring" ) func generatePublicKey() solana.PublicKey { @@ -57,6 +59,58 @@ func generate32ByteArr() [32]byte { return out } +func NewFakeRDDSource(minFeeds, maxFeeds uint8) relayMonitoring.Source { + return &fakeRddSource{minFeeds, maxFeeds} +} + +type fakeRddSource struct { + minFeeds, maxFeeds uint8 +} + +func (f *fakeRddSource) Fetch(_ context.Context) (interface{}, error) { + numFeeds := int(f.minFeeds) + rand.Intn(int(f.maxFeeds-f.minFeeds)) + feeds := make([]relayMonitoring.FeedConfig, numFeeds) + for i := 0; i < numFeeds; i++ { + feeds[i] = generateSolanaFeedConfig() + } + return feeds, nil +} + +func NewFakeBalancesSourceFactory(log relayMonitoring.Logger) relayMonitoring.SourceFactory { + return &fakeSourceFactory{log} +} + +type fakeSourceFactory struct { + log relayMonitoring.Logger +} + +func (f *fakeSourceFactory) NewSource( + _ relayMonitoring.ChainConfig, + _ relayMonitoring.FeedConfig, +) (relayMonitoring.Source, error) { + return &fakeSource{f.log}, nil +} + +type fakeSource struct { + log relayMonitoring.Logger +} + +func (f *fakeSource) Fetch(ctx context.Context) (interface{}, error) { + return generateBalances(), nil +} + +func generateBalances() Balances { + out := Balances{ + make(map[string]uint64), + make(map[string]solana.PublicKey), + } + for _, key := range BalanceAccountNames { + out.Values[key] = rand.Uint64() + out.Addresses[key] = generatePublicKey() + } + return out +} + // This utilities are used primarely in tests but are present in the monitoring package because they are not inside a file ending in _test.go. // This is done in order to expose NewRandomDataReader for use in cmd/monitoring. // The following code is added to comply with the "unused" linter: @@ -65,4 +119,7 @@ var ( _ = generatePublicKey() _ = generateSolanaFeedConfig() _ = generate32ByteArr() + _ = fakeRddSource{} + _ = fakeSourceFactory{} + _ = fakeSource{} ) diff --git a/pkg/solana/config_digester_test.go b/pkg/solana/config_digester_test.go index a1abc7037..40f73dd6e 100644 --- a/pkg/solana/config_digester_test.go +++ b/pkg/solana/config_digester_test.go @@ -19,7 +19,7 @@ func TestConfigDigester(t *testing.T) { var state State err = bin.NewBorshDecoder(mockState.Raw).Decode(&state) require.NoError(t, err) - config, err := configFromState(state) + config, err := ConfigFromState(state) require.NoError(t, err) actualDigest, err := digester.ConfigDigest(config) diff --git a/pkg/solana/config_tracker.go b/pkg/solana/config_tracker.go index a6ce3d27a..b5c1107bc 100644 --- a/pkg/solana/config_tracker.go +++ b/pkg/solana/config_tracker.go @@ -19,7 +19,7 @@ func (c *ContractTracker) LatestConfigDetails(ctx context.Context) (changedInBlo return state.Config.LatestConfigBlockNumber, state.Config.LatestConfigDigest, err } -func configFromState(state State) (types.ContractConfig, error) { +func ConfigFromState(state State) (types.ContractConfig, error) { pubKeys := []types.OnchainPublicKey{} accounts := []types.Account{} for _, o := range state.Oracles.Data() { @@ -57,7 +57,7 @@ func (c *ContractTracker) LatestConfig(ctx context.Context, changedInBlock uint6 if err != nil { return types.ContractConfig{}, err } - return configFromState(state) + return ConfigFromState(state) } // LatestBlockHeight returns the height of the most recent block in the chain.