-
Notifications
You must be signed in to change notification settings - Fork 131
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Mysql and Mssql processors #3023
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Skipped Deployment
|
|
||
func getSqlBatchProcessors(driver string, columns []string, columnDataTypes map[string]string, columnDefaultProperties map[string]*neosync_benthos.ColumnDefaultProperties) (*neosync_benthos.BatchProcessor, error) { | ||
switch driver { | ||
case sqlmanager_shared.PostgresDriver, "postgres": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a new constant in the sqlmanager_shared for the postgres driver called DefaultPostgresDriver
func getSqlBatchProcessors(driver string, columns []string, columnDataTypes map[string]string, columnDefaultProperties map[string]*neosync_benthos.ColumnDefaultProperties) (*neosync_benthos.BatchProcessor, error) { | ||
switch driver { | ||
case sqlmanager_shared.PostgresDriver, "postgres": | ||
return &neosync_benthos.BatchProcessor{NeosyncToPgx: &neosync_benthos.NeosyncToPgxConfig{Columns: columns, ColumnDataTypes: columnDataTypes, ColumnDefaultProperties: columnDefaultProperties}}, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is so clean.
if err != nil { | ||
return err | ||
} | ||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, is this intending to return the outer error? if so, the err that is used for RetryInsertRowByRow should be renamed.
"github.com/warpstreamlabs/bento/public/service" | ||
) | ||
|
||
func neosyncToMssqlProcessorConfig() *service.ConfigSpec { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be nice to see tests for these things via the benthos-way of testing them where they are init'd and run through that, more like an integration test.
See the out output_sql_insert_test.go
for a basic example of what I'm referring to..
creates mysql and mssql processors. untangled messy sql output and insert query builder code
added default value postgres integration test