diff --git a/backend/gen/go/db/dbschemas/postgresql/system.sql.go b/backend/gen/go/db/dbschemas/postgresql/system.sql.go index 5ff8088dc0..a826b7b950 100644 --- a/backend/gen/go/db/dbschemas/postgresql/system.sql.go +++ b/backend/gen/go/db/dbschemas/postgresql/system.sql.go @@ -143,7 +143,7 @@ SELECT cws.sequence_schema_name, cws.sequence_name, ( - 'CREATE SEQUENCE ' || cws.sequence_schema_name || '.' || cws.sequence_name || + 'CREATE SEQUENCE ' || quote_ident(cws.sequence_schema_name) || '.' || quote_ident(cws.sequence_name) || ' START WITH ' || seqs.start_value || ' INCREMENT BY ' || seqs.increment_by || ' MINVALUE ' || seqs.min_value || @@ -310,7 +310,7 @@ domain_defs AS ( rct.schema_name, rct.type_name, rct.type, - 'CREATE DOMAIN ' || rct.schema_name || '.' || rct.type_name || ' AS ' || + 'CREATE DOMAIN ' || quote_ident(rct.schema_name) || '.' || quote_ident(rct.type_name) || ' AS ' || pg_catalog.format_type(t.typbasetype, t.typtypmod) || CASE WHEN t.typnotnull THEN ' NOT NULL' ELSE '' @@ -330,7 +330,7 @@ enum_defs AS ( rct.schema_name, rct.type_name, rct.type, - 'CREATE TYPE ' || rct.schema_name || '.' || rct.type_name || ' AS ENUM (' || + 'CREATE TYPE ' || quote_ident(rct.schema_name) || '.' || quote_ident(rct.type_name) || ' AS ENUM (' || string_agg('''' || e.enumlabel || '''', ', ') || ');' AS definition FROM relevant_custom_types rct @@ -348,7 +348,7 @@ composite_defs AS ( rct.schema_name, rct.type_name, rct.type, - 'CREATE TYPE ' || rct.schema_name || '.' || rct.type_name || ' AS (' || + 'CREATE TYPE ' || quote_ident(rct.schema_name) || '.' || quote_ident(rct.type_name) || ' AS (' || string_agg(a.attname || ' ' || pg_catalog.format_type(a.atttypid, a.atttypmod), ', ') || ');' AS definition FROM relevant_custom_types rct diff --git a/backend/pkg/dbschemas/sql/postgresql/queries/system.sql b/backend/pkg/dbschemas/sql/postgresql/queries/system.sql index 7b8d1e50f4..33baf4cc50 100644 --- a/backend/pkg/dbschemas/sql/postgresql/queries/system.sql +++ b/backend/pkg/dbschemas/sql/postgresql/queries/system.sql @@ -469,7 +469,7 @@ domain_defs AS ( rct.schema_name, rct.type_name, rct.type, - 'CREATE DOMAIN ' || rct.schema_name || '.' || rct.type_name || ' AS ' || + 'CREATE DOMAIN ' || quote_ident(rct.schema_name) || '.' || quote_ident(rct.type_name) || ' AS ' || pg_catalog.format_type(t.typbasetype, t.typtypmod) || CASE WHEN t.typnotnull THEN ' NOT NULL' ELSE '' @@ -489,7 +489,7 @@ enum_defs AS ( rct.schema_name, rct.type_name, rct.type, - 'CREATE TYPE ' || rct.schema_name || '.' || rct.type_name || ' AS ENUM (' || + 'CREATE TYPE ' || quote_ident(rct.schema_name) || '.' || quote_ident(rct.type_name) || ' AS ENUM (' || string_agg('''' || e.enumlabel || '''', ', ') || ');' AS definition FROM relevant_custom_types rct @@ -507,7 +507,7 @@ composite_defs AS ( rct.schema_name, rct.type_name, rct.type, - 'CREATE TYPE ' || rct.schema_name || '.' || rct.type_name || ' AS (' || + 'CREATE TYPE ' || quote_ident(rct.schema_name) || '.' || quote_ident(rct.type_name) || ' AS (' || string_agg(a.attname || ' ' || pg_catalog.format_type(a.atttypid, a.atttypmod), ', ') || ');' AS definition FROM relevant_custom_types rct @@ -655,7 +655,7 @@ SELECT cws.sequence_schema_name, cws.sequence_name, ( - 'CREATE SEQUENCE ' || cws.sequence_schema_name || '.' || cws.sequence_name || + 'CREATE SEQUENCE ' || quote_ident(cws.sequence_schema_name) || '.' || quote_ident(cws.sequence_name) || ' START WITH ' || seqs.start_value || ' INCREMENT BY ' || seqs.increment_by || ' MINVALUE ' || seqs.min_value || diff --git a/backend/pkg/sqlmanager/postgres/postgres-manager.go b/backend/pkg/sqlmanager/postgres/postgres-manager.go index adf94af042..f1c5d6ea1d 100644 --- a/backend/pkg/sqlmanager/postgres/postgres-manager.go +++ b/backend/pkg/sqlmanager/postgres/postgres-manager.go @@ -567,12 +567,12 @@ BEGIN SELECT 1 FROM pg_constraint WHERE conname = '%s' - AND connamespace = '%s'::regnamespace + AND connamespace = (SELECT oid FROM pg_namespace WHERE nspname = '%s') AND conrelid = ( SELECT oid FROM pg_class WHERE relname = '%s' - AND relnamespace = '%s'::regnamespace + AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = '%s') ) ) THEN %s @@ -691,7 +691,7 @@ func buildAlterStatementByConstraint( return "", errors.New("unable to build alter statement as constraint is nil") } return fmt.Sprintf( - "ALTER TABLE %q.%q ADD CONSTRAINT %s %s;", + "ALTER TABLE %q.%q ADD CONSTRAINT %q %s;", constraint.SchemaName, constraint.TableName, constraint.ConstraintName, constraint.ConstraintDefinition, ), nil } @@ -919,7 +919,7 @@ func EscapePgColumn(col string) string { func BuildPgIdentityColumnResetCurrentSql( schema, table, column string, ) string { - return fmt.Sprintf("SELECT setval(pg_get_serial_sequence('%s.%s', '%s'), COALESCE((SELECT MAX(%q) FROM %q.%q), 1));", schema, table, column, column, schema, table) + return fmt.Sprintf("SELECT setval(pg_get_serial_sequence('%q.%q', '%s'), COALESCE((SELECT MAX(%q) FROM %q.%q), 1));", schema, table, column, column, schema, table) } func BuildPgInsertIdentityAlwaysSql( @@ -930,7 +930,7 @@ func BuildPgInsertIdentityAlwaysSql( } func BuildPgResetSequenceSql(sequenceName string) string { - return fmt.Sprintf("ALTER SEQUENCE %s RESTART;", sequenceName) + return fmt.Sprintf("ALTER SEQUENCE %q RESTART;", sequenceName) } func GetPostgresColumnOverrideAndResetProperties(columnInfo *sqlmanager_shared.DatabaseSchemaRow) (needsOverride, needsReset bool) { diff --git a/worker/internal/cmds/worker/serve/serve.go b/worker/internal/cmds/worker/serve/serve.go index c07411147b..50186de257 100644 --- a/worker/internal/cmds/worker/serve/serve.go +++ b/worker/internal/cmds/worker/serve/serve.go @@ -26,6 +26,7 @@ import ( cloudlicense "github.com/nucleuscloud/neosync/internal/ee/cloud-license" "github.com/nucleuscloud/neosync/internal/ee/license" neosyncotel "github.com/nucleuscloud/neosync/internal/otel" + neosync_redis "github.com/nucleuscloud/neosync/worker/internal/redis" accountstatus_activity "github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/activities/account-status" genbenthosconfigs_activity "github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/activities/gen-benthos-configs" jobhooks_by_timing_activity "github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/activities/jobhooks-by-timing" @@ -285,6 +286,10 @@ func serve(ctx context.Context) error { sqlmanager := sql_manager.NewSqlManager(sql_manager.WithConnectionManager(sqlconnmanager)) redisconfig := shared.GetRedisConfig() + redisclient, err := neosync_redis.GetRedisClient(redisconfig) + if err != nil { + return err + } genbenthosActivity := genbenthosconfigs_activity.New( jobclient, @@ -308,12 +313,13 @@ func serve(ctx context.Context) error { accountStatusActivity := accountstatus_activity.New(userclient) runPostTableSyncActivity := posttablesync_activity.New(jobclient, sqlmanager, connclient) jobhookByTimingActivity := jobhooks_by_timing_activity.New(jobclient, connclient, sqlmanager, cascadelicense) + redisCleanUpActivity := syncrediscleanup_activity.New(redisclient) w.RegisterWorkflow(datasync_workflow.Workflow) w.RegisterActivity(syncActivity.Sync) w.RegisterActivity(retrieveActivityOpts.RetrieveActivityOptions) w.RegisterActivity(runSqlInitTableStatements.RunSqlInitTableStatements) - w.RegisterActivity(syncrediscleanup_activity.DeleteRedisHash) + w.RegisterActivity(redisCleanUpActivity.DeleteRedisHash) w.RegisterActivity(genbenthosActivity.GenerateBenthosConfigs) w.RegisterActivity(accountStatusActivity.CheckAccountStatus) w.RegisterActivity(runPostTableSyncActivity.RunPostTableSync) diff --git a/worker/internal/redis/client.go b/worker/internal/redis/client.go index 4fe07183e3..0164091f53 100644 --- a/worker/internal/redis/client.go +++ b/worker/internal/redis/client.go @@ -13,10 +13,9 @@ import ( "github.com/redis/go-redis/v9" ) -func GetRedisClient() (redis.UniversalClient, error) { - redisConfig := shared.GetRedisConfig() +func GetRedisClient(redisConfig *shared.RedisConfig) (redis.UniversalClient, error) { if redisConfig == nil { - return nil, fmt.Errorf("missing redis config. this operation requires redis.") + return nil, nil } var tlsConf *tls.Config diff --git a/worker/pkg/workflows/datasync/activities/post-table-sync/activity.go b/worker/pkg/workflows/datasync/activities/post-table-sync/activity.go index 450863e053..7e1ce3c1d3 100644 --- a/worker/pkg/workflows/datasync/activities/post-table-sync/activity.go +++ b/worker/pkg/workflows/datasync/activities/post-table-sync/activity.go @@ -10,7 +10,6 @@ import ( mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1" "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1/mgmtv1alpha1connect" "github.com/nucleuscloud/neosync/backend/pkg/sqlmanager" - sqlmanager_shared "github.com/nucleuscloud/neosync/backend/pkg/sqlmanager/shared" connectionmanager "github.com/nucleuscloud/neosync/internal/connection-manager" temporallogger "github.com/nucleuscloud/neosync/worker/internal/temporal-logger" "github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/activities/shared" @@ -42,6 +41,17 @@ type RunPostTableSyncRequest struct { AccountId string } type RunPostTableSyncResponse struct { + Errors []*PostTableSyncError +} + +type PostTableSyncError struct { + ConnectionId string + Errors []*StatementError +} + +type StatementError struct { + Statement string + Error string } func (a *Activity) RunPostTableSync( @@ -102,6 +112,7 @@ func (a *Activity) RunPostTableSync( } }() + errors := []*PostTableSyncError{} for destConnectionId, destCfg := range config.DestinationConfigs { slogger.Debug(fmt.Sprintf("found %d post table sync statements", len(destCfg.Statements)), "destinationConnectionId", destConnectionId) if len(destCfg.Statements) == 0 { @@ -111,6 +122,9 @@ func (a *Activity) RunPostTableSync( if err != nil { return nil, fmt.Errorf("unable to get destination connection (%s) by id: %w", destConnectionId, err) } + execErrors := &PostTableSyncError{ + ConnectionId: destConnectionId, + } switch destinationConnection.GetConnectionConfig().GetConfig().(type) { case *mgmtv1alpha1.ConnectionConfig_PgConfig, *mgmtv1alpha1.ConnectionConfig_MysqlConfig, *mgmtv1alpha1.ConnectionConfig_MssqlConfig: destDb, err := a.sqlmanagerclient.NewSqlConnection(ctx, session, destinationConnection, slogger) @@ -119,17 +133,27 @@ func (a *Activity) RunPostTableSync( continue } destconns = append(destconns, destDb) - err = destDb.Db().BatchExec(ctx, 5, destCfg.Statements, &sqlmanager_shared.BatchExecOpts{}) - if err != nil { - slogger.Error("unable to exec destination statement", "connectionId", destConnectionId, "error", err.Error()) - continue + for _, stmt := range destCfg.Statements { + err = destDb.Db().Exec(ctx, stmt) + if err != nil { + slogger.Error("unable to exec destination statement", "connectionId", destConnectionId, "error", err.Error()) + execErrors.Errors = append(execErrors.Errors, &StatementError{ + Statement: stmt, + Error: err.Error(), + }) + } } default: slogger.Warn("unsupported destination type", "connectionId", destConnectionId) } + if len(execErrors.Errors) > 0 { + errors = append(errors, execErrors) + } } - return &RunPostTableSyncResponse{}, nil + return &RunPostTableSyncResponse{ + Errors: errors, + }, nil } func runContextNotFound(err error) bool { diff --git a/worker/pkg/workflows/datasync/activities/sync-redis-clean-up/activity.go b/worker/pkg/workflows/datasync/activities/sync-redis-clean-up/activity.go index 38369c7535..897a2de9cf 100644 --- a/worker/pkg/workflows/datasync/activities/sync-redis-clean-up/activity.go +++ b/worker/pkg/workflows/datasync/activities/sync-redis-clean-up/activity.go @@ -5,13 +5,24 @@ import ( "fmt" "time" - neosync_redis "github.com/nucleuscloud/neosync/worker/internal/redis" temporallogger "github.com/nucleuscloud/neosync/worker/internal/temporal-logger" redis "github.com/redis/go-redis/v9" "go.temporal.io/sdk/activity" "go.temporal.io/sdk/log" ) +type Activity struct { + redisclient redis.UniversalClient +} + +func New( + redisclient redis.UniversalClient, +) *Activity { + return &Activity{ + redisclient: redisclient, + } +} + type DeleteRedisHashRequest struct { JobId string HashKey string @@ -20,7 +31,7 @@ type DeleteRedisHashRequest struct { type DeleteRedisHashResponse struct { } -func DeleteRedisHash( +func (a *Activity) DeleteRedisHash( ctx context.Context, req *DeleteRedisHashRequest, ) (*DeleteRedisHashResponse, error) { @@ -50,17 +61,16 @@ func DeleteRedisHash( "RedisHashKey", req.HashKey, ) - // todo: this should be factored out of here and live on the activity itself - redisClient, err := neosync_redis.GetRedisClient() - if err != nil { - return nil, err + if a.redisclient == nil { + return nil, fmt.Errorf("missing redis client. this operation requires redis.") } - slogger.Debug("redis client created") + slogger.Debug("redis client provided") - err = deleteRedisHashByKey(ctx, redisClient, req.HashKey) + err := deleteRedisHashByKey(ctx, a.redisclient, req.HashKey) if err != nil { return nil, err } + slogger.Debug("deleted redis key") return &DeleteRedisHashResponse{}, nil } diff --git a/worker/pkg/workflows/datasync/workflow/testdata/postgres/all-types/job_mappings.go b/worker/pkg/workflows/datasync/workflow/testdata/postgres/all-types/job_mappings.go index 370877fb4a..9635822a96 100644 --- a/worker/pkg/workflows/datasync/workflow/testdata/postgres/all-types/job_mappings.go +++ b/worker/pkg/workflows/datasync/workflow/testdata/postgres/all-types/job_mappings.go @@ -682,6 +682,38 @@ func GetDefaultSyncJobMappings()[]*mgmtv1alpha1.JobMapping { Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, }, }, + { + Schema: "CaPiTaL", + Table: "BadName", + Column: "ID", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "CaPiTaL", + Table: "BadName", + Column: "NAME", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "CaPiTaL", + Table: "Bad Name 123!@#", + Column: "ID", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: "CaPiTaL", + Table: "Bad Name 123!@#", + Column: "NAME", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, } } diff --git a/worker/pkg/workflows/datasync/workflow/testdata/postgres/all-types/schema-create.sql b/worker/pkg/workflows/datasync/workflow/testdata/postgres/all-types/schema-create.sql index 5ac3f363a9..76e9b6f9a5 100644 --- a/worker/pkg/workflows/datasync/workflow/testdata/postgres/all-types/schema-create.sql +++ b/worker/pkg/workflows/datasync/workflow/testdata/postgres/all-types/schema-create.sql @@ -1 +1,2 @@ CREATE SCHEMA IF NOT EXISTS alltypes; +CREATE SCHEMA IF NOT EXISTS "CaPiTaL"; diff --git a/worker/pkg/workflows/datasync/workflow/testdata/postgres/all-types/setup.sql b/worker/pkg/workflows/datasync/workflow/testdata/postgres/all-types/setup.sql index 78d1b9528c..8e699574a2 100644 --- a/worker/pkg/workflows/datasync/workflow/testdata/postgres/all-types/setup.sql +++ b/worker/pkg/workflows/datasync/workflow/testdata/postgres/all-types/setup.sql @@ -346,3 +346,31 @@ INSERT INTO alltypes.json_data (data) VALUES ( } }' ); + +CREATE SCHEMA IF NOT EXISTS "CaPiTaL"; +CREATE TABLE IF NOT EXISTS "CaPiTaL"."BadName" ( + "ID" BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, + "NAME" text UNIQUE +); + +INSERT INTO "CaPiTaL"."BadName" ("NAME") +VALUES + ('Xk7pQ9nM3v'), + ('Rt5wLjH2yB'), + ('Zc8fAe4dN6'), + ('Ym9gKu3sW5'), + ('Vb4nPx7tJ2'); + +CREATE TABLE "CaPiTaL"."Bad Name 123!@#" ( + "ID" SERIAL PRIMARY KEY, + "NAME" text REFERENCES "CaPiTaL"."BadName" ("NAME") +); + + +INSERT INTO "CaPiTaL"."Bad Name 123!@#" ("NAME") +VALUES + ('Xk7pQ9nM3v'), + ('Rt5wLjH2yB'), + ('Zc8fAe4dN6'), + ('Ym9gKu3sW5'), + ('Vb4nPx7tJ2'); diff --git a/worker/pkg/workflows/datasync/workflow/testdata/postgres/all-types/teardown.sql b/worker/pkg/workflows/datasync/workflow/testdata/postgres/all-types/teardown.sql index 7313dd27c3..8c8b61d90c 100644 --- a/worker/pkg/workflows/datasync/workflow/testdata/postgres/all-types/teardown.sql +++ b/worker/pkg/workflows/datasync/workflow/testdata/postgres/all-types/teardown.sql @@ -1 +1,2 @@ DROP SCHEMA IF EXISTS alltypes CASCADE; +DROP SCHEMA IF EXISTS "CaPiTaL" CASCADE; diff --git a/worker/pkg/workflows/datasync/workflow/workflow.go b/worker/pkg/workflows/datasync/workflow/workflow.go index 1d01deb6fd..15f68a4ea3 100644 --- a/worker/pkg/workflows/datasync/workflow/workflow.go +++ b/worker/pkg/workflows/datasync/workflow/workflow.go @@ -412,6 +412,7 @@ func runRedisCleanUpActivity( } logger.Debug("executing redis clean up activity") var resp *syncrediscleanup_activity.DeleteRedisHashResponse + var redisCleanUpActivity *syncrediscleanup_activity.Activity err := workflow.ExecuteActivity( workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 2 * time.Minute, @@ -420,7 +421,7 @@ func runRedisCleanUpActivity( }, HeartbeatTimeout: 1 * time.Minute, }), - syncrediscleanup_activity.DeleteRedisHash, + redisCleanUpActivity.DeleteRedisHash, &syncrediscleanup_activity.DeleteRedisHashRequest{ JobId: jobId, HashKey: cfg.Key, diff --git a/worker/pkg/workflows/datasync/workflow/workflow_integration_test.go b/worker/pkg/workflows/datasync/workflow/workflow_integration_test.go index cf53a9640b..a394eaa976 100644 --- a/worker/pkg/workflows/datasync/workflow/workflow_integration_test.go +++ b/worker/pkg/workflows/datasync/workflow/workflow_integration_test.go @@ -23,6 +23,7 @@ import ( "github.com/nucleuscloud/neosync/internal/connection-manager/providers/sqlprovider" "github.com/nucleuscloud/neosync/internal/gotypeutil" "github.com/nucleuscloud/neosync/internal/testutil" + neosync_redis "github.com/nucleuscloud/neosync/worker/internal/redis" accountstatus_activity "github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/activities/account-status" genbenthosconfigs_activity "github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/activities/gen-benthos-configs" jobhooks_by_timing_activity "github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/activities/jobhooks-by-timing" @@ -56,7 +57,9 @@ import ( "connectrpc.com/connect" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/metric" + "go.temporal.io/sdk/activity" "go.temporal.io/sdk/client" + "go.temporal.io/sdk/converter" "go.temporal.io/sdk/log" "go.temporal.io/sdk/testsuite" ) @@ -264,7 +267,7 @@ func (s *IntegrationTestSuite) Test_Workflow_Sync_Postgres() { addRunContextProcedureMux(mux) srv := startHTTPServer(t, mux) - env := executeWorkflow(t, srv, s.redis.url, jobId) + env := executeWorkflow(t, srv, &s.redis.url, jobId, tt.ExpectError) require.Truef(t, env.IsWorkflowCompleted(), fmt.Sprintf("Workflow did not complete. Test: %s", tt.Name)) err = env.GetWorkflowError() if tt.ExpectError { @@ -442,7 +445,7 @@ func (s *IntegrationTestSuite) Test_Workflow_Sync_Mssql() { addRunContextProcedureMux(mux) addEmptyJobHooksProcedureMux(mux) srv := startHTTPServer(t, mux) - env := executeWorkflow(t, srv, s.redis.url, "115aaf2c-776e-4847-8268-d914e3c15968") + env := executeWorkflow(t, srv, nil, "115aaf2c-776e-4847-8268-d914e3c15968", tt.ExpectError) require.Truef(t, env.IsWorkflowCompleted(), fmt.Sprintf("Workflow did not complete. Test: %s", tt.Name)) err := env.GetWorkflowError() if tt.ExpectError { @@ -636,7 +639,7 @@ func (s *IntegrationTestSuite) Test_Workflow_VirtualForeignKeys_Transform() { addEmptyJobHooksProcedureMux(mux) srv := startHTTPServer(s.T(), mux) testName := "Virtual Foreign Key primary key transform" - env := executeWorkflow(s.T(), srv, s.redis.url, "fd4d8660-31a0-48b2-9adf-10f11b94898f") + env := executeWorkflow(s.T(), srv, &s.redis.url, "fd4d8660-31a0-48b2-9adf-10f11b94898f", false) require.Truef(s.T(), env.IsWorkflowCompleted(), fmt.Sprintf("Workflow did not complete. Test: %s", testName)) err = env.GetWorkflowError() require.NoError(s.T(), err, "Received Temporal Workflow Error %s", testName) @@ -838,7 +841,7 @@ func (s *IntegrationTestSuite) Test_Workflow_Sync_Mysql() { addRunContextProcedureMux(mux) addEmptyJobHooksProcedureMux(mux) srv := startHTTPServer(t, mux) - env := executeWorkflow(t, srv, s.redis.url, "115aaf2c-776e-4847-8268-d914e3c15968") + env := executeWorkflow(t, srv, &s.redis.url, "115aaf2c-776e-4847-8268-d914e3c15968", tt.ExpectError) require.Truef(t, env.IsWorkflowCompleted(), fmt.Sprintf("Workflow did not complete. Test: %s", tt.Name)) err = env.GetWorkflowError() if tt.ExpectError { @@ -1037,7 +1040,7 @@ func (s *IntegrationTestSuite) Test_Workflow_DynamoDB_Sync() { addRunContextProcedureMux(mux) addEmptyJobHooksProcedureMux(mux) srv := startHTTPServer(t, mux) - env := executeWorkflow(t, srv, s.redis.url, jobId) + env := executeWorkflow(t, srv, &s.redis.url, jobId, tt.ExpectError) require.Truef(t, env.IsWorkflowCompleted(), fmt.Sprintf("Workflow did not complete. Test: %s", tt.Name)) err = env.GetWorkflowError() if tt.ExpectError { @@ -1322,7 +1325,7 @@ func (s *IntegrationTestSuite) Test_Workflow_MongoDB_Sync() { addRunContextProcedureMux(mux) addEmptyJobHooksProcedureMux(mux) srv := startHTTPServer(t, mux) - env := executeWorkflow(t, srv, s.redis.url, jobId) + env := executeWorkflow(t, srv, &s.redis.url, jobId, tt.ExpectError) require.Truef(t, env.IsWorkflowCompleted(), fmt.Sprintf("Workflow did not complete. Test: %s", tt.Name)) err = env.GetWorkflowError() if tt.ExpectError { @@ -1569,7 +1572,7 @@ func (s *IntegrationTestSuite) Test_Workflow_Generate() { addRunContextProcedureMux(mux) addEmptyJobHooksProcedureMux(mux) srv := startHTTPServer(s.T(), mux) - env := executeWorkflow(s.T(), srv, s.redis.url, "115aaf2c-776e-4847-8268-d914e3c15968") + env := executeWorkflow(s.T(), srv, &s.redis.url, "115aaf2c-776e-4847-8268-d914e3c15968", false) require.Truef(s.T(), env.IsWorkflowCompleted(), fmt.Sprintf("Workflow did not complete. Test: %s", testName)) err = env.GetWorkflowError() require.NoError(s.T(), err, "Received Temporal Workflow Error %s", testName) @@ -1596,20 +1599,28 @@ func (f *fakeEELicense) IsValid() bool { func executeWorkflow( t testing.TB, srv *httptest.Server, - redisUrl string, + redisUrl *string, jobId string, + expectActivityErr bool, ) *testsuite.TestWorkflowEnvironment { t.Helper() connclient := mgmtv1alpha1connect.NewConnectionServiceClient(srv.Client(), srv.URL) jobclient := mgmtv1alpha1connect.NewJobServiceClient(srv.Client(), srv.URL) transformerclient := mgmtv1alpha1connect.NewTransformersServiceClient(srv.Client(), srv.URL) userclient := mgmtv1alpha1connect.NewUserAccountServiceClient(srv.Client(), srv.URL) - redisconfig := &shared.RedisConfig{ - Url: redisUrl, - Kind: "simple", - Tls: &shared.RedisTlsConfig{ - Enabled: false, - }, + var redisconfig *shared.RedisConfig + if redisUrl != nil && *redisUrl != "" { + redisconfig = &shared.RedisConfig{ + Url: *redisUrl, + Kind: "simple", + Tls: &shared.RedisTlsConfig{ + Enabled: false, + }, + } + } + redisclient, err := neosync_redis.GetRedisClient(redisconfig) + if err != nil { + t.Fatal(err) } sqlconnmanager := connectionmanager.NewConnectionManager(sqlprovider.NewProvider(&sqlconnect.SqlOpenConnector{}), connectionmanager.WithReaperPoll(10*time.Second)) @@ -1648,18 +1659,32 @@ func executeWorkflow( accountStatusActivity := accountstatus_activity.New(userclient) jobhookTimingActivity := jobhooks_by_timing_activity.New(jobclient, connclient, sqlmanager, &fakeEELicense{}) posttableSyncActivity := posttablesync_activity.New(jobclient, sqlmanager, connclient) + redisCleanUpActivity := syncrediscleanup_activity.New(redisclient) env.RegisterWorkflow(Workflow) env.RegisterActivity(syncActivity.Sync) env.RegisterActivity(retrieveActivityOpts.RetrieveActivityOptions) env.RegisterActivity(runSqlInitTableStatements.RunSqlInitTableStatements) - env.RegisterActivity(syncrediscleanup_activity.DeleteRedisHash) + env.RegisterActivity(redisCleanUpActivity) env.RegisterActivity(genbenthosActivity.GenerateBenthosConfigs) env.RegisterActivity(accountStatusActivity.CheckAccountStatus) env.RegisterActivity(jobhookTimingActivity.RunJobHooksByTiming) env.RegisterActivity(posttableSyncActivity.RunPostTableSync) env.SetTestTimeout(600 * time.Second) // increase the test timeout + env.SetOnActivityCompletedListener(func(activityInfo *activity.Info, result converter.EncodedValue, err error) { + if !expectActivityErr { + require.NoError(t, err, "Activity %s failed", activityInfo.ActivityType.Name) + } + if activityInfo.ActivityType.Name == "RunPostTableSync" && result.HasValue() { + var postTableSyncResp posttablesync_activity.RunPostTableSyncResponse + decodeErr := result.Get(&postTableSyncResp) + require.NoError(t, decodeErr, "Failed to decode result for activity %s", activityInfo.ActivityType.Name) + + require.Emptyf(t, postTableSyncResp.Errors, "Post table sync activity returned errors: %v", formatPostTableSyncErrors(postTableSyncResp.Errors)) + } + }) + env.SetStartWorkflowOptions(client.StartWorkflowOptions{ID: jobId}) env.ExecuteWorkflow(Workflow, &WorkflowRequest{JobId: jobId}) return env @@ -1673,3 +1698,13 @@ func startHTTPServer(tb testing.TB, h http.Handler) *httptest.Server { tb.Cleanup(srv.Close) return srv } + +func formatPostTableSyncErrors(errors []*posttablesync_activity.PostTableSyncError) []string { + formatted := []string{} + for _, err := range errors { + for _, e := range err.Errors { + formatted = append(formatted, fmt.Sprintf("statement: %s error: %s", e.Statement, e.Error)) + } + } + return formatted +}