Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mysql and Mssql processors #3023

Merged
merged 20 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 3 additions & 13 deletions internal/benthos/benthos-builder/benthos-builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ func (b *BuilderProvider) registerStandardBuilders(
connectionclient mgmtv1alpha1connect.ConnectionServiceClient,
redisConfig *shared.RedisConfig,
selectQueryBuilder bb_shared.SelectQueryMapBuilder,
rawSqlInsertMode bool,
) error {
sourceConnectionType := bb_internal.GetConnectionType(sourceConnection)
jobType := bb_internal.GetJobType(job)
Expand All @@ -111,22 +110,17 @@ func (b *BuilderProvider) registerStandardBuilders(
connectionTypes = append(connectionTypes, bb_internal.GetConnectionType(dest))
}

sqlSyncOptions := []bb_conns.SqlSyncOption{}
if rawSqlInsertMode {
sqlSyncOptions = append(sqlSyncOptions, bb_conns.WithRawInsertMode())
}

if jobType == bb_internal.JobTypeSync {
for _, connectionType := range connectionTypes {
switch connectionType {
case bb_internal.ConnectionTypePostgres:
sqlbuilder := bb_conns.NewSqlSyncBuilder(transformerclient, sqlmanagerclient, redisConfig, sqlmanager_shared.PostgresDriver, selectQueryBuilder, sqlSyncOptions...)
sqlbuilder := bb_conns.NewSqlSyncBuilder(transformerclient, sqlmanagerclient, redisConfig, sqlmanager_shared.PostgresDriver, selectQueryBuilder)
b.Register(bb_internal.JobTypeSync, connectionType, sqlbuilder)
case bb_internal.ConnectionTypeMysql:
sqlbuilder := bb_conns.NewSqlSyncBuilder(transformerclient, sqlmanagerclient, redisConfig, sqlmanager_shared.MysqlDriver, selectQueryBuilder, sqlSyncOptions...)
sqlbuilder := bb_conns.NewSqlSyncBuilder(transformerclient, sqlmanagerclient, redisConfig, sqlmanager_shared.MysqlDriver, selectQueryBuilder)
b.Register(bb_internal.JobTypeSync, connectionType, sqlbuilder)
case bb_internal.ConnectionTypeMssql:
sqlbuilder := bb_conns.NewSqlSyncBuilder(transformerclient, sqlmanagerclient, redisConfig, sqlmanager_shared.MssqlDriver, selectQueryBuilder, sqlSyncOptions...)
sqlbuilder := bb_conns.NewSqlSyncBuilder(transformerclient, sqlmanagerclient, redisConfig, sqlmanager_shared.MssqlDriver, selectQueryBuilder)
b.Register(bb_internal.JobTypeSync, connectionType, sqlbuilder)
case bb_internal.ConnectionTypeAwsS3:
b.Register(bb_internal.JobTypeSync, bb_internal.ConnectionTypeAwsS3, bb_conns.NewAwsS3SyncBuilder())
Expand Down Expand Up @@ -217,7 +211,6 @@ type WorkerBenthosConfig struct {
func NewWorkerBenthosConfigManager(
config *WorkerBenthosConfig,
) (*BenthosConfigManager, error) {
rawInsertMode := false
provider := NewBuilderProvider(config.Logger)
err := provider.registerStandardBuilders(
config.Job,
Expand All @@ -228,7 +221,6 @@ func NewWorkerBenthosConfigManager(
config.Connectionclient,
config.RedisConfig,
config.SelectQueryBuilder,
rawInsertMode,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -269,7 +261,6 @@ type CliBenthosConfig struct {
func NewCliBenthosConfigManager(
config *CliBenthosConfig,
) (*BenthosConfigManager, error) {
rawInsertMode := true
destinationProvider := NewBuilderProvider(config.Logger)
err := destinationProvider.registerStandardBuilders(
config.Job,
Expand All @@ -280,7 +271,6 @@ func NewCliBenthosConfigManager(
nil,
config.RedisConfig,
nil,
rawInsertMode,
)
if err != nil {
return nil, err
Expand Down
27 changes: 7 additions & 20 deletions internal/benthos/benthos-builder/builders/aws-s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,6 @@ func (b *awsS3SyncBuilder) BuildDestinationConfig(ctx context.Context, params *b
storageClass = convertToS3StorageClass(destinationOpts.GetStorageClass()).String()
}

processors := []*neosync_benthos.BatchProcessor{}
if isPooledSqlRawConfigured(benthosConfig.Config) {
processors = append(processors, &neosync_benthos.BatchProcessor{SqlToJson: &neosync_benthos.SqlToJsonConfig{}})
}

standardProcessors := []*neosync_benthos.BatchProcessor{
{Archive: &neosync_benthos.ArchiveProcessor{Format: "lines"}},
{Compress: &neosync_benthos.CompressProcessor{Algorithm: "gzip"}},
}
processors = append(processors, standardProcessors...)

config.Outputs = append(config.Outputs, neosync_benthos.Outputs{
Fallback: []neosync_benthos.Outputs{
{
Expand All @@ -97,9 +86,13 @@ func (b *awsS3SyncBuilder) BuildDestinationConfig(ctx context.Context, params *b
Path: strings.Join(s3pathpieces, "/"),
ContentType: "application/gzip",
Batching: &neosync_benthos.Batching{
Count: batchingConfig.BatchCount,
Period: batchingConfig.BatchPeriod,
Processors: processors,
Count: batchingConfig.BatchCount,
Period: batchingConfig.BatchPeriod,
Processors: []*neosync_benthos.BatchProcessor{
{NeosyncToJson: &neosync_benthos.NeosyncToJsonConfig{}},
{Archive: &neosync_benthos.ArchiveProcessor{Format: "lines"}},
{Compress: &neosync_benthos.CompressProcessor{Algorithm: "gzip"}},
},
},
Credentials: buildBenthosS3Credentials(connAwsS3Config.Credentials),
Region: connAwsS3Config.GetRegion(),
Expand All @@ -120,12 +113,6 @@ func (b *awsS3SyncBuilder) BuildDestinationConfig(ctx context.Context, params *b
return config, nil
}

func isPooledSqlRawConfigured(cfg *neosync_benthos.BenthosConfig) bool {
return cfg != nil &&
cfg.StreamConfig.Input != nil &&
cfg.StreamConfig.Input.Inputs.PooledSqlRaw != nil
}

type S3StorageClass int

const (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,12 +511,6 @@ func Test_convertUserDefinedFunctionConfig(t *testing.T) {
require.Equal(t, resp, expected)
}

func Test_buildPlainInsertArgs(t *testing.T) {
require.Empty(t, buildPlainInsertArgs(nil))
require.Empty(t, buildPlainInsertArgs([]string{}))
require.Equal(t, buildPlainInsertArgs([]string{"foo", "bar", "baz"}), `root = [this."foo", this."bar", this."baz"]`)
}

func Test_buildPlainColumns(t *testing.T) {
require.Empty(t, buildPlainColumns(nil))
require.Empty(t, buildPlainColumns([]*mgmtv1alpha1.JobMapping{}))
Expand Down
33 changes: 6 additions & 27 deletions internal/benthos/benthos-builder/builders/generate-ai.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ import (
)

type generateAIBuilder struct {
transformerclient mgmtv1alpha1connect.TransformersServiceClient
sqlmanagerclient sqlmanager.SqlManagerClient
connectionclient mgmtv1alpha1connect.ConnectionServiceClient
aiGroupedTableCols map[string][]string
transformerclient mgmtv1alpha1connect.TransformersServiceClient
sqlmanagerclient sqlmanager.SqlManagerClient
connectionclient mgmtv1alpha1connect.ConnectionServiceClient
}

func NewGenerateAIBuilder(
Expand All @@ -32,10 +31,9 @@ func NewGenerateAIBuilder(
driver string,
) bb_internal.BenthosBuilder {
return &generateAIBuilder{
transformerclient: transformerclient,
sqlmanagerclient: sqlmanagerclient,
connectionclient: connectionclient,
aiGroupedTableCols: map[string][]string{},
transformerclient: transformerclient,
sqlmanagerclient: sqlmanagerclient,
connectionclient: connectionclient,
}
}

Expand Down Expand Up @@ -123,16 +121,6 @@ func (b *generateAIBuilder) BuildSourceConfigs(ctx context.Context, params *bb_i
userBatchSize,
)

// builds a map of table key to columns for AI Generated schemas as they are calculated lazily instead of via job mappings
aiGroupedTableCols := map[string][]string{}
for _, agm := range mappings {
key := neosync_benthos.BuildBenthosTable(agm.Schema, agm.Table)
for _, col := range agm.Columns {
aiGroupedTableCols[key] = append(aiGroupedTableCols[key], col.Column)
}
}
b.aiGroupedTableCols = aiGroupedTableCols

return sourceResponses, nil
}

Expand Down Expand Up @@ -217,12 +205,6 @@ func (b *generateAIBuilder) BuildDestinationConfig(ctx context.Context, params *
if err != nil {
return nil, fmt.Errorf("unable to parse destination options: %w", err)
}
tableKey := neosync_benthos.BuildBenthosTable(benthosConfig.TableSchema, benthosConfig.TableName)

cols, ok := b.aiGroupedTableCols[tableKey]
if !ok {
return nil, fmt.Errorf("unable to find table columns for key (%s) when building destination connection", tableKey)
}

processorConfigs := []neosync_benthos.ProcessorConfig{}
for _, pc := range benthosConfig.Processors {
Expand All @@ -244,12 +226,9 @@ func (b *generateAIBuilder) BuildDestinationConfig(ctx context.Context, params *
ConnectionId: params.DestConnection.GetId(),
Schema: benthosConfig.TableSchema,
Table: benthosConfig.TableName,
Columns: cols,
OnConflictDoNothing: destOpts.OnConflictDoNothing,
TruncateOnRetry: destOpts.Truncate,

ArgsMapping: buildPlainInsertArgs(cols),

Batching: &neosync_benthos.Batching{
Period: destOpts.BatchPeriod,
Count: destOpts.BatchCount,
Expand Down
24 changes: 14 additions & 10 deletions internal/benthos/benthos-builder/builders/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type generateBuilder struct {
transformerclient mgmtv1alpha1connect.TransformersServiceClient
sqlmanagerclient sqlmanager.SqlManagerClient
connectionclient mgmtv1alpha1connect.ConnectionServiceClient
driver string
}

func NewGenerateBuilder(
Expand Down Expand Up @@ -54,6 +55,7 @@ func (b *generateBuilder) BuildSourceConfigs(ctx context.Context, params *bb_int
return nil, fmt.Errorf("unable to create new sql db: %w", err)
}
defer db.Db().Close()
b.driver = db.Driver()

groupedMappings := groupMappingsByTable(job.Mappings)
groupedTableMapping := getTableMappingsMap(groupedMappings)
Expand Down Expand Up @@ -179,6 +181,11 @@ func (b *generateBuilder) BuildDestinationConfig(ctx context.Context, params *bb
processorConfigs = append(processorConfigs, *pc)
}

sqlProcessor, err := getSqlBatchProcessors(b.driver, benthosConfig.Columns, map[string]string{}, benthosConfig.ColumnDefaultProperties)
if err != nil {
return nil, err
}

config.BenthosDsns = append(config.BenthosDsns, &bb_shared.BenthosDsn{ConnectionId: params.DestConnection.Id})
config.Outputs = append(config.Outputs, neosync_benthos.Outputs{
Fallback: []neosync_benthos.Outputs{
Expand All @@ -193,18 +200,15 @@ func (b *generateBuilder) BuildDestinationConfig(ctx context.Context, params *bb
PooledSqlInsert: &neosync_benthos.PooledSqlInsert{
ConnectionId: params.DestConnection.GetId(),

Schema: benthosConfig.TableSchema,
Table: benthosConfig.TableName,
Columns: benthosConfig.Columns,
ColumnDefaultProperties: benthosConfig.ColumnDefaultProperties,
OnConflictDoNothing: destOpts.OnConflictDoNothing,
TruncateOnRetry: destOpts.Truncate,

ArgsMapping: buildPlainInsertArgs(benthosConfig.Columns),
Schema: benthosConfig.TableSchema,
Table: benthosConfig.TableName,
OnConflictDoNothing: destOpts.OnConflictDoNothing,
TruncateOnRetry: destOpts.Truncate,

Batching: &neosync_benthos.Batching{
Period: destOpts.BatchPeriod,
Count: destOpts.BatchCount,
Period: destOpts.BatchPeriod,
Count: destOpts.BatchCount,
Processors: []*neosync_benthos.BatchProcessor{sqlProcessor},
},
MaxInFlight: int(destOpts.MaxInFlight),
},
Expand Down
34 changes: 23 additions & 11 deletions internal/benthos/benthos-builder/builders/sql-util.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,17 +142,6 @@ func getMapValuesCount[K comparable, V any](m map[K][]V) int {
return count
}

func buildPlainInsertArgs(cols []string) string {
if len(cols) == 0 {
return ""
}
pieces := make([]string, len(cols))
for idx := range cols {
pieces[idx] = fmt.Sprintf("this.%q", cols[idx])
}
return fmt.Sprintf("root = [%s]", strings.Join(pieces, ", "))
}

func buildPlainColumns(mappings []*mgmtv1alpha1.JobMapping) []string {
columns := make([]string, len(mappings))
for idx := range mappings {
Expand Down Expand Up @@ -439,6 +428,7 @@ func getColumnDefaultProperties(
if !ok {
return nil, fmt.Errorf("transformer missing for column: %s", cName)
}

var hasDefaultTransformer bool
if jmTransformer != nil && isDefaultJobMappingTransformer(jmTransformer) {
hasDefaultTransformer = true
Expand Down Expand Up @@ -906,3 +896,25 @@ func cleanPostgresType(dataType string) string {
}
return strings.TrimSpace(dataType[:parenIndex])
}

func shouldOverrideColumnDefault(columnDefaults map[string]*neosync_benthos.ColumnDefaultProperties) bool {
for _, cd := range columnDefaults {
if cd != nil && !cd.HasDefaultTransformer && cd.NeedsOverride {
return true
}
}
return false
}

func getSqlBatchProcessors(driver string, columns []string, columnDataTypes map[string]string, columnDefaultProperties map[string]*neosync_benthos.ColumnDefaultProperties) (*neosync_benthos.BatchProcessor, error) {
switch driver {
case sqlmanager_shared.PostgresDriver:
return &neosync_benthos.BatchProcessor{NeosyncToPgx: &neosync_benthos.NeosyncToPgxConfig{Columns: columns, ColumnDataTypes: columnDataTypes, ColumnDefaultProperties: columnDefaultProperties}}, nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is so clean.

case sqlmanager_shared.MysqlDriver:
return &neosync_benthos.BatchProcessor{NeosyncToMysql: &neosync_benthos.NeosyncToMysqlConfig{Columns: columns, ColumnDataTypes: columnDataTypes, ColumnDefaultProperties: columnDefaultProperties}}, nil
case sqlmanager_shared.MssqlDriver:
return &neosync_benthos.BatchProcessor{NeosyncToMssql: &neosync_benthos.NeosyncToMssqlConfig{Columns: columns, ColumnDataTypes: columnDataTypes, ColumnDefaultProperties: columnDefaultProperties}}, nil
default:
return nil, fmt.Errorf("unsupported driver %q when attempting to get sql batch processors", driver)
}
}
Loading
Loading