From 39a091528a23b108aeb7402e8b15a58355b10e54 Mon Sep 17 00:00:00 2001 From: Rohith BCS Date: Wed, 14 Jun 2023 14:21:39 +0530 Subject: [PATCH] chore: remove deprecated merged schema field (#3482) --- warehouse/internal/model/upload.go | 1 - warehouse/internal/repo/upload.go | 6 ------ warehouse/internal/repo/upload_test.go | 3 --- .../testdata/sql/processing_stats_test.sql | 20 +++++++++---------- warehouse/testdata/sql/seed_tracker_test.sql | 4 ++-- warehouse/upload.go | 13 ------------ 6 files changed, 12 insertions(+), 35 deletions(-) diff --git a/warehouse/internal/model/upload.go b/warehouse/internal/model/upload.go index daab522e98..a96d0c07f1 100644 --- a/warehouse/internal/model/upload.go +++ b/warehouse/internal/model/upload.go @@ -72,7 +72,6 @@ type Upload struct { Attempts int64 UploadSchema Schema - MergedSchema Schema } type Timings []map[string]time.Time diff --git a/warehouse/internal/repo/upload.go b/warehouse/internal/repo/upload.go index 48f48aceea..26a3c6fd4f 100644 --- a/warehouse/internal/repo/upload.go +++ b/warehouse/internal/repo/upload.go @@ -25,7 +25,6 @@ const ( id, status, schema, - mergedSchema, namespace, workspace_id, source_id, @@ -455,7 +454,6 @@ func (uploads *Uploads) DeleteWaiting(ctx context.Context, uploadID int64) error func scanUpload(scan scanFn, upload *model.Upload) error { var ( schema []byte - mergedSchema []byte firstTiming sql.NullString lastTiming sql.NullString firstEventAt, lastEventAt sql.NullTime @@ -467,7 +465,6 @@ func scanUpload(scan scanFn, upload *model.Upload) error { &upload.ID, &upload.Status, &schema, - &mergedSchema, &upload.Namespace, &upload.WorkspaceID, &upload.SourceID, @@ -495,9 +492,6 @@ func scanUpload(scan scanFn, upload *model.Upload) error { if err := json.Unmarshal(schema, &upload.UploadSchema); err != nil { return fmt.Errorf("unmarshal upload schema: %w", err) } - if err := json.Unmarshal(mergedSchema, &upload.MergedSchema); err != nil { - return fmt.Errorf("unmarshal merged schema: %w", err) - } var metadata UploadMetadata if err := json.Unmarshal(metadataRaw, &metadata); err != nil { return fmt.Errorf("unmarshal metadata: %w", err) diff --git a/warehouse/internal/repo/upload_test.go b/warehouse/internal/repo/upload_test.go index fc02028291..52324d550f 100644 --- a/warehouse/internal/repo/upload_test.go +++ b/warehouse/internal/repo/upload_test.go @@ -93,7 +93,6 @@ func TestUploads_Count(t *testing.T) { uploads[i].ID = id uploads[i].Error = []byte("{}") uploads[i].UploadSchema = model.Schema{} - uploads[i].MergedSchema = model.Schema{} uploads[i].LoadFileType = "csv" uploads[i].StagingFileStartID = int64(i + 1) uploads[i].StagingFileEndID = int64(i + 1) @@ -202,7 +201,6 @@ func TestUploads_Get(t *testing.T) { ogUpload.SourceTaskRunID = "source_task_run_id" ogUpload.SourceJobID = "source_job_id" ogUpload.SourceJobRunID = "source_job_run_id" - ogUpload.MergedSchema = model.Schema{} t.Run("Get", func(t *testing.T) { upload, err := repoUpload.Get(ctx, id) @@ -684,7 +682,6 @@ func TestUploads_Processing(t *testing.T) { uploads[i].LoadFileType = "csv" uploads[i].StagingFileStartID = int64(i + 1) uploads[i].StagingFileEndID = int64(i + 1) - uploads[i].MergedSchema = model.Schema{} require.NoError(t, err) } diff --git a/warehouse/testdata/sql/processing_stats_test.sql b/warehouse/testdata/sql/processing_stats_test.sql index 04c5aeea00..ae78d584c5 100644 --- a/warehouse/testdata/sql/processing_stats_test.sql +++ b/warehouse/testdata/sql/processing_stats_test.sql @@ -2,51 +2,51 @@ BEGIN; INSERT INTO wh_uploads(id, source_id, namespace, destination_id, destination_type, start_staging_file_id, end_staging_file_id, start_load_file_id, end_load_file_id, status, schema, error, first_event_at, last_event_at, - last_exec_at, timings, created_at, updated_at, metadata,mergedschema, + last_exec_at, timings, created_at, updated_at, metadata, in_progress, workspace_id) VALUES (1, 'test-sourceID', 'test-namespace', 'test-destinationID', 'POSTGRES', 0, 0, 0, 0, 'exporting_data', '{}', '{}', NULL, NULL, NULL, NULL, '2022-12-06 15:23:37.100685', '2022-12-06 15:23:37.100685', '{"nextRetryTime": "2022-12-06 20:53:37"}', - '{}', TRUE, 'test-workspaceID'), + TRUE, 'test-workspaceID'), (2, 'test-sourceID', 'test-namespace', 'test-destinationID', 'POSTGRES', 0, 0, 0, 0, 'exported_data', '{}', '{}', NULL, NULL, NULL, NULL, '2022-12-06 15:23:37.100685', '2022-12-06 15:23:37.100685', '{"nextRetryTime": "2022-12-06 20:53:37"}', - '{}',FALSE, 'test-workspaceID'), + FALSE, 'test-workspaceID'), (3, 'test-sourceID', 'test-namespace', 'test-destinationID', 'POSTGRES', 0, 0, 0, 0, 'aborted', '{}', '{}', NULL, NULL, NULL, NULL, '2022-12-06 15:23:37.100685', '2022-12-06 15:23:37.100685', '{"nextRetryTime": "2022-12-06 20:53:37"}', - '{}',FALSE, 'test-workspaceID'), + FALSE, 'test-workspaceID'), (4, 'test-sourceID', 'test-namespace', 'test-destinationID', 'POSTGRES', 0, 0, 0, 0, 'waiting', '{}', '{}', NULL, NULL, NULL, NULL, '2022-12-06 15:23:37.100685', '2022-12-06 15:23:37.100685', '{"nextRetryTime": "2022-12-06 20:53:37"}', - '{}',FALSE, 'test-workspaceID'), + FALSE, 'test-workspaceID'), (5, 'test-sourceID', 'test-namespace', 'test-destinationID', 'POSTGRES', 0, 0, 0, 0, 'waiting', '{}', '{}', NULL, NULL, NULL, NULL, '2022-12-06 15:23:37.100685', '2022-12-06 15:23:37.100685', '{"nextRetryTime": "2022-12-06 20:54:37"}', - '{}',FALSE, 'test-workspaceID'), + FALSE, 'test-workspaceID'), (6, 'test-sourceID', 'test-namespace', 'test-destinationID', 'POSTGRES', 0, 0, 0, 0, 'waiting', '{}', '{}', NULL, NULL, NULL, NULL, '2022-12-06 15:23:37.100685', '2022-12-06 15:23:37.100685', '{"nextRetryTime": "2022-12-06 21:54:37"}', - '{}',FALSE, 'test-workspaceID'), + FALSE, 'test-workspaceID'), (7, 'test-sourceID', 'test-namespace', 'test-destinationID', 'POSTGRES', 0, 0, 0, 0, 'waiting', '{}', '{}', NULL, NULL, NULL, NULL, '2022-12-06 15:23:37.100685', '2022-12-06 15:23:37.100685', '{"nextRetryTime": "2050-12-06 20:53:37"}', - '{}',FALSE, 'test-workspaceID'), + FALSE, 'test-workspaceID'), (8, 'test-sourceID', 'test-namespace', 'test-destinationID', 'test-destinationType-1', 0, 0, 0, 0, 'waiting', '{}', '{}', NULL, NULL, NULL, NULL, '2022-12-06 15:23:37.100685', '2022-12-06 15:23:37.100685', '{"nextRetryTime": ""}', - '{}', FALSE, 'test-workspaceID'), + FALSE, 'test-workspaceID'), (9, 'test-sourceID', 'test-namespace', 'test-destinationID', 'test-destinationType-2', 0, 0, 0, 0, 'waiting', '{}', '{}', NULL, NULL, NULL, NULL, '2022-12-06 15:23:37.100685', '2022-12-06 15:23:37.100685', '{}', - '{}',FALSE, 'test-workspaceID'); + FALSE, 'test-workspaceID'); COMMIT; diff --git a/warehouse/testdata/sql/seed_tracker_test.sql b/warehouse/testdata/sql/seed_tracker_test.sql index a35bcd69b4..5e41a5ed1a 100644 --- a/warehouse/testdata/sql/seed_tracker_test.sql +++ b/warehouse/testdata/sql/seed_tracker_test.sql @@ -15,9 +15,9 @@ VALUES (1, 'a.json.gz', '{}', 'test-sourceID', 'test-destinationID', 'succeeded' '2022-12-06 15:27:37.100685', NOW(), '{}'); INSERT INTO wh_uploads(id, source_id, namespace, destination_id, destination_type, start_staging_file_id, end_staging_file_id, start_load_file_id, end_load_file_id, status, schema, error, first_event_at, - last_event_at, last_exec_at, timings, created_at, updated_at, metadata,mergedschema, + last_event_at, last_exec_at, timings, created_at, updated_at, metadata, in_progress, workspace_id) VALUES (1, 'test-sourceID', 'test-namespace', 'test-destinationID', 'POSTGRES', 0, 0, 0, 0, 'exported_data', '{}', - '{}', NULL, NULL, NULL, NULL, '2022-12-06 15:30:00', '2022-12-06 15:45:00', '{}','{}', TRUE, + '{}', NULL, NULL, NULL, NULL, '2022-12-06 15:30:00', '2022-12-06 15:45:00', '{}', TRUE, 'test-workspaceID'); COMMIT; diff --git a/warehouse/upload.go b/warehouse/upload.go index 818c7b1fb4..8a609936be 100644 --- a/warehouse/upload.go +++ b/warehouse/upload.go @@ -126,7 +126,6 @@ const ( UploadUpdatedAtField = "updated_at" UploadTimingsField = "timings" UploadSchemaField = "schema" - MergedSchemaField = "mergedschema" UploadLastExecAtField = "last_exec_at" UploadInProgress = "in_progress" ) @@ -255,9 +254,6 @@ func (job *UploadJob) generateUploadSchema() error { return fmt.Errorf("consolidate staging files schema using warehouse schema: %w", err) } - if err := job.setMergedSchema(job.schemaHandle.uploadSchema); err != nil { - return fmt.Errorf("set merged schema: %w", err) - } if err := job.setUploadSchema(job.schemaHandle.uploadSchema); err != nil { return fmt.Errorf("set upload schema: %w", err) } @@ -1471,15 +1467,6 @@ func (job *UploadJob) setUploadSchema(consolidatedSchema model.Schema) error { return job.setUploadColumns(UploadColumnsOpts{Fields: []UploadColumn{{Column: UploadSchemaField, Value: marshalledSchema}}}) } -func (job *UploadJob) setMergedSchema(mergedSchema model.Schema) error { - marshalledSchema, err := json.Marshal(mergedSchema) - if err != nil { - panic(err) - } - job.upload.MergedSchema = mergedSchema - return job.setUploadColumns(UploadColumnsOpts{Fields: []UploadColumn{{Column: MergedSchemaField, Value: marshalledSchema}}}) -} - // Set LoadFileIDs func (job *UploadJob) setLoadFileIDs(startLoadFileID, endLoadFileID int64) error { if startLoadFileID > endLoadFileID {