Skip to content

Commit

Permalink
feat(spanner): PG JSONB support (#6874)
Browse files Browse the repository at this point in the history
* feat: PG_JSONB protoutils change

* feat: PG_JSONB changes

* feat: PG_JSONB changes

* feat: PG_JSONB changes

* feat: PG_JSONB change formatting

* feat: PG_JSONB change formatting

* fix build

* add tests

* fix lint

* refactoring

Co-authored-by: rahul2393 <irahul@google.com>
Co-authored-by: Rahul Yadav <rahulyadavsep92@gmail.com>
  • Loading branch information
3 people authored Nov 3, 2022
1 parent 3c4b2b3 commit 5b14658
Show file tree
Hide file tree
Showing 4 changed files with 306 additions and 0 deletions.
115 changes: 115 additions & 0 deletions spanner/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,16 @@ var (
PRIMARY KEY(AccountId)
)`,
`CREATE INDEX AccountByNickname ON Accounts(Nickname)`,
`CREATE TABLE Types (
RowID BIGINT PRIMARY KEY,
String VARCHAR,
Bytes BYTEA,
Int64a BIGINT,
Bool BOOL,
Float64 DOUBLE PRECISION,
Numeric NUMERIC,
JSONB jsonb
)`,
}

singerDBStatements = []string{
Expand Down Expand Up @@ -3453,6 +3463,111 @@ func TestIntegration_PGNumeric(t *testing.T) {
}
}

func TestIntegration_PGJSONB(t *testing.T) {
onlyRunForPGTest(t)
skipEmulatorTest(t)
t.Parallel()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
client, _, cleanup := prepareIntegrationTestForPG(ctx, t, DefaultSessionPoolConfig, singerDBPGStatements)
defer cleanup()

type Message struct {
Name string
Body string
Time int64
}
msg := Message{"Alice", "Hello", 1294706395881547000}
jsonStr := `{"Name":"Alice","Body":"Hello","Time":1294706395881547000}`
var unmarshalledJSONstruct interface{}
json.Unmarshal([]byte(jsonStr), &unmarshalledJSONstruct)

tests := []struct {
col string
val interface{}
want interface{}
}{
{col: "JSONB", val: PGJsonB{Value: msg, Valid: true}, want: PGJsonB{Value: unmarshalledJSONstruct, Valid: true}},
{col: "JSONB", val: PGJsonB{Value: msg, Valid: false}, want: PGJsonB{}},
}

// Write rows into table first using DML.
statements := make([]Statement, 0)
for i, test := range tests {
stmt := NewStatement(fmt.Sprintf("INSERT INTO Types (RowId, %s) VALUES ($1, $2)", test.col))
// Note: We are not setting the parameter type here to ensure that it
// can be automatically recognized when it is actually needed.
stmt.Params["p1"] = i
stmt.Params["p2"] = test.val
statements = append(statements, stmt)
}
_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
rowCounts, err := tx.BatchUpdate(ctx, statements)
if err != nil {
return err
}
if len(rowCounts) != len(tests) {
return fmt.Errorf("rowCounts length mismatch\nGot: %v\nWant: %v", len(rowCounts), len(tests))
}
for i, c := range rowCounts {
if c != 1 {
return fmt.Errorf("row count mismatch for row %v:\nGot: %v\nWant: %v", i, c, 1)
}
}
return nil
})
if err != nil {
t.Fatalf("failed to insert values using DML: %v", err)
}
// Delete all the rows so we can insert them using mutations as well.
_, err = client.Apply(ctx, []*Mutation{Delete("Types", AllKeys())})
if err != nil {
t.Fatalf("failed to delete all rows: %v", err)
}

// Verify that we can insert the rows using mutations.
var muts []*Mutation
for i, test := range tests {
muts = append(muts, InsertOrUpdate("Types", []string{"RowID", test.col}, []interface{}{i, test.val}))
}
if _, err := client.Apply(ctx, muts, ApplyAtLeastOnce()); err != nil {
t.Fatal(err)
}

for i, test := range tests {
row, err := client.Single().ReadRow(ctx, "Types", []interface{}{i}, []string{test.col})
if err != nil {
t.Fatalf("Unable to fetch row %v: %v", i, err)
}
verifyDirectPathRemoteAddress(t)
// Create new instance of type of test.want.
want := test.want
if want == nil {
want = test.val
}
gotp := reflect.New(reflect.TypeOf(want))
if err := row.Column(0, gotp.Interface()); err != nil {
t.Errorf("%d: col:%v val:%#v, %v", i, test.col, test.val, err)
continue
}
got := reflect.Indirect(gotp).Interface()

// One of the test cases is checking NaN handling. Given
// NaN!=NaN, we can't use reflect to test for it.
if isNaN(got) && isNaN(want) {
continue
}

// Check non-NaN cases.
if !testEqual(got, want) {
t.Errorf("%d: col:%v val:%#v, got %#v, want %#v", i, test.col, test.val, got, want)
continue
}
}

}

func readPGSingerTable(iter *RowIterator) ([][]interface{}, error) {
defer iter.Stop()
var vals [][]interface{}
Expand Down
4 changes: 4 additions & 0 deletions spanner/protoutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ func jsonType() *sppb.Type {
return &sppb.Type{Code: sppb.TypeCode_JSON}
}

func pgJsonbType() *sppb.Type {
return &sppb.Type{Code: sppb.TypeCode_JSON, TypeAnnotation: sppb.TypeAnnotationCode_PG_JSONB}
}

func bytesProto(b []byte) *proto3.Value {
return &proto3.Value{Kind: &proto3.Value_StringValue{StringValue: base64.StdEncoding.EncodeToString(b)}}
}
Expand Down
Loading

0 comments on commit 5b14658

Please sign in to comment.