diff --git a/.github/workflows/reviewdog.yml b/.github/workflows/reviewdog.yml new file mode 100644 index 0000000..002dc8e --- /dev/null +++ b/.github/workflows/reviewdog.yml @@ -0,0 +1,27 @@ +name: reviewdog on Pull Request +on: [pull_request] +jobs: + golangci-lint: + name: runner / golangci-lint + runs-on: ubuntu-latest + steps: + - name: Check out code into the Go module directory + uses: actions/checkout@v1 + - name: golangci-lint + uses: reviewdog/action-golangci-lint@v1 + with: + github_token: ${{ secrets.github_token }} + golangci_lint_flags: "--enable-all --exclude-use-default=false" + reporter: github-check + + misspell: + name: runner / misspell + runs-on: ubuntu-latest + steps: + - name: Check out code. + uses: actions/checkout@v1 + - name: misspell + uses: reviewdog/action-misspell@v1 + with: + github_token: ${{ secrets.github_token }} + reporter: github-check diff --git a/.gitignore b/.gitignore index 21036cc..910fc80 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,9 @@ .envrc .vscode +.bin +*.zip node_modules +apache-livy-0.7.0-incubating-bin jars/target jars/project/target diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000..22efde4 --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,63 @@ +linters-settings: + golint: + # minimal confidence for issues, default is 0.8 + min-confidence: 1.1 + + lll: + line-length: 180 + + funlen: + lines: 200 + statements: 130 + dupl: + threshold: 150 + gocognit: + min-complexity: 40 + errcheck: + # report about not checking of errors in type assertions: `a := b.(MyStruct)`; + # default is false: such cases aren't reported by default. + check-type-assertions: false + + # report about assignment of errors to blank identifier: `num, _ := strconv.Atoi(numStr)`; + # default is false: such cases aren't reported by default. + check-blank: false + + # [deprecated] comma-separated list of pairs of the form pkg:regex + # the regex is used to ignore names within pkg. (default "fmt:.*"). + # see https://github.com/kisielk/errcheck#the-deprecated-method for details + ignore: "fmt:.*,os:.*,Close" + +linters: + disable: + - dupl + - maligned + - gochecknoglobals + - gomnd + - wsl + - godox + - gocyclo + - interfacer + - unconvert + +issues: + exclude-rules: + - path: _test\.go + linters: + - ineffassign + - gocyclo + - errcheck + - dupl + - gosec + # List of regexps of issue texts to exclude, empty list by default. + # But independently from this option we use default exclude patterns, + # it can be disabled by `exclude-use-default: false`. To list all + # excluded by default patterns execute `golangci-lint run --help` + exclude: + - should have comment or be unexported + - S1000 + - ST1000 + - ST1003 + - ST1020 + - ST1021 + - G304 + - G402 diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..dc7a5d7 --- /dev/null +++ b/Makefile @@ -0,0 +1,10 @@ + +livy.install: + wget -q -O tmp.zip http://ftp.kddilabs.jp/infosystems/apache/incubator/livy/0.7.0-incubating/apache-livy-0.7.0-incubating-bin.zip && unzip tmp.zip && rm -f tmp.zip + +livy.start: + apache-livy-0.7.0-incubating-bin/bin/livy-server start + +livy.stop: + apache-livy-0.7.0-incubating-bin/bin/livy-server stop + diff --git a/README.md b/README.md index 617c0c9..51e4446 100644 --- a/README.md +++ b/README.md @@ -1 +1,20 @@ # livy-go + +## Getting Started + +### Livy Setup + +1. livy install +``` +make livy.install +``` + +2. livy start +``` +make livy.start +``` + +3. livy stop +``` +make livy.stop +``` diff --git a/batch.go b/batch.go index 192329f..b8267f2 100644 --- a/batch.go +++ b/batch.go @@ -4,8 +4,6 @@ import ( "fmt" "io" "net/http" - - "github.com/3-shake/livy-go/gensupport" ) type BatchesService struct { @@ -47,7 +45,7 @@ func (c *BatchesListCall) Do() (*Batches, error) { } batches := &Batches{} - err = gensupport.DecodeResponse(batches, res) + err = DecodeResponse(batches, res) if err != nil { return nil, err } @@ -62,7 +60,7 @@ func (c *BatchesListCall) doRequest() (*http.Response, error) { return nil, err } - return gensupport.SendRequest(c.s.client, req) + return SendRequest(c.s.client, req) } type BatchesGetCall struct { @@ -84,7 +82,7 @@ func (c *BatchesGetCall) Do() (*Batch, error) { } batch := &Batch{} - err = gensupport.DecodeResponse(batch, res) + err = DecodeResponse(batch, res) if err != nil { return nil, err } @@ -99,7 +97,7 @@ func (c *BatchesGetCall) doRequest() (*http.Response, error) { return nil, err } - return gensupport.SendRequest(c.s.client, req) + return SendRequest(c.s.client, req) } type BatchState struct { @@ -126,7 +124,7 @@ func (c *BatchesStateCall) Do() (*BatchState, error) { } state := &BatchState{} - err = gensupport.DecodeResponse(state, res) + err = DecodeResponse(state, res) if err != nil { return nil, err } @@ -141,7 +139,7 @@ func (c *BatchesStateCall) doRequest() (*http.Response, error) { return nil, err } - return gensupport.SendRequest(c.s.client, req) + return SendRequest(c.s.client, req) } type BatchesDeleteCall struct { @@ -176,7 +174,7 @@ func (c *BatchesDeleteCall) doRequest() (*http.Response, error) { return nil, err } - return gensupport.SendRequest(c.s.client, req) + return SendRequest(c.s.client, req) } type BatchLogResponse struct { @@ -208,7 +206,7 @@ func (c *BatchesLogCall) Do() (*BatchLogResponse, error) { batchLog := &BatchLogResponse{} - err = gensupport.DecodeResponse(batchLog, res) + err = DecodeResponse(batchLog, res) if err != nil { return nil, err } @@ -225,5 +223,5 @@ func (c *BatchesLogCall) doRequest() (*http.Response, error) { return nil, err } - return gensupport.SendRequest(c.s.client, req) + return SendRequest(c.s.client, req) } diff --git a/examples/spark.go b/examples/spark.go deleted file mode 100644 index 718bae4..0000000 --- a/examples/spark.go +++ /dev/null @@ -1,151 +0,0 @@ -package main - -import ( - "context" - "fmt" - "os/exec" - "strings" - "time" - - "github.com/k0kubun/pp" - "github.com/lithammer/dedent" - "github.com/3-shake/livy-go" -) - -func SessionsList() { - svc := livy.NewService(context.Background()) - res, err := svc.Sessions.List().Do() - pp.Println(res, err) -} - -func SessionsGet(sessionID int) *livy.Session { - svc := livy.NewService(context.Background()) - res, err := svc.Sessions.Get(sessionID).Do() - pp.Println(res, err) - return res -} - -func SessionsInsert(localJarPath string) *livy.Session { - svc := livy.NewService(context.Background()) - res, _ := svc.Sessions.Insert(&livy.InsertSessionRequest{ - Kind: livy.SessionKind_Spark, - Jars: []string{ - fmt.Sprintf("local://%v", localJarPath), - }, - Conf: map[string]string{ - "spark.driver.extraClassPath": localJarPath, - }, - }).Do() - - pp.Println(localJarPath, res) - return res -} - -func SessionsDelete(sessionID int) { - svc := livy.NewService(context.Background()) - err := svc.Sessions.Delete(sessionID).Do() - pp.Println(err) -} - -func SessionsState(sessionID int) { - svc := livy.NewService(context.Background()) - res, err := svc.Sessions.State(sessionID).Do() - pp.Println(res, err) -} - -func SessionsLog(sessionID int) { - svc := livy.NewService(context.Background()) - res, err := svc.Sessions.Log(sessionID).Do() - pp.Println(res, err) -} - -func StatementsList(sessionID int) { - svc := livy.NewService(context.Background()) - res, err := svc.Statements.List(sessionID).Do() - pp.Println(res, err) -} - -func StatementsGet(sessionID, statementID int) *livy.Statement { - svc := livy.NewService(context.Background()) - res, err := svc.Statements.Get(sessionID, statementID).Do() - pp.Println(res, err) - return res -} - -func StatementsWait(sessionID, statementID int) *livy.Statement { - t := time.NewTicker(5 * time.Second) - defer t.Stop() - - for range t.C { - stmt := StatementsGet(sessionID, statementID) - b, _ := stmt.Output.Data.MarshalJSON() - pp.Println(string(b)) - if stmt.State == livy.StatementState_Available { - return stmt - } - } - - return nil -} - -func StatementsInsert(sessionID int) *livy.Statement { - svc := livy.NewService(context.Background()) - letter := "val NUM_SAMPLES = 100000;\n" + - "val count = sc.parallelize(1 to NUM_SAMPLES).map { i => \n" + - "val x = Math.random();\n" + - "val y = Math.random();\n" + - "if (x*x + y*y < 1) 1 else 0\n" + - "}.reduce(_ + _);\n" + - "println(\"Pi is roughly \" + 4.0 * count / NUM_SAMPLES)" - - letter = "import com.locona.livy._\n" + - "val ds = WordCount.executor()\n" + - "ds.show(false)\n" + - "ds.printSchema" - res, err := svc.Statements.Insert(sessionID, &livy.InsertStatementRequest{ - Code: dedent.Dedent(letter), - }).Do() - pp.Println(res, err) - return res -} - -func BatchesList() { - svc := livy.NewService(context.Background()) - res, err := svc.Batches.List().Do() - pp.Println(res, err) -} - -// Stat is exported out of golang convention, rather than necessity - -func rootPath() (string, error) { - path, err := exec.Command("git", "rev-parse", "--show-toplevel").Output() - if err != nil { - return "", err - } - fmt.Println(string(path)) - return strings.TrimSpace(string(path)), nil -} - -func main() { - rootPath, err := rootPath() - if err != nil { - panic(err) - } - - jar := "/jars/target/scala-2.11/root-assembly-1.0.0-SNAPSHOT.jar" - jarPath := fmt.Sprintf("%v/%v", rootPath, jar) - SessionsInsert(jarPath) - - sessionID := 2 - // sessionID = session.ID - // session := SessionsGet(sessionID) - // SessionsDelete(sessionID) - // SessionsState(sessionID) - // SessionsLog(sessionID) - - // Statement - // StatementsList(sessionID) - statement := StatementsInsert(sessionID) - statement = StatementsWait(sessionID, statement.ID) - pp.Println(statement) -} diff --git a/gensupport/json.go b/gensupport/json.go deleted file mode 100644 index c7a34d7..0000000 --- a/gensupport/json.go +++ /dev/null @@ -1,20 +0,0 @@ -package gensupport - -import ( - "bytes" - "encoding/json" - "io" - "strings" -) - -func JSONReader(v interface{}) (io.Reader, error) { - buf := new(bytes.Buffer) - encoder := json.NewEncoder(buf) - encoder.SetEscapeHTML(false) - err := encoder.Encode(v) - if err != nil { - return nil, err - } - - return bytes.NewBufferString(strings.TrimRight(buf.String(), "\n")), nil -} diff --git a/gensupport/params.go b/gensupport/params.go deleted file mode 100644 index b340def..0000000 --- a/gensupport/params.go +++ /dev/null @@ -1,38 +0,0 @@ -package gensupport - -import ( - "net/url" -) - -// URLParams is a simplified replacement for url.Values -// that safely builds up URL parameters for encoding. -type URLParams map[string][]string - -// Get returns the first value for the given key, or "". -func (u URLParams) Get(key string) string { - vs := u[key] - if len(vs) == 0 { - return "" - } - return vs[0] -} - -// Set sets the key to value. -// It replaces any existing values. -func (u URLParams) Set(key, value string) { - u[key] = []string{value} -} - -// SetMulti sets the key to an array of values. -// It replaces any existing values. -// Note that values must not be modified after calling SetMulti -// so the caller is responsible for making a copy if necessary. -func (u URLParams) SetMulti(key string, values []string) { - u[key] = values -} - -// Encode encodes the values into ``URL encoded'' form -// ("bar=baz&foo=quux") sorted by key. -func (u URLParams) Encode() string { - return url.Values(u).Encode() -} diff --git a/gensupport/send.go b/gensupport/send.go deleted file mode 100644 index 90b57ae..0000000 --- a/gensupport/send.go +++ /dev/null @@ -1,43 +0,0 @@ -package gensupport - -import ( - "context" - "encoding/json" - "net/http" -) - -func SendRequest(client *http.Client, req *http.Request) (*http.Response, error) { - reqHeaders := make(http.Header) - reqHeaders.Set("Content-Type", "application/json") - req.Header = reqHeaders - return client.Do(req) - - // TODO: Send request. - // return send(ctx, client, req) -} - -func send(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error) { - if client == nil { - client = http.DefaultClient - } - resp, err := client.Do(req.WithContext(ctx)) - // If we got an error, and the context has been canceled, - // the context's error is probably more useful. - if err != nil { - select { - case <-ctx.Done(): - err = ctx.Err() - default: - } - } - return resp, err -} - -// DecodeResponse decodes the body of res into target. If there is no body, -// target is unchanged. -func DecodeResponse(target interface{}, res *http.Response) error { - if res.StatusCode == http.StatusNoContent { - return nil - } - return json.NewDecoder(res.Body).Decode(target) -} diff --git a/go.mod b/go.mod index c82288e..149fd6c 100644 --- a/go.mod +++ b/go.mod @@ -1,11 +1,11 @@ module github.com/3-shake/livy-go -go 1.12 +go 1.14 require ( - github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88 // indirect + github.com/google/uuid v1.1.1 github.com/k0kubun/pp v3.0.1+incompatible github.com/lithammer/dedent v1.1.0 - github.com/mattn/go-colorable v0.1.4 // indirect - golang.org/x/sys v0.0.0-20190412213103-97732733099d // indirect + github.com/mattn/go-colorable v0.1.6 // indirect + github.com/stretchr/testify v1.5.1 ) diff --git a/go.sum b/go.sum index 8bf7e9d..1e4c6be 100644 --- a/go.sum +++ b/go.sum @@ -1,13 +1,24 @@ -github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88 h1:uC1QfSlInpQF+M0ao65imhwqKnz3Q2z/d8PWZRMQvDM= -github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88/go.mod h1:3w7q1U84EfirKl04SVQ/s7nPm1ZPhiXd34z40TNz36k= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/k0kubun/pp v1.3.0 h1:r9td75hcmetrcVbmsZRjnxcIbI9mhm+/N6iWyG4TWe0= github.com/k0kubun/pp v3.0.1+incompatible h1:3tqvf7QgUnZ5tXO6pNAZlrvHgl6DvifjDrd9g2S9Z40= github.com/k0kubun/pp v3.0.1+incompatible/go.mod h1:GWse8YhT0p8pT4ir3ZgBbfZild3tgzSScAn6HmfYukg= github.com/lithammer/dedent v1.1.0 h1:VNzHMVCBNG1j0fh3OrsFRkVUwStdDArbgBWoPAffktY= github.com/lithammer/dedent v1.1.0/go.mod h1:jrXYCQtgg0nJiN+StA2KgR7w6CiQNv9Fd/Z9BP0jIOc= -github.com/mattn/go-colorable v0.1.4 h1:snbPLB8fVfU9iwbbo30TPtbLRzwWu6aJS6Xh4eaaviA= -github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= -github.com/mattn/go-isatty v0.0.8 h1:HLtExJ+uU2HOZ+wI0Tt5DtUDrx8yhUqDcp7fYERX4CE= -github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= -golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI= -golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +github.com/mattn/go-colorable v0.1.6 h1:6Su7aK7lXmJ/U79bYtBjLNaha4Fs1Rg9plHpcH+vvnE= +github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= +github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= +github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae h1:/WDfKMnPU+m5M4xB+6x4kaepxRw6jWvR5iDRdvjHgy8= +golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/httputil.go b/httputil.go new file mode 100644 index 0000000..f02d471 --- /dev/null +++ b/httputil.go @@ -0,0 +1,48 @@ +package livy + +import ( + "bytes" + "encoding/json" + "io" + "io/ioutil" + "net/http" + "strings" +) + +type ErrorResponse struct { + ErrorMessage string `json:"error_message"` +} + +func (err *ErrorResponse) Error() string { + return err.ErrorMessage +} + +func JSONReader(v interface{}) (io.Reader, error) { + buf := new(bytes.Buffer) + encoder := json.NewEncoder(buf) + encoder.SetEscapeHTML(false) + err := encoder.Encode(v) + if err != nil { + return nil, err + } + + return bytes.NewBufferString(strings.TrimRight(buf.String(), "\n")), nil +} + +func SendRequest(client *http.Client, req *http.Request) (*http.Response, error) { + reqHeaders := make(http.Header) + reqHeaders.Set("Content-Type", "application/json") + req.Header = reqHeaders + return client.Do(req) +} + +// DecodeResponse decodes the body of res into target. If there is no body, +// target is unchanged. +func DecodeResponse(target interface{}, res *http.Response) error { + if res.StatusCode >= http.StatusBadRequest { + b, _ := ioutil.ReadAll(res.Body) + return &ErrorResponse{string(b)} + } + + return json.NewDecoder(res.Body).Decode(target) +} diff --git a/livy_test.go b/livy_test.go new file mode 100644 index 0000000..734f553 --- /dev/null +++ b/livy_test.go @@ -0,0 +1,32 @@ +package livy_test + +import ( + "context" + "os" + "os/exec" + "strings" + "testing" + + "github.com/3-shake/livy-go" +) + +var service = livy.NewService(context.Background()) + +var RootPath string + +func TestMain(m *testing.M) { + rootPath() + + exitVal := m.Run() + + os.Exit(exitVal) +} + +func rootPath() { + path, err := exec.Command("git", "rev-parse", "--show-toplevel").Output() + if err != nil { + panic(err) + } + + RootPath = strings.TrimSpace(string(path)) +} diff --git a/service.go b/service.go index 222735a..bc1c55f 100644 --- a/service.go +++ b/service.go @@ -21,7 +21,6 @@ type Service struct { func NewService(ctx context.Context) *Service { return New(http.DefaultClient) - } func New(client *http.Client) *Service { diff --git a/session.go b/session.go index 4a6ffa3..2ba81e1 100644 --- a/session.go +++ b/session.go @@ -4,8 +4,6 @@ import ( "fmt" "io" "net/http" - - "github.com/3-shake/livy-go/gensupport" ) const ( @@ -45,16 +43,13 @@ type SessionsService struct { } type Sessions struct { - s *Service From int `json:"from"` Size int `json:"size"` Items []*Session `json:"sessions"` } type SessionsListCall struct { - s *Service - from int - size int + s *Service } type Session struct { @@ -92,7 +87,7 @@ func (c *SessionsListCall) Do() (*Sessions, error) { } sessions := &Sessions{} - err = gensupport.DecodeResponse(sessions, res) + err = DecodeResponse(sessions, res) if err != nil { return nil, err } @@ -107,7 +102,7 @@ func (c *SessionsListCall) doRequest() (*http.Response, error) { return nil, err } - return gensupport.SendRequest(c.s.client, req) + return SendRequest(c.s.client, req) } type SessionsGetCall struct { @@ -132,7 +127,7 @@ func (c *SessionsGetCall) Do() (*Session, error) { session := &Session{} - err = gensupport.DecodeResponse(session, res) + err = DecodeResponse(session, res) if err != nil { return nil, err } @@ -149,7 +144,7 @@ func (c *SessionsGetCall) doRequest() (*http.Response, error) { return nil, err } - return gensupport.SendRequest(c.s.client, req) + return SendRequest(c.s.client, req) } type InsertSessionRequest struct { @@ -205,7 +200,7 @@ func (c *SessionsInsertCall) Do() (*Session, error) { } session := &Session{} - err = gensupport.DecodeResponse(session, res) + err = DecodeResponse(session, res) if err != nil { return nil, err } @@ -215,9 +210,8 @@ func (c *SessionsInsertCall) Do() (*Session, error) { func (c *SessionsInsertCall) doRequest() (*http.Response, error) { url := c.s.BasePath + "/sessions" - var body io.Reader = nil - body, err := gensupport.JSONReader(c.insertSessionRequest) + body, err := JSONReader(c.insertSessionRequest) if err != nil { return nil, err } @@ -227,7 +221,7 @@ func (c *SessionsInsertCall) doRequest() (*http.Response, error) { return nil, err } - return gensupport.SendRequest(c.s.client, req) + return SendRequest(c.s.client, req) } type SessionsDeleteCall struct { @@ -262,7 +256,7 @@ func (c *SessionsDeleteCall) doRequest() (*http.Response, error) { return nil, err } - return gensupport.SendRequest(c.s.client, req) + return SendRequest(c.s.client, req) } type SessionStateResponse struct { @@ -292,7 +286,7 @@ func (c *SessionsGetStateCall) Do() (*SessionStateResponse, error) { sessionState := &SessionStateResponse{} - err = gensupport.DecodeResponse(sessionState, res) + err = DecodeResponse(sessionState, res) if err != nil { return nil, err } @@ -309,7 +303,7 @@ func (c *SessionsGetStateCall) doRequest() (*http.Response, error) { return nil, err } - return gensupport.SendRequest(c.s.client, req) + return SendRequest(c.s.client, req) } type SessionLogResponse struct { @@ -341,7 +335,7 @@ func (c *SessionsLogCall) Do() (*SessionLogResponse, error) { sessionLog := &SessionLogResponse{} - err = gensupport.DecodeResponse(sessionLog, res) + err = DecodeResponse(sessionLog, res) if err != nil { return nil, err } @@ -358,5 +352,5 @@ func (c *SessionsLogCall) doRequest() (*http.Response, error) { return nil, err } - return gensupport.SendRequest(c.s.client, req) + return SendRequest(c.s.client, req) } diff --git a/session_test.go b/session_test.go new file mode 100644 index 0000000..3e2229c --- /dev/null +++ b/session_test.go @@ -0,0 +1,82 @@ +package livy_test + +import ( + "fmt" + "testing" + + "github.com/google/uuid" + "github.com/k0kubun/pp" + "github.com/stretchr/testify/assert" + + "github.com/3-shake/livy-go" +) + +func TestSession_List(t *testing.T) { + _, _ = insert() + res, err := service.Sessions.List().Do() + pp.Println(res, err) + + assert.Equal(t, err, nil) +} + +func TestSession_Get(t *testing.T) { + sess, _ := insert() + res, err := service.Sessions.Get(sess.ID).Do() + pp.Println(res, err) + + assert.Equal(t, err, nil) +} + +func TestSession_Insert(t *testing.T) { + _, err := insert() + + pp.Println(err) + + assert.Equal(t, err, nil) +} + +func TestSession_Delete(t *testing.T) { + sess, _ := insert() + err := service.Sessions.Delete(sess.ID).Do() + + pp.Println(err) + + assert.Equal(t, err, nil) +} + +func TestSession_State(t *testing.T) { + sess, _ := insert() + res, err := service.Sessions.State(sess.ID).Do() + + pp.Println(res, err) + + assert.Equal(t, err, nil) +} + +func TestSession_Log(t *testing.T) { + sess, _ := insert() + res, err := service.Sessions.Log(sess.ID).Do() + + pp.Println(res, err) + + assert.Equal(t, err, nil) +} + +func insert() (*livy.Session, error) { + rootPath := RootPath + + jar := "/jars/target/scala-2.11/root-assembly-1.0.0-SNAPSHOT.jar" + jarPath := fmt.Sprintf("%v/%v", rootPath, jar) + + uid := uuid.New() + return service.Sessions.Insert(&livy.InsertSessionRequest{ + Name: uid.String(), + Kind: livy.SessionKind_Spark, + Jars: []string{ + fmt.Sprintf("local://%v", jarPath), + }, + Conf: map[string]string{ + "spark.driver.extraClassPath": jarPath, + }, + }).Do() +} diff --git a/statement.go b/statement.go index 02e2529..b17b11c 100644 --- a/statement.go +++ b/statement.go @@ -5,8 +5,7 @@ import ( "fmt" "io" "net/http" - - "github.com/3-shake/livy-go/gensupport" + "time" ) const ( @@ -20,7 +19,10 @@ const ( type StatementState string -type Statements []*Statement +type Statements struct { + TotalStatements int `json:"total_statements"` + Statements []*Statement `json:"statements"` +} type Statement struct { ID int @@ -62,7 +64,7 @@ func (c *StatementsListCall) Do() (*Statements, error) { } statements := &Statements{} - err = gensupport.DecodeResponse(statements, res) + err = DecodeResponse(statements, res) if err != nil { return nil, err } @@ -77,7 +79,7 @@ func (c *StatementsListCall) doRequest() (*http.Response, error) { return nil, err } - return gensupport.SendRequest(c.s.client, req) + return SendRequest(c.s.client, req) } type StatementsGetCall struct { @@ -101,7 +103,7 @@ func (c *StatementsGetCall) Do() (*Statement, error) { } statement := &Statement{} - err = gensupport.DecodeResponse(statement, res) + err = DecodeResponse(statement, res) if err != nil { return nil, err @@ -117,7 +119,7 @@ func (c *StatementsGetCall) doRequest() (*http.Response, error) { return nil, err } - return gensupport.SendRequest(c.s.client, req) + return SendRequest(c.s.client, req) } type InsertStatementRequest struct { @@ -129,12 +131,14 @@ type StatementsInsertCall struct { s *Service sessionID int insertStatementRequest *InsertStatementRequest + wait bool } -func (r *StatementsService) Insert(sessionID int, insertStatementRequest *InsertStatementRequest) *StatementsInsertCall { +func (r *StatementsService) Insert(sessionID int, insertStatementRequest *InsertStatementRequest, wait bool) *StatementsInsertCall { c := &StatementsInsertCall{s: r.s} c.sessionID = sessionID c.insertStatementRequest = insertStatementRequest + c.wait = wait return c } @@ -146,16 +150,34 @@ func (c *StatementsInsertCall) Do() (*Statement, error) { } statement := &Statement{} - err = gensupport.DecodeResponse(statement, res) + err = DecodeResponse(statement, res) if err != nil { return nil, err } - return statement, nil + if !c.wait { + return statement, nil + } + + availableStmt := &Statement{} + t := time.NewTicker(5 * time.Second) + defer t.Stop() + for range t.C { + availableStmt, err = c.s.Statements.Get(c.sessionID, statement.ID).Do() + if err != nil { + break + } + + if availableStmt.State == StatementState_Available { + break + } + } + + return availableStmt, err } func (c *StatementsInsertCall) doRequest() (*http.Response, error) { url := c.s.BasePath + fmt.Sprintf("/sessions/%v/statements", c.sessionID) - body, err := gensupport.JSONReader(c.insertStatementRequest) + body, err := JSONReader(c.insertStatementRequest) if err != nil { return nil, err } @@ -165,5 +187,5 @@ func (c *StatementsInsertCall) doRequest() (*http.Response, error) { return nil, err } - return gensupport.SendRequest(c.s.client, req) + return SendRequest(c.s.client, req) } diff --git a/statement_test.go b/statement_test.go new file mode 100644 index 0000000..96599c1 --- /dev/null +++ b/statement_test.go @@ -0,0 +1,107 @@ +package livy_test + +import ( + "testing" + "time" + + "github.com/k0kubun/pp" + "github.com/lithammer/dedent" + "github.com/stretchr/testify/assert" + + "github.com/3-shake/livy-go" +) + +func TestStatement_List(t *testing.T) { + sess, _ := insert() + + sessionWait(sess.ID) + _, _ = statementInsert(sess.ID) + + res, err := service.Statements.List(sess.ID).Do() + + pp.Println(res, err) + + assert.Equal(t, err, nil) +} + +func TestStatement_Get(t *testing.T) { + sess, _ := insert() + + sessionWait(sess.ID) + stmt, _ := statementInsert(sess.ID) + + res, err := service.Statements.Get(sess.ID, stmt.ID).Do() + + pp.Println(res, err) + + assert.Equal(t, err, nil) +} + +func TestStatement_Insert(t *testing.T) { + sess, _ := insert() + + sessionWait(sess.ID) + res, err := statementInsert(sess.ID) + pp.Println(res, err) + + assert.Equal(t, err, nil) + assert.Equal(t, res.State, livy.StatementState_Waiting) +} + +func TestStatement_Insert_Wait(t *testing.T) { + sess, _ := insert() + + sessionWait(sess.ID) + letter := "val NUM_SAMPLES = 100000;\n" + + "val count = sc.parallelize(1 to NUM_SAMPLES).map { i => \n" + + "val x = Math.random();\n" + + "val y = Math.random();\n" + + "if (x*x + y*y < 1) 1 else 0\n" + + "}.reduce(_ + _);\n" + + "println(\"Pi is roughly \" + 4.0 * count / NUM_SAMPLES)" + + letter = "import com.locona.livy._\n" + + "val ds = WordCount.executor()\n" + + "ds.show(false)\n" + + "ds.printSchema" + + res, err := service.Statements.Insert(sess.ID, &livy.InsertStatementRequest{ + Code: dedent.Dedent(letter), + }, true).Do() + + pp.Println(res, err) + + assert.Equal(t, err, nil) + assert.Equal(t, res.State, livy.StatementState_Available) +} + +func sessionWait(sessionID int) { + t := time.NewTicker(5 * time.Second) + defer t.Stop() + + for range t.C { + res, _ := service.Sessions.State(sessionID).Do() + if res.State == livy.SessionState_Idle { + return + } + } +} + +func statementInsert(sessionID int) (*livy.Statement, error) { + letter := "val NUM_SAMPLES = 100000;\n" + + "val count = sc.parallelize(1 to NUM_SAMPLES).map { i => \n" + + "val x = Math.random();\n" + + "val y = Math.random();\n" + + "if (x*x + y*y < 1) 1 else 0\n" + + "}.reduce(_ + _);\n" + + "println(\"Pi is roughly \" + 4.0 * count / NUM_SAMPLES)" + + letter = "import com.locona.livy._\n" + + "val ds = WordCount.executor()\n" + + "ds.show(false)\n" + + "ds.printSchema" + + return service.Statements.Insert(sessionID, &livy.InsertStatementRequest{ + Code: dedent.Dedent(letter), + }, false).Do() +}