Skip to content

Commit

Permalink
chore: remove deprecated merged schema field (#3482)
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 authored Jun 14, 2023
1 parent 66e32ad commit 39a0915
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 35 deletions.
1 change: 0 additions & 1 deletion warehouse/internal/model/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ type Upload struct {
Attempts int64

UploadSchema Schema
MergedSchema Schema
}

type Timings []map[string]time.Time
Expand Down
6 changes: 0 additions & 6 deletions warehouse/internal/repo/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ const (
id,
status,
schema,
mergedSchema,
namespace,
workspace_id,
source_id,
Expand Down Expand Up @@ -455,7 +454,6 @@ func (uploads *Uploads) DeleteWaiting(ctx context.Context, uploadID int64) error
func scanUpload(scan scanFn, upload *model.Upload) error {
var (
schema []byte
mergedSchema []byte
firstTiming sql.NullString
lastTiming sql.NullString
firstEventAt, lastEventAt sql.NullTime
Expand All @@ -467,7 +465,6 @@ func scanUpload(scan scanFn, upload *model.Upload) error {
&upload.ID,
&upload.Status,
&schema,
&mergedSchema,
&upload.Namespace,
&upload.WorkspaceID,
&upload.SourceID,
Expand Down Expand Up @@ -495,9 +492,6 @@ func scanUpload(scan scanFn, upload *model.Upload) error {
if err := json.Unmarshal(schema, &upload.UploadSchema); err != nil {
return fmt.Errorf("unmarshal upload schema: %w", err)
}
if err := json.Unmarshal(mergedSchema, &upload.MergedSchema); err != nil {
return fmt.Errorf("unmarshal merged schema: %w", err)
}
var metadata UploadMetadata
if err := json.Unmarshal(metadataRaw, &metadata); err != nil {
return fmt.Errorf("unmarshal metadata: %w", err)
Expand Down
3 changes: 0 additions & 3 deletions warehouse/internal/repo/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ func TestUploads_Count(t *testing.T) {
uploads[i].ID = id
uploads[i].Error = []byte("{}")
uploads[i].UploadSchema = model.Schema{}
uploads[i].MergedSchema = model.Schema{}
uploads[i].LoadFileType = "csv"
uploads[i].StagingFileStartID = int64(i + 1)
uploads[i].StagingFileEndID = int64(i + 1)
Expand Down Expand Up @@ -202,7 +201,6 @@ func TestUploads_Get(t *testing.T) {
ogUpload.SourceTaskRunID = "source_task_run_id"
ogUpload.SourceJobID = "source_job_id"
ogUpload.SourceJobRunID = "source_job_run_id"
ogUpload.MergedSchema = model.Schema{}

t.Run("Get", func(t *testing.T) {
upload, err := repoUpload.Get(ctx, id)
Expand Down Expand Up @@ -684,7 +682,6 @@ func TestUploads_Processing(t *testing.T) {
uploads[i].LoadFileType = "csv"
uploads[i].StagingFileStartID = int64(i + 1)
uploads[i].StagingFileEndID = int64(i + 1)
uploads[i].MergedSchema = model.Schema{}
require.NoError(t, err)
}

Expand Down
20 changes: 10 additions & 10 deletions warehouse/testdata/sql/processing_stats_test.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,51 +2,51 @@ BEGIN;
INSERT INTO wh_uploads(id, source_id, namespace, destination_id, destination_type,
start_staging_file_id, end_staging_file_id, start_load_file_id,
end_load_file_id, status, schema, error, first_event_at, last_event_at,
last_exec_at, timings, created_at, updated_at, metadata,mergedschema,
last_exec_at, timings, created_at, updated_at, metadata,
in_progress, workspace_id)
VALUES (1, 'test-sourceID', 'test-namespace', 'test-destinationID', 'POSTGRES', 0, 0,
0, 0, 'exporting_data', '{}', '{}', NULL, NULL, NULL, NULL,
'2022-12-06 15:23:37.100685', '2022-12-06 15:23:37.100685',
'{"nextRetryTime": "2022-12-06 20:53:37"}',
'{}', TRUE, 'test-workspaceID'),
TRUE, 'test-workspaceID'),
(2, 'test-sourceID', 'test-namespace', 'test-destinationID', 'POSTGRES', 0, 0,
0, 0, 'exported_data', '{}', '{}', NULL, NULL, NULL, NULL,
'2022-12-06 15:23:37.100685', '2022-12-06 15:23:37.100685',
'{"nextRetryTime": "2022-12-06 20:53:37"}',
'{}',FALSE, 'test-workspaceID'),
FALSE, 'test-workspaceID'),
(3, 'test-sourceID', 'test-namespace', 'test-destinationID', 'POSTGRES', 0, 0,
0, 0, 'aborted', '{}', '{}', NULL, NULL, NULL, NULL,
'2022-12-06 15:23:37.100685', '2022-12-06 15:23:37.100685',
'{"nextRetryTime": "2022-12-06 20:53:37"}',
'{}',FALSE, 'test-workspaceID'),
FALSE, 'test-workspaceID'),
(4, 'test-sourceID', 'test-namespace', 'test-destinationID', 'POSTGRES', 0, 0,
0, 0, 'waiting', '{}', '{}', NULL, NULL, NULL, NULL,
'2022-12-06 15:23:37.100685', '2022-12-06 15:23:37.100685',
'{"nextRetryTime": "2022-12-06 20:53:37"}',
'{}',FALSE, 'test-workspaceID'),
FALSE, 'test-workspaceID'),
(5, 'test-sourceID', 'test-namespace', 'test-destinationID', 'POSTGRES', 0, 0,
0, 0, 'waiting', '{}', '{}', NULL, NULL, NULL, NULL,
'2022-12-06 15:23:37.100685', '2022-12-06 15:23:37.100685',
'{"nextRetryTime": "2022-12-06 20:54:37"}',
'{}',FALSE, 'test-workspaceID'),
FALSE, 'test-workspaceID'),
(6, 'test-sourceID', 'test-namespace', 'test-destinationID', 'POSTGRES', 0, 0,
0, 0, 'waiting', '{}', '{}', NULL, NULL, NULL, NULL,
'2022-12-06 15:23:37.100685', '2022-12-06 15:23:37.100685',
'{"nextRetryTime": "2022-12-06 21:54:37"}',
'{}',FALSE, 'test-workspaceID'),
FALSE, 'test-workspaceID'),
(7, 'test-sourceID', 'test-namespace', 'test-destinationID', 'POSTGRES', 0, 0,
0, 0, 'waiting', '{}', '{}', NULL, NULL, NULL, NULL,
'2022-12-06 15:23:37.100685', '2022-12-06 15:23:37.100685',
'{"nextRetryTime": "2050-12-06 20:53:37"}',
'{}',FALSE, 'test-workspaceID'),
FALSE, 'test-workspaceID'),
(8, 'test-sourceID', 'test-namespace', 'test-destinationID', 'test-destinationType-1', 0, 0,
0, 0, 'waiting', '{}', '{}', NULL, NULL, NULL, NULL,
'2022-12-06 15:23:37.100685', '2022-12-06 15:23:37.100685',
'{"nextRetryTime": ""}',
'{}', FALSE, 'test-workspaceID'),
FALSE, 'test-workspaceID'),
(9, 'test-sourceID', 'test-namespace', 'test-destinationID', 'test-destinationType-2', 0, 0,
0, 0, 'waiting', '{}', '{}', NULL, NULL, NULL, NULL,
'2022-12-06 15:23:37.100685', '2022-12-06 15:23:37.100685',
'{}',
'{}',FALSE, 'test-workspaceID');
FALSE, 'test-workspaceID');
COMMIT;
4 changes: 2 additions & 2 deletions warehouse/testdata/sql/seed_tracker_test.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ VALUES (1, 'a.json.gz', '{}', 'test-sourceID', 'test-destinationID', 'succeeded'
'2022-12-06 15:27:37.100685', NOW(), '{}');
INSERT INTO wh_uploads(id, source_id, namespace, destination_id, destination_type, start_staging_file_id,
end_staging_file_id, start_load_file_id, end_load_file_id, status, schema, error, first_event_at,
last_event_at, last_exec_at, timings, created_at, updated_at, metadata,mergedschema,
last_event_at, last_exec_at, timings, created_at, updated_at, metadata,
in_progress, workspace_id)
VALUES (1, 'test-sourceID', 'test-namespace', 'test-destinationID', 'POSTGRES', 0, 0, 0, 0, 'exported_data', '{}',
'{}', NULL, NULL, NULL, NULL, '2022-12-06 15:30:00', '2022-12-06 15:45:00', '{}','{}', TRUE,
'{}', NULL, NULL, NULL, NULL, '2022-12-06 15:30:00', '2022-12-06 15:45:00', '{}', TRUE,
'test-workspaceID');
COMMIT;
13 changes: 0 additions & 13 deletions warehouse/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ const (
UploadUpdatedAtField = "updated_at"
UploadTimingsField = "timings"
UploadSchemaField = "schema"
MergedSchemaField = "mergedschema"
UploadLastExecAtField = "last_exec_at"
UploadInProgress = "in_progress"
)
Expand Down Expand Up @@ -255,9 +254,6 @@ func (job *UploadJob) generateUploadSchema() error {
return fmt.Errorf("consolidate staging files schema using warehouse schema: %w", err)
}

if err := job.setMergedSchema(job.schemaHandle.uploadSchema); err != nil {
return fmt.Errorf("set merged schema: %w", err)
}
if err := job.setUploadSchema(job.schemaHandle.uploadSchema); err != nil {
return fmt.Errorf("set upload schema: %w", err)
}
Expand Down Expand Up @@ -1471,15 +1467,6 @@ func (job *UploadJob) setUploadSchema(consolidatedSchema model.Schema) error {
return job.setUploadColumns(UploadColumnsOpts{Fields: []UploadColumn{{Column: UploadSchemaField, Value: marshalledSchema}}})
}

func (job *UploadJob) setMergedSchema(mergedSchema model.Schema) error {
marshalledSchema, err := json.Marshal(mergedSchema)
if err != nil {
panic(err)
}
job.upload.MergedSchema = mergedSchema
return job.setUploadColumns(UploadColumnsOpts{Fields: []UploadColumn{{Column: MergedSchemaField, Value: marshalledSchema}}})
}

// Set LoadFileIDs
func (job *UploadJob) setLoadFileIDs(startLoadFileID, endLoadFileID int64) error {
if startLoadFileID > endLoadFileID {
Expand Down

0 comments on commit 39a0915

Please sign in to comment.