Skip to content

Commit

Permalink
SNOW-1854657 Fallback to rows parser if JSON returned while in Arrow …
Browse files Browse the repository at this point in the history
…batches mode
  • Loading branch information
sfc-gh-pfus committed Jan 24, 2025
1 parent 72a121f commit 116e016
Show file tree
Hide file tree
Showing 12 changed files with 124 additions and 86 deletions.
2 changes: 1 addition & 1 deletion async.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (sr *snowflakeRestful) getAsync(
rows.errChannel <- err
return err
}
rows.format = respd.Data.QueryResultFormat
rows.format = resultFormat(respd.Data.QueryResultFormat)
rows.errChannel <- nil // mark query status complete
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion chunk_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (scd *snowflakeChunkDownloader) nextResultSet() error {
}

func (scd *snowflakeChunkDownloader) start() error {
if usesArrowBatches(scd.ctx) {
if usesArrowBatches(scd.ctx) && scd.getQueryResultFormat() == arrowFormat {
return scd.startArrowBatches()
}
scd.CurrentChunkSize = len(scd.RowSet.JSON) // cache the size
Expand Down
66 changes: 0 additions & 66 deletions chunk_downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,69 +72,3 @@ func TestWithArrowBatchesWhenQueryReturnsNoRowsAndReadingArrowBatches(t *testing
assertEmptyE(t, batches)
})
}

func TestWithArrowBatchesWhenQueryReturnsSomeRowsInGivenFormatUsingNativeGoSQLInterface(t *testing.T) {
for _, tc := range []struct {
useJSON bool
desc string
}{
{
useJSON: true,
desc: "json",
},
{
useJSON: false,
desc: "arrow",
},
} {
t.Run(tc.desc, func(t *testing.T) {
runDBTest(t, func(dbt *DBTest) {
if tc.useJSON {
dbt.mustExec(forceJSON)
}
var rows driver.Rows
var err error
err = dbt.conn.Raw(func(x interface{}) error {
rows, err = x.(driver.QueryerContext).QueryContext(WithArrowBatches(context.Background()), "SELECT 1", nil)
return err
})
assertNilF(t, err)
defer func() {
assertNilF(t, rows.Close())
}()
values := make([]driver.Value, 1)
assertNotNilE(t, rows.Next(values)) // we deliberately check that there is an error, because we are in arrow batches mode
assertEqualE(t, values[0], nil)
})
})
}
}

func TestWithArrowBatchesWhenQueryReturnsSomeRowsInGivenFormat(t *testing.T) {
for _, tc := range []struct {
useJSON bool
desc string
}{
{
useJSON: true,
desc: "json",
},
{
useJSON: false,
desc: "arrow",
},
} {
t.Run(tc.desc, func(t *testing.T) {
runDBTest(t, func(dbt *DBTest) {
if tc.useJSON {
dbt.mustExec(forceJSON)
}
rows := dbt.mustQueryContext(WithArrowBatches(context.Background()), "SELECT 1")
defer func() {
assertNilF(t, rows.Close())
}()
assertFalseF(t, rows.Next())
})
})
}
}
16 changes: 7 additions & 9 deletions chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,15 +560,13 @@ func testWithArrowBatchesButReturningJSON(t *testing.T, async bool) {
_, err := rows.(SnowflakeRows).GetArrowBatches()
assertNotNilF(t, err)
var se *SnowflakeError
errors.As(err, &se)
assertEqualE(t, se.Message, errJSONResponseInArrowBatchesMode)

ctx = WithRequestID(context.Background(), requestID)
rows2 := sct.mustQueryContext(ctx, "SELECT 'hello'", nil)
defer rows2.Close()
scanValues := make([]driver.Value, 1)
assertNilF(t, rows2.Next(scanValues))
assertEqualE(t, scanValues[0], "hello")
assertTrueE(t, errors.As(err, &se))
assertEqualE(t, se.Message, errMsgNonArrowResponseInArrowBatches)
assertEqualE(t, se.Number, ErrNonArrowResponseInArrowBatches)

v := make([]driver.Value, 1)
assertNilE(t, rows.Next(v))
assertEqualE(t, v[0], "hello")
})
}

Expand Down
1 change: 1 addition & 0 deletions cmd/arrow/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
arrow_batches
transform_batches_to_rows/transform_batches_to_rows
16 changes: 16 additions & 0 deletions cmd/arrow/transform_batches_to_rows/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
include ../../../gosnowflake.mak
CMD_TARGET=transform_batches_to_rows

## Install
install: cinstall

## Run
run: crun

## Lint
lint: clint

## Format source codes
fmt: cfmt

.PHONY: install run lint fmt
70 changes: 70 additions & 0 deletions cmd/arrow/transform_batches_to_rows/transform_batches_to_rows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package main

import (
"context"
"database/sql"
"database/sql/driver"
"errors"
"flag"
sf "github.com/snowflakedb/gosnowflake"
"io"
"log"
)

func main() {
if !flag.Parsed() {
flag.Parse()
}

cfg, err := sf.GetConfigFromEnv([]*sf.ConfigParam{
{Name: "Account", EnvName: "SNOWFLAKE_TEST_ACCOUNT", FailOnMissing: true},
{Name: "User", EnvName: "SNOWFLAKE_TEST_USER", FailOnMissing: true},
{Name: "Password", EnvName: "SNOWFLAKE_TEST_PASSWORD", FailOnMissing: true},
{Name: "Host", EnvName: "SNOWFLAKE_TEST_HOST", FailOnMissing: false},
{Name: "Port", EnvName: "SNOWFLAKE_TEST_PORT", FailOnMissing: false},
{Name: "Protocol", EnvName: "SNOWFLAKE_TEST_PROTOCOL", FailOnMissing: false},
})
if err != nil {
log.Fatalf("failed to create Config, err: %v", err)
}

connector := sf.NewConnector(sf.SnowflakeDriver{}, *cfg)
db := sql.OpenDB(connector)
defer db.Close()

conn, err := db.Conn(context.Background())
if err != nil {
log.Fatalf("cannot create a connection. %v", err)
}
defer conn.Close()

_, err = conn.ExecContext(context.Background(), "ALTER SESSION SET GO_QUERY_RESULT_FORMAT = json")
if err != nil {
log.Fatalf("cannot force JSON as result format. %v", err)
}

var rows driver.Rows
err = conn.Raw(func(x any) error {
rows, err = x.(driver.QueryerContext).QueryContext(sf.WithArrowBatches(context.Background()), "SELECT 1, 'hello' UNION SELECT 2, 'hi' UNION SELECT 3, 'howdy'", nil)
return err
})
if err != nil {
log.Fatalf("cannot run a query. %v", err)
}
defer rows.Close()

_, err = rows.(sf.SnowflakeRows).GetArrowBatches()
var se *sf.SnowflakeError
if !errors.As(err, &se) || se.Number != sf.ErrNonArrowResponseInArrowBatches {
log.Fatalf("expected to fail while retrieving arrow batches")
}

res := make([]driver.Value, 2)
for {
err = rows.Next(res)
if err == io.EOF {
break
}
println(res[0].(string), res[1].(string))
}
}
2 changes: 1 addition & 1 deletion connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ func (sc *snowflakeConn) queryContextInternal(
rows.sc = sc
rows.queryID = data.Data.QueryID
rows.ctx = ctx
rows.format = data.Data.QueryResultFormat
rows.format = resultFormat(data.Data.QueryResultFormat)

if isMultiStmt(&data.Data) {
// handleMultiQuery is responsible to fill rows with childResults
Expand Down
13 changes: 12 additions & 1 deletion doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,7 @@ the underlying data has already been loaded, and downloads it if not.
Limitations:
1. For some queries Snowflake may decide to return data in JSON format (examples: `SHOW PARAMETERS` or `ls @stage`). You cannot use JSON with Arrow batches context.
1. For some queries Snowflake may decide to return data in JSON format (examples: `SHOW PARAMETERS` or `ls @stage`). You cannot use JSON with Arrow batches context. See alternative below.
2. Snowflake handles timestamps in a range which is broader than available space in Arrow timestamp type. Because of that special treatment should be used (see below).
3. When using numbers, Snowflake chooses the smallest type that covers all values in a batch. So even when your column is NUMBER(38, 0), if all values are 8bits, array.Int8 is used.
Expand Down Expand Up @@ -741,6 +741,17 @@ WHen using NUMBERs with non zero scale, the value is returned as an integer type
Example. When we have a 123.45 value that comes from NUMBER(9, 4), it will be represented as 1234500 with scale equal to 4. It is a client responsibility to interpret it correctly.
Also - see limitations section above.
How to handle JSON responses in Arrow batches:
Due to technical limitations Snowflake backend may return Arrow even if client expects JSON.
In that case Arrow batches are not available and the error with code ErrNonArrowResponseInArrowBatches is returned.
The response is parsed to regular rows.
You can read rows in a way described in transform_batches_to_rows.go example.
This has a very strong limitation though - this is a very low level API (Go driver API), so there are no conversions ready.
All values are returned as strings.
Alternative approach is to rerun a query, but without enabling Arrow batches and use a general Go SQL API instead of driver API.
It can be optimized by using `WithRequestID`, so backend returns results from cache.
# Binding Parameters
Binding allows a SQL statement to use a value that is stored in a Golang variable.
Expand Down
1 change: 1 addition & 0 deletions dsn.go
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,7 @@ func GetConfigFromEnv(properties []*ConfigParam) (*Config, error) {
Region: region,
Passcode: passcode,
Application: application,
Params: map[string]*string{},
}
return cfg, nil
}
Expand Down
12 changes: 11 additions & 1 deletion errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ const (

// ErrFailedToGetChunk is an error code for the case where it failed to get chunk of result set
ErrFailedToGetChunk = 262000
// ErrNonArrowResponseInArrowBatches is an error code for case where ArrowBatches mode is enabled, but response is not Arrow-based
ErrNonArrowResponseInArrowBatches = 262001

/* transaction*/

Expand Down Expand Up @@ -308,7 +310,7 @@ const (
errMsgFailedToParseTomlFile = "failed to parse toml file. the params %v occurred error with value %v"
errMsgFailedToFindDSNInTomlFile = "failed to find DSN in toml file."
errMsgInvalidPermissionToTomlFile = "file permissions different than read/write for user. Your Permission: %v"
errJSONResponseInArrowBatchesMode = "arrow batches enabled, but the response is not Arrow based"
errMsgNonArrowResponseInArrowBatches = "arrow batches enabled, but the response is not Arrow based"
)

// Returned if a DNS doesn't include account parameter.
Expand Down Expand Up @@ -374,3 +376,11 @@ func errNullValueInMap() *SnowflakeError {
Message: errMsgNullValueInMap,
}
}

func errNonArrowResponseForArrowBatches(queryId string) *SnowflakeError {

Check failure on line 380 in errors.go

View workflow job for this annotation

GitHub Actions / Check linter

func parameter queryId should be queryID
return &SnowflakeError{
QueryID: queryId,
Number: ErrNonArrowResponseInArrowBatches,
Message: errMsgNonArrowResponseInArrowBatches,
}
}
9 changes: 3 additions & 6 deletions rows.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type snowflakeRows struct {
errChannel chan error
location *time.Location
ctx context.Context
format string
format resultFormat
}

func (rows *snowflakeRows) getLocation() *time.Location {
Expand Down Expand Up @@ -169,11 +169,8 @@ func (rows *snowflakeRows) GetArrowBatches() ([]*ArrowBatch, error) {
return nil, err
}

if rows.format != "arrow" {
return nil, (&SnowflakeError{
QueryID: rows.queryID,
Message: errJSONResponseInArrowBatchesMode,
}).exceptionTelemetry(rows.sc)
if rows.format != arrowFormat {
return nil, errNonArrowResponseForArrowBatches(rows.queryID).exceptionTelemetry(rows.sc)
}

return rows.ChunkDownloader.getArrowBatches(), nil
Expand Down

0 comments on commit 116e016

Please sign in to comment.