Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

telemetry: Push telemetry event to OpenSearch on pipeline startup #74

Merged
merged 30 commits into from
Jun 16, 2023
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
e698ba1
Add telemetry option to config and initialize OpenSearch client
algochoi May 9, 2023
56e54a8
Add telemetry option to example config
algochoi May 9, 2023
88620f2
Fix json marshalling
algochoi May 9, 2023
2d9f4b9
Add telemetry GUID to pipeline metadata file
algochoi May 12, 2023
f5412f6
Remove debug log
algochoi May 12, 2023
b97c94e
Add simple tests and refactor
algochoi May 15, 2023
a68e65e
Merge branch 'master' into telemetry-configs
algochoi May 15, 2023
6ac3c15
Add initial readme
algochoi May 15, 2023
922f9bd
Fix lint errors
algochoi May 15, 2023
471d114
Refactor client to interface
algochoi May 16, 2023
acbf6f7
Fix pipeline telemetry test
algochoi May 16, 2023
a30719d
Merge branch 'master' into telemetry-configs
algochoi May 16, 2023
31cd2fb
Clean up test file
algochoi May 19, 2023
b0bb38a
Fix README
algochoi May 19, 2023
55520d6
Clean up unused code in tests
algochoi May 19, 2023
80b82ff
Update conduit.yml example
algochoi May 19, 2023
b4acb9a
Remove SetTelemetryClient from InitProvider interface
algochoi May 22, 2023
ec53b18
Move telemetry credentials and uri to config.yml instead of hardcodin…
algochoi May 23, 2023
30adcc3
Update README
algochoi May 24, 2023
234ca36
Add conduit version string to event and fix tests
algochoi May 24, 2023
96a7c9c
Update conduit.yml examples
algochoi May 24, 2023
585339d
Fix gci lint errors
algochoi May 24, 2023
4b12960
Change client in pipeline to interface
algochoi May 24, 2023
6c33757
Fix tests
algochoi May 25, 2023
cce5f71
Merge branch 'master' into telemetry-configs
algochoi Jun 6, 2023
61f2530
Add extra argument for telemetry client initialization
algochoi Jun 6, 2023
925e5eb
Default telemetry to be off until we point to opensearch production s…
algochoi Jun 14, 2023
105bfc5
Merge branch 'master' into telemetry-configs
algochoi Jun 14, 2023
f32193f
Respond to PR comments by fixing README and use opensearch v2
algochoi Jun 16, 2023
c337917
Merge branch 'master' into telemetry-configs
winder Jun 16, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions conduit/data/block_export_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package data

import (
sdk "github.com/algorand/go-algorand-sdk/v2/types"

"github.com/algorand/conduit/conduit/telemetry"
)

// RoundProvider is the interface which all data types sent to Exporters should implement
Expand All @@ -16,6 +18,7 @@ type InitProvider interface {
GetGenesis() *sdk.Genesis
SetGenesis(*sdk.Genesis)
NextDBRound() sdk.Round
GetTelemetryClient() *telemetry.OpenSearchClient
}

// BlockData is provided to the Exporter on each round.
Expand Down
11 changes: 11 additions & 0 deletions conduit/data/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ type Metrics struct {
Prefix string `yaml:"prefix"`
}

// Telemetry configs for sending Telemetry to OpenSearch
type Telemetry struct {
Enabled bool `yaml:"enabled"`
URI string `yaml:"uri"`
Index string `yaml:"index"`
UserName string `yaml:"username"`
Password string `yaml:"password"`
}

// Config stores configuration specific to the conduit pipeline
type Config struct {
// ConduitArgs are the program inputs. Should not be serialized for config.
Expand All @@ -61,6 +70,8 @@ type Config struct {
RetryCount uint64 `yaml:"retry-count"`
// RetryDelay is a duration amount interpreted from a string
RetryDelay time.Duration `yaml:"retry-delay"`

Telemetry Telemetry `yaml:"telemetry"`
}

// Valid validates pipeline config
Expand Down
1 change: 0 additions & 1 deletion conduit/data/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ exporters:
name: "noop"
config:
connectionstring: ""`, "field exporters not found"},

{"config not configs", `---
log-level: info
importer:
Expand Down
19 changes: 14 additions & 5 deletions conduit/init_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,23 @@ package conduit

import (
sdk "github.com/algorand/go-algorand-sdk/v2/types"

"github.com/algorand/conduit/conduit/telemetry"
)

// PipelineInitProvider algod based init provider
type PipelineInitProvider struct {
currentRound *sdk.Round
genesis *sdk.Genesis
currentRound *sdk.Round
genesis *sdk.Genesis
telemetryClient *telemetry.OpenSearchClient
}

// MakePipelineInitProvider constructs an init provider.
func MakePipelineInitProvider(currentRound *sdk.Round, genesis *sdk.Genesis) *PipelineInitProvider {
func MakePipelineInitProvider(currentRound *sdk.Round, genesis *sdk.Genesis, client *telemetry.OpenSearchClient) *PipelineInitProvider {
return &PipelineInitProvider{
currentRound: currentRound,
genesis: genesis,
currentRound: currentRound,
genesis: genesis,
telemetryClient: client,
}
}

Expand All @@ -32,3 +36,8 @@ func (a *PipelineInitProvider) GetGenesis() *sdk.Genesis {
func (a *PipelineInitProvider) NextDBRound() sdk.Round {
return *a.currentRound
}

// GetTelemetryClient gets the telemetry state in the init provider
func (a *PipelineInitProvider) GetTelemetryClient() *telemetry.OpenSearchClient {
return a.telemetryClient
}
1 change: 1 addition & 0 deletions conduit/pipeline/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type state struct {
GenesisHash string `json:"genesis-hash"`
Network string `json:"network"`
NextRound uint64 `json:"next-round"`
TelemetryID string `json:"telemetry-id,omitempty"`
}

// encodeToFile writes the state object to the dataDir
Expand Down
3 changes: 3 additions & 0 deletions conduit/pipeline/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,18 @@ func TestBlockMetaDataFile(t *testing.T) {
assert.Equal(t, pipelineMetadata.GenesisHash, metaData.GenesisHash)
assert.Equal(t, pipelineMetadata.NextRound, metaData.NextRound)
assert.Equal(t, pipelineMetadata.Network, metaData.Network)
assert.Equal(t, pipelineMetadata.TelemetryID, metaData.TelemetryID)

// Test that file encodes correctly
pipelineMetadata.GenesisHash = "HASH"
pipelineMetadata.NextRound = 7
pipelineMetadata.TelemetryID = "SOME_ID"
err = pipelineMetadata.encodeToFile(datadir)
assert.NoError(t, err)
metaData, err = readBlockMetadata(datadir)
assert.NoError(t, err)
assert.Equal(t, "HASH", metaData.GenesisHash)
assert.Equal(t, uint64(7), metaData.NextRound)
assert.Equal(t, pipelineMetadata.Network, metaData.Network)
assert.Equal(t, pipelineMetadata.TelemetryID, metaData.TelemetryID)
}
42 changes: 40 additions & 2 deletions conduit/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/algorand/conduit/conduit/plugins/exporters"
"github.com/algorand/conduit/conduit/plugins/importers"
"github.com/algorand/conduit/conduit/plugins/processors"
"github.com/algorand/conduit/conduit/telemetry"
)

// Pipeline is a struct that orchestrates the entire
Expand Down Expand Up @@ -195,6 +196,25 @@ func (p *pipelineImpl) pluginRoundOverride() (uint64, error) {
return pluginOverride, nil
}

// initializeTelemetry initializes telemetry and reads or sets the GUID in the metadata.
func (p *pipelineImpl) initializeTelemetry() (*telemetry.OpenSearchClient, error) {
algochoi marked this conversation as resolved.
Show resolved Hide resolved
telemetryConfig := telemetry.MakeTelemetryConfig(p.cfg.Telemetry.URI, p.cfg.Telemetry.Index, p.cfg.Telemetry.UserName, p.cfg.Telemetry.Password)
telemetryClient, err := telemetry.MakeOpenSearchClient(telemetryConfig)
if err != nil {
return nil, fmt.Errorf("failed to initialize telemetry: %w", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depending on the events we add in the future, an implementation that puts them in the log file could be helpful in this case.

}
p.logger.Infof("Telemetry initialized with URI: %s", telemetryConfig.URI)

// If GUID is not in metadata, save it. Otherwise, use the GUID from metadata.
if p.pipelineMetadata.TelemetryID == "" {
p.pipelineMetadata.TelemetryID = telemetryClient.TelemetryConfig.GUID
} else {
telemetryClient.TelemetryConfig.GUID = p.pipelineMetadata.TelemetryID
}

return telemetryClient, nil
}

// Init prepares the pipeline for processing block data
func (p *pipelineImpl) Init() error {
p.logger.Infof("Starting Pipeline Initialization")
Expand Down Expand Up @@ -254,8 +274,26 @@ func (p *pipelineImpl) Init() error {

// InitProvider
round := sdk.Round(p.pipelineMetadata.NextRound)
// Initial genesis object is nil--gets updated after importer.Init
var initProvider data.InitProvider = conduit.MakePipelineInitProvider(&round, nil)

// Initialize Telemetry
var telemetryClient *telemetry.OpenSearchClient
if p.cfg.Telemetry.Enabled {
// If telemetry cannot be initialized, log a warning and continue
// pipeline initialization.
var telemetryErr error
telemetryClient, telemetryErr = p.initializeTelemetry()
if telemetryErr != nil {
p.logger.Warn("Telemetry initialization failed. Continuing without telemetry.")
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
}
// Try sending a startup event. If it fails, log a warning and continue
event := telemetryClient.MakeTelemetryStartupEvent()
if telemetryErr = telemetryClient.SendEvent(event); telemetryErr != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, without config modifications, we'll always log a warning here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pipeline tries to initialize the client based on the config file. Since the default config file doesn't specify a URI, we don't send an event for now. We can change the default config.yml when we have an OpenSearch production server ready or alternatively hardcode the URI/credentials in like we do in algod.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it the Telemetry initialization failed warning, or failed to send telemetry event?

Let's default Enabled to false, so this block wouldn't be hit anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it the Telemetry initialization failed warning, or failed to send telemetry event?

Currently you'll get both warnings - it might be too redundant so I'll change it to send one or the other.

Let's default Enabled to false, so this block wouldn't be hit anyway.

Good point, will do 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try to come up with a design that lets us gracefully disable telemetry. Keep in mind that plugins are going to be grabbing the telemetryClient object and they wont be able to check for Enabled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try to come up with a design that lets us gracefully disable telemetry. Keep in mind that plugins are going to be grabbing the telemetryClient object and they wont be able to check for Enabled.

The telemetryClient object has its own Enabled bool: https://github.com/algorand/conduit/pull/74/files#diff-ada99fa4b32a319d368310f322561a3e2292fefcd48533e763dff3069fe64cf0R11

This reads the config file upon pipeline initialization, checks if the config file has set the telemetry enabled bool to true, and initializes the client object (which then sets the client's bool to true). I think the current implementation lets us dynamically disable telemetry on the client as well.

p.logger.Warnf("failed to send telemetry event: %s", telemetryErr)
}
winder marked this conversation as resolved.
Show resolved Hide resolved
}

// Initial genesis object is nil and gets updated after importer.Init
var initProvider data.InitProvider = conduit.MakePipelineInitProvider(&round, nil, telemetryClient)
p.initProvider = &initProvider

// Initialize Importer
Expand Down
30 changes: 30 additions & 0 deletions conduit/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,36 @@ func TestPipelineMetricsConfigs(t *testing.T) {
assert.Equal(t, pImpl.cfg.Metrics.Prefix, prefixOverride)
}

func TestPipelineTelemetryConfigs(t *testing.T) {
pImpl, _, _, _, _ := mockPipeline(t, "")

// telemetry OFF, check that client is nil
pImpl.cfg.Telemetry = data.Telemetry{
Enabled: false,
}
pImpl.Init()
client := (*pImpl.initProvider).GetTelemetryClient()
assert.Nil(t, client)

// telemetry ON
pImpl.cfg.Telemetry = data.Telemetry{
Enabled: true,
URI: "test-uri",
Index: "test-index",
UserName: "test-username",
Password: "test-password",
}
pImpl.Init()
client = (*pImpl.initProvider).GetTelemetryClient()
assert.NotNil(t, client)
assert.NotNil(t, client.Client)
assert.Equal(t, true, client.TelemetryConfig.Enable)
assert.Equal(t, "test-uri", client.TelemetryConfig.URI)
assert.Equal(t, "test-index", client.TelemetryConfig.Index)
assert.Equal(t, "test-username", client.TelemetryConfig.UserName)
assert.Equal(t, "test-password", client.TelemetryConfig.Password)
}

func TestRoundOverrideValidConflict(t *testing.T) {
t.Run("processor_no_conflict", func(t *testing.T) {
pImpl, _, mImporter, mProcessor, _ := mockPipeline(t, "")
Expand Down
10 changes: 5 additions & 5 deletions conduit/plugins/exporters/filewriter/file_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestExporterInitDefaults(t *testing.T) {
defer fileExp.Close()
pcfg := plugins.MakePluginConfig(fmt.Sprintf("block-dir: %s", tc.blockdir))
pcfg.DataDir = tempdir
err := fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, nil), pcfg, logger)
err := fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, nil, nil), pcfg, logger)
require.NoError(t, err)
pluginConfig := fileExp.Config()
assert.Contains(t, pluginConfig, fmt.Sprintf("block-dir: %s", tc.expected))
Expand All @@ -94,14 +94,14 @@ func TestExporterInit(t *testing.T) {
defer fileExp.Close()

// creates a new output file
err := fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, nil), plugins.MakePluginConfig(config), logger)
err := fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, nil, nil), plugins.MakePluginConfig(config), logger)
pluginConfig := fileExp.Config()
configWithDefault := config + "filename-pattern: '%[1]d_block.json'\n" + "drop-certificate: false\n"
assert.Equal(t, configWithDefault, string(pluginConfig))
fileExp.Close()

// can open existing file
err = fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, nil), plugins.MakePluginConfig(config), logger)
err = fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, nil, nil), plugins.MakePluginConfig(config), logger)
assert.NoError(t, err)
fileExp.Close()
}
Expand All @@ -125,7 +125,7 @@ func sendData(t *testing.T, fileExp exporters.Exporter, config string, numRounds

// initialize
rnd := sdk.Round(0)
err = fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&rnd, nil), plugins.MakePluginConfig(config), logger)
err = fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&rnd, nil, nil), plugins.MakePluginConfig(config), logger)
require.NoError(t, err)

// incorrect round
Expand Down Expand Up @@ -186,7 +186,7 @@ func TestExporterClose(t *testing.T) {
config, _ := getConfig(t)
fileExp := fileCons.New()
rnd := sdk.Round(0)
fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&rnd, nil), plugins.MakePluginConfig(config), logger)
fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&rnd, nil, nil), plugins.MakePluginConfig(config), logger)
require.NoError(t, fileExp.Close())
}

Expand Down
14 changes: 7 additions & 7 deletions conduit/plugins/exporters/postgresql/postgresql_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,20 @@ func TestExporterMetadata(t *testing.T) {
func TestConnectDisconnectSuccess(t *testing.T) {
pgsqlExp := pgsqlConstructor.New()
cfg := plugins.MakePluginConfig("test: true\nconnection-string: ''")
assert.NoError(t, pgsqlExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, &sdk.Genesis{}), cfg, logger))
assert.NoError(t, pgsqlExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, &sdk.Genesis{}, nil), cfg, logger))
assert.NoError(t, pgsqlExp.Close())
}

func TestConnectUnmarshalFailure(t *testing.T) {
pgsqlExp := pgsqlConstructor.New()
cfg := plugins.MakePluginConfig("'")
assert.ErrorContains(t, pgsqlExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, nil), cfg, logger), "connect failure in unmarshalConfig")
assert.ErrorContains(t, pgsqlExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, nil, nil), cfg, logger), "connect failure in unmarshalConfig")
}

func TestConnectDbFailure(t *testing.T) {
pgsqlExp := pgsqlConstructor.New()
cfg := plugins.MakePluginConfig("")
assert.ErrorContains(t, pgsqlExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, nil), cfg, logger), "connection string is empty for postgres")
assert.ErrorContains(t, pgsqlExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, nil, nil), cfg, logger), "connection string is empty for postgres")
}

func TestConfigDefault(t *testing.T) {
Expand All @@ -70,7 +70,7 @@ func TestConfigDefault(t *testing.T) {
func TestReceiveInvalidBlock(t *testing.T) {
pgsqlExp := pgsqlConstructor.New()
cfg := plugins.MakePluginConfig("test: true")
assert.NoError(t, pgsqlExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, &sdk.Genesis{}), cfg, logger))
assert.NoError(t, pgsqlExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, &sdk.Genesis{}, nil), cfg, logger))
invalidBlock := data.BlockData{
BlockHeader: sdk.BlockHeader{
Round: 1,
Expand All @@ -86,7 +86,7 @@ func TestReceiveInvalidBlock(t *testing.T) {
func TestReceiveAddBlockSuccess(t *testing.T) {
pgsqlExp := pgsqlConstructor.New()
cfg := plugins.MakePluginConfig("test: true")
assert.NoError(t, pgsqlExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, &sdk.Genesis{}), cfg, logger))
assert.NoError(t, pgsqlExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, &sdk.Genesis{}, nil), cfg, logger))

block := data.BlockData{
BlockHeader: sdk.BlockHeader{},
Expand All @@ -102,7 +102,7 @@ func TestPostgresqlExporterInit(t *testing.T) {
cfg := plugins.MakePluginConfig("test: true")

// genesis hash mismatch
initProvider := conduit.MakePipelineInitProvider(&round, &sdk.Genesis{})
initProvider := conduit.MakePipelineInitProvider(&round, &sdk.Genesis{}, nil)
initProvider.SetGenesis(&sdk.Genesis{
Network: "test",
})
Expand All @@ -111,7 +111,7 @@ func TestPostgresqlExporterInit(t *testing.T) {

// incorrect round
round = 1
err = pgsqlExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, &sdk.Genesis{}), cfg, logger)
err = pgsqlExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, &sdk.Genesis{}, nil), cfg, logger)
assert.Contains(t, err.Error(), "initializing block round 1 but next round to account is 0")
}

Expand Down
Loading