diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 013844aaed..626740de75 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -80,6 +80,11 @@ jobs: integration-tests: name: Integration Tests runs-on: ubuntu-latest + permissions: + id-token: write # required for requesting JWT for use with retrieving AWS creds + contents: read # required for actions/checkout + env: + AWS_REGION: "us-west-2" steps: - name: Checkout uses: actions/checkout@v4 @@ -90,11 +95,21 @@ jobs: go-version-file: go.mod cache-dependency-path: go.sum + - name: Configure AWS Credentials + uses: aws-actions/configure-aws-credentials@v4 + with: + aws-region: ${{ env.AWS_REGION }} + role-to-assume: ${{ vars.INTEGRATION_TEST_AWS_ROLE_ARN }} + role-session-name: NeosyncCiIntegrationTests + - name: Run Integration Tests run: | go test -race -timeout 1800s -coverprofile=integration-coverage.out -covermode=atomic -run TestIntegrationTestSuite ./... env: INTEGRATION_TESTS_ENABLED: 1 + S3_INTEGRATION_TESTS_ENABLED: 1 + TEST_S3_REGION: ${{ env.AWS_REGION }} + TEST_S3_BUCKET: ${{ vars.INTEGRATION_TEST_BUCKET_NAME }} - name: Upload coverage to Codecov uses: codecov/codecov-action@v5 diff --git a/backend/pkg/integration-test/integration-test-util.go b/backend/pkg/integration-test/integration-test-util.go index 01e50da9f1..6af841741e 100644 --- a/backend/pkg/integration-test/integration-test-util.go +++ b/backend/pkg/integration-test/integration-test-util.go @@ -76,6 +76,36 @@ func CreateMysqlConnection( return resp.Msg.GetConnection() } +func CreateS3Connection( + ctx context.Context, + t *testing.T, + connclient mgmtv1alpha1connect.ConnectionServiceClient, + accountId, name string, + bucket string, + region *string, +) *mgmtv1alpha1.Connection { + resp, err := connclient.CreateConnection( + ctx, + connect.NewRequest(&mgmtv1alpha1.CreateConnectionRequest{ + AccountId: accountId, + Name: name, + ConnectionConfig: &mgmtv1alpha1.ConnectionConfig{ + Config: &mgmtv1alpha1.ConnectionConfig_AwsS3Config{ + AwsS3Config: &mgmtv1alpha1.AwsS3ConnectionConfig{ + Bucket: bucket, + PathPrefix: nil, + Region: region, + Endpoint: nil, + Credentials: nil, + }, + }, + }, + }), + ) + RequireNoErrResp(t, resp, err) + return resp.Msg.GetConnection() +} + func SetUser(ctx context.Context, t *testing.T, client mgmtv1alpha1connect.UserAccountServiceClient) string { resp, err := client.SetUser(ctx, connect.NewRequest(&mgmtv1alpha1.SetUserRequest{})) RequireNoErrResp(t, resp, err) diff --git a/backend/pkg/integration-test/integration-test.go b/backend/pkg/integration-test/integration-test.go index c95a1db8cc..3970a92ca8 100644 --- a/backend/pkg/integration-test/integration-test.go +++ b/backend/pkg/integration-test/integration-test.go @@ -40,6 +40,7 @@ import ( promapiv1mock "github.com/nucleuscloud/neosync/internal/mocks/github.com/prometheus/client_golang/api/prometheus/v1" "github.com/nucleuscloud/neosync/internal/testutil" tcpostgres "github.com/nucleuscloud/neosync/internal/testutil/testcontainers/postgres" + "github.com/stretchr/testify/mock" ) var ( @@ -399,6 +400,27 @@ func (s *NeosyncApiTestClient) Setup(ctx context.Context, t testing.TB) error { return nil } +func (s *NeosyncApiTestClient) MockTemporalForCreateJob(returnId string) { + s.Mocks.TemporalClientManager. + On( + "DoesAccountHaveNamespace", mock.Anything, mock.Anything, mock.Anything, + ). + Return(true, nil). + Once() + s.Mocks.TemporalClientManager. + On( + "GetSyncJobTaskQueue", mock.Anything, mock.Anything, mock.Anything, + ). + Return("sync-job", nil). + Once() + s.Mocks.TemporalClientManager. + On( + "CreateSchedule", mock.Anything, mock.Anything, mock.Anything, mock.Anything, + ). + Return(returnId, nil). + Once() +} + func (s *NeosyncApiTestClient) InitializeTest(ctx context.Context, t testing.TB) error { err := neomigrate.Up(ctx, s.Pgcontainer.URL, s.migrationsDir, testutil.GetTestLogger(t)) if err != nil { diff --git a/backend/services/mgmt/v1alpha1/integration_tests/jobs-service_integration_test.go b/backend/services/mgmt/v1alpha1/integration_tests/jobs-service_integration_test.go index 3c98a387de..1b4b2e6d9d 100644 --- a/backend/services/mgmt/v1alpha1/integration_tests/jobs-service_integration_test.go +++ b/backend/services/mgmt/v1alpha1/integration_tests/jobs-service_integration_test.go @@ -9,7 +9,6 @@ import ( "github.com/google/uuid" mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1" "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1/mgmtv1alpha1connect" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) @@ -29,7 +28,7 @@ func (s *IntegrationTestSuite) Test_CreateJob_Ok() { srcconn := s.createPostgresConnection(s.UnauthdClients.Connections, accountId, "source", "test") destconn := s.createPostgresConnection(s.UnauthdClients.Connections, accountId, "dest", "test2") - s.mockTemporalForCreateJob("test-id") + s.MockTemporalForCreateJob("test-id") resp, err := s.UnauthdClients.Jobs.CreateJob(s.ctx, connect.NewRequest(&mgmtv1alpha1.CreateJobRequest{ AccountId: accountId, @@ -59,27 +58,6 @@ func (s *IntegrationTestSuite) Test_CreateJob_Ok() { require.NotNil(s.T(), resp.Msg.GetJob()) } -func (s *IntegrationTestSuite) mockTemporalForCreateJob(returnId string) { - s.Mocks.TemporalClientManager. - On( - "DoesAccountHaveNamespace", mock.Anything, mock.Anything, mock.Anything, - ). - Return(true, nil). - Once() - s.Mocks.TemporalClientManager. - On( - "GetSyncJobTaskQueue", mock.Anything, mock.Anything, mock.Anything, - ). - Return("sync-job", nil). - Once() - s.Mocks.TemporalClientManager. - On( - "CreateSchedule", mock.Anything, mock.Anything, mock.Anything, mock.Anything, - ). - Return(returnId, nil). - Once() -} - func (s *IntegrationTestSuite) Test_JobService_JobHooks() { t := s.T() ctx := s.ctx @@ -131,7 +109,7 @@ func (s *IntegrationTestSuite) Test_JobService_JobHooks() { srcconn := s.createPostgresConnection(s.NeosyncCloudClients.GetConnectionClient(testAuthUserId), accountId, "source", "test") destconn := s.createPostgresConnection(s.NeosyncCloudClients.GetConnectionClient(testAuthUserId), accountId, "dest", "test2") - s.mockTemporalForCreateJob("test-id") + s.MockTemporalForCreateJob("test-id") jobResp, err := client.CreateJob(ctx, connect.NewRequest(&mgmtv1alpha1.CreateJobRequest{ JobName: "cloud-testjob-1", AccountId: accountId, diff --git a/cli/internal/cmds/neosync/sync/sync_integration_test.go b/cli/internal/cmds/neosync/sync/sync_integration_test.go index 8131a76e4f..d24aa3a16b 100644 --- a/cli/internal/cmds/neosync/sync/sync_integration_test.go +++ b/cli/internal/cmds/neosync/sync/sync_integration_test.go @@ -2,17 +2,20 @@ package sync_cmd import ( "context" + "fmt" "testing" + "connectrpc.com/connect" + mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1" tcneosyncapi "github.com/nucleuscloud/neosync/backend/pkg/integration-test" - "github.com/nucleuscloud/neosync/backend/pkg/sqlconnect" - "github.com/nucleuscloud/neosync/backend/pkg/sqlmanager" "github.com/nucleuscloud/neosync/cli/internal/output" connectionmanager "github.com/nucleuscloud/neosync/internal/connection-manager" - "github.com/nucleuscloud/neosync/internal/connection-manager/providers/sqlprovider" "github.com/nucleuscloud/neosync/internal/testutil" tcmysql "github.com/nucleuscloud/neosync/internal/testutil/testcontainers/mysql" tcpostgres "github.com/nucleuscloud/neosync/internal/testutil/testcontainers/postgres" + mysqlalltypes "github.com/nucleuscloud/neosync/internal/testutil/testdata/mysql/alltypes" + pgalltypes "github.com/nucleuscloud/neosync/internal/testutil/testdata/postgres/alltypes" + tcworkflow "github.com/nucleuscloud/neosync/worker/pkg/integration-test" "github.com/stretchr/testify/require" ) @@ -28,36 +31,57 @@ func Test_Sync(t *testing.T) { neosyncApi, err := tcneosyncapi.NewNeosyncApiTestClient(ctx, t, tcneosyncapi.WithMigrationsDirectory(neosyncDbMigrationsPath)) if err != nil { - panic(err) + t.Fatal(err) } connclient := neosyncApi.UnauthdClients.Connections conndataclient := neosyncApi.UnauthdClients.ConnectionData - connmanager := connectionmanager.NewConnectionManager(sqlprovider.NewProvider(&sqlconnect.SqlOpenConnector{})) - sqlmanagerclient := sqlmanager.NewSqlManager(sqlmanager.WithConnectionManager(connmanager)) + jobclient := neosyncApi.UnauthdClients.Jobs + dbManagers := tcworkflow.NewTestDatabaseManagers(t) + connmanager := dbManagers.SqlConnManager + sqlmanagerclient := dbManagers.SqlManager accountId := tcneosyncapi.CreatePersonalAccount(ctx, t, neosyncApi.UnauthdClients.Users) + awsS3Config := testutil.GetTestAwsS3Config() + s3Conn := tcneosyncapi.CreateS3Connection( + ctx, + t, + connclient, + accountId, + "s3-conn", + awsS3Config.Bucket, + &awsS3Config.Region, + ) outputType := output.PlainOutput t.Run("postgres", func(t *testing.T) { t.Parallel() postgres, err := tcpostgres.NewPostgresTestSyncContainer(ctx, []tcpostgres.Option{}, []tcpostgres.Option{}) if err != nil { - panic(err) + t.Fatal(err) } testdataFolder := "../../../../../internal/testutil/testdata/postgres" - err = postgres.Source.RunSqlFiles(ctx, &testdataFolder, []string{"humanresources/create-tables.sql", "alltypes/create-tables.sql"}) - if err != nil { - panic(err) - } - err = postgres.Target.RunSqlFiles(ctx, &testdataFolder, []string{"humanresources/create-schema.sql", "alltypes/create-schema.sql"}) - if err != nil { - panic(err) - } sourceConn := tcneosyncapi.CreatePostgresConnection(ctx, t, neosyncApi.UnauthdClients.Connections, accountId, "postgres-source", postgres.Source.URL) - t.Run("sync", func(t *testing.T) { + t.Run("postgres_sync", func(t *testing.T) { + // can't be run in parallel yet + // right now CLI sync and init schema takes everything in source and copies it to target since there are no job mappings defined by the user + // so it can't be scoped to specific schema + // t.Parallel() + err = postgres.Source.RunCreateStmtsInSchema(ctx, &testdataFolder, []string{"humanresources/create-tables.sql"}, "humanresources") + if err != nil { + t.Fatal(err) + } + err = postgres.Source.RunCreateStmtsInSchema(ctx, &testdataFolder, []string{"alltypes/create-tables.sql"}, "alltypes") + if err != nil { + t.Fatal(err) + } + err = postgres.Target.CreateSchemas(ctx, []string{"humanresources", "alltypes"}) + if err != nil { + t.Fatal(err) + } + testlogger := testutil.GetTestLogger(t) cmdConfig := &cmdConfig{ Source: &sourceConfig{ @@ -97,7 +121,108 @@ func Test_Sync(t *testing.T) { require.NoError(t, err) require.Greater(t, rowCount, 1) - rows = postgres.Target.DB.QueryRow(ctx, "select count(*) from alltypes.all_postgres_types;") + rows = postgres.Target.DB.QueryRow(ctx, "select count(*) from alltypes.all_data_types;") + err = rows.Scan(&rowCount) + require.NoError(t, err) + require.Greater(t, rowCount, 1) + }) + + t.Run("S3_end_to_end", func(t *testing.T) { + t.Parallel() + ok := testutil.ShouldRunS3IntegrationTest() + if !ok { + return + } + + alltypesSchema := "alltypes_s3_pg" + err := postgres.Source.RunCreateStmtsInSchema(ctx, &testdataFolder, []string{"alltypes/create-tables.sql"}, alltypesSchema) + if err != nil { + t.Fatal(err) + } + + err = postgres.Target.RunCreateStmtsInSchema(ctx, &testdataFolder, []string{"alltypes/create-tables.sql"}, alltypesSchema) + if err != nil { + t.Fatal(err) + } + + neosyncApi.MockTemporalForCreateJob("cli-test-sync") + job, err := jobclient.CreateJob(ctx, connect.NewRequest(&mgmtv1alpha1.CreateJobRequest{ + AccountId: accountId, + JobName: "S3 to PG", + Source: &mgmtv1alpha1.JobSource{ + Options: &mgmtv1alpha1.JobSourceOptions{ + Config: &mgmtv1alpha1.JobSourceOptions_Postgres{ + Postgres: &mgmtv1alpha1.PostgresSourceConnectionOptions{ + ConnectionId: sourceConn.Id, + Schemas: []*mgmtv1alpha1.PostgresSourceSchemaOption{}, + SubsetByForeignKeyConstraints: true, + }, + }, + }, + }, + Destinations: []*mgmtv1alpha1.CreateJobDestination{ + { + ConnectionId: s3Conn.Id, + Options: &mgmtv1alpha1.JobDestinationOptions{ + Config: &mgmtv1alpha1.JobDestinationOptions_AwsS3Options{ + AwsS3Options: &mgmtv1alpha1.AwsS3DestinationConnectionOptions{}, + }, + }, + }, + }, + Mappings: pgalltypes.GetDefaultSyncJobMappings(alltypesSchema), + })) + require.NoError(t, err) + + t.Run("Postgres_to_S3", func(t *testing.T) { + testworkflow := tcworkflow.NewTestDataSyncWorkflowEnv(t, neosyncApi, dbManagers) + testworkflow.RequireActivitiesCompletedSuccessfully(t) + testworkflow.ExecuteTestDataSyncWorkflow(job.Msg.GetJob().GetId()) + require.Truef(t, testworkflow.TestEnv.IsWorkflowCompleted(), "Workflow did not complete. Test: pg_to_s3") + err = testworkflow.TestEnv.GetWorkflowError() + require.NoError(t, err, "Received Temporal Workflow Error", "testName", "pg_to_s3") + }) + + t.Run("S3_to_Postgres", func(t *testing.T) { + testlogger := testutil.GetTestLogger(t) + cmdConfig := &cmdConfig{ + Source: &sourceConfig{ + ConnectionId: s3Conn.Id, + ConnectionOpts: &connectionOpts{ + JobId: &job.Msg.Job.Id, + }, + }, + Destination: &sqlDestinationConfig{ + ConnectionUrl: postgres.Target.URL, + Driver: postgresDriver, + InitSchema: false, + TruncateBeforeInsert: true, + TruncateCascade: true, + }, + OutputType: &outputType, + AccountId: &accountId, + } + sync := &clisync{ + connectiondataclient: conndataclient, + connectionclient: connclient, + sqlmanagerclient: sqlmanagerclient, + ctx: ctx, + logger: testlogger, + cmd: cmdConfig, + connmanager: connmanager, + session: connectionmanager.NewUniqueSession(), + } + err := sync.configureAndRunSync() + require.NoError(t, err) + }) + + var rowCount int + rows := postgres.Target.DB.QueryRow(ctx, fmt.Sprintf("select count(*) from %s.all_data_types;", alltypesSchema)) + err = rows.Scan(&rowCount) + require.NoError(t, err) + require.Greater(t, rowCount, 1) + + rows = postgres.Target.DB.QueryRow(ctx, fmt.Sprintf("select count(*) from %s.json_data;", alltypesSchema)) err = rows.Scan(&rowCount) require.NoError(t, err) require.Greater(t, rowCount, 1) @@ -106,7 +231,7 @@ func Test_Sync(t *testing.T) { t.Cleanup(func() { err := postgres.TearDown(ctx) if err != nil { - panic(err) + t.Fatal(err) } }) }) @@ -115,22 +240,30 @@ func Test_Sync(t *testing.T) { t.Parallel() mysql, err := tcmysql.NewMysqlTestSyncContainer(ctx, []tcmysql.Option{}, []tcmysql.Option{}) if err != nil { - panic(err) + t.Fatal(err) } testdataFolder := "../../../../../internal/testutil/testdata/mysql" - err = mysql.Source.RunSqlFiles(ctx, &testdataFolder, []string{"humanresources/create-tables.sql", "alltypes/create-tables.sql"}) - if err != nil { - panic(err) - } - err = mysql.Target.RunSqlFiles(ctx, &testdataFolder, []string{"humanresources/create-schema.sql", "alltypes/create-schema.sql"}) - if err != nil { - panic(err) - } sourceConn := tcneosyncapi.CreateMysqlConnection(ctx, t, neosyncApi.UnauthdClients.Connections, accountId, "mysql-source", mysql.Source.URL) - t.Run("sync", func(t *testing.T) { - discardLogger := testutil.GetTestLogger(t) + t.Run("mysql_sync", func(t *testing.T) { + // can't be run in parallel yet + // right now CLI sync and init schema takes everything in source and copies it to target since there are no job mappings defined by the user + // so it can't be scoped to specific schema + // t.Parallel() + err = mysql.Source.RunCreateStmtsInDatabase(ctx, &testdataFolder, []string{"humanresources/create-tables.sql"}, "humanresources") + if err != nil { + t.Fatal(err) + } + err = mysql.Source.RunCreateStmtsInDatabase(ctx, &testdataFolder, []string{"alltypes/create-tables.sql"}, "alltypes") + if err != nil { + t.Fatal(err) + } + err = mysql.Target.CreateDatabases(ctx, []string{"humanresources", "alltypes"}) + if err != nil { + t.Fatal(err) + } + testlogger := testutil.GetTestLogger(t) cmdConfig := &cmdConfig{ Source: &sourceConfig{ ConnectionId: sourceConn.Id, @@ -149,7 +282,7 @@ func Test_Sync(t *testing.T) { connectionclient: connclient, sqlmanagerclient: sqlmanagerclient, ctx: ctx, - logger: discardLogger, + logger: testlogger, cmd: cmdConfig, connmanager: connmanager, session: connectionmanager.NewUniqueSession(), @@ -174,10 +307,110 @@ func Test_Sync(t *testing.T) { require.Greater(t, rowCount, 1) }) + t.Run("S3_end_to_end", func(t *testing.T) { + t.Parallel() + ok := testutil.ShouldRunS3IntegrationTest() + if !ok { + return + } + + alltypesSchema := "alltypes_s3_mysql" + err := mysql.Source.RunCreateStmtsInDatabase(ctx, &testdataFolder, []string{"alltypes/create-tables.sql"}, alltypesSchema) + if err != nil { + t.Fatal(err) + } + + err = mysql.Target.RunCreateStmtsInDatabase(ctx, &testdataFolder, []string{"alltypes/create-tables.sql"}, alltypesSchema) + if err != nil { + t.Fatal(err) + } + + neosyncApi.MockTemporalForCreateJob("cli-test-sync") + job, err := jobclient.CreateJob(ctx, connect.NewRequest(&mgmtv1alpha1.CreateJobRequest{ + AccountId: accountId, + JobName: "S3 to Mysql", + Source: &mgmtv1alpha1.JobSource{ + Options: &mgmtv1alpha1.JobSourceOptions{ + Config: &mgmtv1alpha1.JobSourceOptions_Mysql{ + Mysql: &mgmtv1alpha1.MysqlSourceConnectionOptions{ + ConnectionId: sourceConn.Id, + Schemas: []*mgmtv1alpha1.MysqlSourceSchemaOption{}, + SubsetByForeignKeyConstraints: true, + }, + }, + }, + }, + Destinations: []*mgmtv1alpha1.CreateJobDestination{ + { + ConnectionId: s3Conn.Id, + Options: &mgmtv1alpha1.JobDestinationOptions{ + Config: &mgmtv1alpha1.JobDestinationOptions_AwsS3Options{ + AwsS3Options: &mgmtv1alpha1.AwsS3DestinationConnectionOptions{}, + }, + }, + }, + }, + Mappings: mysqlalltypes.GetDefaultSyncJobMappings(alltypesSchema), + })) + require.NoError(t, err) + + t.Run("Mysql_to_S3", func(t *testing.T) { + testworkflow := tcworkflow.NewTestDataSyncWorkflowEnv(t, neosyncApi, dbManagers) + testworkflow.RequireActivitiesCompletedSuccessfully(t) + testworkflow.ExecuteTestDataSyncWorkflow(job.Msg.GetJob().GetId()) + require.Truef(t, testworkflow.TestEnv.IsWorkflowCompleted(), "Workflow did not complete. Test: mysql_to_s3") + err = testworkflow.TestEnv.GetWorkflowError() + require.NoError(t, err, "Received Temporal Workflow Error", "testName", "mysql_to_s3") + }) + + t.Run("S3_to_Mysql", func(t *testing.T) { + testlogger := testutil.GetTestLogger(t) + cmdConfig := &cmdConfig{ + Source: &sourceConfig{ + ConnectionId: s3Conn.Id, + ConnectionOpts: &connectionOpts{ + JobId: &job.Msg.Job.Id, + }, + }, + Destination: &sqlDestinationConfig{ + ConnectionUrl: mysql.Target.URL, + Driver: mysqlDriver, + InitSchema: false, + TruncateBeforeInsert: true, + }, + OutputType: &outputType, + AccountId: &accountId, + } + sync := &clisync{ + connectiondataclient: conndataclient, + connectionclient: connclient, + sqlmanagerclient: sqlmanagerclient, + ctx: ctx, + logger: testlogger, + cmd: cmdConfig, + connmanager: connmanager, + session: connectionmanager.NewUniqueSession(), + } + err := sync.configureAndRunSync() + require.NoError(t, err) + }) + + var rowCount int + rows := mysql.Target.DB.QueryRowContext(ctx, fmt.Sprintf("select count(*) from %s.all_data_types;", alltypesSchema)) + err = rows.Scan(&rowCount) + require.NoError(t, err) + require.Greater(t, rowCount, 1) + + rows = mysql.Target.DB.QueryRowContext(ctx, fmt.Sprintf("select count(*) from %s.json_data;", alltypesSchema)) + err = rows.Scan(&rowCount) + require.NoError(t, err) + require.Greater(t, rowCount, 1) + }) + t.Cleanup(func() { err := mysql.TearDown(ctx) if err != nil { - panic(err) + t.Fatal(err) } }) }) @@ -185,7 +418,7 @@ func Test_Sync(t *testing.T) { t.Cleanup(func() { err = neosyncApi.TearDown(ctx) if err != nil { - panic(err) + t.Fatal(err) } }) } diff --git a/internal/testutil/testcontainers/mysql/mysql.go b/internal/testutil/testcontainers/mysql/mysql.go index a02081f3bf..2a75fa44aa 100644 --- a/internal/testutil/testcontainers/mysql/mysql.go +++ b/internal/testutil/testcontainers/mysql/mysql.go @@ -179,3 +179,34 @@ func (m *MysqlTestContainer) RunSqlFiles(ctx context.Context, folder *string, fi } return nil } + +// Creates schema and sets USE to schema before running SQL files +func (m *MysqlTestContainer) RunCreateStmtsInDatabase(ctx context.Context, folder *string, files []string, database string) error { + for _, file := range files { + filePath := file + if folder != nil && *folder != "" { + filePath = fmt.Sprintf("./%s/%s", *folder, file) + } + sqlStr, err := os.ReadFile(filePath) + if err != nil { + return err + } + + setSchemaSql := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s; \n USE %s; \n", database, database) + _, err = m.DB.ExecContext(ctx, setSchemaSql+string(sqlStr)) + if err != nil { + return fmt.Errorf("unable to exec sql when running postgres sql files: %w", err) + } + } + return nil +} + +func (m *MysqlTestContainer) CreateDatabases(ctx context.Context, schemas []string) error { + for _, schema := range schemas { + _, err := m.DB.ExecContext(ctx, fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s;", schema)) + if err != nil { + return fmt.Errorf("unable to create schema %s: %w", schema, err) + } + } + return nil +} diff --git a/internal/testutil/testcontainers/postgres/postgres.go b/internal/testutil/testcontainers/postgres/postgres.go index 6073c53e94..9450cce382 100644 --- a/internal/testutil/testcontainers/postgres/postgres.go +++ b/internal/testutil/testcontainers/postgres/postgres.go @@ -179,3 +179,33 @@ func (p *PostgresTestContainer) RunSqlFiles(ctx context.Context, folder *string, } return nil } + +// Creates schema and sets search_path to schema before running SQL files +func (p *PostgresTestContainer) RunCreateStmtsInSchema(ctx context.Context, folder *string, files []string, schema string) error { + for _, file := range files { + filePath := file + if folder != nil && *folder != "" { + filePath = fmt.Sprintf("./%s/%s", *folder, file) + } + sqlStr, err := os.ReadFile(filePath) + if err != nil { + return err + } + setSchemaSql := fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s; \n SET search_path TO %s; \n", schema, schema) + _, err = p.DB.Exec(ctx, setSchemaSql+string(sqlStr)) + if err != nil { + return fmt.Errorf("unable to exec sql when running postgres sql files: %w", err) + } + } + return nil +} + +func (p *PostgresTestContainer) CreateSchemas(ctx context.Context, schemas []string) error { + for _, schema := range schemas { + _, err := p.DB.Exec(ctx, fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s;", schema)) + if err != nil { + return fmt.Errorf("unable to create schema %s: %w", schema, err) + } + } + return nil +} diff --git a/internal/testutil/testdata/gen_jobmappings_config.json b/internal/testutil/testdata/gen_jobmappings_config.json new file mode 100644 index 0000000000..cf61de37f2 --- /dev/null +++ b/internal/testutil/testdata/gen_jobmappings_config.json @@ -0,0 +1,22 @@ +[ + { + "folder":"postgres/alltypes", + "sql_file": "create-tables.sql", + "driver": "postgres" + }, + { + "folder":"postgres/humanresources", + "sql_file": "create-tables.sql", + "driver": "postgres" + }, + { + "folder": "mysql/alltypes", + "sql_file": "create-tables.sql", + "driver": "mysql" + }, + { + "folder": "mysql/humanresources", + "sql_file": "create-tables.sql", + "driver": "mysql" + } +] diff --git a/internal/testutil/testdata/generators.go b/internal/testutil/testdata/generators.go new file mode 100644 index 0000000000..c37d56bd4f --- /dev/null +++ b/internal/testutil/testdata/generators.go @@ -0,0 +1,3 @@ +package testutil_testdata + +//go:generate go run jobmapping_generator.go gen_jobmappings_config.json $GOPACKAGE diff --git a/internal/testutil/testdata/jobmapping_generator.go b/internal/testutil/testdata/jobmapping_generator.go new file mode 100644 index 0000000000..c8c96dc4c0 --- /dev/null +++ b/internal/testutil/testdata/jobmapping_generator.go @@ -0,0 +1,362 @@ +package main + +import ( + "bytes" + "encoding/json" + "fmt" + + // "html/template" + "io" + "os" + "regexp" + "slices" + "strings" + "text/template" + + "github.com/antlr4-go/antlr/v4" + parser "github.com/nucleuscloud/go-antlrv4-parser/tsql" + mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1" + pg_query "github.com/pganalyze/pg_query_go/v5" +) + +type Input struct { + Folder string `json:"folder"` + SqlFile string `json:"sql_file"` + Driver string `json:"driver"` +} + +type Column struct { + Name string + TypeStr string +} + +type Table struct { + Name string + Columns []*Column +} + +type JobMapping struct { + Table string + Column string + Transformer string + Config string +} + +func parsePostegresStatements(sql string) ([]*Table, error) { + tree, err := pg_query.Parse(sql) + if err != nil { + return nil, err + } + + tables := []*Table{} + for _, stmt := range tree.GetStmts() { + s := stmt.GetStmt() + switch s.Node.(type) { + case *pg_query.Node_CreateStmt: + table := s.GetCreateStmt().GetRelation().GetRelname() + columns := []*Column{} + for _, col := range s.GetCreateStmt().GetTableElts() { + if col.GetColumnDef() != nil { + columns = append(columns, &Column{ + Name: col.GetColumnDef().Colname, + }) + } + } + tables = append(tables, &Table{ + Name: table, + Columns: columns, + }) + } + } + return tables, nil +} + +// todo fix very brittle +func parseSQLStatements(sql string) []*Table { + lines := strings.Split(sql, "\n") + tableColumnsMap := make(map[string][]string) + var currentTable string + + reCreateTable := regexp.MustCompile(`CREATE\s+TABLE\s+IF\s+NOT\s+EXISTS\s+(\w+)\s*\.\s*(\w+)\s*\(`) + reCreateTableNoSchema := regexp.MustCompile(`CREATE\s+TABLE\s+IF\s+NOT\s+EXISTS\s+(\w+)\s*\(`) + reColumn := regexp.MustCompile(`^\s*([\w]+)\s+[\w\(\)]+.*`) + + for _, line := range lines { + line = strings.TrimSpace(line) + if matches := reCreateTable.FindStringSubmatch(line); len(matches) > 2 { + currentTable = matches[2] + } else if matches := reCreateTableNoSchema.FindStringSubmatch(line); len(matches) > 1 { + currentTable = matches[1] + } else if currentTable != "" { + if matches := reColumn.FindStringSubmatch(line); len(matches) > 1 { + columnName := matches[1] + if slices.Contains([]string{"primary key", "constraint", "key", "unique", "primary", "alter"}, strings.ToLower(matches[1])) { + continue + } + tableColumnsMap[currentTable] = append(tableColumnsMap[currentTable], columnName) + } else if strings.HasPrefix(line, "PRIMARY KEY") || strings.HasPrefix(line, "CONSTRAINT") || strings.HasPrefix(line, "UNIQUE") || strings.HasPrefix(line, "KEY") || strings.HasPrefix(line, "ENGINE") || strings.HasPrefix(line, ")") { + // Ignore key constraints and end of table definition + if strings.HasPrefix(line, ")") { + currentTable = "" + } + } + } + } + res := []*Table{} + for table, cols := range tableColumnsMap { + tableCols := []*Column{} + for _, c := range cols { + tableCols = append(tableCols, &Column{ + Name: c, + }) + } + res = append(res, &Table{ + Name: table, + Columns: tableCols, + }) + } + + return res +} + +func generateJobMapping(tables []*Table) []*mgmtv1alpha1.JobMapping { + mappings := []*mgmtv1alpha1.JobMapping{} + for _, t := range tables { + for _, c := range t.Columns { + mappings = append(mappings, &mgmtv1alpha1.JobMapping{ + Table: t.Name, + Column: c.Name, + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }) + + } + } + return mappings +} + +type TemplateData struct { + SourceFile string + PackageName string + Mappings []*mgmtv1alpha1.JobMapping + Tables []*Table + GenerateTypeMap bool +} + +func formatJobMappings(pkgName string, sqlFile string, mappings []*mgmtv1alpha1.JobMapping, tables []*Table, generateTypeMap bool) (string, error) { + const tmpl = ` +// Code generated by Neosync jobmapping_generator. DO NOT EDIT. +// source: {{ .SourceFile }} + +package {{ .PackageName }} + +import ( + mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1" +) + +func GetDefaultSyncJobMappings(schema string)[]*mgmtv1alpha1.JobMapping { + return []*mgmtv1alpha1.JobMapping{ + {{- range .Mappings }} + { + Schema: schema, + Table: "{{ .Table }}", + Column: "{{ .Column }}", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + {{- end }} + } +} +{{ if .GenerateTypeMap }} + + +func GetTableColumnTypeMap() map[string]map[string]string { + return map[string]map[string]string{ + {{- range .Tables }} + "{{ .Name }}": { + {{- range .Columns }} + "{{ .Name }}": "{{ .TypeStr }}", + {{- end }} + }, + {{- end }} + } +} +{{- end }} +` + data := TemplateData{ + SourceFile: sqlFile, + PackageName: pkgName, + Mappings: mappings, + Tables: tables, + GenerateTypeMap: generateTypeMap, + } + t := template.Must(template.New("jobmappings").Parse(tmpl)) + var out bytes.Buffer + err := t.Execute(&out, data) + if err != nil { + return "", err + } + return out.String(), nil +} + +func main() { + args := os.Args + if len(args) < 3 { + panic("must provide necessary args") + } + + configFile := args[1] + gopackage := args[2] + + packageSplit := strings.Split(gopackage, "_") + goPkg := packageSplit[len(packageSplit)-1] + + jsonFile, err := os.Open(configFile) + if err != nil { + fmt.Println("failed to open file: %s", err) + return + } + defer jsonFile.Close() + + byteValue, err := io.ReadAll(jsonFile) + if err != nil { + fmt.Println("failed to read file: %s", err) + return + } + + var inputs []Input + if err := json.Unmarshal(byteValue, &inputs); err != nil { + fmt.Println("failed to unmarshal JSON: %s", err) + return + } + for _, input := range inputs { + folderSplit := strings.Split(input.Folder, "/") + var goPkgName string + if len(folderSplit) == 1 { + goPkgName = strings.ReplaceAll(fmt.Sprintf("%s_%s", goPkg, input.Folder), "-", "") + } else if len(folderSplit) > 1 { + lastTwo := folderSplit[len(folderSplit)-2:] + goPkgName = strings.ReplaceAll(strings.Join(lastTwo, "_"), "-", "") + } + sqlFile, err := os.Open(fmt.Sprintf("%s/%s", input.Folder, input.SqlFile)) + if err != nil { + fmt.Println("failed to open file: %s", err) + } + + byteValue, err := io.ReadAll(sqlFile) + if err != nil { + fmt.Println("failed to read file: %s", err) + } + + sqlContent := string(byteValue) + sqlFile.Close() + + var tables []*Table + if input.Driver == "postgres" { + t, err := parsePostegresStatements(sqlContent) + if err != nil { + fmt.Println("Error parsing postgres SQL schema:", err) + return + } + tables = t + } else if input.Driver == "mysql" { + t := parseSQLStatements(sqlContent) + tables = t + } else if input.Driver == "sqlserver" { + t := parseTsql(sqlContent) + tables = t + } + + jobMapping := generateJobMapping(tables) + + formattedJobMappings, err := formatJobMappings(goPkgName, input.SqlFile, jobMapping, tables, input.Driver == "sqlserver") + if err != nil { + fmt.Println("Error formatting job mappings:", err) + return + } + + output := fmt.Sprintf("%s/job_mappings.go", input.Folder) + outputFile, err := os.Create(output) + if err != nil { + fmt.Println("Error creating jobmapping.go file:", err) + return + } + + _, err = outputFile.WriteString(formattedJobMappings) + if err != nil { + fmt.Println("Error writing to jobmapping.go file:", err) + return + } + outputFile.Close() + } + + return +} + +type tsqlListener struct { + *parser.BaseTSqlParserListener + inCreate bool + currentTable string + currentCols []*Column + mappings []*Table +} + +func (l *tsqlListener) PushTable() { + l.mappings = append(l.mappings, &Table{ + Name: l.currentTable, + Columns: l.currentCols, + }) + l.currentTable = "" + l.currentCols = []*Column{} + l.inCreate = false +} + +func (l *tsqlListener) PushColumn(name, typeStr string) { + l.currentCols = append(l.currentCols, &Column{ + Name: name, + TypeStr: typeStr, + }) +} + +func (l *tsqlListener) SetTable(schemaTable string) { + split := strings.Split(schemaTable, ".") + if len(split) == 1 { + l.currentTable = split[0] + } else if len(split) > 1 { + l.currentTable = split[1] + } +} + +// EnterCreate_table is called when production create_table is entered. +func (l *tsqlListener) EnterCreate_table(ctx *parser.Create_tableContext) { + l.inCreate = true + table := ctx.Table_name().GetText() + l.SetTable(table) +} + +// ExitCreate_table is called when production create_table is exited. +func (l *tsqlListener) ExitCreate_table(ctx *parser.Create_tableContext) { + l.PushTable() +} +func (l *tsqlListener) EnterColumn_definition(ctx *parser.Column_definitionContext) { + l.PushColumn(ctx.Id_().GetText(), ctx.Data_type().GetText()) +} + +func parseTsql(sql string) []*Table { + inputStream := antlr.NewInputStream(sql) + + // create the lexer + lexer := parser.NewTSqlLexer(inputStream) + tokens := antlr.NewCommonTokenStream(lexer, antlr.TokenDefaultChannel) + + // create the parser + p := parser.NewTSqlParser(tokens) + + listener := &tsqlListener{} + tree := p.Tsql_file() + antlr.ParseTreeWalkerDefault.Walk(listener, tree) + + return listener.mappings +} diff --git a/internal/testutil/testdata/mysql/alltypes/create-schema.sql b/internal/testutil/testdata/mysql/alltypes/create-schema.sql deleted file mode 100644 index 7fdccc4d07..0000000000 --- a/internal/testutil/testdata/mysql/alltypes/create-schema.sql +++ /dev/null @@ -1,2 +0,0 @@ -CREATE DATABASE IF NOT EXISTS alltypes; - diff --git a/internal/testutil/testdata/mysql/alltypes/create-tables.sql b/internal/testutil/testdata/mysql/alltypes/create-tables.sql index ecbdfdae22..024a32152e 100644 --- a/internal/testutil/testdata/mysql/alltypes/create-tables.sql +++ b/internal/testutil/testdata/mysql/alltypes/create-tables.sql @@ -1,6 +1,3 @@ -CREATE DATABASE IF NOT EXISTS alltypes; - -USE alltypes; CREATE TABLE IF NOT EXISTS all_data_types ( -- Auto-incrementing primary key id INT AUTO_INCREMENT PRIMARY KEY, @@ -76,7 +73,7 @@ INSERT INTO all_data_types ( json_col, set_as_array ) VALUES ( - 127, 32767, 8388607, 2147483647, 9223372036854775807, + 127, 32767, 8388607, 2147483647, 922337203685477580, 1234.56, 3.1415, 3.14159265359, b'10101010', '2023-09-12', '14:30:00', '2023-09-12 14:30:00', 2023, diff --git a/internal/testutil/testdata/mysql/alltypes/job_mappings.go b/internal/testutil/testdata/mysql/alltypes/job_mappings.go new file mode 100644 index 0000000000..0143ff7402 --- /dev/null +++ b/internal/testutil/testdata/mysql/alltypes/job_mappings.go @@ -0,0 +1,263 @@ + +// Code generated by Neosync jobmapping_generator. DO NOT EDIT. +// source: create-tables.sql + +package mysql_alltypes + +import ( + mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1" +) + +func GetDefaultSyncJobMappings(schema string)[]*mgmtv1alpha1.JobMapping { + return []*mgmtv1alpha1.JobMapping{ + { + Schema: schema, + Table: "all_data_types", + Column: "id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "tinyint_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "smallint_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "mediumint_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "int_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "bigint_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "decimal_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "float_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "double_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "bit_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "date_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "time_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "datetime_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "timestamp_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "year_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "char_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "varchar_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "binary_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "varbinary_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "tinyblob_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "tinytext_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "blob_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "text_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "mediumblob_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "mediumtext_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "longblob_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "longtext_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "enum_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "set_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "json_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "set_as_array", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + } +} + diff --git a/internal/testutil/testdata/mysql/alltypes/teardown.sql b/internal/testutil/testdata/mysql/alltypes/teardown.sql deleted file mode 100644 index 0ce3909abc..0000000000 --- a/internal/testutil/testdata/mysql/alltypes/teardown.sql +++ /dev/null @@ -1 +0,0 @@ -DROP DATABASE IF EXISTS alltypes; diff --git a/internal/testutil/testdata/mysql/humanresources/create-schema.sql b/internal/testutil/testdata/mysql/humanresources/create-schema.sql deleted file mode 100644 index 8f0d1c740e..0000000000 --- a/internal/testutil/testdata/mysql/humanresources/create-schema.sql +++ /dev/null @@ -1 +0,0 @@ -CREATE DATABASE IF NOT EXISTS humanresources; diff --git a/internal/testutil/testdata/mysql/humanresources/create-tables.sql b/internal/testutil/testdata/mysql/humanresources/create-tables.sql index 9664def324..88769136a2 100644 --- a/internal/testutil/testdata/mysql/humanresources/create-tables.sql +++ b/internal/testutil/testdata/mysql/humanresources/create-tables.sql @@ -1,66 +1,80 @@ -CREATE DATABASE IF NOT EXISTS humanresources; -USE humanresources; -CREATE TABLE regions ( - region_id INT (11) AUTO_INCREMENT PRIMARY KEY, - region_name VARCHAR (25) DEFAULT NULL +CREATE TABLE IF NOT EXISTS regions ( + region_id INT (11) AUTO_INCREMENT PRIMARY KEY, + region_name VARCHAR (25) DEFAULT NULL ); -CREATE TABLE countries ( - country_id CHAR (2) PRIMARY KEY, - country_name VARCHAR (40) DEFAULT NULL, - region_id INT (11) NOT NULL, - FOREIGN KEY (region_id) REFERENCES regions (region_id) ON DELETE CASCADE ON UPDATE CASCADE +CREATE TABLE IF NOT EXISTS countries ( + country_id CHAR (2) PRIMARY KEY, + country_name VARCHAR (40) DEFAULT NULL, + region_id INT (11) NOT NULL ); -CREATE TABLE locations ( - location_id INT (11) AUTO_INCREMENT PRIMARY KEY, - street_address VARCHAR (40) DEFAULT NULL, - postal_code VARCHAR (12) DEFAULT NULL, - city VARCHAR (30) NOT NULL, - state_province VARCHAR (25) DEFAULT NULL, - country_id CHAR (2) NOT NULL, - FOREIGN KEY (country_id) REFERENCES countries (country_id) ON DELETE CASCADE ON UPDATE CASCADE +CREATE TABLE IF NOT EXISTS locations ( + location_id INT (11) AUTO_INCREMENT PRIMARY KEY, + street_address VARCHAR (40) DEFAULT NULL, + postal_code VARCHAR (12) DEFAULT NULL, + city VARCHAR (30) NOT NULL, + state_province VARCHAR (25) DEFAULT NULL, + country_id CHAR (2) NOT NULL ); -CREATE TABLE jobs ( - job_id INT (11) AUTO_INCREMENT PRIMARY KEY, - job_title VARCHAR (35) NOT NULL, - min_salary DECIMAL (8, 2) DEFAULT NULL, - max_salary DECIMAL (8, 2) DEFAULT NULL +CREATE TABLE IF NOT EXISTS jobs ( + job_id INT (11) AUTO_INCREMENT PRIMARY KEY, + job_title VARCHAR (35) NOT NULL, + min_salary DECIMAL (8, 2) DEFAULT NULL, + max_salary DECIMAL (8, 2) DEFAULT NULL ); -CREATE TABLE departments ( - department_id INT (11) AUTO_INCREMENT PRIMARY KEY, - department_name VARCHAR (30) NOT NULL, - location_id INT (11) DEFAULT NULL, - FOREIGN KEY (location_id) REFERENCES locations (location_id) ON DELETE CASCADE ON UPDATE CASCADE +CREATE TABLE IF NOT EXISTS departments ( + department_id INT (11) AUTO_INCREMENT PRIMARY KEY, + department_name VARCHAR (30) NOT NULL, + location_id INT (11) DEFAULT NULL ); -CREATE TABLE employees ( - employee_id INT (11) AUTO_INCREMENT PRIMARY KEY, - first_name VARCHAR (20) DEFAULT NULL, - last_name VARCHAR (25) NOT NULL, - email VARCHAR (100) NOT NULL, - phone_number VARCHAR (20) DEFAULT NULL, - hire_date DATE NOT NULL, - job_id INT (11) NOT NULL, - salary DECIMAL (8, 2) NOT NULL, - manager_id INT (11) DEFAULT NULL, - department_id INT (11) DEFAULT NULL, - FOREIGN KEY (job_id) REFERENCES jobs (job_id) ON DELETE CASCADE ON UPDATE CASCADE, - FOREIGN KEY (department_id) REFERENCES departments (department_id) ON DELETE CASCADE ON UPDATE CASCADE, - FOREIGN KEY (manager_id) REFERENCES employees (employee_id) +CREATE TABLE IF NOT EXISTS employees ( + employee_id INT (11) AUTO_INCREMENT PRIMARY KEY, + first_name VARCHAR (20) DEFAULT NULL, + last_name VARCHAR (25) NOT NULL, + email VARCHAR (100) NOT NULL, + phone_number VARCHAR (20) DEFAULT NULL, + hire_date DATE NOT NULL, + job_id INT (11) NOT NULL, + salary DECIMAL (8, 2) NOT NULL, + manager_id INT (11) DEFAULT NULL, + department_id INT (11) DEFAULT NULL ); -CREATE TABLE dependents ( - dependent_id INT (11) AUTO_INCREMENT PRIMARY KEY, - first_name VARCHAR (50) NOT NULL, - last_name VARCHAR (50) NOT NULL, - relationship VARCHAR (25) NOT NULL, - employee_id INT (11) NOT NULL, - FOREIGN KEY (employee_id) REFERENCES employees (employee_id) ON DELETE CASCADE ON UPDATE CASCADE +CREATE TABLE IF NOT EXISTS dependents ( + dependent_id INT (11) AUTO_INCREMENT PRIMARY KEY, + first_name VARCHAR (50) NOT NULL, + last_name VARCHAR (50) NOT NULL, + relationship VARCHAR (25) NOT NULL, + employee_id INT (11) NOT NULL ); +-- Add foreign keys +ALTER TABLE countries + ADD FOREIGN KEY (region_id) REFERENCES regions (region_id) + ON DELETE CASCADE ON UPDATE CASCADE; + +ALTER TABLE locations + ADD FOREIGN KEY (country_id) REFERENCES countries (country_id) + ON DELETE CASCADE ON UPDATE CASCADE; + +ALTER TABLE departments + ADD FOREIGN KEY (location_id) REFERENCES locations (location_id) + ON DELETE CASCADE ON UPDATE CASCADE; + +ALTER TABLE employees + ADD FOREIGN KEY (job_id) REFERENCES jobs (job_id) + ON DELETE CASCADE ON UPDATE CASCADE, + ADD FOREIGN KEY (department_id) REFERENCES departments (department_id) + ON DELETE CASCADE ON UPDATE CASCADE, + ADD FOREIGN KEY (manager_id) REFERENCES employees (employee_id); + +ALTER TABLE dependents + ADD FOREIGN KEY (employee_id) REFERENCES employees (employee_id) + ON DELETE CASCADE ON UPDATE CASCADE; /*Data for the table regions */ @@ -225,7 +239,7 @@ INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_i -- table with generated columns -CREATE TABLE generated_table ( +CREATE TABLE IF NOT EXISTS generated_table ( -- Auto Incremented column id INT AUTO_INCREMENT PRIMARY KEY, price DECIMAL(10,2) NOT NULL, diff --git a/internal/testutil/testdata/mysql/humanresources/job_mappings.go b/internal/testutil/testdata/mysql/humanresources/job_mappings.go new file mode 100644 index 0000000000..cf821ff31f --- /dev/null +++ b/internal/testutil/testdata/mysql/humanresources/job_mappings.go @@ -0,0 +1,327 @@ + +// Code generated by Neosync jobmapping_generator. DO NOT EDIT. +// source: create-tables.sql + +package mysql_humanresources + +import ( + mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1" +) + +func GetDefaultSyncJobMappings(schema string)[]*mgmtv1alpha1.JobMapping { + return []*mgmtv1alpha1.JobMapping{ + { + Schema: schema, + Table: "departments", + Column: "department_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "departments", + Column: "department_name", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "departments", + Column: "location_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "employees", + Column: "employee_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "employees", + Column: "first_name", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "employees", + Column: "last_name", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "employees", + Column: "email", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "employees", + Column: "phone_number", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "employees", + Column: "hire_date", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "employees", + Column: "job_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "employees", + Column: "salary", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "employees", + Column: "manager_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "employees", + Column: "department_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "dependents", + Column: "dependent_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "dependents", + Column: "first_name", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "dependents", + Column: "last_name", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "dependents", + Column: "relationship", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "dependents", + Column: "employee_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "generated_table", + Column: "id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "generated_table", + Column: "price", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "generated_table", + Column: "quantity", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "generated_table", + Column: "discount_percent", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "generated_table", + Column: "total_value", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "generated_table", + Column: "discounted_price", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "regions", + Column: "region_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "regions", + Column: "region_name", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "countries", + Column: "country_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "countries", + Column: "country_name", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "countries", + Column: "region_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "locations", + Column: "location_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "locations", + Column: "street_address", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "locations", + Column: "postal_code", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "locations", + Column: "city", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "locations", + Column: "state_province", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "locations", + Column: "country_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "jobs", + Column: "job_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "jobs", + Column: "job_title", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "jobs", + Column: "min_salary", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "jobs", + Column: "max_salary", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + } +} + diff --git a/internal/testutil/testdata/mysql/humanresources/teardown.sql b/internal/testutil/testdata/mysql/humanresources/teardown.sql deleted file mode 100644 index 7db214c127..0000000000 --- a/internal/testutil/testdata/mysql/humanresources/teardown.sql +++ /dev/null @@ -1 +0,0 @@ -DROP DATABASE IF EXISTS humanresources; diff --git a/internal/testutil/testdata/postgres/alltypes/create-schema.sql b/internal/testutil/testdata/postgres/alltypes/create-schema.sql deleted file mode 100644 index 5ac3f363a9..0000000000 --- a/internal/testutil/testdata/postgres/alltypes/create-schema.sql +++ /dev/null @@ -1 +0,0 @@ -CREATE SCHEMA IF NOT EXISTS alltypes; diff --git a/internal/testutil/testdata/postgres/alltypes/create-tables.sql b/internal/testutil/testdata/postgres/alltypes/create-tables.sql index ea9c5daf69..1821d4df7e 100644 --- a/internal/testutil/testdata/postgres/alltypes/create-tables.sql +++ b/internal/testutil/testdata/postgres/alltypes/create-tables.sql @@ -1,5 +1,4 @@ -CREATE SCHEMA IF NOT EXISTS alltypes; -CREATE TABLE IF NOT EXISTS alltypes.all_postgres_types ( +CREATE TABLE IF NOT EXISTS all_data_types ( id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- Numeric Types smallint_col SMALLINT, @@ -82,7 +81,7 @@ CREATE TABLE IF NOT EXISTS alltypes.all_postgres_types ( ); -INSERT INTO alltypes.all_postgres_types ( +INSERT INTO all_data_types ( Id, smallint_col, integer_col, @@ -135,7 +134,7 @@ INSERT INTO alltypes.all_postgres_types ( DEFAULT, 32767, -- smallint_col 2147483647, -- integer_col - 9223372036854775807, -- bigint_col + 922337203685477580, -- bigint_col 1234.56, -- decimal_col 99999999.99, -- numeric_col 12345.67, -- real_col @@ -182,21 +181,21 @@ INSERT INTO alltypes.all_postgres_types ( 123456 -- oid_col ); -INSERT INTO alltypes.all_postgres_types ( +INSERT INTO all_data_types ( Id ) VALUES ( DEFAULT ); --- CREATE TABLE IF NOT EXISTS alltypes.time_time ( +-- CREATE TABLE IF NOT EXISTS time_time ( -- id SERIAL PRIMARY KEY, -- timestamp_col TIMESTAMP, -- timestamptz_col TIMESTAMPTZ, -- date_col DATE -- ); --- INSERT INTO alltypes.time_time ( +-- INSERT INTO time_time ( -- timestamp_col, -- timestamptz_col, -- date_col @@ -207,7 +206,7 @@ INSERT INTO alltypes.all_postgres_types ( -- '2024-03-18' -- ); --- INSERT INTO alltypes.time_time ( +-- INSERT INTO time_time ( -- timestamp_col, -- timestamptz_col, -- date_col @@ -219,7 +218,7 @@ INSERT INTO alltypes.all_postgres_types ( -- ); -CREATE TABLE IF NOT EXISTS alltypes.array_types ( +CREATE TABLE IF NOT EXISTS array_types ( "id" BIGINT NOT NULL PRIMARY KEY, -- "int_array" _int4, -- "smallint_array" _int2, @@ -256,7 +255,7 @@ CREATE TABLE IF NOT EXISTS alltypes.array_types ( ); -INSERT INTO alltypes.array_types ( +INSERT INTO array_types ( id, -- int_array, smallint_array, bigint_array, -- real_array, @@ -312,28 +311,28 @@ INSERT INTO alltypes.array_types ( ); -CREATE TABLE alltypes.json_data ( +CREATE TABLE json_data ( id SERIAL PRIMARY KEY, data JSONB ); -INSERT INTO alltypes.json_data (data) VALUES ('"Hello, world!"'); -INSERT INTO alltypes.json_data (data) VALUES ('42'); -INSERT INTO alltypes.json_data (data) VALUES ('3.14'); -INSERT INTO alltypes.json_data (data) VALUES ('true'); -INSERT INTO alltypes.json_data (data) VALUES ('false'); -INSERT INTO alltypes.json_data (data) VALUES ('null'); +INSERT INTO json_data (data) VALUES ('"Hello, world!"'); +INSERT INTO json_data (data) VALUES ('42'); +INSERT INTO json_data (data) VALUES ('3.14'); +INSERT INTO json_data (data) VALUES ('true'); +INSERT INTO json_data (data) VALUES ('false'); +INSERT INTO json_data (data) VALUES ('null'); -INSERT INTO alltypes.json_data (data) VALUES ('{"name": "John", "age": 30}'); -INSERT INTO alltypes.json_data (data) VALUES ('{"coords": {"x": 10, "y": 20}}'); +INSERT INTO json_data (data) VALUES ('{"name": "John", "age": 30}'); +INSERT INTO json_data (data) VALUES ('{"coords": {"x": 10, "y": 20}}'); -INSERT INTO alltypes.json_data (data) VALUES ('[1, 2, 3, 4]'); -INSERT INTO alltypes.json_data (data) VALUES ('["apple", "banana", "cherry"]'); +INSERT INTO json_data (data) VALUES ('[1, 2, 3, 4]'); +INSERT INTO json_data (data) VALUES ('["apple", "banana", "cherry"]'); -INSERT INTO alltypes.json_data (data) VALUES ('{"items": ["book", "pen"], "count": 2, "in_stock": true}'); +INSERT INTO json_data (data) VALUES ('{"items": ["book", "pen"], "count": 2, "in_stock": true}'); -INSERT INTO alltypes.json_data (data) VALUES ( +INSERT INTO json_data (data) VALUES ( '{ "user": { "name": "Alice", diff --git a/internal/testutil/testdata/postgres/alltypes/job_mappings.go b/internal/testutil/testdata/postgres/alltypes/job_mappings.go new file mode 100644 index 0000000000..08f933ff49 --- /dev/null +++ b/internal/testutil/testdata/postgres/alltypes/job_mappings.go @@ -0,0 +1,415 @@ + +// Code generated by Neosync jobmapping_generator. DO NOT EDIT. +// source: create-tables.sql + +package postgres_alltypes + +import ( + mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1" +) + +func GetDefaultSyncJobMappings(schema string)[]*mgmtv1alpha1.JobMapping { + return []*mgmtv1alpha1.JobMapping{ + { + Schema: schema, + Table: "all_data_types", + Column: "id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "smallint_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "integer_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "bigint_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "decimal_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "numeric_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "real_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "double_precision_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "serial_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "bigserial_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "money_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "char_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "varchar_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "text_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "bytea_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "timestamp_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "timestamptz_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "date_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "interval_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "boolean_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "uuid_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "inet_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "cidr_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "macaddr_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "bit_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "varbit_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "point_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "line_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "lseg_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "box_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "path_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "polygon_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "circle_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "json_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "jsonb_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "int4range_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "int8range_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "numrange_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "tsrange_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "tstzrange_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "daterange_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "integer_array_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "text_array_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "xml_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "tsvector_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "all_data_types", + Column: "oid_col", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "array_types", + Column: "id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "array_types", + Column: "interval_array", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "json_data", + Column: "id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "json_data", + Column: "data", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + } +} + diff --git a/internal/testutil/testdata/postgres/alltypes/teardown.sql b/internal/testutil/testdata/postgres/alltypes/teardown.sql deleted file mode 100644 index 7313dd27c3..0000000000 --- a/internal/testutil/testdata/postgres/alltypes/teardown.sql +++ /dev/null @@ -1 +0,0 @@ -DROP SCHEMA IF EXISTS alltypes CASCADE; diff --git a/internal/testutil/testdata/postgres/humanresources/create-schema.sql b/internal/testutil/testdata/postgres/humanresources/create-schema.sql deleted file mode 100644 index 9112cc24aa..0000000000 --- a/internal/testutil/testdata/postgres/humanresources/create-schema.sql +++ /dev/null @@ -1 +0,0 @@ -CREATE SCHEMA IF NOT EXISTS humanresources; diff --git a/internal/testutil/testdata/postgres/humanresources/create-tables.sql b/internal/testutil/testdata/postgres/humanresources/create-tables.sql index 56276c8ca0..5ca813ab81 100644 --- a/internal/testutil/testdata/postgres/humanresources/create-tables.sql +++ b/internal/testutil/testdata/postgres/humanresources/create-tables.sql @@ -1,6 +1,3 @@ -CREATE SCHEMA IF NOT EXISTS humanresources; -SET search_path TO humanresources; - CREATE TABLE regions ( region_id SERIAL PRIMARY KEY, region_name CHARACTER VARYING (25) diff --git a/internal/testutil/testdata/postgres/humanresources/job_mappings.go b/internal/testutil/testdata/postgres/humanresources/job_mappings.go new file mode 100644 index 0000000000..cbbd894aa0 --- /dev/null +++ b/internal/testutil/testdata/postgres/humanresources/job_mappings.go @@ -0,0 +1,311 @@ + +// Code generated by Neosync jobmapping_generator. DO NOT EDIT. +// source: create-tables.sql + +package postgres_humanresources + +import ( + mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1" +) + +func GetDefaultSyncJobMappings(schema string)[]*mgmtv1alpha1.JobMapping { + return []*mgmtv1alpha1.JobMapping{ + { + Schema: schema, + Table: "regions", + Column: "region_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "regions", + Column: "region_name", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "countries", + Column: "country_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "countries", + Column: "country_name", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "countries", + Column: "region_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "locations", + Column: "location_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "locations", + Column: "street_address", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "locations", + Column: "postal_code", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "locations", + Column: "city", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "locations", + Column: "state_province", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "locations", + Column: "country_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "departments", + Column: "department_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "departments", + Column: "department_name", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "departments", + Column: "location_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "jobs", + Column: "job_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "jobs", + Column: "job_title", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "jobs", + Column: "min_salary", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "jobs", + Column: "max_salary", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "employees", + Column: "employee_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "employees", + Column: "first_name", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "employees", + Column: "last_name", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "employees", + Column: "email", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "employees", + Column: "phone_number", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "employees", + Column: "hire_date", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "employees", + Column: "job_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "employees", + Column: "salary", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "employees", + Column: "manager_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "employees", + Column: "department_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "dependents", + Column: "dependent_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "dependents", + Column: "first_name", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "dependents", + Column: "last_name", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "dependents", + Column: "relationship", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "dependents", + Column: "employee_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "generated_table", + Column: "id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "generated_table", + Column: "amount", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "generated_table", + Column: "status", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + { + Schema: schema, + Table: "generated_table", + Column: "amount_with_tax", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_PASSTHROUGH, + }, + }, + } +} + diff --git a/internal/testutil/testdata/postgres/humanresources/teardown.sql b/internal/testutil/testdata/postgres/humanresources/teardown.sql deleted file mode 100644 index dc2f4c304e..0000000000 --- a/internal/testutil/testdata/postgres/humanresources/teardown.sql +++ /dev/null @@ -1 +0,0 @@ -DROP SCHEMA IF EXISTS humanresources CASCADE; diff --git a/internal/testutil/utils.go b/internal/testutil/utils.go index 5d42c4bb17..9ffb53b79c 100644 --- a/internal/testutil/utils.go +++ b/internal/testutil/utils.go @@ -20,6 +20,30 @@ func ShouldRunIntegrationTest() bool { return true } +func ShouldRunS3IntegrationTest() bool { + evkey := "S3_INTEGRATION_TESTS_ENABLED" + shouldRun := os.Getenv(evkey) + if shouldRun != "1" { + slog.Warn(fmt.Sprintf("skipping S3 integration tests, set %s=1 to enable", evkey)) + return false + } + return true +} + +type AwsS3Config struct { + Bucket string + Region string + AccessKeyId string + SecretAccessKey string +} + +func GetTestAwsS3Config() *AwsS3Config { + return &AwsS3Config{ + Region: os.Getenv("TEST_S3_REGION"), + Bucket: os.Getenv("TEST_S3_BUCKET"), + } +} + // not safe for concurrent use func GetTestLogger(t testing.TB) *slog.Logger { f := slogt.Factory(func(w io.Writer) slog.Handler { @@ -31,6 +55,30 @@ func GetTestLogger(t testing.TB) *slog.Logger { return slogt.New(t, f) } +type FakeEELicense struct { + isValid bool +} + +type Option func(*FakeEELicense) + +func WithIsValid() Option { + return func(f *FakeEELicense) { + f.isValid = true + } +} + +func NewFakeEELicense(opts ...Option) *FakeEELicense { + f := &FakeEELicense{} + for _, opt := range opts { + opt(f) + } + return f +} + +func (f *FakeEELicense) IsValid() bool { + return f.isValid +} + func GetConcurrentTestLogger(t testing.TB) *slog.Logger { if testing.Verbose() { return slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{})) diff --git a/worker/pkg/integration-test/datasync_workflow.go b/worker/pkg/integration-test/datasync_workflow.go new file mode 100644 index 0000000000..6dfde25e1c --- /dev/null +++ b/worker/pkg/integration-test/datasync_workflow.go @@ -0,0 +1,192 @@ +package integrationtest + +import ( + "fmt" + "testing" + "time" + + tcneosyncapi "github.com/nucleuscloud/neosync/backend/pkg/integration-test" + "github.com/nucleuscloud/neosync/backend/pkg/sqlconnect" + sql_manager "github.com/nucleuscloud/neosync/backend/pkg/sqlmanager" + connectionmanager "github.com/nucleuscloud/neosync/internal/connection-manager" + "github.com/nucleuscloud/neosync/internal/connection-manager/providers/mongoprovider" + "github.com/nucleuscloud/neosync/internal/connection-manager/providers/sqlprovider" + "github.com/nucleuscloud/neosync/internal/testutil" + neosync_redis "github.com/nucleuscloud/neosync/worker/internal/redis" + neosync_benthos_mongodb "github.com/nucleuscloud/neosync/worker/pkg/benthos/mongodb" + neosync_benthos_sql "github.com/nucleuscloud/neosync/worker/pkg/benthos/sql" + accountstatus_activity "github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/activities/account-status" + genbenthosconfigs_activity "github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/activities/gen-benthos-configs" + jobhooks_by_timing_activity "github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/activities/jobhooks-by-timing" + posttablesync_activity "github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/activities/post-table-sync" + runsqlinittablestmts_activity "github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/activities/run-sql-init-table-stmts" + "github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/activities/shared" + sync_activity "github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/activities/sync" + syncactivityopts_activity "github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/activities/sync-activity-opts" + syncrediscleanup_activity "github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/activities/sync-redis-clean-up" + datasync_workflow "github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/workflow" + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/metric" + "go.temporal.io/sdk/activity" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/converter" + "go.temporal.io/sdk/log" + "go.temporal.io/sdk/testsuite" +) + +type Option func(*TestWorkflowEnv) + +type TestWorkflowEnv struct { + neosyncApi *tcneosyncapi.NeosyncApiTestClient + redisconfig *shared.RedisConfig + redisclient redis.UniversalClient + fakeEELicense *testutil.FakeEELicense + TestEnv *testsuite.TestWorkflowEnvironment +} + +// WithRedis creates redis client with provided URL +func WithRedis(url string) Option { + return func(c *TestWorkflowEnv) { + c.redisconfig = &shared.RedisConfig{ + Url: url, + Kind: "simple", + Tls: &shared.RedisTlsConfig{ + Enabled: false, + }, + } + } +} + +// WithValidEELicense creates a valid enterprise edition license +func WithValidEELicense() Option { + return func(c *TestWorkflowEnv) { + c.fakeEELicense = testutil.NewFakeEELicense(testutil.WithIsValid()) + } +} + +// NewTestDataSyncWorkflowEnv creates and configures a new test datasync workflow environment +func NewTestDataSyncWorkflowEnv( + t testing.TB, + neosyncApi *tcneosyncapi.NeosyncApiTestClient, + dbManagers *TestDatabaseManagers, + opts ...Option, +) *TestWorkflowEnv { + t.Helper() + + workflowEnv := &TestWorkflowEnv{ + neosyncApi: neosyncApi, + fakeEELicense: testutil.NewFakeEELicense(), + } + + for _, opt := range opts { + opt(workflowEnv) + } + + redisclient, err := neosync_redis.GetRedisClient(workflowEnv.redisconfig) + if err != nil { + t.Fatal(err) + } + workflowEnv.redisclient = redisclient + + connclient := neosyncApi.UnauthdClients.Connections + jobclient := neosyncApi.UnauthdClients.Jobs + transformerclient := neosyncApi.UnauthdClients.Transformers + userclient := neosyncApi.UnauthdClients.Users + + testSuite := &testsuite.WorkflowTestSuite{} + testSuite.SetLogger(log.NewStructuredLogger(testutil.GetConcurrentTestLogger(t))) + env := testSuite.NewTestWorkflowEnvironment() + + genbenthosActivity := genbenthosconfigs_activity.New( + jobclient, + connclient, + transformerclient, + dbManagers.SqlManager, + workflowEnv.redisconfig, + false, + ) + + var activityMeter metric.Meter + syncActivity := sync_activity.New(connclient, jobclient, dbManagers.SqlConnManager, dbManagers.MongoConnManager, activityMeter, sync_activity.NewBenthosStreamManager()) + retrieveActivityOpts := syncactivityopts_activity.New(jobclient) + runSqlInitTableStatements := runsqlinittablestmts_activity.New(jobclient, connclient, dbManagers.SqlManager, workflowEnv.fakeEELicense) + jobhookTimingActivity := jobhooks_by_timing_activity.New(jobclient, connclient, dbManagers.SqlManager, workflowEnv.fakeEELicense) + accountStatusActivity := accountstatus_activity.New(userclient) + posttableSyncActivity := posttablesync_activity.New(jobclient, dbManagers.SqlManager, connclient) + redisCleanUpActivity := syncrediscleanup_activity.New(workflowEnv.redisclient) + + env.RegisterWorkflow(datasync_workflow.Workflow) + env.RegisterActivity(syncActivity.Sync) + env.RegisterActivity(retrieveActivityOpts.RetrieveActivityOptions) + env.RegisterActivity(runSqlInitTableStatements.RunSqlInitTableStatements) + env.RegisterActivity(redisCleanUpActivity.DeleteRedisHash) + env.RegisterActivity(genbenthosActivity.GenerateBenthosConfigs) + env.RegisterActivity(accountStatusActivity.CheckAccountStatus) + env.RegisterActivity(jobhookTimingActivity.RunJobHooksByTiming) + env.RegisterActivity(posttableSyncActivity.RunPostTableSync) + env.SetTestTimeout(600 * time.Second) + + workflowEnv.TestEnv = env + + return workflowEnv +} + +// ExecuteTestDataSyncWorkflow starts the test workflow with the given job ID +func (w *TestWorkflowEnv) ExecuteTestDataSyncWorkflow(jobId string) { + w.TestEnv.SetStartWorkflowOptions(client.StartWorkflowOptions{ID: jobId}) + w.TestEnv.ExecuteWorkflow(datasync_workflow.Workflow, &datasync_workflow.WorkflowRequest{JobId: jobId}) +} + +// RequireActivitiesCompletedSuccessfully verifies all activities completed without errors +// NOTE: this should be called before ExecuteTestDataSyncWorkflow +func (w *TestWorkflowEnv) RequireActivitiesCompletedSuccessfully(t testing.TB) { + w.TestEnv.SetOnActivityCompletedListener(func(activityInfo *activity.Info, result converter.EncodedValue, err error) { + require.NoError(t, err, "Activity %s failed", activityInfo.ActivityType.Name) + if activityInfo.ActivityType.Name == "RunPostTableSync" && result.HasValue() { + var postTableSyncResp posttablesync_activity.RunPostTableSyncResponse + decodeErr := result.Get(&postTableSyncResp) + require.NoError(t, decodeErr, "Failed to decode result for activity %s", activityInfo.ActivityType.Name) + require.Emptyf(t, postTableSyncResp.Errors, "Post table sync activity returned errors: %v", formatPostTableSyncErrors(postTableSyncResp.Errors)) + } + }) +} + +func formatPostTableSyncErrors(errors []*posttablesync_activity.PostTableSyncError) []string { + formatted := []string{} + for _, err := range errors { + for _, e := range err.Errors { + formatted = append(formatted, fmt.Sprintf("statement: %s error: %s", e.Statement, e.Error)) + } + } + return formatted +} + +// TestDatabaseManagers holds managers for supported connection types +type TestDatabaseManagers struct { + SqlConnManager *connectionmanager.ConnectionManager[neosync_benthos_sql.SqlDbtx] + SqlManager *sql_manager.SqlManager + MongoConnManager *connectionmanager.ConnectionManager[neosync_benthos_mongodb.MongoClient] +} + +// NewTestDatabaseManagers creates and configures database connection managers for testing +func NewTestDatabaseManagers(t testing.TB) *TestDatabaseManagers { + sqlconnmanager := connectionmanager.NewConnectionManager(sqlprovider.NewProvider(&sqlconnect.SqlOpenConnector{}), connectionmanager.WithReaperPoll(10*time.Second)) + go sqlconnmanager.Reaper(testutil.GetConcurrentTestLogger(t)) + mongoconnmanager := connectionmanager.NewConnectionManager(mongoprovider.NewProvider()) + go mongoconnmanager.Reaper(testutil.GetConcurrentTestLogger(t)) + + t.Cleanup(func() { + sqlconnmanager.Shutdown(testutil.GetConcurrentTestLogger(t)) + mongoconnmanager.Shutdown(testutil.GetConcurrentTestLogger(t)) + }) + + sqlmanager := sql_manager.NewSqlManager( + sql_manager.WithConnectionManager(sqlconnmanager), + ) + return &TestDatabaseManagers{ + SqlConnManager: sqlconnmanager, + SqlManager: sqlmanager, + MongoConnManager: mongoconnmanager, + } +} diff --git a/worker/pkg/workflows/datasync/activities/post-table-sync/activity.go b/worker/pkg/workflows/datasync/activities/post-table-sync/activity.go index 7e1ce3c1d3..fb7ce15f57 100644 --- a/worker/pkg/workflows/datasync/activities/post-table-sync/activity.go +++ b/worker/pkg/workflows/datasync/activities/post-table-sync/activity.go @@ -157,5 +157,9 @@ func (a *Activity) RunPostTableSync( } func runContextNotFound(err error) bool { - return strings.Contains(err.Error(), "unable to find key") + connectErr, ok := err.(*connect.Error) + if ok && connectErr.Code() == connect.CodeNotFound { + return true + } + return strings.Contains(err.Error(), "unable to find key") || strings.Contains(err.Error(), "no run context exists with the provided key") } diff --git a/worker/pkg/workflows/datasync/workflow/workflow_integration_test.go b/worker/pkg/workflows/datasync/workflow/workflow_integration_test.go index a394eaa976..cbdd2a14ef 100644 --- a/worker/pkg/workflows/datasync/workflow/workflow_integration_test.go +++ b/worker/pkg/workflows/datasync/workflow/workflow_integration_test.go @@ -1665,7 +1665,7 @@ func executeWorkflow( env.RegisterActivity(syncActivity.Sync) env.RegisterActivity(retrieveActivityOpts.RetrieveActivityOptions) env.RegisterActivity(runSqlInitTableStatements.RunSqlInitTableStatements) - env.RegisterActivity(redisCleanUpActivity) + env.RegisterActivity(redisCleanUpActivity.DeleteRedisHash) env.RegisterActivity(genbenthosActivity.GenerateBenthosConfigs) env.RegisterActivity(accountStatusActivity.CheckAccountStatus) env.RegisterActivity(jobhookTimingActivity.RunJobHooksByTiming)