Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pipelineImpl: de-pointerize plugin interface fields #113

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 33 additions & 36 deletions conduit/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@

initProvider *data.InitProvider

importer *importers.Importer
processors []*processors.Processor
exporter *exporters.Exporter
importer importers.Importer
processors []processors.Processor
exporter exporters.Exporter
completeCallback []conduit.OnCompleteFunc

pipelineMetadata state
Expand All @@ -72,30 +72,30 @@
}

func (p *pipelineImpl) registerLifecycleCallbacks() {
if v, ok := (*p.importer).(conduit.Completed); ok {
if v, ok := p.importer.(conduit.Completed); ok {
p.completeCallback = append(p.completeCallback, v.OnComplete)
}
for _, processor := range p.processors {
if v, ok := (*processor).(conduit.Completed); ok {
if v, ok := processor.(conduit.Completed); ok {
p.completeCallback = append(p.completeCallback, v.OnComplete)
}
}
if v, ok := (*p.exporter).(conduit.Completed); ok {
if v, ok := p.exporter.(conduit.Completed); ok {
p.completeCallback = append(p.completeCallback, v.OnComplete)
}
}

func (p *pipelineImpl) registerPluginMetricsCallbacks() {
var collectors []prometheus.Collector
if v, ok := (*p.importer).(conduit.PluginMetrics); ok {
if v, ok := p.importer.(conduit.PluginMetrics); ok {
collectors = append(collectors, v.ProvideMetrics(p.cfg.Metrics.Prefix)...)
}
for _, processor := range p.processors {
if v, ok := (*processor).(conduit.PluginMetrics); ok {
if v, ok := processor.(conduit.PluginMetrics); ok {
collectors = append(collectors, v.ProvideMetrics(p.cfg.Metrics.Prefix)...)
}
}
if v, ok := (*p.exporter).(conduit.PluginMetrics); ok {
if v, ok := p.exporter.(conduit.PluginMetrics); ok {
collectors = append(collectors, v.ProvideMetrics(p.cfg.Metrics.Prefix)...)
}
for _, c := range collectors {
Expand All @@ -108,8 +108,8 @@
func (p *pipelineImpl) makeConfig(cfg data.NameConfigPair, pluginType plugins.PluginType) (*log.Logger, plugins.PluginConfig, error) {
configs, err := yaml.Marshal(cfg.Config)
if err != nil {
return nil, plugins.PluginConfig{}, fmt.Errorf("makeConfig(): could not serialize config: %w", err)
}

Check warning on line 112 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L111-L112

Added lines #L111 - L112 were not covered by tests

lgr := log.New()
lgr.SetOutput(p.logger.Out)
Expand All @@ -122,8 +122,8 @@
config.DataDir = path.Join(p.cfg.ConduitArgs.ConduitDataDir, fmt.Sprintf("%s_%s", pluginType, cfg.Name))
err = os.MkdirAll(config.DataDir, os.ModePerm)
if err != nil {
return nil, plugins.PluginConfig{}, fmt.Errorf("makeConfig: unable to create plugin data directory: %w", err)
}

Check warning on line 126 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L125-L126

Added lines #L125 - L126 were not covered by tests
}

return lgr, config, nil
Expand All @@ -143,23 +143,23 @@
}
var parts []overridePart

if v, ok := (*p.importer).(conduit.RoundRequestor); ok {
if v, ok := p.importer.(conduit.RoundRequestor); ok {
parts = append(parts, overridePart{
RoundRequest: v.RoundRequest,
cfg: p.cfg.Importer,
t: plugins.Importer,
})
}
for idx, processor := range p.processors {
if v, ok := (*processor).(conduit.RoundRequestor); ok {
if v, ok := processor.(conduit.RoundRequestor); ok {
parts = append(parts, overridePart{
RoundRequest: v.RoundRequest,
cfg: p.cfg.Processors[idx],
t: plugins.Processor,
})
}
}
if v, ok := (*p.exporter).(conduit.RoundRequestor); ok {
if v, ok := p.exporter.(conduit.RoundRequestor); ok {
parts = append(parts, overridePart{
RoundRequest: v.RoundRequest,
cfg: p.cfg.Exporter,
Expand All @@ -173,8 +173,8 @@
for _, part := range parts {
_, config, err := p.makeConfig(part.cfg, part.t)
if err != nil {
return 0, err
}

Check warning on line 177 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L176-L177

Added lines #L176 - L177 were not covered by tests
rnd, err := part.RoundRequest(config)
if err != nil {
return 0, err
Expand Down Expand Up @@ -204,16 +204,16 @@
telemetryConfig := telemetry.MakeTelemetryConfig(p.cfg.Telemetry.URI, p.cfg.Telemetry.Index, p.cfg.Telemetry.UserName, p.cfg.Telemetry.Password)
telemetryClient, err := telemetry.MakeOpenSearchClient(telemetryConfig)
if err != nil {
return nil, fmt.Errorf("failed to initialize telemetry: %w", err)
}

Check warning on line 208 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L207-L208

Added lines #L207 - L208 were not covered by tests
p.logger.Infof("Telemetry initialized with URI: %s", telemetryConfig.URI)

// If GUID is not in metadata, save it. Otherwise, use the GUID from metadata.
if p.pipelineMetadata.TelemetryID == "" {
p.pipelineMetadata.TelemetryID = telemetryClient.TelemetryConfig.GUID
} else {
telemetryClient.TelemetryConfig.GUID = p.pipelineMetadata.TelemetryID
}

Check warning on line 216 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L215-L216

Added lines #L215 - L216 were not covered by tests

return telemetryClient, nil
}
Expand All @@ -232,12 +232,12 @@
var err error
profFile, err := os.Create(p.cfg.CPUProfile)
if err != nil {
return fmt.Errorf("Pipeline.Init(): unable to create profile: %w", err)

Check warning on line 235 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L235

Added line #L235 was not covered by tests
}
p.profFile = profFile
err = pprof.StartCPUProfile(profFile)
if err != nil {
return fmt.Errorf("Pipeline.Init(): unable to start pprof: %w", err)

Check warning on line 240 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L240

Added line #L240 was not covered by tests
}
}

Expand Down Expand Up @@ -286,7 +286,7 @@
var telemetryErr error
telemetryClient, telemetryErr = p.initializeTelemetry()
if telemetryErr != nil {
p.logger.Warnf("Telemetry initialization failed, continuing without telemetry: %s", telemetryErr)

Check warning on line 289 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L289

Added line #L289 was not covered by tests
} else {
// Try sending a startup event. If it fails, log a warning and continue
event := telemetryClient.MakeTelemetryStartupEvent()
Expand All @@ -304,16 +304,16 @@
{
importerLogger, pluginConfig, err := p.makeConfig(p.cfg.Importer, plugins.Importer)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not make %s config: %w", p.cfg.Importer.Name, err)
}

Check warning on line 308 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L307-L308

Added lines #L307 - L308 were not covered by tests
err = (*p.importer).Init(p.ctx, *p.initProvider, pluginConfig, importerLogger)
err = p.importer.Init(p.ctx, *p.initProvider, pluginConfig, importerLogger)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize importer (%s): %w", p.cfg.Importer.Name, err)
}

Check warning on line 312 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L311-L312

Added lines #L311 - L312 were not covered by tests
genesis, err := (*p.importer).GetGenesis()
genesis, err := p.importer.GetGenesis()
if err != nil {
return fmt.Errorf("Pipeline.GetGenesis(): could not obtain Genesis from the importer (%s): %w", p.cfg.Importer.Name, err)
}

Check warning on line 316 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L315-L316

Added lines #L315 - L316 were not covered by tests
(*p.initProvider).SetGenesis(genesis)

// write pipeline metadata
Expand All @@ -326,8 +326,8 @@
p.pipelineMetadata.Network = genesis.Network
err = p.pipelineMetadata.encodeToFile(p.cfg.ConduitArgs.ConduitDataDir)
if err != nil {
return fmt.Errorf("Pipeline.Init() failed to write metadata to file: %w", err)
}

Check warning on line 330 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L329-L330

Added lines #L329 - L330 were not covered by tests

p.logger.Infof("Initialized Importer: %s", p.cfg.Importer.Name)
}
Expand All @@ -337,11 +337,11 @@
ncPair := p.cfg.Processors[idx]
logger, config, err := p.makeConfig(ncPair, plugins.Processor)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize processor (%s): %w", ncPair, err)

Check warning on line 340 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L340

Added line #L340 was not covered by tests
}
err = (*processor).Init(p.ctx, *p.initProvider, config, logger)
err = processor.Init(p.ctx, *p.initProvider, config, logger)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize processor (%s): %w", ncPair.Name, err)

Check warning on line 344 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L344

Added line #L344 was not covered by tests
}
p.logger.Infof("Initialized Processor: %s", ncPair.Name)
}
Expand All @@ -350,12 +350,12 @@
{
logger, config, err := p.makeConfig(p.cfg.Exporter, plugins.Exporter)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize processor (%s): %w", p.cfg.Exporter.Name, err)
}

Check warning on line 354 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L353-L354

Added lines #L353 - L354 were not covered by tests
err = (*p.exporter).Init(p.ctx, *p.initProvider, config, logger)
err = p.exporter.Init(p.ctx, *p.initProvider, config, logger)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize Exporter (%s): %w", p.cfg.Exporter.Name, err)
}

Check warning on line 358 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L357-L358

Added lines #L357 - L358 were not covered by tests
p.logger.Infof("Initialized Exporter: %s", p.cfg.Exporter.Name)
}

Expand Down Expand Up @@ -388,20 +388,20 @@
}
}

if err := (*p.importer).Close(); err != nil {
if err := p.importer.Close(); err != nil {

Check warning on line 391 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L391

Added line #L391 was not covered by tests
// Log and continue on closing the rest of the pipeline
p.logger.Errorf("Pipeline.Stop(): Importer (%s) error on close: %v", (*p.importer).Metadata().Name, err)
p.logger.Errorf("Pipeline.Stop(): Importer (%s) error on close: %v", p.importer.Metadata().Name, err)

Check warning on line 393 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L393

Added line #L393 was not covered by tests
}

for _, processor := range p.processors {
if err := (*processor).Close(); err != nil {
if err := processor.Close(); err != nil {

Check warning on line 397 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L397

Added line #L397 was not covered by tests
// Log and continue on closing the rest of the pipeline
p.logger.Errorf("Pipeline.Stop(): Processor (%s) error on close: %v", (*processor).Metadata().Name, err)
p.logger.Errorf("Pipeline.Stop(): Processor (%s) error on close: %v", processor.Metadata().Name, err)

Check warning on line 399 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L399

Added line #L399 was not covered by tests
}
}

if err := (*p.exporter).Close(); err != nil {
p.logger.Errorf("Pipeline.Stop(): Exporter (%s) error on close: %v", (*p.exporter).Metadata().Name, err)
if err := p.exporter.Close(); err != nil {
p.logger.Errorf("Pipeline.Stop(): Exporter (%s) error on close: %v", p.exporter.Metadata().Name, err)

Check warning on line 404 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L403-L404

Added lines #L403 - L404 were not covered by tests
}
}

Expand Down Expand Up @@ -460,7 +460,7 @@
p.logger.Infof("Pipeline round: %v", p.pipelineMetadata.NextRound)
// fetch block
importStart := time.Now()
blkData, err := (*p.importer).GetBlock(p.pipelineMetadata.NextRound)
blkData, err := p.importer.GetBlock(p.pipelineMetadata.NextRound)
if err != nil {
p.logger.Errorf("%v", err)
p.setError(err)
Expand All @@ -477,18 +477,18 @@
start := time.Now()
for _, proc := range p.processors {
processorStart := time.Now()
blkData, err = (*proc).Process(blkData)
blkData, err = proc.Process(blkData)
if err != nil {
p.logger.Errorf("%v", err)
p.setError(err)
retry++
goto pipelineRun
}
metrics.ProcessorTimeSeconds.WithLabelValues((*proc).Metadata().Name).Observe(time.Since(processorStart).Seconds())
metrics.ProcessorTimeSeconds.WithLabelValues(proc.Metadata().Name).Observe(time.Since(processorStart).Seconds())
}
// run through exporter
exporterStart := time.Now()
err = (*p.exporter).Receive(blkData)
err = p.exporter.Receive(blkData)
if err != nil {
p.logger.Errorf("%v", err)
p.setError(err)
Expand Down Expand Up @@ -563,47 +563,44 @@
logger: logger,
initProvider: nil,
importer: nil,
processors: []*processors.Processor{},
processors: []processors.Processor{},
exporter: nil,
}

importerName := cfg.Importer.Name

importerBuilder, err := importers.ImporterBuilderByName(importerName)
importerConstructor, err := importers.ImporterConstructorByName(importerName)
if err != nil {
return nil, fmt.Errorf("MakePipeline(): could not build importer '%s': %w", importerName, err)
}

importer := importerBuilder.New()
pipeline.importer = &importer
pipeline.importer = importerConstructor.New()

Check warning on line 577 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L577

Added line #L577 was not covered by tests
logger.Infof("Found Importer: %s", importerName)

// ---

for _, processorConfig := range cfg.Processors {
processorName := processorConfig.Name

processorBuilder, err := processors.ProcessorBuilderByName(processorName)
processorConstructor, err := processors.ProcessorConstructorByName(processorName)

Check warning on line 585 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L585

Added line #L585 was not covered by tests
if err != nil {
return nil, fmt.Errorf("MakePipeline(): could not build processor '%s': %w", processorName, err)
}

processor := processorBuilder.New()
pipeline.processors = append(pipeline.processors, &processor)
pipeline.processors = append(pipeline.processors, processorConstructor.New())

Check warning on line 590 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L590

Added line #L590 was not covered by tests
logger.Infof("Found Processor: %s", processorName)
}

// ---

exporterName := cfg.Exporter.Name

exporterBuilder, err := exporters.ExporterBuilderByName(exporterName)
exporterConstructor, err := exporters.ExporterConstructorByName(exporterName)

Check warning on line 598 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L598

Added line #L598 was not covered by tests
if err != nil {
return nil, fmt.Errorf("MakePipeline(): could not build exporter '%s': %w", exporterName, err)
}

exporter := exporterBuilder.New()
pipeline.exporter = &exporter
pipeline.exporter = exporterConstructor.New()

Check warning on line 603 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L603

Added line #L603 was not covered by tests
logger.Infof("Found Exporter: %s", exporterName)

return pipeline, nil
Expand All @@ -615,22 +612,22 @@
logger.Infof("Creating PID file at: %s", pidFilePath)
fout, err := os.Create(pidFilePath)
if err != nil {
err = fmt.Errorf("%s: could not create pid file, %v", pidFilePath, err)
logger.Error(err)
return err
}

Check warning on line 618 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L615-L618

Added lines #L615 - L618 were not covered by tests

if _, err = fmt.Fprintf(fout, "%d", os.Getpid()); err != nil {
err = fmt.Errorf("%s: could not write pid file, %v", pidFilePath, err)
logger.Error(err)
return err
}

Check warning on line 624 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L621-L624

Added lines #L621 - L624 were not covered by tests

err = fout.Close()
if err != nil {
err = fmt.Errorf("%s: could not close pid file, %v", pidFilePath, err)
logger.Error(err)
return err
}

Check warning on line 631 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L628-L631

Added lines #L628 - L631 were not covered by tests
return err
}
32 changes: 16 additions & 16 deletions conduit/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,9 @@ func mockPipeline(t *testing.T, dataDir string) (*pipelineImpl, *test.Hook, *moc
},
logger: l,
initProvider: nil,
importer: &pImporter,
processors: []*processors.Processor{&pProcessor},
exporter: &pExporter,
importer: pImporter,
processors: []processors.Processor{pProcessor},
exporter: pExporter,
pipelineMetadata: state{
GenesisHash: "",
Network: "",
Expand Down Expand Up @@ -271,9 +271,9 @@ func TestPipelineRun(t *testing.T) {
cf: cf,
logger: l,
initProvider: nil,
importer: &pImporter,
processors: []*processors.Processor{&pProcessor},
exporter: &pExporter,
importer: pImporter,
processors: []processors.Processor{pProcessor},
exporter: pExporter,
completeCallback: []conduit.OnCompleteFunc{cbComplete.OnComplete},
pipelineMetadata: state{
NextRound: 0,
Expand Down Expand Up @@ -371,9 +371,9 @@ func TestPipelineErrors(t *testing.T) {
},
logger: l,
initProvider: nil,
importer: &pImporter,
processors: []*processors.Processor{&pProcessor},
exporter: &pExporter,
importer: pImporter,
processors: []processors.Processor{pProcessor},
exporter: pExporter,
completeCallback: []conduit.OnCompleteFunc{cbComplete.OnComplete},
pipelineMetadata: state{},
}
Expand Down Expand Up @@ -440,9 +440,9 @@ func Test_pipelineImpl_registerLifecycleCallbacks(t *testing.T) {
cfg: &data.Config{},
logger: l,
initProvider: nil,
importer: &pImporter,
processors: []*processors.Processor{&pProcessor, &pProcessor},
exporter: &pExporter,
importer: pImporter,
processors: []processors.Processor{pProcessor, pProcessor},
exporter: pExporter,
}

// Each plugin implements the Completed interface, so there should be 4
Expand Down Expand Up @@ -485,7 +485,7 @@ func TestGenesisHash(t *testing.T) {

// mock a different genesis hash
var pImporter importers.Importer = &mockImporter{genesis: sdk.Genesis{Network: "dev"}}
pImpl.importer = &pImporter
pImpl.importer = pImporter
err = pImpl.Init()
assert.Contains(t, err.Error(), "genesis hash in metadata does not match")
}
Expand Down Expand Up @@ -797,9 +797,9 @@ func TestPipelineRetryVariables(t *testing.T) {
},
logger: l,
initProvider: nil,
importer: &pImporter,
processors: []*processors.Processor{&pProcessor},
exporter: &pExporter,
importer: pImporter,
processors: []processors.Processor{pProcessor},
exporter: pExporter,
pipelineMetadata: state{
GenesisHash: "",
Network: "",
Expand Down
4 changes: 2 additions & 2 deletions conduit/plugins/exporters/exporter_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ func Register(name string, constructor ExporterConstructor) {
Exporters[name] = constructor
}

// ExporterBuilderByName returns a Processor constructor for the name provided
func ExporterBuilderByName(name string) (ExporterConstructor, error) {
// ExporterConstructorByName returns a Processor constructor for the name provided
func ExporterConstructorByName(name string) (ExporterConstructor, error) {
constructor, ok := Exporters[name]
if !ok {
return nil, fmt.Errorf("no Exporter Constructor for %s", name)
Expand Down
4 changes: 2 additions & 2 deletions conduit/plugins/exporters/exporter_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ func TestExporterByNameSuccess(t *testing.T) {
me := mockExporter{}
Register("foobar", &mockExporterConstructor{&me})

expC, err := ExporterBuilderByName("foobar")
expC, err := ExporterConstructorByName("foobar")
assert.NoError(t, err)
exp := expC.New()
assert.Implements(t, (*Exporter)(nil), exp)
}

func TestExporterByNameNotFound(t *testing.T) {
_, err := ExporterBuilderByName("barfoo")
_, err := ExporterConstructorByName("barfoo")
expectedErr := "no Exporter Constructor for barfoo"
assert.EqualError(t, err, expectedErr)
}
Expand Down
2 changes: 1 addition & 1 deletion conduit/plugins/exporters/noop/noop_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ var ne = nc.New()
func TestExporterBuilderByName(t *testing.T) {
// init() has already registered the noop exporter
assert.Contains(t, exporters.Exporters, metadata.Name)
neBuilder, err := exporters.ExporterBuilderByName(metadata.Name)
neBuilder, err := exporters.ExporterConstructorByName(metadata.Name)
assert.NoError(t, err)
ne := neBuilder.New()
assert.Implements(t, (*exporters.Exporter)(nil), ne)
Expand Down
12 changes: 6 additions & 6 deletions conduit/plugins/importers/importer_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"fmt"
)

// Constructor must be implemented by each Importer.
// ImporterConstructor must be implemented by each Importer.
// It provides a basic no-arg constructor for instances of an ImporterImpl.
type Constructor interface {
type ImporterConstructor interface {
// New should return an instantiation of a Importer.
// Configuration values should be passed and can be processed during `Init()`.
New() Importer
Expand All @@ -21,20 +21,20 @@ func (f ImporterConstructorFunc) New() Importer {
}

// Importers are the constructors to build importer plugins.
var Importers = make(map[string]Constructor)
var Importers = make(map[string]ImporterConstructor)

// Register is used to register Constructor implementations. This mechanism allows
// for loose coupling between the configuration and the implementation. It is extremely similar to the way sql.DB
// drivers are configured and used.
func Register(name string, constructor Constructor) {
func Register(name string, constructor ImporterConstructor) {
if _, ok := Importers[name]; ok {
panic(fmt.Errorf("importer %s already registered", name))
}
Importers[name] = constructor
}

// ImporterBuilderByName returns a Importer constructor for the name provided
func ImporterBuilderByName(name string) (Constructor, error) {
// ImporterConstructorByName returns a Importer constructor for the name provided
func ImporterConstructorByName(name string) (ImporterConstructor, error) {
constructor, ok := Importers[name]
if !ok {
return nil, fmt.Errorf("no Importer Constructor for %s", name)
Expand Down
Loading
Loading