Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix postgres statements case sensitivity #3007

Merged
merged 8 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions backend/gen/go/db/dbschemas/postgresql/system.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions backend/pkg/dbschemas/sql/postgresql/queries/system.sql
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ domain_defs AS (
rct.schema_name,
rct.type_name,
rct.type,
'CREATE DOMAIN ' || rct.schema_name || '.' || rct.type_name || ' AS ' ||
'CREATE DOMAIN ' || quote_ident(rct.schema_name) || '.' || quote_ident(rct.type_name) || ' AS ' ||
pg_catalog.format_type(t.typbasetype, t.typtypmod) ||
CASE
WHEN t.typnotnull THEN ' NOT NULL' ELSE ''
Expand All @@ -489,7 +489,7 @@ enum_defs AS (
rct.schema_name,
rct.type_name,
rct.type,
'CREATE TYPE ' || rct.schema_name || '.' || rct.type_name || ' AS ENUM (' ||
'CREATE TYPE ' || quote_ident(rct.schema_name) || '.' || quote_ident(rct.type_name) || ' AS ENUM (' ||
string_agg('''' || e.enumlabel || '''', ', ') || ');' AS definition
FROM
relevant_custom_types rct
Expand All @@ -507,7 +507,7 @@ composite_defs AS (
rct.schema_name,
rct.type_name,
rct.type,
'CREATE TYPE ' || rct.schema_name || '.' || rct.type_name || ' AS (' ||
'CREATE TYPE ' || quote_ident(rct.schema_name) || '.' || quote_ident(rct.type_name) || ' AS (' ||
string_agg(a.attname || ' ' || pg_catalog.format_type(a.atttypid, a.atttypmod), ', ') || ');' AS definition
FROM
relevant_custom_types rct
Expand Down Expand Up @@ -655,7 +655,7 @@ SELECT
cws.sequence_schema_name,
cws.sequence_name,
(
'CREATE SEQUENCE ' || cws.sequence_schema_name || '.' || cws.sequence_name ||
'CREATE SEQUENCE ' || quote_ident(cws.sequence_schema_name) || '.' || quote_ident(cws.sequence_name) ||
' START WITH ' || seqs.start_value ||
' INCREMENT BY ' || seqs.increment_by ||
' MINVALUE ' || seqs.min_value ||
Expand Down
10 changes: 5 additions & 5 deletions backend/pkg/sqlmanager/postgres/postgres-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,12 +567,12 @@ BEGIN
SELECT 1
FROM pg_constraint
WHERE conname = '%s'
AND connamespace = '%s'::regnamespace
AND connamespace = (SELECT oid FROM pg_namespace WHERE nspname = '%s')
AND conrelid = (
SELECT oid
FROM pg_class
WHERE relname = '%s'
AND relnamespace = '%s'::regnamespace
AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = '%s')
)
) THEN
%s
Expand Down Expand Up @@ -691,7 +691,7 @@ func buildAlterStatementByConstraint(
return "", errors.New("unable to build alter statement as constraint is nil")
}
return fmt.Sprintf(
"ALTER TABLE %q.%q ADD CONSTRAINT %s %s;",
"ALTER TABLE %q.%q ADD CONSTRAINT %q %s;",
constraint.SchemaName, constraint.TableName, constraint.ConstraintName, constraint.ConstraintDefinition,
), nil
}
Expand Down Expand Up @@ -919,7 +919,7 @@ func EscapePgColumn(col string) string {
func BuildPgIdentityColumnResetCurrentSql(
schema, table, column string,
) string {
return fmt.Sprintf("SELECT setval(pg_get_serial_sequence('%s.%s', '%s'), COALESCE((SELECT MAX(%q) FROM %q.%q), 1));", schema, table, column, column, schema, table)
return fmt.Sprintf("SELECT setval(pg_get_serial_sequence('%q.%q', '%s'), COALESCE((SELECT MAX(%q) FROM %q.%q), 1));", schema, table, column, column, schema, table)
}

func BuildPgInsertIdentityAlwaysSql(
Expand All @@ -930,7 +930,7 @@ func BuildPgInsertIdentityAlwaysSql(
}

func BuildPgResetSequenceSql(sequenceName string) string {
return fmt.Sprintf("ALTER SEQUENCE %s RESTART;", sequenceName)
return fmt.Sprintf("ALTER SEQUENCE %q RESTART;", sequenceName)
}

func GetPostgresColumnOverrideAndResetProperties(columnInfo *sqlmanager_shared.DatabaseSchemaRow) (needsOverride, needsReset bool) {
Expand Down
8 changes: 7 additions & 1 deletion worker/internal/cmds/worker/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
cloudlicense "github.com/nucleuscloud/neosync/internal/ee/cloud-license"
"github.com/nucleuscloud/neosync/internal/ee/license"
neosyncotel "github.com/nucleuscloud/neosync/internal/otel"
neosync_redis "github.com/nucleuscloud/neosync/worker/internal/redis"
accountstatus_activity "github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/activities/account-status"
genbenthosconfigs_activity "github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/activities/gen-benthos-configs"
jobhooks_by_timing_activity "github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/activities/jobhooks-by-timing"
Expand Down Expand Up @@ -285,6 +286,10 @@ func serve(ctx context.Context) error {
sqlmanager := sql_manager.NewSqlManager(sql_manager.WithConnectionManager(sqlconnmanager))

redisconfig := shared.GetRedisConfig()
redisclient, err := neosync_redis.GetRedisClient(redisconfig)
if err != nil {
return err
}

genbenthosActivity := genbenthosconfigs_activity.New(
jobclient,
Expand All @@ -308,12 +313,13 @@ func serve(ctx context.Context) error {
accountStatusActivity := accountstatus_activity.New(userclient)
runPostTableSyncActivity := posttablesync_activity.New(jobclient, sqlmanager, connclient)
jobhookByTimingActivity := jobhooks_by_timing_activity.New(jobclient, connclient, sqlmanager, cascadelicense)
redisCleanUpActivity := syncrediscleanup_activity.New(redisclient)

w.RegisterWorkflow(datasync_workflow.Workflow)
w.RegisterActivity(syncActivity.Sync)
w.RegisterActivity(retrieveActivityOpts.RetrieveActivityOptions)
w.RegisterActivity(runSqlInitTableStatements.RunSqlInitTableStatements)
w.RegisterActivity(syncrediscleanup_activity.DeleteRedisHash)
w.RegisterActivity(redisCleanUpActivity.DeleteRedisHash)
w.RegisterActivity(genbenthosActivity.GenerateBenthosConfigs)
w.RegisterActivity(accountStatusActivity.CheckAccountStatus)
w.RegisterActivity(runPostTableSyncActivity.RunPostTableSync)
Expand Down
5 changes: 2 additions & 3 deletions worker/internal/redis/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@ import (
"github.com/redis/go-redis/v9"
)

func GetRedisClient() (redis.UniversalClient, error) {
redisConfig := shared.GetRedisConfig()
func GetRedisClient(redisConfig *shared.RedisConfig) (redis.UniversalClient, error) {
if redisConfig == nil {
return nil, fmt.Errorf("missing redis config. this operation requires redis.")
return nil, nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this intentional?

}

var tlsConf *tls.Config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1"
"github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1/mgmtv1alpha1connect"
"github.com/nucleuscloud/neosync/backend/pkg/sqlmanager"
sqlmanager_shared "github.com/nucleuscloud/neosync/backend/pkg/sqlmanager/shared"
connectionmanager "github.com/nucleuscloud/neosync/internal/connection-manager"
temporallogger "github.com/nucleuscloud/neosync/worker/internal/temporal-logger"
"github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/activities/shared"
Expand Down Expand Up @@ -42,6 +41,17 @@ type RunPostTableSyncRequest struct {
AccountId string
}
type RunPostTableSyncResponse struct {
Errors []*PostTableSyncError
}

type PostTableSyncError struct {
ConnectionId string
Errors []*StatementError
}

type StatementError struct {
Statement string
Error string
}

func (a *Activity) RunPostTableSync(
Expand Down Expand Up @@ -102,6 +112,7 @@ func (a *Activity) RunPostTableSync(
}
}()

errors := []*PostTableSyncError{}
for destConnectionId, destCfg := range config.DestinationConfigs {
slogger.Debug(fmt.Sprintf("found %d post table sync statements", len(destCfg.Statements)), "destinationConnectionId", destConnectionId)
if len(destCfg.Statements) == 0 {
Expand All @@ -111,6 +122,9 @@ func (a *Activity) RunPostTableSync(
if err != nil {
return nil, fmt.Errorf("unable to get destination connection (%s) by id: %w", destConnectionId, err)
}
execErrors := &PostTableSyncError{
ConnectionId: destConnectionId,
}
switch destinationConnection.GetConnectionConfig().GetConfig().(type) {
case *mgmtv1alpha1.ConnectionConfig_PgConfig, *mgmtv1alpha1.ConnectionConfig_MysqlConfig, *mgmtv1alpha1.ConnectionConfig_MssqlConfig:
destDb, err := a.sqlmanagerclient.NewSqlConnection(ctx, session, destinationConnection, slogger)
Expand All @@ -119,17 +133,27 @@ func (a *Activity) RunPostTableSync(
continue
}
destconns = append(destconns, destDb)
err = destDb.Db().BatchExec(ctx, 5, destCfg.Statements, &sqlmanager_shared.BatchExecOpts{})
if err != nil {
slogger.Error("unable to exec destination statement", "connectionId", destConnectionId, "error", err.Error())
continue
for _, stmt := range destCfg.Statements {
err = destDb.Db().Exec(ctx, stmt)
if err != nil {
slogger.Error("unable to exec destination statement", "connectionId", destConnectionId, "error", err.Error())
execErrors.Errors = append(execErrors.Errors, &StatementError{
Statement: stmt,
Error: err.Error(),
})
}
}
default:
slogger.Warn("unsupported destination type", "connectionId", destConnectionId)
}
if len(execErrors.Errors) > 0 {
errors = append(errors, execErrors)
}
}

return &RunPostTableSyncResponse{}, nil
return &RunPostTableSyncResponse{
Errors: errors,
}, nil
}

func runContextNotFound(err error) bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,24 @@ import (
"fmt"
"time"

neosync_redis "github.com/nucleuscloud/neosync/worker/internal/redis"
temporallogger "github.com/nucleuscloud/neosync/worker/internal/temporal-logger"
redis "github.com/redis/go-redis/v9"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/log"
)

type Activity struct {
redisclient redis.UniversalClient
}

func New(
redisclient redis.UniversalClient,
) *Activity {
return &Activity{
redisclient: redisclient,
}
}

type DeleteRedisHashRequest struct {
JobId string
HashKey string
Expand All @@ -20,7 +31,7 @@ type DeleteRedisHashRequest struct {
type DeleteRedisHashResponse struct {
}

func DeleteRedisHash(
func (a *Activity) DeleteRedisHash(
ctx context.Context,
req *DeleteRedisHashRequest,
) (*DeleteRedisHashResponse, error) {
Expand Down Expand Up @@ -50,17 +61,16 @@ func DeleteRedisHash(
"RedisHashKey", req.HashKey,
)

// todo: this should be factored out of here and live on the activity itself
redisClient, err := neosync_redis.GetRedisClient()
if err != nil {
return nil, err
if a.redisclient == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay so I'm guessing the way this works is that you are providing an empty redis client but the activity is never invoked via the workflow unless we detect redis clients, yes?

return nil, fmt.Errorf("missing redis client. this operation requires redis.")
}
slogger.Debug("redis client created")
slogger.Debug("redis client provided")

err = deleteRedisHashByKey(ctx, redisClient, req.HashKey)
err := deleteRedisHashByKey(ctx, a.redisclient, req.HashKey)
if err != nil {
return nil, err
}
slogger.Debug("deleted redis key")

return &DeleteRedisHashResponse{}, nil
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
CREATE SCHEMA IF NOT EXISTS alltypes;
CREATE SCHEMA IF NOT EXISTS "CaPiTaL";
Original file line number Diff line number Diff line change
Expand Up @@ -346,3 +346,31 @@ INSERT INTO alltypes.json_data (data) VALUES (
}
}'
);

CREATE SCHEMA IF NOT EXISTS "CaPiTaL";
CREATE TABLE IF NOT EXISTS "CaPiTaL"."BadName" (
"ID" BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
"NAME" text UNIQUE
);

INSERT INTO "CaPiTaL"."BadName" ("NAME")
VALUES
('Xk7pQ9nM3v'),
('Rt5wLjH2yB'),
('Zc8fAe4dN6'),
('Ym9gKu3sW5'),
('Vb4nPx7tJ2');

CREATE TABLE "CaPiTaL"."Bad Name 123!@#" (
"ID" SERIAL PRIMARY KEY,
"NAME" text REFERENCES "CaPiTaL"."BadName" ("NAME")
);


INSERT INTO "CaPiTaL"."Bad Name 123!@#" ("NAME")
VALUES
('Xk7pQ9nM3v'),
('Rt5wLjH2yB'),
('Zc8fAe4dN6'),
('Ym9gKu3sW5'),
('Vb4nPx7tJ2');
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
DROP SCHEMA IF EXISTS alltypes CASCADE;
DROP SCHEMA IF EXISTS "CaPiTaL" CASCADE;
3 changes: 2 additions & 1 deletion worker/pkg/workflows/datasync/workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ func runRedisCleanUpActivity(
}
logger.Debug("executing redis clean up activity")
var resp *syncrediscleanup_activity.DeleteRedisHashResponse
var redisCleanUpActivity *syncrediscleanup_activity.Activity
err := workflow.ExecuteActivity(
workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 2 * time.Minute,
Expand All @@ -420,7 +421,7 @@ func runRedisCleanUpActivity(
},
HeartbeatTimeout: 1 * time.Minute,
}),
syncrediscleanup_activity.DeleteRedisHash,
redisCleanUpActivity.DeleteRedisHash,
&syncrediscleanup_activity.DeleteRedisHashRequest{
JobId: jobId,
HashKey: cfg.Key,
Expand Down
Loading
Loading