-
Notifications
You must be signed in to change notification settings - Fork 123
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
SNOW-829454: Async API Example (#813)
- Loading branch information
1 parent
c6902de
commit 2f493c8
Showing
3 changed files
with
165 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
async |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
include ../../gosnowflake.mak | ||
CMD_TARGET=async | ||
|
||
## Install | ||
install: cinstall | ||
|
||
## Run | ||
run: crun | ||
|
||
## Lint | ||
lint: clint | ||
|
||
## Format source codes | ||
fmt: cfmt | ||
|
||
.PHONY: install run lint fmt |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,148 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"database/sql" | ||
"database/sql/driver" | ||
"flag" | ||
"fmt" | ||
sf "github.com/snowflakedb/gosnowflake" | ||
"io" | ||
"log" | ||
"os" | ||
"strconv" | ||
"strings" | ||
) | ||
|
||
func getDSN() (string, *sf.Config, error) { | ||
env := func(k string, failOnMissing bool) string { | ||
if value := os.Getenv(k); value != "" { | ||
return value | ||
} | ||
if failOnMissing { | ||
log.Fatalf("%v environment variable is not set.", k) | ||
} | ||
return "" | ||
} | ||
|
||
account := env("SNOWFLAKE_TEST_ACCOUNT", true) | ||
user := env("SNOWFLAKE_TEST_USER", true) | ||
password := env("SNOWFLAKE_TEST_PASSWORD", true) | ||
host := env("SNOWFLAKE_TEST_HOST", false) | ||
portStr := env("SNOWFLAKE_TEST_PORT", false) | ||
protocol := env("SNOWFLAKE_TEST_PROTOCOL", false) | ||
|
||
port := 443 // snowflake default port | ||
var err error | ||
if len(portStr) > 0 { | ||
port, err = strconv.Atoi(portStr) | ||
if err != nil { | ||
return "", nil, err | ||
} | ||
} | ||
|
||
cfg := &sf.Config{ | ||
Account: account, | ||
User: user, | ||
Password: password, | ||
Host: host, | ||
Port: port, | ||
Protocol: protocol, | ||
} | ||
|
||
dsn, err := sf.DSN(cfg) | ||
return dsn, cfg, err | ||
} | ||
|
||
func main() { | ||
if !flag.Parsed() { | ||
flag.Parse() | ||
} | ||
|
||
dsn, cfg, err := getDSN() | ||
if err != nil { | ||
log.Fatalf("failed to create DSN from Config: %v, err: %v", cfg, err) | ||
} | ||
|
||
db, err := sql.Open("snowflake", dsn) | ||
if err != nil { | ||
log.Fatalf("failed to connect. %v, err: %v", dsn, err) | ||
} | ||
defer db.Close() | ||
|
||
fmt.Println("Lets simulate long running query by passing execution logic as a function") | ||
driverRows := runAsyncDriverQuery(db, "CALL SYSTEM$WAIT(10, 'SECONDS')") | ||
fmt.Println("The query is running asynchronously - you can continue your workflow after starting the query") | ||
printDriverRowsResult(driverRows) | ||
|
||
fmt.Println("Lets simulate long running query using the standard sql package") | ||
sqlRows := runAsyncSQLQuery(db, "CALL SYSTEM$WAIT(10, 'SECONDS')") | ||
fmt.Println("The query is running asynchronously - you can continue your workflow after starting the query") | ||
printSQLRowsResult(sqlRows) | ||
} | ||
|
||
func runAsyncDriverQuery(db *sql.DB, query string) driver.Rows { | ||
// Enable asynchronous mode | ||
ctx := sf.WithAsyncMode(context.Background()) | ||
|
||
// Establish a connection | ||
conn, _ := db.Conn(ctx) | ||
var rows driver.Rows | ||
|
||
// Unwrap connection | ||
err := conn.Raw(func(x interface{}) error { | ||
var err error | ||
// Execute asynchronous query | ||
rows, err = x.(driver.QueryerContext).QueryContext(ctx, query, nil) | ||
|
||
return err | ||
}) | ||
|
||
if err != nil { | ||
log.Fatalf("unable to run the query. err: %v", err) | ||
} | ||
|
||
return rows | ||
} | ||
|
||
func runAsyncSQLQuery(db *sql.DB, query string) *sql.Rows { | ||
// Enable asynchronous mode | ||
ctx := sf.WithAsyncMode(context.Background()) | ||
|
||
// Execute asynchronous query | ||
rows, err := db.QueryContext(ctx, query) | ||
|
||
if err != nil { | ||
log.Fatalf("unable to run the query. err: %v", err) | ||
} | ||
|
||
return rows | ||
} | ||
|
||
func printDriverRowsResult(rows driver.Rows) { | ||
fmt.Println(strings.Join(rows.Columns(), ", ")) | ||
|
||
dest := make([]driver.Value, 1) | ||
for rows.Next(dest) != io.EOF { | ||
for val := range dest { | ||
fmt.Printf("%v\n", dest[val]) | ||
} | ||
} | ||
} | ||
|
||
func printSQLRowsResult(rows *sql.Rows) { | ||
cols, err := rows.Columns() | ||
if err != nil { | ||
log.Fatalf("failed to get columns. err: %v", err) | ||
} | ||
fmt.Println(strings.Join(cols, ", ")) | ||
|
||
var val string | ||
for rows.Next() { | ||
err := rows.Scan(&val) | ||
if err != nil { | ||
log.Fatalf("failed to scan rows. err: %v", err) | ||
} | ||
fmt.Printf("%v\n", val) | ||
} | ||
} |