Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NEOS-1641: Fixes stale DB Connections during Data Sync Workflow #2995

Merged
merged 44 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
9e16b57
renames connectiontunnelmanager to just connectionmanager
nickzelei Nov 26, 2024
7df69cb
fixes all non-test build errors
nickzelei Nov 26, 2024
8e3703c
Removes commented code in sql manager
nickzelei Nov 26, 2024
7f584a6
gets sql-manager integration tests working
nickzelei Nov 26, 2024
441d323
regen mockery
nickzelei Nov 26, 2024
738189a
refactors new sql manager to be parameterless
nickzelei Nov 26, 2024
6164361
adds ability to handle shutting down automatically
nickzelei Nov 26, 2024
57df068
updates manager to be more robust
nickzelei Nov 26, 2024
8f08a0b
updates sql manager inits to use proper closer
nickzelei Nov 26, 2024
c7e5f45
Removes unused generic param
nickzelei Nov 26, 2024
bae5add
updates benthos to take in simpler provider interface for sql
nickzelei Nov 27, 2024
1c6f087
updates connection manager to have session be an interface
nickzelei Nov 27, 2024
4761b49
reworking conn manager to have session
nickzelei Nov 27, 2024
8faad26
fixes build errors from reworking session
nickzelei Nov 27, 2024
42f8e17
fixes remaining build errors
nickzelei Nov 27, 2024
b882757
fixes one round of test failures
nickzelei Nov 27, 2024
c986a49
fixing more test failures
nickzelei Nov 27, 2024
bc4b48f
another round of test fixes
nickzelei Nov 27, 2024
04a7500
fixes more error
nickzelei Nov 27, 2024
6af4e4c
removing commented code
nickzelei Nov 27, 2024
a30fbf7
fixing bad session generation
nickzelei Nov 27, 2024
795e198
adds connection reaper debug statements
nickzelei Nov 27, 2024
aa8bbac
properly cleaning up connections during sync
nickzelei Nov 27, 2024
1eb4a05
fixes connection service
nickzelei Nov 27, 2024
b78492d
fixes golint
nickzelei Nov 27, 2024
509d314
fixes unregistered post sync activity in test
nickzelei Nov 27, 2024
fc6765e
fixes race condition of manager
nickzelei Nov 27, 2024
4a3b461
rolls back to original reap time
nickzelei Nov 27, 2024
f0a99f4
adds configuration for reaper duration
nickzelei Dec 2, 2024
c6752c4
readds connection timeout
nickzelei Dec 2, 2024
060066b
adds session unit tests
nickzelei Dec 2, 2024
7322017
adds check for nil options
nickzelei Dec 2, 2024
dade253
updates places to use consts for postgres
nickzelei Dec 2, 2024
6be85c3
update one more spot
nickzelei Dec 2, 2024
375c7db
fixes redis testcontainer
nickzelei Dec 2, 2024
50d1079
uncomments connection data tests
nickzelei Dec 2, 2024
8b5cf4a
gets conn data tests working
nickzelei Dec 2, 2024
eab0e9f
Adds debug logs to getconnection
nickzelei Dec 2, 2024
c1fff42
updates conn manager to handle more logging
nickzelei Dec 2, 2024
98c910b
handles setting bool
nickzelei Dec 2, 2024
c6fbab8
Adds shutdown
nickzelei Dec 2, 2024
801a186
updates shutdown and reaper logic
nickzelei Dec 2, 2024
de9ccfa
go mod tidy
nickzelei Dec 2, 2024
4ee4822
Fixes mocks
nickzelei Dec 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,14 @@ issues:
- keycloak
# # Excluding configuration per-path, per-linter, per-text and per-source
exclude-rules:
- path: _test\.go
linters:
- gomnd
- lll
- goconst
- gosec
- path: ^mock_.*\.go$
text: "exclude mock files"
linters:
- gocritic
- gofmt
- goimports
- gosimple
- stylecheck
- whitespace

run:
timeout: 10m
Expand Down
2 changes: 1 addition & 1 deletion .mockery.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ packages:
interfaces:
ManagerInterface:
ClientInterface:
github.com/nucleuscloud/neosync/internal/connection-tunnel-manager:
github.com/nucleuscloud/neosync/internal/connection-manager:
interfaces:
ConnectionProvider:
github.com/nucleuscloud/neosync/worker/pkg/benthos/dynamodb:
Expand Down
24 changes: 17 additions & 7 deletions backend/internal/cmds/mgmt/serve/connect/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"net/http"
"net/url"
"strconv"
"sync"
"time"

"connectrpc.com/connect"
Expand All @@ -19,6 +18,7 @@ import (
"github.com/auth0/go-jwt-middleware/v2/validator"
"github.com/go-logr/logr"
"github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1/mgmtv1alpha1connect"
connectionmanager "github.com/nucleuscloud/neosync/internal/connection-manager"
"github.com/nucleuscloud/neosync/internal/connectrpc/validate"
http_client "github.com/nucleuscloud/neosync/internal/http/client"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
Expand Down Expand Up @@ -464,14 +464,24 @@ func serve(ctx context.Context) error {
pgquerier := pg_queries.New()
mysqlquerier := mysql_queries.New()
sqlConnector := &sqlconnect.SqlOpenConnector{}
pgpoolmap := &sync.Map{}
mysqlpoolmap := &sync.Map{}
mssqlpoolmap := &sync.Map{}
mssqlquerier := mssql_queries.New()
sqlmanager := sql_manager.NewSqlManager(pgpoolmap, pgquerier, mysqlpoolmap, mysqlquerier, mssqlpoolmap, mssqlquerier, sqlConnector)

sqlmanager := sql_manager.NewSqlManager(
sql_manager.WithPostgresQuerier(pgquerier),
sql_manager.WithMysqlQuerier(mysqlquerier),
sql_manager.WithMssqlQuerier(mssqlquerier),
sql_manager.WithConnectionManagerOpts(connectionmanager.WithCloseOnRelease()),
)
mongoconnector := mongoconnect.NewConnector()
connectionService := v1alpha1_connectionservice.New(&v1alpha1_connectionservice.Config{}, db, useraccountService, sqlConnector, pgquerier,
mysqlquerier, mssqlquerier, mongoconnector, awsManager)
connectionService := v1alpha1_connectionservice.New(
&v1alpha1_connectionservice.Config{},
db,
useraccountService,
mongoconnector,
awsManager,
sqlmanager,
&sqlconnect.SqlOpenConnector{},
)
api.Handle(
mgmtv1alpha1connect.NewConnectionServiceHandler(
connectionService,
Expand Down
3 changes: 2 additions & 1 deletion backend/pkg/dbconnect-config/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1"
"github.com/nucleuscloud/neosync/backend/pkg/clienttls"
sqlmanager_shared "github.com/nucleuscloud/neosync/backend/pkg/sqlmanager/shared"
)

type pgConnectConfig struct {
Expand Down Expand Up @@ -37,7 +38,7 @@ func NewFromPostgresConnection(
}

pgurl := url.URL{
Scheme: "postgres",
Scheme: sqlmanager_shared.DefaultPostgresDriver,
Host: host,
}
if cc.Connection.GetUser() != "" && cc.Connection.GetPass() != "" {
Expand Down
49 changes: 13 additions & 36 deletions backend/pkg/integration-test/integration-test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"log/slog"
"net/http"
"net/http/httptest"
"sync"
"testing"

"connectrpc.com/connect"
Expand All @@ -26,7 +25,6 @@ import (
clientmanager "github.com/nucleuscloud/neosync/backend/internal/temporal/clientmanager"
"github.com/nucleuscloud/neosync/backend/internal/utils"
"github.com/nucleuscloud/neosync/backend/pkg/mongoconnect"
mssql_queries "github.com/nucleuscloud/neosync/backend/pkg/mssql-querier"
"github.com/nucleuscloud/neosync/backend/pkg/sqlconnect"
"github.com/nucleuscloud/neosync/backend/pkg/sqlmanager"
v1alpha_anonymizationservice "github.com/nucleuscloud/neosync/backend/services/mgmt/v1alpha1/anonymization-service"
Expand All @@ -37,6 +35,7 @@ import (
v1alpha1_useraccountservice "github.com/nucleuscloud/neosync/backend/services/mgmt/v1alpha1/user-account-service"
awsmanager "github.com/nucleuscloud/neosync/internal/aws"
"github.com/nucleuscloud/neosync/internal/billing"
connectionmanager "github.com/nucleuscloud/neosync/internal/connection-manager"
presidioapi "github.com/nucleuscloud/neosync/internal/ee/presidio"
http_client "github.com/nucleuscloud/neosync/internal/http/client"
neomigrate "github.com/nucleuscloud/neosync/internal/migrate"
Expand Down Expand Up @@ -188,16 +187,16 @@ func (s *NeosyncApiTestClient) Setup(ctx context.Context, t *testing.T) error {
nil,
)

sqlmanagerclient := NewTestSqlManagerClient()

authdConnectionService := v1alpha1_connectionservice.New(
&v1alpha1_connectionservice.Config{},
neosyncdb.New(pgcontainer.DB, db_queries.New()),
authdUserService,
&sqlconnect.SqlOpenConnector{},
pg_queries.New(),
mysql_queries.New(),
mssql_queries.New(),
mongoconnect.NewConnector(),
awsmanager.New(),
sqlmanagerclient,
&sqlconnect.SqlOpenConnector{},
)

neoCloudAuthdUserService := v1alpha1_useraccountservice.New(
Expand All @@ -221,12 +220,10 @@ func (s *NeosyncApiTestClient) Setup(ctx context.Context, t *testing.T) error {
&v1alpha1_connectionservice.Config{},
neosyncdb.New(pgcontainer.DB, db_queries.New()),
neoCloudAuthdUserService,
&sqlconnect.SqlOpenConnector{},
pg_queries.New(),
mysql_queries.New(),
mssql_queries.New(),
mongoconnect.NewConnector(),
awsmanager.New(),
sqlmanagerclient,
&sqlconnect.SqlOpenConnector{},
)
neoCloudJobHookService := jobhooks.New(
neosyncdb.New(pgcontainer.DB, db_queries.New()),
Expand All @@ -239,12 +236,7 @@ func (s *NeosyncApiTestClient) Setup(ctx context.Context, t *testing.T) error {
s.Mocks.TemporalClientManager,
neoCloudConnectionService,
neoCloudAuthdUserService,
sqlmanager.NewSqlManager(
&sync.Map{}, pg_queries.New(),
&sync.Map{}, mysql_queries.New(),
&sync.Map{}, mssql_queries.New(),
&sqlconnect.SqlOpenConnector{},
),
sqlmanagerclient,
neoCloudJobHookService,
)

Expand All @@ -262,12 +254,10 @@ func (s *NeosyncApiTestClient) Setup(ctx context.Context, t *testing.T) error {
&v1alpha1_connectionservice.Config{},
neosyncdb.New(pgcontainer.DB, db_queries.New()),
unauthdUserService,
&sqlconnect.SqlOpenConnector{},
pg_queries.New(),
mysql_queries.New(),
mssql_queries.New(),
mongoconnect.NewConnector(),
awsmanager.New(),
sqlmanagerclient,
&sqlconnect.SqlOpenConnector{},
)

jobhookService := jobhooks.New(
Expand All @@ -281,12 +271,7 @@ func (s *NeosyncApiTestClient) Setup(ctx context.Context, t *testing.T) error {
s.Mocks.TemporalClientManager,
unauthdConnectionsService,
unauthdUserService,
sqlmanager.NewSqlManager(
&sync.Map{}, pg_queries.New(),
&sync.Map{}, mysql_queries.New(),
&sync.Map{}, mssql_queries.New(),
&sqlconnect.SqlOpenConnector{},
),
sqlmanagerclient,
jobhookService,
)

Expand All @@ -300,12 +285,7 @@ func (s *NeosyncApiTestClient) Setup(ctx context.Context, t *testing.T) error {
pg_queries.New(),
mysql_queries.New(),
mongoconnect.NewConnector(),
sqlmanager.NewSqlManager(
&sync.Map{}, pg_queries.New(),
&sync.Map{}, mysql_queries.New(),
&sync.Map{}, mssql_queries.New(),
&sqlconnect.SqlOpenConnector{},
),
sqlmanagerclient,
neosync_gcp.NewManager(),
)

Expand Down Expand Up @@ -478,9 +458,6 @@ func startHTTPServer(tb testing.TB, h http.Handler) *httptest.Server {

func NewTestSqlManagerClient() *sqlmanager.SqlManager {
return sqlmanager.NewSqlManager(
&sync.Map{}, pg_queries.New(),
&sync.Map{}, mysql_queries.New(),
&sync.Map{}, mssql_queries.New(),
&sqlconnect.SqlOpenConnector{},
sqlmanager.WithConnectionManagerOpts(connectionmanager.WithCloseOnRelease()),
)
}
33 changes: 16 additions & 17 deletions backend/pkg/sqlconnect/mock_SqlConnector.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 21 additions & 8 deletions backend/pkg/sqlconnect/sql-connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1"
"github.com/nucleuscloud/neosync/backend/pkg/clienttls"
dbconnectconfig "github.com/nucleuscloud/neosync/backend/pkg/dbconnect-config"
sqlmanager_shared "github.com/nucleuscloud/neosync/backend/pkg/sqlmanager/shared"
tun "github.com/nucleuscloud/neosync/internal/sshtunnel"
"github.com/nucleuscloud/neosync/internal/sshtunnel/connectors/mssqltunconnector"
"github.com/nucleuscloud/neosync/internal/sshtunnel/connectors/mysqltunconnector"
Expand All @@ -39,6 +40,8 @@ type SqlConnectorOption func(*sqlConnectorOptions)
type sqlConnectorOptions struct {
mysqlDisableParseTime bool
postgresDriver string

connectionTimeoutSeconds *uint32
}

// WithMysqlParseTimeDisabled disables MySQL time parsing
Expand All @@ -51,26 +54,36 @@ func WithMysqlParseTimeDisabled() SqlConnectorOption {
// WithPostgresDriver overrides default postgres driver
func WithDefaultPostgresDriver() SqlConnectorOption {
return func(opts *sqlConnectorOptions) {
opts.postgresDriver = "postgres"
opts.postgresDriver = sqlmanager_shared.DefaultPostgresDriver
}
}

// Provide an integer number that corresponds to the number of seconds to wait before timing out attempting to connect.
// Ex: 10 == 10 seconds
func WithConnectionTimeout(timeoutSeconds uint32) SqlConnectorOption {
return func(sco *sqlConnectorOptions) {
sco.connectionTimeoutSeconds = &timeoutSeconds
}
}

type SqlConnector interface {
NewDbFromConnectionConfig(connectionConfig *mgmtv1alpha1.ConnectionConfig, connectionTimeout *uint32, logger *slog.Logger, opts ...SqlConnectorOption) (SqlDbContainer, error)
NewDbFromConnectionConfig(connectionConfig *mgmtv1alpha1.ConnectionConfig, logger *slog.Logger, opts ...SqlConnectorOption) (SqlDbContainer, error)
}

type SqlOpenConnector struct{}

func (rc *SqlOpenConnector) NewDbFromConnectionConfig(cc *mgmtv1alpha1.ConnectionConfig, connectionTimeout *uint32, logger *slog.Logger, opts ...SqlConnectorOption) (SqlDbContainer, error) {
func (rc *SqlOpenConnector) NewDbFromConnectionConfig(cc *mgmtv1alpha1.ConnectionConfig, logger *slog.Logger, opts ...SqlConnectorOption) (SqlDbContainer, error) {
if cc == nil {
return nil, errors.New("connectionConfig was nil, expected *mgmtv1alpha1.ConnectionConfig")
}

options := sqlConnectorOptions{
postgresDriver: "pgx",
postgresDriver: sqlmanager_shared.PostgresDriver,
}
for _, opt := range opts {
opt(&options)
if opt != nil {
opt(&options)
}
}

dbconnopts, err := getConnectionOptsFromConnectionConfig(cc)
Expand All @@ -86,7 +99,7 @@ func (rc *SqlOpenConnector) NewDbFromConnectionConfig(cc *mgmtv1alpha1.Connectio
return nil, fmt.Errorf("unable to upsert client tls files: %w", err)
}
}
connDetails, err := dbconnectconfig.NewFromPostgresConnection(config, connectionTimeout, logger)
connDetails, err := dbconnectconfig.NewFromPostgresConnection(config, options.connectionTimeoutSeconds, logger)
if err != nil {
return nil, err
}
Expand All @@ -107,7 +120,7 @@ func (rc *SqlOpenConnector) NewDbFromConnectionConfig(cc *mgmtv1alpha1.Connectio
return newStdlibContainer(options.postgresDriver, dsn, dbconnopts), nil
}
case *mgmtv1alpha1.ConnectionConfig_MysqlConfig:
connDetails, err := dbconnectconfig.NewFromMysqlConnection(config, connectionTimeout, logger, options.mysqlDisableParseTime)
connDetails, err := dbconnectconfig.NewFromMysqlConnection(config, options.connectionTimeoutSeconds, logger, options.mysqlDisableParseTime)
if err != nil {
return nil, err
}
Expand All @@ -127,7 +140,7 @@ func (rc *SqlOpenConnector) NewDbFromConnectionConfig(cc *mgmtv1alpha1.Connectio
}
return newStdlibContainer("mysql", dsn, dbconnopts), nil
case *mgmtv1alpha1.ConnectionConfig_MssqlConfig:
connDetails, err := dbconnectconfig.NewFromMssqlConnection(config, connectionTimeout)
connDetails, err := dbconnectconfig.NewFromMssqlConnection(config, options.connectionTimeoutSeconds)
if err != nil {
return nil, err
}
Expand Down
Loading
Loading