Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
PratikDhanave committed Jul 28, 2018
1 parent af46b11 commit d388644
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 14 deletions.
50 changes: 36 additions & 14 deletions streamdataprocessing/clouddataflow/clouddataflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,16 @@ import (
googleauth "github.com/cloudlibz/gocloud/googleauth"
"io/ioutil"
"net/http"
"time"
)


const (
UnixDate = "Mon Jan _2 15:04:05 MST 2006"
RFC3339 = "2006-01-02T15:04:05Z07:00"
)


//DescribeStream Describe Stream
func (clouddataflow *Clouddataflow) DescribeStream(request interface{}) (resp interface{}, err error) {

Expand Down Expand Up @@ -102,7 +110,7 @@ func (clouddataflow *Clouddataflow) DeleteStream(request interface{}) (resp inte
func (clouddataflow *Clouddataflow) CreateStream(request interface{}) (resp interface{}, err error) {

param := request.(map[string]interface{})
var Project string
var View string
var option Createstream

for key, value := range param {
Expand All @@ -112,6 +120,10 @@ func (clouddataflow *Clouddataflow) CreateStream(request interface{}) (resp inte
projectIdv, _ := value.(string)
option.ProjectID = projectIdv

case "View":
viewv, _ := value.(string)
View = viewv

case "ClientRequestID":
clientRequestIDv, _ := value.(string)
option.ClientRequestID = clientRequestIDv
Expand All @@ -135,6 +147,7 @@ func (clouddataflow *Clouddataflow) CreateStream(request interface{}) (resp inte
case "CurrentStateTime":
currentStateTimev, _ := value.(string)
option.CurrentStateTime = currentStateTimev
option.CurrentStateTime = time.Now().UTC().Format(time.RFC3339)

case "RequestedState":
requestedStatev, _ := value.(string)
Expand All @@ -143,7 +156,7 @@ func (clouddataflow *Clouddataflow) CreateStream(request interface{}) (resp inte
case "CreateTime":
createTimev, _ := value.(string)
option.CreateTime = createTimev
option.CreateTime = createTimev
option.CreateTime = time.Now().UTC().Format(time.RFC3339)

case "ReplaceJobId":
replaceJobIdv, _ := value.(string)
Expand Down Expand Up @@ -267,16 +280,36 @@ func (clouddataflow *Clouddataflow) CreateStream(request interface{}) (resp inte

createstreamjson, _ := json.Marshal(createstreamjsonmap)

fmt.Println("Json : \n",string(createstreamjson))

createstreamjsonstring := string(createstreamjson)

var createstreamjsonstringbyte = []byte(createstreamjsonstring)

url := "https://www.googleapis.com/dns/v1/projects/" + Project + "/managedZones"
url := "https://dataflow.googleapis.com/v1b3/projects/" + option.ProjectID + "/jobs"

client := googleauth.SignJWT()

createstreamrequest, err := http.NewRequest("POST", url, bytes.NewBuffer(createstreamjsonstringbyte))


createstreamrequestparam := createstreamrequest.URL.Query()

if View != "" {
createstreamrequestparam.Add("view", View)
}

if option.Location != "" {
createstreamrequestparam.Add("location", option.Location)
}

if option.ReplaceJobId != "" {
createstreamrequestparam.Add("replaceJobId", option.ReplaceJobId)
}

createstreamrequest.URL.RawQuery = createstreamrequestparam.Encode()


createstreamrequest.Header.Set("Content-Type", "application/json")

createstreamresp, err := client.Do(createstreamrequest)
Expand Down Expand Up @@ -338,17 +371,6 @@ func createstreamdictnoaryconvert(option Createstream, createstreamjsonmap map[s
prepareEnvironment(option, createstreamjsonmap)
}

type UserAgent struct {
Name string `json:"name"`
support Support `json:"support"`
BuildDate string `json:"build.date"`
Version string `json:"version"`
}

type Support struct {
Status string `json:"status"`
URL string `json:"url"`
}

func prepareEnvironment(option Createstream, createstreamjsonmap map[string]interface{}) {

Expand Down
90 changes: 90 additions & 0 deletions streamdataprocessing/clouddataflow/clouddataflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,93 @@ func TestDescribeStream(t *testing.T) {

fmt.Println(response["body"])
}


func TestCreateStream(t *testing.T) {
var clouddataflow Clouddataflow

createstream := make(map[string]interface{})

userAgent :=make(map[string]interface{})

support := make(map[string]interface{})

support["Status"]= "STALE"
support[URL] = "https://cloud.google.com/dataflow/support"


userAgent["Name"] = "Google Cloud Dataflow SDK for Java"

userAgent["BuildDate"] = "2017-09-01 05:54"

userAgent["Version"] = "2.1.0"

userAgent["Support"] = support

version := make(map[string]interface{})

version["Major"] = "6"

version["JobType"] = "JAVA_BATCH_AUTOSCALING"

environment := make(map[string]interface{})

environment["UserAgent"] = userAgent
environment["Version"] = version

stageStates := []map[string]interface{}{
{
"ExecutionStageName": "F19",
"ExecutionStageState": "JOB_STATE_DONE",
"CurrentStateTime": "2018-07-27T15:39:00.225Z",
},
{
"ExecutionStageName": "s10",
"ExecutionStageState": "JOB_STATE_DONE",
"CurrentStateTime": "2018-07-27T15:39:00.430Z",
},
{
"ExecutionStageName": "s44-close-shuffle15",
"ExecutionStageState": "JOB_STATE_DONE",
"CurrentStateTime": "2018-07-27T15:38:51.161Z",
},
{
"ExecutionStageName": "F20",
"ExecutionStageState": "JOB_STATE_DONE",
"CurrentStateTime": "2018-07-27T15:38:51.072Z",
},
{
"ExecutionStageName": "F18",
"ExecutionStageState": "JOB_STATE_DONE",
"CurrentStateTime": "2018-07-27T15:39:05.296Z",
},
{
"ExecutionStageName": "s44-open-shuffle13",
"ExecutionStageState": "JOB_STATE_DONE",
"CurrentStateTime": "2018-07-27T15:37:56.465Z",
},
}

createstream := map[string]interface{}{

"ProjectID": "gocloud-206919",
"View": "JOB_VIEW_ALL",
"Location" : "us-central1",
"CurrentState" : "JOB_STATE_DONE",
"ClientRequestID" :"20180727153742037_7574",
"ID" : "2018-07-27_08_37_46-11774589915372519551",
"Name" : "dataflow-intro",
"Type" : "JOB_TYPE_BATCH",
"StageStates" : stageStates,
"Environment" : environment,
}

resp, err := clouddataflow.CreateStream(createstream)

if err != nil {
t.Errorf("Test Fail")
}
response := resp.(map[string]interface{})

fmt.Println(response["body"])
}

0 comments on commit d388644

Please sign in to comment.