Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Follow changes in RDD #81

Merged
merged 9 commits into from
Jan 11, 2022
5 changes: 3 additions & 2 deletions cmd/monitoring/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ KAFKA_TRANSMISSION_TOPIC="transmission_topic" \
SCHEMA_REGISTRY_URL="http://localhost:8989" \
SCHEMA_REGISTRY_USERNAME="" \
SCHEMA_REGISTRY_PASSWORD="" \
FEEDS_FILE_PATH="/tmp/feeds.json" \
HTTP_ADDRESS="localhost:3000" \
FEATURE_TEST_MODE=true \
FEEDS_URL="http://localhost:4000" \
FEATURE_TEST_ONLY_FAKE_READERS=true \
FEATURE_TEST_ONLY_FAKE_RDD=true \
go run ./cmd/monitoring/main.go
```

Expand Down
46 changes: 37 additions & 9 deletions cmd/monitoring/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func main() {
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalw("failed to start http server", "address", cfg.Http.Address, "error", err)
} else {
log.Info("http server closed")
log.Info("http server stopped")
}
}()

Expand All @@ -71,7 +71,7 @@ func main() {
}

var transmissionReader, stateReader monitoring.AccountReader
if cfg.Feature.TestMode {
if cfg.Feature.TestOnlyFakeReaders {
transmissionReader = monitoring.NewRandomDataReader(bgCtx, wg, "transmission", log.With("component", "rand-reader", "account", "transmissions"))
stateReader = monitoring.NewRandomDataReader(bgCtx, wg, "state", log.With("component", "rand-reader", "account", "state"))
} else {
Expand All @@ -81,7 +81,6 @@ func main() {

monitor := monitoring.NewMultiFeedMonitor(
cfg.Solana,
cfg.Feeds.Feeds,

log,
transmissionReader, stateReader,
Expand All @@ -96,11 +95,40 @@ func main() {
configSetSimplifiedSchema,
transmissionSchema,
)
wg.Add(1)
go func() {
defer wg.Done()
monitor.Start(bgCtx, wg)
}()

if cfg.Feeds.FilePath != "" {
wg.Add(1)
go func() {
defer wg.Done()
monitor.Start(bgCtx, wg, cfg.Feeds.Feeds)
}()
} else if cfg.Feeds.URL != "" {
source := monitoring.NewRDDSource(cfg.Feeds.URL)
if cfg.Feature.TestOnlyFakeRdd {
source = monitoring.NewFakeRDDSource(2, 10)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could u not add this to the statement where we check the flag?

}
rddPoller := monitoring.NewSourcePoller(
source,
log.With("component", "rdd-poller"),
cfg.Feeds.RDDPollInterval,
cfg.Feeds.RDDReadTimeout,
0, // no buffering!
)
wg.Add(1)
go func() {
defer wg.Done()
rddPoller.Start(bgCtx)
}()
manager := monitoring.NewManager(
log.With("component", "manager"),
rddPoller,
)
wg.Add(1)
go func() {
defer wg.Done()
manager.Start(bgCtx, wg, monitor.Start)
}()
} // the config package makes sure there is either a FilePath or a URL set!

osSignalsCh := make(chan os.Signal, 1)
signal.Notify(osSignalsCh, syscall.SIGINT, syscall.SIGTERM)
Expand All @@ -112,7 +140,7 @@ func main() {
log.Errorw("failed to shut http server down", "error", err)
}
wg.Wait()
log.Info("monitor stopped")
log.Info("process stopped")
}

// logger config
Expand Down
17 changes: 12 additions & 5 deletions pkg/monitoring/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,26 +104,33 @@ func parseEnvVars(cfg *Config) error {
if err != nil {
return fmt.Errorf("failed to parse env var FEEDS_RDD_READ_TIMEOUT, see https://pkg.go.dev/time#ParseDuration: %w", err)
}
cfg.Feeds.RddReadTimeout = readTimeout
cfg.Feeds.RDDReadTimeout = readTimeout
}
if value, isPresent := os.LookupEnv("FEEDS_RDD_POLL_INTERVAL"); isPresent {
pollInterval, err := time.ParseDuration(value)
if err != nil {
return fmt.Errorf("failed to parse env var FEEDS_RDD_POLL_INTERVAL, see https://pkg.go.dev/time#ParseDuration: %w", err)
}
cfg.Feeds.RddPollInterval = pollInterval
cfg.Feeds.RDDPollInterval = pollInterval
}

if value, isPresent := os.LookupEnv("HTTP_ADDRESS"); isPresent {
cfg.Http.Address = value
}

if value, isPresent := os.LookupEnv("FEATURE_TEST_MODE"); isPresent {
if value, isPresent := os.LookupEnv("FEATURE_TEST_ONLY_FAKE_READERS"); isPresent {
isTestMode, err := strconv.ParseBool(value)
if err != nil {
return fmt.Errorf("failed to parse boolean env var '%s'. See https://pkg.go.dev/strconv#ParseBool", "FEATURE_TEST_MODE")
return fmt.Errorf("failed to parse boolean env var '%s'. See https://pkg.go.dev/strconv#ParseBool", "FEATURE_TEST_ONLY_FAKE_READERS")
}
cfg.Feature.TestMode = isTestMode
cfg.Feature.TestOnlyFakeReaders = isTestMode
}
if value, isPresent := os.LookupEnv("FEATURE_TEST_ONLY_FAKE_RDD"); isPresent {
isTestMode, err := strconv.ParseBool(value)
if err != nil {
return fmt.Errorf("failed to parse boolean env var '%s'. See https://pkg.go.dev/strconv#ParseBool", "FEATURE_TEST_ONLY_FAKE_RDD")
}
cfg.Feature.TestOnlyFakeRdd = isTestMode
}

return nil
Expand Down
8 changes: 4 additions & 4 deletions pkg/monitoring/config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ func applyDefaults(cfg *Config) {
if cfg.Solana.PollInterval == 0 {
cfg.Solana.PollInterval = 5 * time.Second
}
if cfg.Feeds.RddReadTimeout == 0 {
cfg.Feeds.RddReadTimeout = 1 * time.Second
if cfg.Feeds.RDDReadTimeout == 0 {
cfg.Feeds.RDDReadTimeout = 1 * time.Second
}
if cfg.Feeds.RddPollInterval == 0 {
cfg.Feeds.RddPollInterval = 10 * time.Second
if cfg.Feeds.RDDPollInterval == 0 {
cfg.Feeds.RDDPollInterval = 10 * time.Second
}
}
81 changes: 36 additions & 45 deletions pkg/monitoring/config/feeds.go
Original file line number Diff line number Diff line change
@@ -1,74 +1,65 @@
package config

import (
"context"
"encoding/json"
"fmt"
"net/http"
"os"

"github.com/gagliardetto/solana-go"
)

func populateFeeds(cfg *Config) error {
feeds := []jsonFeedConfig{}
if cfg.Feeds.URL != "" {
rddCtx, cancel := context.WithTimeout(context.Background(), cfg.Feeds.RddReadTimeout)
defer cancel()
readFeedsReq, err := http.NewRequestWithContext(rddCtx, http.MethodGet, cfg.Feeds.URL, nil)
if err != nil {
return fmt.Errorf("unable to build a request to the RDD URL '%s': %w", cfg.Feeds.URL, err)
}
httpClient := &http.Client{}
res, err := httpClient.Do(readFeedsReq)
if err != nil {
return fmt.Errorf("unable to fetch RDD data from URL '%s': %w", cfg.Feeds.URL, err)
}
defer res.Body.Close()
decoder := json.NewDecoder(res.Body)
if err := decoder.Decode(&feeds); err != nil {
return fmt.Errorf("unable to unmarshal feeds config from RDD URL '%s': %w", cfg.Feeds.URL, err)
}
} else if cfg.Feeds.FilePath != "" {
contents, err := os.ReadFile(cfg.Feeds.FilePath)
if err != nil {
return fmt.Errorf("unable to read feeds file '%s': %w", cfg.Feeds.FilePath, err)
}
if err = json.Unmarshal(contents, &feeds); err != nil {
return fmt.Errorf("unable to unmarshal feeds config from file '%s': %w", cfg.Feeds.FilePath, err)
}
if cfg.Feeds.FilePath == "" {
return nil
}
contents, err := os.ReadFile(cfg.Feeds.FilePath)
if err != nil {
return fmt.Errorf("unable to read feeds file '%s': %w", cfg.Feeds.FilePath, err)
}
rawFeeds := []RawFeedConfig{}
if err = json.Unmarshal(contents, &rawFeeds); err != nil {
return fmt.Errorf("unable to unmarshal feeds config from file '%s': %w", cfg.Feeds.FilePath, err)
}
feeds, err := NewFeeds(rawFeeds)
if err != nil {
return err
}
cfg.Feeds.Feeds = feeds
return nil
}

cfg.Feeds.Feeds = make([]Feed, len(feeds))
for i, feed := range feeds {
contractAddress, err := solana.PublicKeyFromBase58(feed.ContractAddressBase58)
func NewFeeds(rawFeeds []RawFeedConfig) ([]Feed, error) {
feeds := make([]Feed, len(rawFeeds))
for i, rawFeed := range rawFeeds {
contractAddress, err := solana.PublicKeyFromBase58(rawFeed.ContractAddressBase58)
if err != nil {
return fmt.Errorf("failed to parse program id '%s' from JSON at index i=%d: %w", feed.ContractAddressBase58, i, err)
return nil, fmt.Errorf("failed to parse program id '%s' from JSON at index i=%d: %w", rawFeed.ContractAddressBase58, i, err)
}
transmissionsAccount, err := solana.PublicKeyFromBase58(feed.TransmissionsAccountBase58)
transmissionsAccount, err := solana.PublicKeyFromBase58(rawFeed.TransmissionsAccountBase58)
if err != nil {
return fmt.Errorf("failed to parse transmission account '%s' from JSON at index i=%d: %w", feed.TransmissionsAccountBase58, i, err)
return nil, fmt.Errorf("failed to parse transmission account '%s' from JSON at index i=%d: %w", rawFeed.TransmissionsAccountBase58, i, err)
}
stateAccount, err := solana.PublicKeyFromBase58(feed.StateAccountBase58)
stateAccount, err := solana.PublicKeyFromBase58(rawFeed.StateAccountBase58)
if err != nil {
return fmt.Errorf("failed to parse state account '%s' from JSON at index i=%d: %w", feed.StateAccountBase58, i, err)
return nil, fmt.Errorf("failed to parse state account '%s' from JSON at index i=%d: %w", rawFeed.StateAccountBase58, i, err)
}
cfg.Feeds.Feeds[i] = Feed{
feed.FeedName,
feed.FeedPath,
feed.Symbol,
feed.Heartbeat,
feed.ContractType,
feed.ContractStatus,
feeds[i] = Feed{
rawFeed.FeedName,
rawFeed.FeedPath,
rawFeed.Symbol,
rawFeed.Heartbeat,
rawFeed.ContractType,
rawFeed.ContractStatus,
contractAddress,
transmissionsAccount,
stateAccount,
}
}
return nil
return feeds, nil
}

type jsonFeedConfig struct {
// RawFeedConfig should only be used for deserializing responses from the RDD.
type RawFeedConfig struct {
FeedName string `json:"name,omitempty"`
FeedPath string `json:"path,omitempty"`
Symbol string `json:"symbol,omitempty"`
Expand Down
12 changes: 8 additions & 4 deletions pkg/monitoring/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,12 @@ type SchemaRegistry struct {
}

type Feeds struct {
URL string
// If URL is set, the RDD tracker will start and override any feed configs extracted from FilePath!
FilePath string
URL string
RDDReadTimeout time.Duration
RDDPollInterval time.Duration
Feeds []Feed
RddReadTimeout time.Duration
RddPollInterval time.Duration
}

type Feed struct {
Expand All @@ -74,5 +75,8 @@ type Http struct {
}

type Feature struct {
TestMode bool
// If set, the monitor will not read from a chain instead from a source of random state snapshots.
TestOnlyFakeReaders bool
// If set, the monitor will not read from the RDD, instead it will get data from a local source of random feeds configurations.
TestOnlyFakeRdd bool
}
14 changes: 10 additions & 4 deletions pkg/monitoring/feed_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,21 @@ func TestFeedMonitor(t *testing.T) {
readTimeout := 1 * time.Second
var bufferCapacity uint32 = 0 // no buffering

transmissionPoller := NewPoller(
transmissionPoller := NewSourcePoller(
NewSolanaSource(
transmissionAccount,
transmissionReader,
),
logger.NewNullLogger(),
transmissionAccount, transmissionReader,
pollInterval, readTimeout,
bufferCapacity,
)
statePoller := NewPoller(
statePoller := NewSourcePoller(
NewSolanaSource(
stateAccount,
stateReader,
),
logger.NewNullLogger(),
stateAccount, stateReader,
pollInterval, readTimeout,
bufferCapacity,
)
Expand Down
Loading