Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use left joins in subset queries for nullable foreign keys #3034

Merged
merged 6 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions backend/pkg/table-dependency/table-dependency.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@ type DependsOn struct {
}

type ForeignKey struct {
Columns []string
// ReferenceSchema string need to split out schema and table
Columns []string
NotNullable []bool
// ReferenceSchema string TODO: need to split out schema and table
ReferenceTable string
ReferenceColumns []string
}

type RunConfig struct {
table string // schema.table
table string // schema.table TODO: should use sqlmanager_shared.SchemaTable
selectColumns []string
insertColumns []string
dependsOn []*DependsOn // this should be a list of config names like "table.insert", rename to dependsOnConfigs
Expand Down Expand Up @@ -210,6 +211,7 @@ func GetRunConfigs(
// by checking insert columns, we can skip foreign keys that are not needed for the insert
if slices.Contains(config.insertColumns, col) {
foreignKey.Columns = append(foreignKey.Columns, col)
foreignKey.NotNullable = append(foreignKey.NotNullable, fk.NotNullable[idx])
foreignKey.ReferenceColumns = append(foreignKey.ReferenceColumns, fk.ForeignKey.Columns[idx])
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
bb_internal "github.com/nucleuscloud/neosync/internal/benthos/benthos-builder/internal"
"github.com/nucleuscloud/neosync/internal/gotypeutil"
"github.com/nucleuscloud/neosync/internal/testutil"
querybuilder "github.com/nucleuscloud/neosync/worker/pkg/query-builder2"
"github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/activities/shared"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -88,8 +89,8 @@ func Test_ProcessorConfigEmpty(t *testing.T) {
"name": &mgmtv1alpha1.JobMappingTransformer{},
},
}
queryMap := map[string]map[tabledependency.RunType]string{
"public.users": {tabledependency.RunTypeInsert: ""},
queryMap := map[string]map[tabledependency.RunType]*querybuilder.SelectQuery{
"public.users": {tabledependency.RunTypeInsert: &querybuilder.SelectQuery{Query: ""}},
}
runconfigs := []*tabledependency.RunConfig{
tabledependency.NewRunConfig("public.users", tabledependency.RunTypeInsert, []string{"id"}, nil, []string{"id", "name"}, []string{"id", "name"}, []*tabledependency.DependsOn{}, nil, false),
Expand Down Expand Up @@ -185,8 +186,8 @@ func Test_ProcessorConfigEmptyJavascript(t *testing.T) {
tabledependency.NewRunConfig("public.users", tabledependency.RunTypeInsert, []string{"id"}, nil, []string{"id", "name"}, []string{"id", "name"}, []*tabledependency.DependsOn{}, nil, false),
}

queryMap := map[string]map[tabledependency.RunType]string{
"public.users": {tabledependency.RunTypeInsert: ""},
queryMap := map[string]map[tabledependency.RunType]*querybuilder.SelectQuery{
"public.users": {tabledependency.RunTypeInsert: &querybuilder.SelectQuery{Query: ""}},
}
logger := testutil.GetTestLogger(t)
connectionId := uuid.NewString()
Expand Down
38 changes: 27 additions & 11 deletions internal/benthos/benthos-builder/builders/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
bb_shared "github.com/nucleuscloud/neosync/internal/benthos/benthos-builder/shared"
connectionmanager "github.com/nucleuscloud/neosync/internal/connection-manager"
neosync_benthos "github.com/nucleuscloud/neosync/worker/pkg/benthos"
querybuilder "github.com/nucleuscloud/neosync/worker/pkg/query-builder2"
"github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/activities/shared"
)

Expand All @@ -35,7 +36,8 @@ type sqlSyncBuilder struct {
colTransformerMap map[string]map[string]*mgmtv1alpha1.JobMappingTransformer // schema.table -> column -> transformer
sqlSourceSchemaColumnInfoMap map[string]map[string]*sqlmanager_shared.DatabaseSchemaRow // schema.table -> column -> column info struct
// merged source and destination schema. with preference given to destination schema
mergedSchemaColumnMap map[string]map[string]*sqlmanager_shared.DatabaseSchemaRow // schema.table -> column -> column info struct
mergedSchemaColumnMap map[string]map[string]*sqlmanager_shared.DatabaseSchemaRow // schema.table -> column -> column info struct
isNotForeignKeySafeSubsetMap map[string]map[tabledependency.RunType]bool // schema.table -> true if the query could return rows that violate foreign key constraints
}

type SqlSyncOption func(*SqlSyncOptions)
Expand Down Expand Up @@ -63,12 +65,13 @@ func NewSqlSyncBuilder(
opt(options)
}
return &sqlSyncBuilder{
transformerclient: transformerclient,
sqlmanagerclient: sqlmanagerclient,
redisConfig: redisConfig,
driver: databaseDriver,
selectQueryBuilder: selectQueryBuilder,
options: options,
transformerclient: transformerclient,
sqlmanagerclient: sqlmanagerclient,
redisConfig: redisConfig,
driver: databaseDriver,
selectQueryBuilder: selectQueryBuilder,
options: options,
isNotForeignKeySafeSubsetMap: map[string]map[tabledependency.RunType]bool{},
}
}

Expand Down Expand Up @@ -140,6 +143,7 @@ func (b *sqlSyncBuilder) BuildSourceConfigs(ctx context.Context, params *bb_inte
if err != nil {
return nil, err
}

primaryKeyToForeignKeysMap := getPrimaryKeyDependencyMap(filteredForeignKeysMap)
b.primaryKeyToForeignKeysMap = primaryKeyToForeignKeysMap

Expand All @@ -148,6 +152,15 @@ func (b *sqlSyncBuilder) BuildSourceConfigs(ctx context.Context, params *bb_inte
return nil, fmt.Errorf("unable to build select queries: %w", err)
}

// build map of table to runType to isNotForeignKeySafeSubset
// used in destination config to determine if foreign key violations should be skipped
for table, querymap := range tableRunTypeQueryMap {
b.isNotForeignKeySafeSubsetMap[table] = make(map[tabledependency.RunType]bool)
for runtype, q := range querymap {
b.isNotForeignKeySafeSubsetMap[table][runtype] = q.IsNotForeignKeySafeSubset
}
}

configs, err := buildBenthosSqlSourceConfigResponses(logger, ctx, b.transformerclient, groupedTableMapping, runConfigs, sourceConnection.Id, tableRunTypeQueryMap, groupedColumnInfo, filteredForeignKeysMap, colTransformerMap, job.Id, params.WorkflowId, b.redisConfig, primaryKeyToForeignKeysMap)
if err != nil {
return nil, fmt.Errorf("unable to build benthos sql source config responses: %w", err)
Expand All @@ -172,7 +185,7 @@ func buildBenthosSqlSourceConfigResponses(
groupedTableMapping map[string]*tableMapping,
runconfigs []*tabledependency.RunConfig,
dsnConnectionId string,
tableRunTypeQueryMap map[string]map[tabledependency.RunType]string,
tableRunTypeQueryMap map[string]map[tabledependency.RunType]*querybuilder.SelectQuery,
groupedColumnInfo map[string]map[string]*sqlmanager_shared.DatabaseSchemaRow,
tableDependencies map[string][]*sqlmanager_shared.ForeignConstraint,
colTransformerMap map[string]map[string]*mgmtv1alpha1.JobMappingTransformer,
Expand Down Expand Up @@ -201,7 +214,7 @@ func buildBenthosSqlSourceConfigResponses(
PooledSqlRaw: &neosync_benthos.InputPooledSqlRaw{
ConnectionId: dsnConnectionId,

Query: query,
Query: query.Query,
},
},
},
Expand Down Expand Up @@ -322,6 +335,9 @@ func (b *sqlSyncBuilder) BuildDestinationConfig(ctx context.Context, params *bb_
return nil, fmt.Errorf("unable to parse destination options: %w", err)
}

// skip foreign key violations if the query could return rows that violate foreign key constraints
skipForeignKeyViolations := destOpts.SkipForeignKeyViolations || b.isNotForeignKeySafeSubsetMap[tableKey][benthosConfig.RunType]

config.BenthosDsns = append(config.BenthosDsns, &bb_shared.BenthosDsn{ConnectionId: params.DestConnection.Id})
if benthosConfig.RunType == tabledependency.RunTypeUpdate {
args := benthosConfig.Columns
Expand All @@ -335,7 +351,7 @@ func (b *sqlSyncBuilder) BuildDestinationConfig(ctx context.Context, params *bb_
Schema: benthosConfig.TableSchema,
Table: benthosConfig.TableName,
Columns: benthosConfig.Columns,
SkipForeignKeyViolations: destOpts.SkipForeignKeyViolations,
SkipForeignKeyViolations: skipForeignKeyViolations,
MaxInFlight: int(destOpts.MaxInFlight),
WhereColumns: benthosConfig.PrimaryKeys,
ArgsMapping: buildPlainInsertArgs(args),
Expand Down Expand Up @@ -419,7 +435,7 @@ func (b *sqlSyncBuilder) BuildDestinationConfig(ctx context.Context, params *bb_
ColumnsDataTypes: columnTypes,
ColumnDefaultProperties: columnDefaultProperties,
OnConflictDoNothing: destOpts.OnConflictDoNothing,
SkipForeignKeyViolations: destOpts.SkipForeignKeyViolations,
SkipForeignKeyViolations: skipForeignKeyViolations,
RawInsertMode: b.options.rawInsertMode,
TruncateOnRetry: destOpts.Truncate,
ArgsMapping: buildPlainInsertArgs(benthosConfig.Columns),
Expand Down
3 changes: 2 additions & 1 deletion internal/benthos/benthos-builder/shared/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

sqlmanager_shared "github.com/nucleuscloud/neosync/backend/pkg/sqlmanager/shared"
tabledependency "github.com/nucleuscloud/neosync/backend/pkg/table-dependency"
querybuilder "github.com/nucleuscloud/neosync/worker/pkg/query-builder2"
)

// Holds the environment variable name and the connection id that should replace it at runtime when the Sync activity is launched
Expand All @@ -27,7 +28,7 @@ type SelectQueryMapBuilder interface {
runConfigs []*tabledependency.RunConfig,
subsetByForeignKeyConstraints bool,
groupedColumnInfo map[string]map[string]*sqlmanager_shared.DatabaseSchemaRow,
) (map[string]map[tabledependency.RunType]string, error)
) (map[string]map[tabledependency.RunType]*querybuilder.SelectQuery, error)
}

func WithEnvInterpolation(input string) string {
Expand Down
Loading
Loading