Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
PratikDhanave committed Jul 29, 2018
1 parent d388644 commit 586326c
Show file tree
Hide file tree
Showing 2 changed files with 335 additions and 24 deletions.
263 changes: 241 additions & 22 deletions streamdataprocessing/clouddataflow/clouddataflow.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package clouddataflow

import (
//"bytes"
//"encoding/json"
"bytes"
"encoding/json"
googleauth "github.com/cloudlibz/gocloud/googleauth"
"io/ioutil"
"net/http"
"time"
"fmt"
)


Expand Down Expand Up @@ -100,12 +101,227 @@ func (clouddataflow *Clouddataflow) ListStream(request interface{}) (resp interf
return resp, err
}

//DeleteStream Delete Stream
func (clouddataflow *Clouddataflow) DeleteStream(request interface{}) (resp interface{}, err error) {

return resp, err
}


//DescribeStream Describe Stream
func (clouddataflow *Clouddataflow) UpdateStream(request interface{}) (resp interface{}, err error) {

param := request.(map[string]interface{})
var jobId string
var option Createstream

for key, value := range param {
switch key {

case "JobId":
jobIdv, _ := value.(string)
jobId = jobIdv

case "ProjectID":
projectIdv, _ := value.(string)
option.ProjectID = projectIdv

case "ClientRequestID":
clientRequestIDv, _ := value.(string)
option.ClientRequestID = clientRequestIDv

case "ID":
idv, _ := value.(string)
option.ID = idv

case "Name":
nameV, _ := value.(string)
option.Name = nameV

case "Type":
typev, _ := value.(string)
option.Type = typev

case "CurrentState":
currentStatev, _ := value.(string)
option.CurrentState = currentStatev

case "CurrentStateTime":
currentStateTimev, _ := value.(string)
option.CurrentStateTime = currentStateTimev
option.CurrentStateTime = time.Now().UTC().Format(time.RFC3339)

case "RequestedState":
requestedStatev, _ := value.(string)
option.RequestedState = requestedStatev

case "CreateTime":
createTimev, _ := value.(string)
option.CreateTime = createTimev
option.CreateTime = time.Now().UTC().Format(time.RFC3339)

fmt.Println(option.CreateTime,option.CreateTime)

case "ReplaceJobId":
replaceJobIdv, _ := value.(string)
option.ReplaceJobID = replaceJobIdv

case "ReplacedByJobID":
replacedByJobIDv, _ := value.(string)
option.ReplacedByJobID = replacedByJobIDv

case "Location":
locationv, _ := value.(string)
option.Location = locationv

case "TempFiles":
tempFilesv, _ := value.([]string)
option.TempFiles = tempFilesv

case "StageStates":

stageStatesparam, _ := value.([]map[string]interface{})

for i := 0; i < len(stageStatesparam); i++ {
var stageState StageStates
for stageStatesparamkey, stageStatesparamvalue := range stageStatesparam[i] {
switch stageStatesparamkey {

case "CurrentStateTime":
currentStateTimev, _ := stageStatesparamvalue.(string)
stageState.CurrentStateTime = currentStateTimev

case "ExecutionStageName":
executionStageNamev, _ := stageStatesparamvalue.(string)
stageState.ExecutionStageName = executionStageNamev

case "ExecutionStageState":
executionStageStatev, _ := stageStatesparamvalue.(string)
stageState.ExecutionStageState = executionStageStatev

}

}
option.stageStates = append(option.stageStates, stageState)
}

case "Environment":

environmentparam, _ := value.(map[string]interface{})

for environmentparamkey, environmentparamvalue := range environmentparam {

switch environmentparamkey {

case "Version":

versionparam, _ := environmentparamvalue.(map[string]interface{})

for versionparamkey, versionparamvalue := range versionparam {
switch versionparamkey {

case "Major":
majorv, _ := versionparamvalue.(string)
option.environment.version.Major = majorv

case "JobType":
jobTypev, _ := versionparamvalue.(string)
option.environment.version.JobType = jobTypev
}
}

case "UserAgent":

useragentparam, _ := environmentparamvalue.(map[string]interface{})

for useragentparamkey, useragentparamvalue := range useragentparam {

switch useragentparamkey {

case "Name":
namev, _ := useragentparamvalue.(string)
option.environment.userAgent.Name = namev

case "BuildDate":
buildDatev, _ := useragentparamvalue.(string)
option.environment.userAgent.BuildDate = buildDatev

case "Version":
versionv, _ := useragentparamvalue.(string)
option.environment.userAgent.Version = versionv

case "Support":
supportparam, _ := useragentparamvalue.(map[string]interface{})

for supportparamkey, supportparamvalue := range supportparam {

switch supportparamkey {

case "Status":
statusv, _ := supportparamvalue.(string)
option.environment.userAgent.support.Status = statusv

case "URL":
urlv, _ := supportparamvalue.(string)
option.environment.userAgent.support.URL = urlv

}
}

}
}

}
}

//end of switch key
}
} //end of parse


createstreamjsonmap := make(map[string]interface{})

createstreamdictnoaryconvert(option, createstreamjsonmap)

updatestreamjson, _ := json.Marshal(createstreamjsonmap)

fmt.Println("Json : \n",string(updatestreamjson))

updatestreamjsonstring := string(updatestreamjson)

var updatestreamjsonstringbyte = []byte(updatestreamjsonstring)

url := "https://dataflow.googleapis.com/v1b3/projects/" + option.ProjectID + "/jobs/" + jobId

client := googleauth.SignJWT()

updatestreamrequest, err := http.NewRequest("POST", url, bytes.NewBuffer(updatestreamjsonstringbyte))


updatestreamrequestparam := updatestreamrequest.URL.Query()


if option.Location != "" {
updatestreamrequestparam.Add("location", option.Location)
}

updatestreamrequest.URL.RawQuery = updatestreamrequestparam.Encode()


updatestreamrequest.Header.Set("Content-Type", "application/json")

updatestreamresp, err := client.Do(updatestreamrequest)

defer updatestreamresp.Body.Close()

body, err := ioutil.ReadAll(updatestreamresp.Body)

updatestreamresponse := make(map[string]interface{})
updatestreamresponse["status"] = updatestreamresp.StatusCode
updatestreamresponse["body"] = string(body)
resp = updatestreamresponse
return resp, err

}

//CreateStream Create Stream
func (clouddataflow *Clouddataflow) CreateStream(request interface{}) (resp interface{}, err error) {

Expand Down Expand Up @@ -138,11 +354,11 @@ func (clouddataflow *Clouddataflow) CreateStream(request interface{}) (resp inte

case "Type":
typev, _ := value.(string)
option.Type = typeV
option.Type = typev

case "CurrentState":
currentStatev, _ := value.(string)
option.CreateTime = currentStatev
option.CurrentState = currentStatev

case "CurrentStateTime":
currentStateTimev, _ := value.(string)
Expand All @@ -158,9 +374,11 @@ func (clouddataflow *Clouddataflow) CreateStream(request interface{}) (resp inte
option.CreateTime = createTimev
option.CreateTime = time.Now().UTC().Format(time.RFC3339)

fmt.Println(option.CreateTime,option.CreateTime)

case "ReplaceJobId":
replaceJobIdv, _ := value.(string)
option.ReplaceJobId = replaceJobIdv
option.ReplaceJobID = replaceJobIdv

case "ReplacedByJobID":
replacedByJobIDv, _ := value.(string)
Expand All @@ -181,15 +399,15 @@ func (clouddataflow *Clouddataflow) CreateStream(request interface{}) (resp inte
for i := 0; i < len(stageStatesparam); i++ {
var stageState StageStates
for stageStatesparamkey, stageStatesparamvalue := range stageStatesparam[i] {
switch key {
switch stageStatesparamkey {

case "CurrentStateTime":
currentStateTimev, _ := stageStatesparamvalue.(string)
stageState.CurrentStateTime = currentStateTimev

case "ExecutionStageName":
executionStageNamev, _ := stageStatesparamvalue.(string)
stageState.CurrentStateTime = executionStageNamev
stageState.ExecutionStageName = executionStageNamev

case "ExecutionStageState":
executionStageStatev, _ := stageStatesparamvalue.(string)
Expand All @@ -203,15 +421,15 @@ func (clouddataflow *Clouddataflow) CreateStream(request interface{}) (resp inte

case "Environment":

environmentparam, _ := value.([]map[string]interface{})
environmentparam, _ := value.(map[string]interface{})

for environmentparamkey, environmentparamvalue := range environmentparam {

switch environmentparamkey {

case "Version":

versionparam, _ := environmentparamvalue.([]map[string]interface{})
versionparam, _ := environmentparamvalue.(map[string]interface{})

for versionparamkey, versionparamvalue := range versionparam {
switch versionparamkey {
Expand Down Expand Up @@ -251,7 +469,7 @@ func (clouddataflow *Clouddataflow) CreateStream(request interface{}) (resp inte

for supportparamkey, supportparamvalue := range supportparam {

switch useragentparamkey {
switch supportparamkey {

case "Status":
statusv, _ := supportparamvalue.(string)
Expand Down Expand Up @@ -303,8 +521,8 @@ func (clouddataflow *Clouddataflow) CreateStream(request interface{}) (resp inte
createstreamrequestparam.Add("location", option.Location)
}

if option.ReplaceJobId != "" {
createstreamrequestparam.Add("replaceJobId", option.ReplaceJobId)
if option.ReplaceJobID != "" {
createstreamrequestparam.Add("replaceJobId", option.ReplaceJobID)
}

createstreamrequest.URL.RawQuery = createstreamrequestparam.Encode()
Expand All @@ -319,20 +537,20 @@ func (clouddataflow *Clouddataflow) CreateStream(request interface{}) (resp inte
body, err := ioutil.ReadAll(createstreamresp.Body)

createstreamresponse := make(map[string]interface{})
createstreamresponse["status"] = createstreamrresp.StatusCode
createstreamresponse["status"] = createstreamresp.StatusCode
createstreamresponse["body"] = string(body)
resp = createstreamresponse
return resp, err
}

func createstreamdictnoaryconvert(option Createstream, createstreamjsonmap map[string]interface{}) {

if option.Id != "" {
createstreamjsonmap["id"] = option.Id
if option.ID != "" {
createstreamjsonmap["id"] = option.ID
}

if option.ProjectId != "" {
createstreamjsonmap["projectId"] = option.ProjectId
if option.ProjectID != "" {
createstreamjsonmap["projectId"] = option.ProjectID
}

if option.Name != "" {
Expand All @@ -359,8 +577,8 @@ func createstreamdictnoaryconvert(option Createstream, createstreamjsonmap map[s
createstreamjsonmap["createTime"] = option.CreateTime
}

if option.ReplaceJobId != "" {
createstreamjsonmap["replaceJobId"] = option.ReplaceJobId
if option.ReplaceJobID != "" {
createstreamjsonmap["replaceJobId"] = option.ReplaceJobID
}

if option.Location != "" {
Expand Down Expand Up @@ -416,13 +634,14 @@ func prepareEnvironment(option Createstream, createstreamjsonmap map[string]inte

environmentv["userAgent"] = userAgentv

createstreamjsonmap["environment"] = environmentv
}

func prepareStageStates(option Createstream, createstreamjsonmap map[string]interface{}) {

if len(option.stageStates) > 0 {

stageStatesarray := make([]map[string]interface{})
stageStatesarray := make([]map[string]interface{},0)

for i := 0; i < len(option.stageStates); i++ {

Expand Down
Loading

0 comments on commit 586326c

Please sign in to comment.