Skip to content

Commit

Permalink
change to WithReadSessionProject option in anticipation of googleapis…
Browse files Browse the repository at this point in the history
  • Loading branch information
juanli16 committed Sep 30, 2024
1 parent 1836363 commit 0746c09
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 23 deletions.
20 changes: 7 additions & 13 deletions bigquery/storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (c *readClient) close() error {
}

// sessionForTable establishes a new session to fetch from a table using the Storage API
func (c *readClient) sessionForTable(ctx context.Context, table *Table, ordered bool, useClientProject bool) (*readSession, error) {
func (c *readClient) sessionForTable(ctx context.Context, table *Table, rsProjectID string, ordered bool) (*readSession, error) {
tableID, err := table.Identifier(StorageAPIResourceID)
if err != nil {
return nil, err
Expand All @@ -107,17 +107,11 @@ func (c *readClient) sessionForTable(ctx context.Context, table *Table, ordered
settings.maxStreamCount = 1
}

// configure where the read session is created
readSessionProjectID := table.ProjectID
if useClientProject {
readSessionProjectID = c.projectID
}

rs := &readSession{
ctx: ctx,
readSessionProjectID: readSessionProjectID,
table: table,
tableID: tableID,
projectID: rsProjectID,
settings: settings,
readRowsFunc: c.rawClient.ReadRows,
createReadSessionFunc: c.rawClient.CreateReadSession,
Expand All @@ -129,10 +123,10 @@ func (c *readClient) sessionForTable(ctx context.Context, table *Table, ordered
type readSession struct {
settings readClientSettings

ctx context.Context
readSessionProjectID string
table *Table
tableID string
ctx context.Context
table *Table
tableID string
projectID string

bqSession *storagepb.ReadSession

Expand All @@ -150,7 +144,7 @@ func (rs *readSession) start() error {
}

createReadSessionRequest := &storagepb.CreateReadSessionRequest{
Parent: fmt.Sprintf("projects/%s", rs.readSessionProjectID),
Parent: fmt.Sprintf("projects/%s", rs.projectID),
ReadSession: &storagepb.ReadSession{
Table: rs.tableID,
DataFormat: storagepb.DataFormat_ARROW,
Expand Down
36 changes: 36 additions & 0 deletions bigquery/storage_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -87,6 +88,41 @@ func TestIntegration_StorageReadEmptyResultSet(t *testing.T) {
}
}

func TestIntegration_StorageReadClientProject(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
}
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

table := storageOptimizedClient.Dataset("usa_names").Table("usa_1910_current")
table.ProjectID = "bigquery-public-data"

it := table.Read(ctx)
_, err := countIteratorRows(it)
if err != nil {
t.Fatal(err)
}
if !it.IsAccelerated() {
t.Fatal("expected storage api to be used")
}

session := it.arrowIterator.(*storageArrowIterator).rs
expectedPrefix := fmt.Sprintf("projects/%s", storageOptimizedClient.projectID)
if !strings.HasPrefix(session.bqSession.Name, expectedPrefix) {
t.Fatalf("expected read session to have prefix %q: but found %s:", expectedPrefix, session.bqSession.Name)
}

it = table.Read(ctx, WithReadSessionProject("bigquery-public-data"))
_, err = countIteratorRows(it)
if err != nil {
t.Fatal(err)
}
if it.IsAccelerated() {
t.Fatal("expected storage api to not be used")
}
}

func TestIntegration_StorageReadFromSources(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
Expand Down
6 changes: 3 additions & 3 deletions bigquery/storage_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ type storageArrowIterator struct {

var _ ArrowIterator = &storageArrowIterator{}

func newStorageRowIteratorFromTable(ctx context.Context, table *Table, ordered, useClientProject bool) (*RowIterator, error) {
func newStorageRowIteratorFromTable(ctx context.Context, table *Table, rsProject string, ordered bool) (*RowIterator, error) {
md, err := table.Metadata(ctx)
if err != nil {
return nil, err
}
rs, err := table.c.rc.sessionForTable(ctx, table, ordered, useClientProject)
rs, err := table.c.rc.sessionForTable(ctx, table, rsProject, ordered)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -95,7 +95,7 @@ func newStorageRowIteratorFromJob(ctx context.Context, j *Job) (*RowIterator, er
return newStorageRowIteratorFromJob(ctx, lastJob)
}
ordered := query.HasOrderedResults(qcfg.Q)
return newStorageRowIteratorFromTable(ctx, qcfg.Dst, ordered, false)
return newStorageRowIteratorFromTable(ctx, qcfg.Dst, job.projectID, ordered)
}

func resolveLastChildSelectJob(ctx context.Context, job *Job) (*Job, error) {
Expand Down
17 changes: 10 additions & 7 deletions bigquery/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -968,17 +968,16 @@ func (t *Table) Delete(ctx context.Context) (err error) {
}

type tableReadOption struct {
useClientProject bool
readSessionProject string
}

// TableReadOption allows requests to alter the behavior of reading from a table.
type TableReadOption func(*tableReadOption)

// WithClientProject allows the read session to be created from the client project
// when reading from the table, instead of the table's project.
func WithClientProject() TableReadOption {
// WithReadSessionProject allows to create the read session with the specified project that has the necessary permissions to do so.
func WithReadSessionProject(project string) TableReadOption {
return func(tro *tableReadOption) {
tro.useClientProject = true
tro.readSessionProject = project
}
}

Expand All @@ -988,13 +987,17 @@ func (t *Table) Read(ctx context.Context, opts ...TableReadOption) *RowIterator
}

func (t *Table) read(ctx context.Context, pf pageFetcher, opts ...TableReadOption) *RowIterator {
tro := &tableReadOption{useClientProject: false}
tro := &tableReadOption{}
for _, o := range opts {
o(tro)
}

if tro.readSessionProject == "" {
tro.readSessionProject = t.c.projectID
}

if t.c.isStorageReadAvailable() {
it, err := newStorageRowIteratorFromTable(ctx, t, false, tro.useClientProject)
it, err := newStorageRowIteratorFromTable(ctx, t, tro.readSessionProject, false)
if err == nil {
return it
}
Expand Down

0 comments on commit 0746c09

Please sign in to comment.