From db459dad47e2af4f83f32185205488bd1112beee Mon Sep 17 00:00:00 2001 From: rsulli-rai <124619653+rsulli-rai@users.noreply.github.com> Date: Wed, 5 Apr 2023 13:54:08 -0600 Subject: [PATCH] Add support for Snowflake data stream APIs (#54) * Add support for Snowflake data stream APIs db-link fqn * typo * Add engine args to create-snowflake-integration * minor * Update changelog * change data stream command args --------- Co-authored-by: Wien Leung --- CHANGELOG.md | 5 ++ go.mod | 2 +- go.sum | 4 +- rai/cmds.go | 73 ++++++++++++++++++- rai/main.go | 44 +++++++++++ .../relationalai/rai-sdk-go/rai/client.go | 67 ++++++++++++++++- .../relationalai/rai-sdk-go/rai/models.go | 51 +++++++++++++ .../relationalai/rai-sdk-go/rai/version.go | 2 +- vendor/modules.txt | 2 +- 9 files changed, 241 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 394edb1..79ace72 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +## v0.1.5-alpha + +* Add Snowflake data stream commands +* Update Snowflake integration creation to require assigning a RAI engine + ## v0.1.4-alpha * Add Snowflake integration commands diff --git a/go.mod b/go.mod index c536037..7eac327 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.18 require ( github.com/pkg/errors v0.9.1 - github.com/relationalai/rai-sdk-go v0.4.4-alpha + github.com/relationalai/rai-sdk-go v0.4.5-alpha github.com/spf13/cobra v1.5.0 ) diff --git a/go.sum b/go.sum index 49b19ea..2ec77ae 100644 --- a/go.sum +++ b/go.sum @@ -277,8 +277,8 @@ github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= -github.com/relationalai/rai-sdk-go v0.4.4-alpha h1:AsZsEWK8K9RPIQ0hFDFuPHqtUrhY3Yz5TeRdhkzPvNw= -github.com/relationalai/rai-sdk-go v0.4.4-alpha/go.mod h1:qfIZ7OiUpM+ZLPB8m358DFQPRPFzMjM4ghX4iKR8UHo= +github.com/relationalai/rai-sdk-go v0.4.5-alpha h1:+F7Z2K7H2q8S1x4Y0P7baz9GyCGOlTIVRmnacwmo228= +github.com/relationalai/rai-sdk-go v0.4.5-alpha/go.mod h1:qfIZ7OiUpM+ZLPB8m358DFQPRPFzMjM4ghX4iKR8UHo= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= diff --git a/rai/cmds.go b/rai/cmds.go index fe79fa2..12b656d 100644 --- a/rai/cmds.go +++ b/rai/cmds.go @@ -785,6 +785,7 @@ func updateUser(cmd *cobra.Command, args []string) { func createSnowflakeIntegration(cmd *cobra.Command, args []string) { action := newAction(cmd) name := args[0] + engine := action.getString("engine") account := action.getStringEnv("account", "SNOWSQL_ACCOUNT") adminUsername := action.getStringEnv("admin-username", "SNOWSQL_USER") adminPassword := action.getStringEnv("admin-password", "SNOWSQL_PWD") @@ -796,7 +797,7 @@ func createSnowflakeIntegration(cmd *cobra.Command, args []string) { Username: proxyUsername, Password: proxyPassword} action.Start("Create Snowflake integration '%s' account='%s'", name, account) rsp, err := action.Client().CreateSnowflakeIntegration( - name, account, &adminCreds, &proxyCreds) + name, account, engine, &adminCreds, &proxyCreds) action.Exit(rsp, err) } @@ -837,7 +838,7 @@ func createSnowflakeDatabaseLink(cmd *cobra.Command, args []string) { schema := action.getStringEnv("schema", "SNOWSQL_SCHEMA") role := action.getStringEnv("role", "SNOWSQL_ROLE") username := action.getStringEnv("username", "SNOWSQL_USER") - password := action.getStringEnv("username", "SNOWSQL_PWD") + password := action.getStringEnv("password", "SNOWSQL_PWD") creds := rai.SnowflakeCredentials{Username: username, Password: password} name := fmt.Sprintf("%s.%s", database, schema) action.Start("Create Snowflake database link '%s' (%s)", name, integration) @@ -853,7 +854,7 @@ func deleteSnowflakeDatabaseLink(cmd *cobra.Command, args []string) { schema := action.getStringEnv("schema", "SNOWSQL_SCHEMA") role := action.getStringEnv("role", "SNOWSQL_ROLE") username := action.getStringEnv("username", "SNOWSQL_USER") - password := action.getStringEnv("username", "SNOWSQL_PWD") + password := action.getStringEnv("password", "SNOWSQL_PWD") creds := rai.SnowflakeCredentials{Username: username, Password: password} name := fmt.Sprintf("%s.%s", database, schema) action.Start("Delete Snowflake database link '%s' (%s)", name, integration) @@ -881,6 +882,72 @@ func listSnowflakeDatabaseLinks(cmd *cobra.Command, args []string) { action.Exit(rsp, err) } +// +// Snowflake data streams +// + +func createSnowflakeDataStream(cmd *cobra.Command, args []string) { + action := newAction(cmd) + integration := args[0] + dbLink := args[1] + dataStream := args[2] + role := action.getStringEnv("role", "SNOWSQL_ROLE") + warehouse := action.getStringEnv("warehouse", "SNOWSQL_WAREHOUSE") + username := action.getStringEnv("username", "SNOWSQL_USER") + password := action.getStringEnv("password", "SNOWSQL_PWD") + isView := action.getBool("is-view") + raiDatabase := action.getString("rai-database") + relation := action.getString("rai-relation") + creds := &rai.SnowflakeCredentials{Username: username, Password: password} + + opts := &rai.DataStreamOpts{ + IsView: isView, + RaiDatabase: raiDatabase, + Relation: relation, + ObjectName: dataStream, + Role: role, + Warehouse: warehouse, + } + action.Start("Create Snowflake data stream '%s' (%s)", dataStream, integration) + rsp, err := action.Client().CreateSnowflakeDataStream(integration, dbLink, creds, opts) + action.Exit(rsp, err) +} + +func deleteSnowflakeDataStream(cmd *cobra.Command, args []string) { + action := newAction(cmd) + integration := args[0] + dbLink := args[1] + dataStream := args[2] + role := action.getStringEnv("role", "SNOWSQL_ROLE") + username := action.getStringEnv("username", "SNOWSQL_USER") + password := action.getStringEnv("password", "SNOWSQL_PWD") + creds := rai.SnowflakeCredentials{Username: username, Password: password} + action.Start("Delete Snowflake data stream %s (%s)", dataStream, integration) + err := action.Client().DeleteSnowflakeDataStream( + integration, dbLink, dataStream, role, &creds, + ) + action.Exit(nil, err) +} + +func getSnowflakeDataStream(cmd *cobra.Command, args []string) { + action := newAction(cmd) + integration := args[0] + dbLink := args[1] + dataStream := args[2] + action.Start("Get Snowflake data stream %s (%s)", dataStream, integration) + rsp, err := action.Client().GetSnowflakeDataStream(integration, dbLink, dataStream) + action.Exit(rsp, err) +} + +func listSnowflakeDataStreams(cmd *cobra.Command, args []string) { + action := newAction(cmd) + integration := args[0] + dbLink := args[1] + action.Start("List Snowflake dataStreams linked to %s (%s)", dbLink, integration) + rsp, err := action.Client().ListSnowflakeDataStreams(integration, dbLink) + action.Exit(rsp, err) +} + // // Misc // diff --git a/rai/main.go b/rai/main.go index 0ef97bf..39c47a2 100644 --- a/rai/main.go +++ b/rai/main.go @@ -296,6 +296,8 @@ func addCommands(root *cobra.Command) { Short: "Create a Snowflake integration", Args: cobra.ExactArgs(1), Run: createSnowflakeIntegration} + cmd.Flags().String("engine", "", "default RAI engine for integration initiated actions (required)") + cmd.MarkFlagRequired("engine") cmd.Flags().String("account", "", "Snowflake account (default: SNOWSQL_ACCOUNT env var)") cmd.Flags().String("admin-username", "", "Snowflake admin username (default: SNOWSQL_USER env var") cmd.Flags().String("admin-password", "", "Snowflake admin password (default: SNOWSQL_PWD env var") @@ -367,6 +369,48 @@ func addCommands(root *cobra.Command) { Run: listSnowflakeDatabaseLinks} root.AddCommand(cmd) + // Snowflake Data Streams + cmd = &cobra.Command{ + Use: "create-snowflake-data-stream integration database-link objectName", + Short: "Create Snowflake data stream associated with an integration", + Args: cobra.ExactArgs(3), + Run: createSnowflakeDataStream} + cmd.Flags().String("role", "", "Snowflake role (default: SNOWSQL_ROLE env var)") + cmd.Flags().String("warehouse", "", "Snowflake warehouse (default: SNOWSQL_WAREHOUSE env var)") + cmd.Flags().String("username", "", "Snowflake username (default: SNOWSQL_USER env var)") + cmd.Flags().String("password", "", "Snowflake password (default: SNOWSQL_PWD env var)") + cmd.Flags().Bool("is-view", false, "Snowflake object is a view (default: false)") + cmd.Flags().String("rai-database", "", "RelationalAI target database name") + cmd.MarkFlagRequired("rai-database") + cmd.Flags().String("rai-relation", "", "RelationalAI target relation") + cmd.MarkFlagRequired("rai-relation") + root.AddCommand(cmd) + + cmd = &cobra.Command{ + Use: "delete-snowflake-data-stream integration database-link objectName", + Short: "Delete a Snowflake data stream associated with an integration", + Args: cobra.ExactArgs(3), + Run: deleteSnowflakeDataStream} + cmd.Flags().String("role", "", "Snowflake role (default: SNOWSQL_ROLE env var)") + cmd.Flags().String("warehouse", "", "Snowflake warehouse (default: SNOWSQL_WAREHOUSE env var)") + cmd.Flags().String("username", "", "Snowflake username (default: SNOWSQL_USER env var)") + cmd.Flags().String("password", "", "Snowflake password (default: SNOWSQL_PWD env var)") + root.AddCommand(cmd) + + cmd = &cobra.Command{ + Use: "get-snowflake-data-stream integration database-link objectName", + Short: "Get information about a Snowflake data stream associated with an integration and database-link", + Args: cobra.ExactArgs(3), + Run: getSnowflakeDataStream} + root.AddCommand(cmd) + + cmd = &cobra.Command{ + Use: "list-snowflake-data-streams integration database-link", + Short: "List Snowflake data streams associated with an integration ", + Args: cobra.ExactArgs(2), + Run: listSnowflakeDataStreams} + root.AddCommand(cmd) + // Misc cmd = &cobra.Command{ Use: "get-access-token", diff --git a/vendor/github.com/relationalai/rai-sdk-go/rai/client.go b/vendor/github.com/relationalai/rai-sdk-go/rai/client.go index 9388bdd..d3beb30 100644 --- a/vendor/github.com/relationalai/rai-sdk-go/rai/client.go +++ b/vendor/github.com/relationalai/rai-sdk-go/rai/client.go @@ -1625,13 +1625,14 @@ func (c *Client) UpdateUser(id string, req UpdateUserRequest) (*User, error) { // func (c *Client) CreateSnowflakeIntegration( - name, snowflakeAccount string, adminCreds, proxyCreds *SnowflakeCredentials, + name, snowflakeAccount, raiEngine string, adminCreds, proxyCreds *SnowflakeCredentials, ) (*Integration, error) { var result Integration req := createSnowflakeIntegrationRequest{Name: name} req.Snowflake.Account = snowflakeAccount req.Snowflake.Admin = *adminCreds req.Snowflake.Proxy = *proxyCreds + req.RAI.Engine = raiEngine if err := c.Post(PathIntegrations, nil, &req, &result); err != nil { return nil, err } @@ -1713,3 +1714,67 @@ func (c *Client) ListSnowflakeDatabaseLinks( } return result, nil } + +// +// Snowflake Data Streams +// + +type DataStreamOpts struct { + IsView bool + RaiDatabase string + Relation string + ObjectName string + Role string + Warehouse string +} + +func (c *Client) CreateSnowflakeDataStream( + integration, dbLink string, creds *SnowflakeCredentials, opts *DataStreamOpts, +) (*SnowflakeDataStream, error) { + var result SnowflakeDataStream + path := makePath(PathIntegrations, integration, "database-links", dbLink, "data-streams") + req := createSnowflakeDataStreamRequest{} + req.Snowflake.Object = opts.ObjectName + req.Snowflake.IsView = opts.IsView + req.Snowflake.Role = opts.Role + req.Snowflake.Warehouse = opts.Warehouse + req.Snowflake.Credentials = *creds + req.RAI.Database = opts.RaiDatabase + req.RAI.Relation = opts.Relation + if err := c.Post(path, nil, &req, &result); err != nil { + return nil, err + } + return &result, nil +} + +func (c *Client) DeleteSnowflakeDataStream( + integration, dbLink, objectName, role string, creds *SnowflakeCredentials, +) error { + path := makePath(PathIntegrations, integration, "database-links", dbLink, "data-streams", objectName) + req := deleteSnowflakeDataStreamRequest{} + req.Snowflake.Role = role + req.Snowflake.Credentials = *creds + return c.Delete(path, nil, &req, nil) +} + +func (c *Client) GetSnowflakeDataStream( + integration, dbLink, objectName string, +) (*SnowflakeDataStream, error) { + var result SnowflakeDataStream + path := makePath(PathIntegrations, integration, "database-links", dbLink, "data-streams", objectName) + if err := c.Get(path, nil, nil, &result); err != nil { + return nil, err + } + return &result, nil +} + +func (c *Client) ListSnowflakeDataStreams( + integration, dbLink string, +) ([]SnowflakeDataStream, error) { + var result []SnowflakeDataStream + path := makePath(PathIntegrations, integration, "database-links", dbLink, "data-streams") + if err := c.Get(path, nil, nil, &result); err != nil { + return nil, err + } + return result, nil +} diff --git a/vendor/github.com/relationalai/rai-sdk-go/rai/models.go b/vendor/github.com/relationalai/rai-sdk-go/rai/models.go index 1b6faa1..7a510d1 100644 --- a/vendor/github.com/relationalai/rai-sdk-go/rai/models.go +++ b/vendor/github.com/relationalai/rai-sdk-go/rai/models.go @@ -351,6 +351,9 @@ type Integration struct { Snowflake struct { Account string `json:"account"` } `json:"snowflake"` + RAI struct { + Engine string `json:"engine"` + } `json:"rai"` } type SnowflakeCredentials struct { @@ -365,6 +368,9 @@ type createSnowflakeIntegrationRequest struct { Admin SnowflakeCredentials `json:"admin"` // not-persisted Proxy SnowflakeCredentials `json:"proxy"` // persisted } `json:"snowflake"` + RAI struct { + Engine string `json:"engine"` + } `json:"rai"` } type deleteSnowflakeIntegrationRequest struct { @@ -406,3 +412,48 @@ type deleteSnowflakeDatabaseLinkRequest struct { Credentials SnowflakeCredentials `json:"credentials"` // not-persisted } `json:"snowflake"` } + +// +// Snowflake Data Stream +// + +type SnowflakeDataStream struct { + Account string `json:"account"` // partition key + ID string `json:"id"` + Name string `json:"name"` // database.schema.object + Integration string `json:"integration"` + DbLink string `json:"dbLink"` + CreatedBy string `json:"createdBy"` + CreatedOn string `json:"createdOn"` + State string `json:"state"` + Snowflake struct { + Database string `json:"database"` + Schema string `json:"schema"` + Object string `json:"object"` // fully qualified object name + } `json:"snowflake"` + RAI struct { + Database string `json:"database"` + Relation string `json:"relation"` + } `json:"rai"` +} + +type createSnowflakeDataStreamRequest struct { + Snowflake struct { + Object string `json:"object"` // fully qualified object name + IsView bool `json:"isView"` + Role string `json:"role"` + Warehouse string `json:"warehouse"` + Credentials SnowflakeCredentials `json:"credentials"` // not-persisted + } `json:"snowflake"` + RAI struct { + Database string `json:"database"` + Relation string `json:"relation"` + } `json:"rai"` +} + +type deleteSnowflakeDataStreamRequest struct { + Snowflake struct { + Role string `json:"role"` + Credentials SnowflakeCredentials `json:"credentials"` // not-persisted + } `json:"snowflake"` +} diff --git a/vendor/github.com/relationalai/rai-sdk-go/rai/version.go b/vendor/github.com/relationalai/rai-sdk-go/rai/version.go index f5cc31d..1959445 100644 --- a/vendor/github.com/relationalai/rai-sdk-go/rai/version.go +++ b/vendor/github.com/relationalai/rai-sdk-go/rai/version.go @@ -14,4 +14,4 @@ package rai -const Version = "0.4.4-alpha" +const Version = "0.4.5-alpha" diff --git a/vendor/modules.txt b/vendor/modules.txt index cc5bd3a..4e7660c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -49,7 +49,7 @@ github.com/pierrec/lz4/v4/internal/xxh32 # github.com/pkg/errors v0.9.1 ## explicit github.com/pkg/errors -# github.com/relationalai/rai-sdk-go v0.4.4-alpha +# github.com/relationalai/rai-sdk-go v0.4.5-alpha ## explicit; go 1.18 github.com/relationalai/rai-sdk-go/rai github.com/relationalai/rai-sdk-go/rai/pb