Skip to content

Commit

Permalink
feat(batch): Implement InsertBatch (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
thekuwayama authored Jun 1, 2020
1 parent 7b8a7fc commit 164cc3e
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ jobs:
rm livy.zip
ln -s apache-livy-${LIVY_VERSION}-incubating-bin livy
cp livy/conf/livy.conf.template livy/conf/livy.conf
echo "livy.file.local-dir-whitelist = /work" >> /opt/livy/conf/livy.conf
sudo mkdir -p /work
wget https://downloads.apache.org/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-without-hadoop.tgz -O spark.tgz
tar -xvzf spark.tgz
Expand Down
79 changes: 79 additions & 0 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,3 +225,82 @@ func (c *BatchesLogCall) doRequest() (*http.Response, error) {

return SendRequest(c.s.client, req)
}

type InsertBatchRequest struct {
// File containing the application to execute
File string `json:"file"`
// User to impersonate when starting the batch
ProxyUser string `json:"proxyUser,omitempty"`
// Application Java/Spark main class
ClassName string `json:"className,omitempty"`
// Command line arguments for the application
Args []string `json:"args,omitempty"`
// jars to be used in this session
Jars []string `json:"jars,omitempty"`
// Python files to be used in this session
PyFiles []string `json:"pyFiles,omitempty"`
// files to be used in this session
Files []string `json:"files,omitempty"`
// Amount of memory to use for the driver process
DriverMemory string `json:"driverMemory,omitempty"`
// Number of cores to use for the driver process
DriverCores int `json:"driverCores,omitempty"`
// Amount of memory to use per executor process
ExecutorMemory string `json:"executorMemory,omitempty"`
// Number of cores to use for each executor
ExecutorCores int `json:"executorCores,omitempty"`
// Number of executors to launch for this session
NumExecutors int `json:"numExecutors,omitempty"`
// Archives to be used in this session
Archives []string `json:"archives,omitempty"`
// The name of the YARN queue to which submitted
Queue string `json:"queue,omitempty"`
// The name of this session
Name string `json:"name,omitempty"`
// Spark configuration properties
Conf map[string]string `json:"conf,omitempty"`
}

type BatchesInsertCall struct {
s *Service
insertBatchRequest *InsertBatchRequest
}

// Insert: Creates a new batch.
func (r *BatchesService) Insert(insertBatchRequest *InsertBatchRequest) *BatchesInsertCall {
c := &BatchesInsertCall{s: r.s}
c.insertBatchRequest = insertBatchRequest

return c
}

func (c *BatchesInsertCall) Do() (*Batch, error) {
res, err := c.doRequest()
if err != nil {
return nil, err
}

batch := &Batch{}
err = DecodeResponse(batch, res)
if err != nil {
return nil, err
}

return batch, nil
}

func (c *BatchesInsertCall) doRequest() (*http.Response, error) {
url := c.s.BasePath + "/batches"

body, err := JSONReader(c.insertBatchRequest)
if err != nil {
return nil, err
}

req, err := http.NewRequest("POST", url, body)
if err != nil {
return nil, err
}

return SendRequest(c.s.client, req)
}
73 changes: 73 additions & 0 deletions integration_tests/batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package livy_test

import (
"fmt"
"testing"

"github.com/google/uuid"
"github.com/k0kubun/pp"
"github.com/stretchr/testify/assert"

"github.com/3-shake/livy-go"
)

func TestBatch_List(t *testing.T) {
res, err := service.Batches.List().Do()
pp.Println(res, err)

assert.Equal(t, err, nil)
}

func TestBatch_Get(t *testing.T) {
bat, _ := insertBatch()
res, err := service.Batches.Get(bat.ID).Do()
pp.Println(res, err)

assert.Equal(t, err, nil)
}

func TestBatch_Insert(t *testing.T) {
_, err := insertBatch()

pp.Println(err)

assert.Equal(t, err, nil)
}

func TestBatch_Delete(t *testing.T) {
bat, _ := insertBatch()
err := service.Batches.Delete(bat.ID).Do()

pp.Println(err)

assert.Equal(t, err, nil)
}

func TestBatch_State(t *testing.T) {
bat, _ := insertBatch()
res, err := service.Batches.State(bat.ID).Do()

pp.Println(res, err)

assert.Equal(t, err, nil)
}

func TestBatch_Log(t *testing.T) {
bat, _ := insertBatch()
res, err := service.Batches.Log(bat.ID).Do()

pp.Println(res, err)

assert.Equal(t, err, nil)
}

func insertBatch() (*livy.Batch, error) {
className := "com.example.livy.WordCount"
jarPath := "/work/root-assembly-1.0.0-SNAPSHOT.jar"
uid := uuid.New()
return service.Batches.Insert(&livy.InsertBatchRequest{
File: fmt.Sprintf("local:%v", jarPath),
ClassName: className,
Name: uid.String(),
}).Do()
}
2 changes: 2 additions & 0 deletions integration_tests/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,6 @@ services:
container_name: livy
ports:
- 8998:8998
volumes:
- ./wordcount/target/scala-2.11:/work
restart: always

0 comments on commit 164cc3e

Please sign in to comment.