Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline architecture v2 (preview) #1913

Merged
merged 26 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
f231970
proof of concept for funnel (new stream engine)
lovromazgon Sep 18, 2024
77385c0
tie code into the lifecycle service to test end-to-end
lovromazgon Sep 18, 2024
a1926ac
expand the implementation
lovromazgon Oct 1, 2024
27ce28c
improve performance
lovromazgon Oct 1, 2024
26ede3b
minimize allocations
lovromazgon Oct 1, 2024
b7bcb72
start implementing processor
lovromazgon Oct 1, 2024
e636130
ensure graceful stop
lovromazgon Oct 2, 2024
64c0782
implement processor task
lovromazgon Oct 3, 2024
048b72d
build worker tasks correctly
lovromazgon Oct 3, 2024
aefa646
do not use a sandbox for Run (#1886)
lovromazgon Oct 9, 2024
1b10c59
make graceful shutdown work
lovromazgon Oct 10, 2024
ab6bd70
fix example
lovromazgon Oct 10, 2024
ecb23c1
update tests
lovromazgon Oct 11, 2024
c65a728
fix linter warnings
lovromazgon Oct 11, 2024
ea68ef5
rename introduced lifecycle package
lovromazgon Oct 11, 2024
3448fce
restore original lifecycle package
lovromazgon Oct 11, 2024
f2153ab
Merge branch 'main' into lovro/poc-funnel
lovromazgon Oct 11, 2024
298b9ec
fix code after merge
lovromazgon Oct 11, 2024
365beeb
Merge branch 'main' into lovro/poc-funnel
lovromazgon Oct 15, 2024
882608d
add feature flag
lovromazgon Oct 15, 2024
1bb4635
go mod tidy
lovromazgon Oct 15, 2024
42bff79
make acknowledgment fetching from destination stricter
lovromazgon Oct 24, 2024
8c8ae0b
documentation
lovromazgon Oct 24, 2024
c5daa08
Merge branch 'main' into lovro/poc-funnel
lovromazgon Oct 24, 2024
b1b969b
make generate
lovromazgon Oct 24, 2024
459bc07
Merge branch 'main' into lovro/poc-funnel
raulb Nov 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ require (
github.com/prometheus/client_model v0.6.1
github.com/prometheus/common v0.60.1
github.com/rs/zerolog v1.33.0
github.com/sourcegraph/conc v0.3.0
github.com/spf13/cobra v1.8.1
github.com/stealthrocket/wazergo v0.19.1
github.com/tetratelabs/wazero v1.8.1
Expand Down Expand Up @@ -312,7 +313,6 @@ require (
github.com/sivchari/containedctx v1.0.3 // indirect
github.com/sivchari/tenv v1.10.0 // indirect
github.com/sonatard/noctx v0.0.2 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/sourcegraph/go-diff v0.7.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.7.0 // indirect
Expand Down
5 changes: 5 additions & 0 deletions pkg/conduit/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ type Config struct {
}
}

Preview struct {
// PipelineArchV2 enables the new pipeline architecture.
PipelineArchV2 bool
}

dev struct {
cpuprofile string
memprofile string
Expand Down
2 changes: 2 additions & 0 deletions pkg/conduit/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ func Flags(cfg *Config) *flag.FlagSet {
flags.StringVar(&cfg.SchemaRegistry.Type, "schema-registry.type", cfg.SchemaRegistry.Type, "schema registry type; accepts builtin,confluent")
flags.StringVar(&cfg.SchemaRegistry.Confluent.ConnectionString, "schema-registry.confluent.connection-string", cfg.SchemaRegistry.Confluent.ConnectionString, "confluent schema registry connection string")

flags.BoolVar(&cfg.Preview.PipelineArchV2, "preview.pipeline-arch-v2", cfg.Preview.PipelineArchV2, "enables experimental pipeline architecture v2 (note that the new architecture currently supports only 1 source and 1 destination per pipeline)")

// NB: flags with prefix dev.* are hidden from help output by default, they only show up using '-dev -help'
showDevHelp := flags.Bool("dev", false, "used together with the dev flag it shows dev flags")
flags.StringVar(&cfg.dev.cpuprofile, "dev.cpuprofile", "", "write cpu profile to file")
Expand Down
139 changes: 116 additions & 23 deletions pkg/conduit/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/conduitio/conduit/pkg/foundation/metrics/measure"
"github.com/conduitio/conduit/pkg/foundation/metrics/prometheus"
"github.com/conduitio/conduit/pkg/lifecycle"
lifecycle_v2 "github.com/conduitio/conduit/pkg/lifecycle-poc"
"github.com/conduitio/conduit/pkg/orchestrator"
"github.com/conduitio/conduit/pkg/pipeline"
conn_plugin "github.com/conduitio/conduit/pkg/plugin/connector"
Expand Down Expand Up @@ -77,7 +78,7 @@ import (
)

const (
exitTimeout = 10 * time.Second
exitTimeout = 30 * time.Second
)

// Runtime sets up all services for serving and monitoring a Conduit instance.
Expand All @@ -95,7 +96,7 @@ type Runtime struct {
pipelineService *pipeline.Service
connectorService *connector.Service
processorService *processor.Service
lifecycleService *lifecycle.Service
lifecycleService lifecycleService

connectorPluginService *conn_plugin.PluginService
processorPluginService *proc_plugin.PluginService
Expand All @@ -107,6 +108,14 @@ type Runtime struct {
logger log.CtxLogger
}

// lifecycleService is an interface that we use temporarily to allow for
// both the old and new lifecycle services to be used interchangeably.
type lifecycleService interface {
Start(ctx context.Context, pipelineID string) error
Stop(ctx context.Context, pipelineID string, force bool) error
Init(ctx context.Context) error
}

// NewRuntime sets up a Runtime instance and primes it for start.
func NewRuntime(cfg Config) (*Runtime, error) {
if err := cfg.Validate(); err != nil {
Expand Down Expand Up @@ -203,21 +212,28 @@ func createServices(r *Runtime) error {
tokenService,
)

// Error recovery configuration
errRecoveryCfg := &lifecycle.ErrRecoveryCfg{
MinDelay: r.Config.Pipelines.ErrorRecovery.MinDelay,
MaxDelay: r.Config.Pipelines.ErrorRecovery.MaxDelay,
BackoffFactor: r.Config.Pipelines.ErrorRecovery.BackoffFactor,
MaxRetries: r.Config.Pipelines.ErrorRecovery.MaxRetries,
MaxRetriesWindow: r.Config.Pipelines.ErrorRecovery.MaxRetriesWindow,
}

plService := pipeline.NewService(r.logger, r.DB)
connService := connector.NewService(r.logger, r.DB, r.connectorPersister)
procService := processor.NewService(r.logger, r.DB, procPluginService)
lifecycleService := lifecycle.NewService(r.logger, errRecoveryCfg, connService, procService, connPluginService, plService)
provisionService := provisioning.NewService(r.DB, r.logger, plService, connService, procService, connPluginService, lifecycleService, r.Config.Pipelines.Path)

var lifecycleService lifecycleService
if r.Config.Preview.PipelineArchV2 {
r.logger.Info(context.Background()).Msg("using lifecycle service v2")
lifecycleService = lifecycle_v2.NewService(r.logger, connService, procService, connPluginService, plService)
} else {
// Error recovery configuration
errRecoveryCfg := &lifecycle.ErrRecoveryCfg{
MinDelay: r.Config.Pipelines.ErrorRecovery.MinDelay,
MaxDelay: r.Config.Pipelines.ErrorRecovery.MaxDelay,
BackoffFactor: r.Config.Pipelines.ErrorRecovery.BackoffFactor,
MaxRetries: r.Config.Pipelines.ErrorRecovery.MaxRetries,
MaxRetriesWindow: r.Config.Pipelines.ErrorRecovery.MaxRetriesWindow,
}

lifecycleService = lifecycle.NewService(r.logger, errRecoveryCfg, connService, procService, connPluginService, plService)
}

provisionService := provisioning.NewService(r.DB, r.logger, plService, connService, procService, connPluginService, lifecycleService, r.Config.Pipelines.Path)
orc := orchestrator.NewOrchestrator(r.DB, r.logger, plService, connService, procService, connPluginService, procPluginService, lifecycleService)

r.Orchestrator = orc
Expand Down Expand Up @@ -415,6 +431,15 @@ func (r *Runtime) initProfiling(ctx context.Context) (deferred func(), err error
}

func (r *Runtime) registerCleanup(t *tomb.Tomb) {
if r.Config.Preview.PipelineArchV2 {
r.registerCleanupV2(t)
} else {
r.registerCleanupV1(t)
}
}

func (r *Runtime) registerCleanupV1(t *tomb.Tomb) {
ls := r.lifecycleService.(*lifecycle.Service)
t.Go(func() error {
<-t.Dying()
// start cleanup with a fresh context
Expand All @@ -423,12 +448,12 @@ func (r *Runtime) registerCleanup(t *tomb.Tomb) {
// t.Err() can be nil, when we had a call: t.Kill(nil)
// t.Err() will be context.Canceled, if the tomb's context was canceled
if t.Err() == nil || cerrors.Is(t.Err(), context.Canceled) {
r.lifecycleService.StopAll(ctx, pipeline.ErrGracefulShutdown)
ls.StopAll(ctx, pipeline.ErrGracefulShutdown)
} else {
// tomb died due to a real error
r.lifecycleService.StopAll(ctx, cerrors.Errorf("conduit experienced an error: %w", t.Err()))
ls.StopAll(ctx, cerrors.Errorf("conduit experienced an error: %w", t.Err()))
}
err := r.lifecycleService.Wait(exitTimeout)
err := ls.Wait(exitTimeout)
t.Go(func() error {
r.connectorPersister.Wait()
return r.DB.Close()
Expand All @@ -437,6 +462,62 @@ func (r *Runtime) registerCleanup(t *tomb.Tomb) {
})
}

func (r *Runtime) registerCleanupV2(t *tomb.Tomb) {
ls := r.lifecycleService.(*lifecycle_v2.Service)
t.Go(func() error {
<-t.Dying()
// start cleanup with a fresh context
ctx := context.Background()

err := ls.StopAll(ctx, false)
if err != nil {
r.logger.Err(ctx, err).Msg("some pipelines stopped with an error")
}

// Wait for the pipelines to stop
const (
count = 6
interval = exitTimeout / count
)

pipelinesStopped := make(chan struct{})
go func() {
for i := count; i > 0; i-- {
if i == 1 {
// on last try, stop forcefully
_ = ls.StopAll(ctx, true)
}

r.logger.Info(ctx).Msgf("waiting for pipelines to stop running (time left: %s)", time.Duration(i)*interval)
select {
case <-time.After(interval):
case <-pipelinesStopped:
return
}
}
}()

err = ls.Wait(exitTimeout)
switch {
case err != nil && err != context.DeadlineExceeded:
r.logger.Warn(ctx).Err(err).Msg("some pipelines stopped with an error")
case err == context.DeadlineExceeded:
r.logger.Warn(ctx).Msg("some pipelines did not stop in time")
default:
r.logger.Info(ctx).Msg("all pipelines stopped gracefully")
}

pipelinesStopped <- struct{}{}
raulb marked this conversation as resolved.
Show resolved Hide resolved

t.Go(func() error {
r.connectorPersister.Wait()
return r.DB.Close()
})

return nil
})
}

func (r *Runtime) newHTTPMetricsHandler() http.Handler {
return promhttp.Handler()
}
Expand Down Expand Up @@ -770,13 +851,25 @@ func (r *Runtime) initServices(ctx context.Context, t *tomb.Tomb) error {
}

if r.Config.Pipelines.ExitOnDegraded {
r.lifecycleService.OnFailure(func(e lifecycle.FailureEvent) {
r.logger.Warn(ctx).
Err(e.Error).
Str(log.PipelineIDField, e.ID).
Msg("Conduit will shut down due to a pipeline failure and 'exit-on-degraded' enabled")
t.Kill(cerrors.Errorf("shut down due to 'exit-on-degraded' error: %w", e.Error))
})
if r.Config.Preview.PipelineArchV2 {
ls := r.lifecycleService.(*lifecycle_v2.Service)
ls.OnFailure(func(e lifecycle_v2.FailureEvent) {
r.logger.Warn(ctx).
Err(e.Error).
Str(log.PipelineIDField, e.ID).
Msg("Conduit will shut down due to a pipeline failure and 'exit-on-degraded' enabled")
t.Kill(cerrors.Errorf("shut down due to 'exit-on-degraded' error: %w", e.Error))
})
} else {
ls := r.lifecycleService.(*lifecycle.Service)
ls.OnFailure(func(e lifecycle.FailureEvent) {
r.logger.Warn(ctx).
Err(e.Error).
Str(log.PipelineIDField, e.ID).
Msg("Conduit will shut down due to a pipeline failure and 'exit-on-degraded' enabled")
t.Kill(cerrors.Errorf("shut down due to 'exit-on-degraded' error: %w", e.Error))
})
}
}
err = r.pipelineService.Init(ctx)
if err != nil {
Expand Down
23 changes: 17 additions & 6 deletions pkg/connector/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package connector

import (
"context"
"strconv"
"sync"
"time"

"github.com/conduitio/conduit-commons/opencdc"
"github.com/conduitio/conduit-connector-protocol/pconnector"
Expand Down Expand Up @@ -153,6 +155,7 @@ func (s *Source) Teardown(ctx context.Context) error {
return plugin.ErrPluginNotRunning
}

s.Instance.logger.Debug(ctx).Msg("closing stream")
// close stream
if s.stopStream != nil {
s.stopStream()
Expand Down Expand Up @@ -192,8 +195,9 @@ func (s *Source) Read(ctx context.Context) ([]opencdc.Record, error) {
return nil, err
}

now := strconv.FormatInt(time.Now().UnixNano(), 10)
for _, r := range resp.Records {
s.sanitizeRecord(&r)
s.sanitizeRecord(&r, now)
}

s.Instance.inspector.Send(ctx, resp.Records)
Expand Down Expand Up @@ -375,7 +379,7 @@ func (s *Source) triggerLifecycleEvent(ctx context.Context, oldConfig, newConfig
}
}

func (s *Source) sanitizeRecord(r *opencdc.Record) {
func (s *Source) sanitizeRecord(r *opencdc.Record, now string) {
if r.Key == nil {
r.Key = opencdc.RawData{}
}
Expand All @@ -385,12 +389,19 @@ func (s *Source) sanitizeRecord(r *opencdc.Record) {
if r.Payload.After == nil {
r.Payload.After = opencdc.RawData{}
}

if r.Metadata == nil {
r.Metadata = opencdc.Metadata{}
r.Metadata = opencdc.Metadata{
raulb marked this conversation as resolved.
Show resolved Hide resolved
opencdc.MetadataReadAt: now,
opencdc.MetadataConduitSourceConnectorID: s.Instance.ID,
}
} else {
if r.Metadata[opencdc.MetadataReadAt] == "" {
r.Metadata[opencdc.MetadataReadAt] = now
}
if r.Metadata[opencdc.MetadataConduitSourceConnectorID] == "" {
r.Metadata[opencdc.MetadataConduitSourceConnectorID] = s.Instance.ID
}
}
// source connector ID is added to all records
r.Metadata.SetConduitSourceConnectorID(s.Instance.ID)
}

func (*Source) isEqual(cfg1, cfg2 map[string]string) bool {
Expand Down
10 changes: 7 additions & 3 deletions pkg/foundation/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,14 +405,18 @@ func (mt *labeledHistogram) WithValues(vs ...string) Histogram {
// RecordBytesHistogram wraps a histrogram metric and allows to observe record
// sizes in bytes.
type RecordBytesHistogram struct {
h Histogram
H Histogram
}

func NewRecordBytesHistogram(h Histogram) RecordBytesHistogram {
return RecordBytesHistogram{h}
return RecordBytesHistogram{H: h}
}

func (m RecordBytesHistogram) Observe(r opencdc.Record) {
m.H.Observe(m.SizeOf(r))
}

func (m RecordBytesHistogram) SizeOf(r opencdc.Record) float64 {
raulb marked this conversation as resolved.
Show resolved Hide resolved
// TODO for now we call method Bytes() on key and payload to get the
// bytes representation. In case of a structured payload or key it
// is marshaled into JSON, which might not be the correct way to
Expand All @@ -429,5 +433,5 @@ func (m RecordBytesHistogram) Observe(r opencdc.Record) {
if r.Payload.After != nil {
bytes += len(r.Payload.After.Bytes())
}
m.h.Observe(float64(bytes))
return float64(bytes)
}
Loading