Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Add Lifecycle service #1835

Merged
merged 48 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
908c4f4
wip
raulb Aug 30, 2024
9386361
update method calls
raulb Aug 30, 2024
51043b6
use configuration
raulb Sep 3, 2024
8d64430
Merge branch 'main' into raul/recovery-mechanism
raulb Sep 4, 2024
b860284
wip refactor
raulb Sep 6, 2024
fd66596
wip
raulb Sep 6, 2024
8e2de1b
move stream
raulb Sep 9, 2024
f21f901
add lifecycleService to runtime
raulb Sep 9, 2024
9867f23
fix
raulb Sep 9, 2024
0ef5142
fix imports
raulb Sep 9, 2024
80cd33b
no errors on lifecycle service
raulb Sep 10, 2024
a9f30b3
fix wait
raulb Sep 10, 2024
599cd81
update status through its method
raulb Sep 10, 2024
871fc0b
it compiles
raulb Sep 10, 2024
60547de
update interfaces
raulb Sep 10, 2024
b2a41ee
more test fixes
raulb Sep 11, 2024
d6a958b
update mocks
raulb Sep 11, 2024
fbdbfa3
add mocks and update tests
raulb Sep 11, 2024
9952975
fix lint
raulb Sep 11, 2024
49b90f2
remove GetInstances
raulb Sep 11, 2024
58dd785
delete key instead
raulb Sep 11, 2024
ae85b9d
ignore test files
raulb Sep 11, 2024
542c895
delete test files
raulb Sep 11, 2024
d19f89d
Merge branch 'main' into raul/recovery-mechanism
raulb Sep 11, 2024
0f91751
Merge branch 'raul/recovery-mechanism' into raul/recovery-mechanism-r…
raulb Sep 12, 2024
fdcc578
Merge branch 'main' into raul/recovery-mechanism
raulb Sep 12, 2024
531414f
Merge branch 'raul/recovery-mechanism' into raul/recovery-mechanism-r…
raulb Sep 12, 2024
0c932fe
ensures runningPipeline is locked
raulb Sep 12, 2024
17140fb
fix not initialized map
raulb Sep 12, 2024
1de69c4
fix tests
raulb Sep 13, 2024
7d9210c
add back the needed test file
raulb Sep 13, 2024
07b8018
update comment
raulb Sep 13, 2024
435709f
add comment
raulb Sep 13, 2024
99b215a
locks while retrieving the pipeline
raulb Sep 13, 2024
0521ff8
update comment
raulb Sep 13, 2024
4763ac5
unsure about the need for this
raulb Sep 13, 2024
ea29909
remove backoff from pipeline service
raulb Sep 13, 2024
97d24a4
update method name on lifecycle service
raulb Sep 13, 2024
7d4da27
lock running pipelines and optimize stopAll
raulb Sep 13, 2024
fc3f3b6
remove redundant call to update status
raulb Sep 13, 2024
6e60956
fix typo
raulb Sep 13, 2024
55ba024
pr feedback
raulb Sep 13, 2024
3596fb8
uses csync Map
raulb Sep 13, 2024
b17554f
Merge branch 'main' into raul/recovery-mechanism-refactor
raulb Sep 13, 2024
1bf2136
go mod tidy
raulb Sep 13, 2024
3c67005
Merge branch 'raul/recovery-mechanism-refactor' of github.com:Conduit…
raulb Sep 13, 2024
fd57ecd
update conduit-commons
lovromazgon Sep 13, 2024
08ed5e8
go mod tidy
lovromazgon Sep 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,9 @@ escape_analysis.txt

# Compiled test wasm processors
pkg/plugin/processor/standalone/test/wasm_processors/*/processor.wasm

# Test data
**/test/*.txt

# this one is needed for integration tests
!pkg/provisioning/test/source-file.txt
30 changes: 21 additions & 9 deletions pkg/conduit/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/conduitio/conduit/pkg/foundation/metrics"
"github.com/conduitio/conduit/pkg/foundation/metrics/measure"
"github.com/conduitio/conduit/pkg/foundation/metrics/prometheus"
"github.com/conduitio/conduit/pkg/lifecycle"
"github.com/conduitio/conduit/pkg/orchestrator"
"github.com/conduitio/conduit/pkg/pipeline"
conn_plugin "github.com/conduitio/conduit/pkg/plugin/connector"
Expand All @@ -63,6 +64,7 @@ import (
"github.com/conduitio/conduit/pkg/web/ui"
apiv1 "github.com/conduitio/conduit/proto/api/v1"
grpcruntime "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/jpillora/backoff"
"github.com/piotrkowalczuk/promgrpc/v4"
promclient "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down Expand Up @@ -94,6 +96,7 @@ type Runtime struct {
pipelineService *pipeline.Service
connectorService *connector.Service
processorService *processor.Service
lifecycleService *lifecycle.Service

connectorPluginService *conn_plugin.PluginService
processorPluginService *proc_plugin.PluginService
Expand Down Expand Up @@ -201,13 +204,21 @@ func createServices(r *Runtime) error {
tokenService,
)

plService := pipeline.NewService(r.logger, r.DB)
errRecovery := r.Config.Pipelines.ErrorRecovery
backoffCfg := &backoff.Backoff{
Min: errRecovery.MinDelay,
Max: errRecovery.MaxDelay,
Factor: float64(errRecovery.BackoffFactor),
Jitter: true,
}

plService := pipeline.NewService(r.logger, r.DB, backoffCfg)
connService := connector.NewService(r.logger, r.DB, r.connectorPersister)
procService := processor.NewService(r.logger, r.DB, procPluginService)
lifecycleService := lifecycle.NewService(r.logger, backoffCfg, connService, procService, connPluginService, plService)
provisionService := provisioning.NewService(r.DB, r.logger, plService, connService, procService, connPluginService, lifecycleService, r.Config.Pipelines.Path)

provisionService := provisioning.NewService(r.DB, r.logger, plService, connService, procService, connPluginService, r.Config.Pipelines.Path)

orc := orchestrator.NewOrchestrator(r.DB, r.logger, plService, connService, procService, connPluginService, procPluginService)
orc := orchestrator.NewOrchestrator(r.DB, r.logger, plService, connService, procService, connPluginService, procPluginService, lifecycleService)

r.Orchestrator = orc
r.ProvisionService = provisionService
Expand All @@ -220,6 +231,7 @@ func createServices(r *Runtime) error {
r.processorPluginService = procPluginService
r.connSchemaService = connSchemaService
r.procSchemaService = procSchemaService
r.lifecycleService = lifecycleService

return nil
}
Expand Down Expand Up @@ -411,12 +423,12 @@ func (r *Runtime) registerCleanup(t *tomb.Tomb) {
// t.Err() can be nil, when we had a call: t.Kill(nil)
// t.Err() will be context.Canceled, if the tomb's context was canceled
if t.Err() == nil || cerrors.Is(t.Err(), context.Canceled) {
r.pipelineService.StopAll(ctx, pipeline.ErrGracefulShutdown)
r.lifecycleService.StopAll(ctx, pipeline.ErrGracefulShutdown)
} else {
// tomb died due to a real error
r.pipelineService.StopAll(ctx, cerrors.Errorf("conduit experienced an error: %w", t.Err()))
r.lifecycleService.StopAll(ctx, cerrors.Errorf("conduit experienced an error: %w", t.Err()))
}
err := r.pipelineService.Wait(exitTimeout)
err := r.lifecycleService.Wait(exitTimeout)
t.Go(func() error {
r.connectorPersister.Wait()
return r.DB.Close()
Expand Down Expand Up @@ -758,7 +770,7 @@ func (r *Runtime) initServices(ctx context.Context, t *tomb.Tomb) error {
}

if r.Config.Pipelines.ExitOnError {
r.pipelineService.OnFailure(func(e pipeline.FailureEvent) {
r.lifecycleService.OnFailure(func(e lifecycle.FailureEvent) {
r.logger.Warn(ctx).
Err(e.Error).
Str(log.PipelineIDField, e.ID).
Expand All @@ -785,7 +797,7 @@ func (r *Runtime) initServices(ctx context.Context, t *tomb.Tomb) error {
}
}

err = r.pipelineService.Run(ctx, r.connectorService, r.processorService, r.connectorPluginService)
err = r.lifecycleService.Run(ctx)
if err != nil {
cerrors.ForEach(err, func(err error) {
r.logger.Err(ctx, err).Msg("pipeline failed to be started")
Expand Down
4 changes: 2 additions & 2 deletions pkg/pipeline/dlq.go → pkg/lifecycle/dlq.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package pipeline
package lifecycle

import (
"bytes"
Expand All @@ -21,7 +21,7 @@ import (
"github.com/conduitio/conduit-commons/opencdc"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/pipeline/stream"
"github.com/conduitio/conduit/pkg/lifecycle/stream"
)

// DLQDestination is a DLQ handler that forwards DLQ records to a destination
Expand Down
4 changes: 2 additions & 2 deletions pkg/pipeline/dlq_test.go → pkg/lifecycle/dlq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package pipeline
package lifecycle

import (
"context"
Expand All @@ -22,7 +22,7 @@ import (
"github.com/conduitio/conduit/pkg/connector"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/log"
streammock "github.com/conduitio/conduit/pkg/pipeline/stream/mock"
streammock "github.com/conduitio/conduit/pkg/lifecycle/stream/mock"
"github.com/matryer/is"
"go.uber.org/mock/gomock"
)
Expand Down
Loading