Skip to content
This repository has been archived by the owner on Jul 12, 2023. It is now read-only.

Commit

Permalink
Exports from multiple regions (#465)
Browse files Browse the repository at this point in the history
* Exports from multiple regions

* Allow exportconfig to have a list of regions as part of the input
* Defaults to the single output region if not specfied
* Add multi input region config to admin console

/fixes #445

* rename region to output region
  • Loading branch information
mikehelmick authored May 27, 2020
1 parent b17dd45 commit 4e5e8f7
Show file tree
Hide file tree
Showing 17 changed files with 301 additions and 79 deletions.
5 changes: 4 additions & 1 deletion internal/admin/authorizedapps/form.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,14 @@ func (f *formData) PopulateAuthorizedApp(a *model.AuthorizedApp) error {
a.Platform = f.Platform
a.AllowedRegions = make(map[string]struct{})
for _, region := range strings.Split(f.AllowedRegions, "\n") {
a.AllowedRegions[region] = struct{}{}
a.AllowedRegions[strings.TrimSpace(region)] = struct{}{}
}
// SafetyNet
a.SafetyNetDisabled = f.SafetyNetDisabled
a.SafetyNetApkDigestSHA256 = strings.Split(f.SafetyNetApkDigestSHA256, "\n")
for i, s := range a.SafetyNetApkDigestSHA256 {
a.SafetyNetApkDigestSHA256[i] = strings.TrimSpace(s)
}
a.SafetyNetBasicIntegrity = f.SafetyNetBasicIntegrity
a.SafetyNetCTSProfileMatch = f.SafetyNetCTSProfileMatch
var err error
Expand Down
10 changes: 8 additions & 2 deletions internal/admin/exports/form.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
package exports

import (
"strings"
"time"

"github.com/google/exposure-notifications-server/internal/admin"
"github.com/google/exposure-notifications-server/internal/export/model"
)

type formData struct {
Region string `form:"Region"`
OutputRegion string `form:"OutputRegion"`
InputRegions string `form:"InputRegions"`
BucketName string `form:"BucketName"`
FilenameRoot string `form:"FilenameRoot"`
Period time.Duration `form:"Period"`
Expand All @@ -46,7 +48,11 @@ func (f *formData) PopulateExportConfig(ec *model.ExportConfig) error {
ec.BucketName = f.BucketName
ec.FilenameRoot = f.FilenameRoot
ec.Period = f.Period
ec.Region = f.Region
ec.OutputRegion = f.OutputRegion
ec.InputRegions = strings.Split(f.InputRegions, "\n")
for i, s := range ec.InputRegions {
ec.InputRegions[i] = strings.TrimSpace(s)
}
ec.From = from
ec.Thru = thru
ec.SignatureInfoIDs = f.SigInfoIDs
Expand Down
2 changes: 1 addition & 1 deletion internal/admin/exports/view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestRenderSignatureInfo(t *testing.T) {
m["export"] = exportConfig

sigInfos := []*model.SignatureInfo{
&model.SignatureInfo{ID: 5},
{ID: 5},
}
usedSigInfos := map[int64]bool{5: true}
m["usedSigInfos"] = usedSigInfos
Expand Down
3 changes: 2 additions & 1 deletion internal/export/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ func (s *Server) maybeCreateBatches(ctx context.Context, ec *model.ExportConfig,
FilenameRoot: ec.FilenameRoot,
StartTimestamp: br.start,
EndTimestamp: br.end,
Region: ec.Region,
OutputRegion: ec.OutputRegion,
InputRegions: ec.InputRegions,
Status: model.ExportBatchOpen,
SignatureInfoIDs: infoIds,
})
Expand Down
59 changes: 30 additions & 29 deletions internal/export/database/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ func (db *ExportDB) AddExportConfig(ctx context.Context, ec *model.ExportConfig)
row := tx.QueryRow(ctx, `
INSERT INTO
ExportConfig
(bucket_name, filename_root, period_seconds, region, from_timestamp, thru_timestamp, signature_info_ids)
(bucket_name, filename_root, period_seconds, output_region, from_timestamp, thru_timestamp, signature_info_ids, input_regions)
VALUES
($1, $2, $3, $4, $5, $6, $7)
($1, $2, $3, $4, $5, $6, $7, $8)
RETURNING config_id
`, ec.BucketName, ec.FilenameRoot, int(ec.Period.Seconds()), ec.Region,
ec.From, thru, ec.SignatureInfoIDs)
`, ec.BucketName, ec.FilenameRoot, int(ec.Period.Seconds()), ec.OutputRegion,
ec.From, thru, ec.SignatureInfoIDs, ec.InputRegions)

if err := row.Scan(&ec.ConfigID); err != nil {
return fmt.Errorf("fetching config_id: %w", err)
Expand All @@ -85,10 +85,10 @@ func (db *ExportDB) UpdateExportConfig(ctx context.Context, ec *model.ExportConf
UPDATE
ExportConfig
SET
bucket_name = $1, filename_root = $2, period_seconds = $3, region = $4, from_timestamp = $5, thru_timestamp = $6, signature_info_ids = $7
WHERE config_id = $8
`, ec.BucketName, ec.FilenameRoot, int(ec.Period.Seconds()), ec.Region,
ec.From, thru, ec.SignatureInfoIDs, ec.ConfigID)
bucket_name = $1, filename_root = $2, period_seconds = $3, output_region = $4, from_timestamp = $5, thru_timestamp = $6, signature_info_ids = $7, input_regions = $8
WHERE config_id = $9
`, ec.BucketName, ec.FilenameRoot, int(ec.Period.Seconds()), ec.OutputRegion,
ec.From, thru, ec.SignatureInfoIDs, ec.InputRegions, ec.ConfigID)
if err != nil {
return fmt.Errorf("updating signatureinfo: %w", err)
}
Expand All @@ -108,7 +108,7 @@ func (db *ExportDB) GetExportConfig(ctx context.Context, id int64) (*model.Expor

row := conn.QueryRow(ctx, `
SELECT
config_id, bucket_name, filename_root, period_seconds, region, from_timestamp, thru_timestamp, signature_info_ids
config_id, bucket_name, filename_root, period_seconds, output_region, from_timestamp, thru_timestamp, signature_info_ids, input_regions
FROM
ExportConfig
WHERE
Expand All @@ -126,7 +126,7 @@ func (db *ExportDB) GetAllExportConfigs(ctx context.Context) ([]*model.ExportCon

rows, err := conn.Query(ctx, `
SELECT
config_id, bucket_name, filename_root, period_seconds, region, from_timestamp, thru_timestamp, signature_info_ids
config_id, bucket_name, filename_root, period_seconds, output_region, from_timestamp, thru_timestamp, signature_info_ids, input_regions
FROM
ExportConfig`)
if err != nil {
Expand Down Expand Up @@ -164,7 +164,7 @@ func (db *ExportDB) IterateExportConfigs(ctx context.Context, t time.Time, f fun

rows, err := conn.Query(ctx, `
SELECT
config_id, bucket_name, filename_root, period_seconds, region, from_timestamp, thru_timestamp, signature_info_ids
config_id, bucket_name, filename_root, period_seconds, output_region, from_timestamp, thru_timestamp, signature_info_ids, input_regions
FROM
ExportConfig
WHERE
Expand Down Expand Up @@ -194,7 +194,7 @@ func scanOneExportConfig(row pgx.Row) (*model.ExportConfig, error) {
periodSeconds int
thru *time.Time
)
if err := row.Scan(&m.ConfigID, &m.BucketName, &m.FilenameRoot, &periodSeconds, &m.Region, &m.From, &thru, &m.SignatureInfoIDs); err != nil {
if err := row.Scan(&m.ConfigID, &m.BucketName, &m.FilenameRoot, &periodSeconds, &m.OutputRegion, &m.From, &thru, &m.SignatureInfoIDs, &m.InputRegions); err != nil {
return nil, err
}
m.Period = time.Duration(periodSeconds) * time.Second
Expand Down Expand Up @@ -393,17 +393,17 @@ func (db *ExportDB) AddExportBatches(ctx context.Context, batches []*model.Expor
_, err := tx.Prepare(ctx, stmtName, `
INSERT INTO
ExportBatch
(config_id, bucket_name, filename_root, start_timestamp, end_timestamp, region, status, signature_info_ids)
(config_id, bucket_name, filename_root, start_timestamp, end_timestamp, output_region, status, signature_info_ids, input_regions)
VALUES
($1, $2, $3, $4, $5, $6, $7, $8)
($1, $2, $3, $4, $5, $6, $7, $8, $9)
`)
if err != nil {
return err
}

for _, eb := range batches {
if _, err := tx.Exec(ctx, stmtName,
eb.ConfigID, eb.BucketName, eb.FilenameRoot, eb.StartTimestamp, eb.EndTimestamp, eb.Region, eb.Status, eb.SignatureInfoIDs); err != nil {
eb.ConfigID, eb.BucketName, eb.FilenameRoot, eb.StartTimestamp, eb.EndTimestamp, eb.OutputRegion, eb.Status, eb.SignatureInfoIDs, eb.InputRegions); err != nil {
return err
}
}
Expand Down Expand Up @@ -535,7 +535,7 @@ type queryRowFn func(ctx context.Context, query string, args ...interface{}) pgx
func lookupExportBatch(ctx context.Context, batchID int64, queryRow queryRowFn) (*model.ExportBatch, error) {
row := queryRow(ctx, `
SELECT
batch_id, config_id, bucket_name, filename_root, start_timestamp, end_timestamp, region, status, lease_expires, signature_info_ids
batch_id, config_id, bucket_name, filename_root, start_timestamp, end_timestamp, output_region, status, lease_expires, signature_info_ids, input_regions
FROM
ExportBatch
WHERE
Expand All @@ -545,7 +545,7 @@ func lookupExportBatch(ctx context.Context, batchID int64, queryRow queryRowFn)

var expires *time.Time
eb := model.ExportBatch{}
if err := row.Scan(&eb.BatchID, &eb.ConfigID, &eb.BucketName, &eb.FilenameRoot, &eb.StartTimestamp, &eb.EndTimestamp, &eb.Region, &eb.Status, &expires, &eb.SignatureInfoIDs); err != nil {
if err := row.Scan(&eb.BatchID, &eb.ConfigID, &eb.BucketName, &eb.FilenameRoot, &eb.StartTimestamp, &eb.EndTimestamp, &eb.OutputRegion, &eb.Status, &expires, &eb.SignatureInfoIDs, &eb.InputRegions); err != nil {
if err == pgx.ErrNoRows {
return nil, database.ErrNotFound
}
Expand All @@ -563,13 +563,14 @@ func (db *ExportDB) FinalizeBatch(ctx context.Context, eb *model.ExportBatch, fi
// Update ExportFile for the files created.
for i, file := range files {
ef := model.ExportFile{
BucketName: eb.BucketName,
Filename: file,
BatchID: eb.BatchID,
Region: eb.Region,
BatchNum: i + 1,
BatchSize: batchSize,
Status: model.ExportBatchComplete,
BucketName: eb.BucketName,
Filename: file,
BatchID: eb.BatchID,
OutputRegion: eb.OutputRegion,
InputRegions: eb.InputRegions,
BatchNum: i + 1,
BatchSize: batchSize,
Status: model.ExportBatchComplete,
}
if err := addExportFile(ctx, tx, &ef); err != nil {
if err == database.ErrKeyConflict {
Expand Down Expand Up @@ -648,7 +649,7 @@ func (db *ExportDB) LookupExportFile(ctx context.Context, filename string) (*mod

row := conn.QueryRow(ctx, `
SELECT
bucket_name, filename, batch_id, region, batch_num, batch_size, status
bucket_name, filename, batch_id, output_region, batch_num, batch_size, status, input_regions
FROM
ExportFile
WHERE
Expand All @@ -657,7 +658,7 @@ func (db *ExportDB) LookupExportFile(ctx context.Context, filename string) (*mod
`, filename)

ef := model.ExportFile{}
if err := row.Scan(&ef.BucketName, &ef.Filename, &ef.BatchID, &ef.Region, &ef.BatchNum, &ef.BatchSize, &ef.Status); err != nil {
if err := row.Scan(&ef.BucketName, &ef.Filename, &ef.BatchID, &ef.OutputRegion, &ef.BatchNum, &ef.BatchSize, &ef.Status, &ef.InputRegions); err != nil {
if err == pgx.ErrNoRows {
return nil, database.ErrNotFound
}
Expand Down Expand Up @@ -762,11 +763,11 @@ func addExportFile(ctx context.Context, tx pgx.Tx, ef *model.ExportFile) error {
tag, err := tx.Exec(ctx, `
INSERT INTO
ExportFile
(bucket_name, filename, batch_id, region, batch_num, batch_size, status)
(bucket_name, filename, batch_id, output_region, batch_num, batch_size, status, input_regions)
VALUES
($1, $2, $3, $4, $5, $6, $7)
($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (filename) DO NOTHING
`, ef.BucketName, ef.Filename, ef.BatchID, ef.Region, ef.BatchNum, ef.BatchSize, ef.Status)
`, ef.BucketName, ef.Filename, ef.BatchID, ef.OutputRegion, ef.BatchNum, ef.BatchSize, ef.Status, ef.InputRegions)
if err != nil {
return fmt.Errorf("inserting to ExportFile: %w", err)
}
Expand Down
44 changes: 23 additions & 21 deletions internal/export/database/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ func TestAddGetUpdateExportConfig(t *testing.T) {
BucketName: "mocked",
FilenameRoot: "root",
Period: 3 * time.Hour,
Region: "i1",
OutputRegion: "i1",
InputRegions: []string{"US"},
From: fromTime,
Thru: thruTime,
SignatureInfoIDs: []int64{42, 84},
Expand All @@ -154,6 +155,7 @@ func TestAddGetUpdateExportConfig(t *testing.T) {
want.Period = 15 * time.Minute
want.Thru = time.Time{}
want.SignatureInfoIDs = []int64{1, 2, 3, 4, 5}
want.InputRegions = []string{"US", "CA"}

if err := exportDB.UpdateExportConfig(ctx, want); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -199,7 +201,7 @@ func TestIterateExportConfigs(t *testing.T) {
}
for _, ec := range ecs {
ec.Period = time.Hour
ec.Region = "R"
ec.OutputRegion = "R"
if err := New(testDB).AddExportConfig(ctx, ec); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -231,7 +233,7 @@ func TestBatches(t *testing.T) {
BucketName: "mocked",
FilenameRoot: "root",
Period: time.Hour,
Region: "R",
OutputRegion: "R",
From: now,
Thru: now.Add(time.Hour),
SignatureInfoIDs: []int64{1, 2, 3, 4},
Expand All @@ -249,7 +251,7 @@ func TestBatches(t *testing.T) {
ConfigID: config.ConfigID,
BucketName: config.BucketName,
FilenameRoot: config.FilenameRoot,
Region: config.Region,
OutputRegion: config.OutputRegion,
Status: model.ExportBatchOpen,
StartTimestamp: start,
EndTimestamp: end,
Expand Down Expand Up @@ -281,10 +283,10 @@ func TestBatches(t *testing.T) {
t.Fatal("could not lease a batch")
}
if got.ConfigID != config.ConfigID || got.FilenameRoot != config.FilenameRoot ||
got.Region != config.Region || got.BucketName != config.BucketName {
got.OutputRegion != config.OutputRegion || got.BucketName != config.BucketName {
t.Errorf("LeaseBatch: got (%d, %q, %q, %q), want (%d, %q, %q, %q)",
got.ConfigID, got.BucketName, got.FilenameRoot, got.Region,
config.ConfigID, config.BucketName, config.FilenameRoot, config.Region)
got.ConfigID, got.BucketName, got.FilenameRoot, got.OutputRegion,
config.ConfigID, config.BucketName, config.FilenameRoot, config.OutputRegion)
}
if got.Status != model.ExportBatchPending {
t.Errorf("LeaseBatch: got status %q, want pending", got.Status)
Expand Down Expand Up @@ -342,7 +344,7 @@ func TestFinalizeBatch(t *testing.T) {
BucketName: "some-bucket",
FilenameRoot: "filename-root",
Period: time.Minute,
Region: "US",
OutputRegion: "US",
}
if err := exportDB.AddExportConfig(ctx, ec); err != nil {
t.Fatal(err)
Expand All @@ -355,7 +357,7 @@ func TestFinalizeBatch(t *testing.T) {
FilenameRoot: ec.FilenameRoot,
StartTimestamp: now.Add(-2 * time.Hour),
EndTimestamp: now.Add(-time.Hour),
Region: ec.Region,
OutputRegion: ec.OutputRegion,
Status: model.ExportBatchOpen,
}
if err := exportDB.AddExportBatches(ctx, []*model.ExportBatch{eb}); err != nil {
Expand Down Expand Up @@ -409,13 +411,13 @@ func TestFinalizeBatch(t *testing.T) {
t.Fatal(err)
}
want := &model.ExportFile{
BucketName: eb.BucketName,
Filename: filename,
BatchID: eb.BatchID,
Region: eb.Region,
BatchNum: i + 1,
BatchSize: batchSize,
Status: model.ExportBatchComplete,
BucketName: eb.BucketName,
Filename: filename,
BatchID: eb.BatchID,
OutputRegion: eb.OutputRegion,
BatchNum: i + 1,
BatchSize: batchSize,
Status: model.ExportBatchComplete,
}
if diff := cmp.Diff(want, got); diff != "" {
t.Errorf("mismatch for %q (-want, +got):\n%s", filename, diff)
Expand All @@ -436,7 +438,7 @@ func TestKeysInBatch(t *testing.T) {
BucketName: "bucket-name",
FilenameRoot: "filename-root",
Period: 3600 * time.Second,
Region: "US",
OutputRegion: "US",
From: now.Add(-24 * time.Hour),
}
if err := New(testDB).AddExportConfig(ctx, ec); err != nil {
Expand All @@ -452,7 +454,7 @@ func TestKeysInBatch(t *testing.T) {
FilenameRoot: ec.FilenameRoot,
StartTimestamp: startTimestamp,
EndTimestamp: endTimestamp,
Region: ec.Region,
OutputRegion: ec.OutputRegion,
Status: model.ExportBatchOpen,
}
if err := New(testDB).AddExportBatches(ctx, []*model.ExportBatch{eb}); err != nil {
Expand All @@ -462,14 +464,14 @@ func TestKeysInBatch(t *testing.T) {
// Create key aligned with the StartTimestamp
sek := &publishmodel.Exposure{
ExposureKey: []byte("aaa"),
Regions: []string{ec.Region},
Regions: []string{ec.OutputRegion},
CreatedAt: startTimestamp,
}

// Create key aligned with the EndTimestamp
eek := &publishmodel.Exposure{
ExposureKey: []byte("bbb"),
Regions: []string{ec.Region},
Regions: []string{ec.OutputRegion},
CreatedAt: endTimestamp,
}

Expand All @@ -496,7 +498,7 @@ func TestKeysInBatch(t *testing.T) {
// Lookup the keys; they must be only the key created_at the startTimestamp
// (because start is inclusive, end is exclusive).
criteria := publishdb.IterateExposuresCriteria{
IncludeRegions: []string{leased.Region},
IncludeRegions: []string{leased.OutputRegion},
SinceTimestamp: leased.StartTimestamp,
UntilTimestamp: leased.EndTimestamp,
}
Expand Down
2 changes: 1 addition & 1 deletion internal/export/exportfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func marshalContents(eb *model.ExportBatch, exposures []*publishmodel.Exposure,
pbeke := export.TemporaryExposureKeyExport{
StartTimestamp: proto.Uint64(uint64(eb.StartTimestamp.Unix())),
EndTimestamp: proto.Uint64(uint64(eb.EndTimestamp.Unix())),
Region: proto.String(eb.Region),
Region: proto.String(eb.OutputRegion),
BatchNum: proto.Int32(int32(batchNum)),
BatchSize: proto.Int32(int32(batchSize)),
Keys: pbeks,
Expand Down
2 changes: 1 addition & 1 deletion internal/export/exportfile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestMarshalUnmarshalExportFile(t *testing.T) {
FilenameRoot: "files",
StartTimestamp: batchStartTime,
EndTimestamp: batchEndTime,
Region: "US",
OutputRegion: "US",
Status: "",
LeaseExpires: time.Time{},
SignatureInfoIDs: []int64{1, 2},
Expand Down
Loading

0 comments on commit 4e5e8f7

Please sign in to comment.