Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JSON schema validation on requests #69

Merged
merged 2 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ require (
github.com/apache/thrift v0.15.0 // indirect
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/onsi/gomega v1.32.0 // indirect
github.com/qri-io/jsonpointer v0.1.1 // indirect
github.com/qri-io/jsonschema v0.2.1 // indirect
golang.org/x/net v0.20.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.16.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,17 @@ github.com/petervolvowinz/viss-rl-interfaces v0.1.0 h1:R9iU0C+5nVgcjqvbzx4spjKQy
github.com/petervolvowinz/viss-rl-interfaces v0.1.0/go.mod h1:CrEuOgBagZ/frCWYx3Zr5TIN4pyEXz8joWMI9ntHAz0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/qri-io/jsonpointer v0.1.1 h1:prVZBZLL6TW5vsSB9fFHFAMBLI4b0ri5vribQlTJiBA=
github.com/qri-io/jsonpointer v0.1.1/go.mod h1:DnJPaYgiKu56EuDp8TU5wFLdZIcAnb/uH9v37ZaMV64=
github.com/qri-io/jsonschema v0.2.1 h1:NNFoKms+kut6ABPf6xiKNM5214jzxAhDBrPHCJ97Wg0=
github.com/qri-io/jsonschema v0.2.1/go.mod h1:g7DPkiOsK1xv6T/Ao5scXRkd+yTFygcANPBaaqW+VrI=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
Expand Down
11 changes: 11 additions & 0 deletions server/vissv2server/httpMgr/httpMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/covesa/vissr/utils"
)

var errorResponseMap = map[string]interface{}{}

// All HTTP app clients share same channel
var HttpClientChan = []chan string{
make(chan string),
Expand All @@ -24,6 +26,7 @@ func RemoveRoutingForwardResponse(response string, transportMgrChan chan string)

func HttpMgrInit(mgrId int, transportMgrChan chan string) {
utils.ReadTransportSecConfig()
utils.JsonSchemaInit()

go utils.HttpServer{}.InitClientServer(utils.MuxServer[0], HttpClientChan) // go routine needed due to listenAndServe call...
utils.Info.Println("HTTP manager data session initiated.")
Expand All @@ -33,6 +36,14 @@ func HttpMgrInit(mgrId int, transportMgrChan chan string) {
select {
case reqMessage := <-HttpClientChan[0]:
utils.Info.Printf("HTTP mgr hub: Request from client:%s\n", reqMessage)
validationError := utils.JsonSchemaValidate(reqMessage)
if len(validationError) > 0 {
var requestMap map[string]interface{}
utils.MapRequest(reqMessage, &requestMap)
utils.SetErrorResponse(requestMap, errorResponseMap, 0, validationError) //bad_request
HttpClientChan[0] <- utils.FinalizeMessage(errorResponseMap)
continue
}
utils.AddRoutingForwardRequest(reqMessage, mgrId, 0, transportMgrChan)
case respMessage := <-transportMgrChan:
utils.Info.Printf("HTTP mgr hub: Response from server core:%s\n", respMessage)
Expand Down
32 changes: 26 additions & 6 deletions server/vissv2server/mqttMgr/mqttMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (

var mqttChannel chan string

var errorResponseMap = map[string]interface{}{}

type NodeValue struct {
topicId int
topic string
Expand All @@ -47,7 +49,7 @@ func vissV2Receiver(transportMgrChan chan string, vissv2Channel chan string) {
break
}*/
response := <-transportMgrChan
utils.Info.Printf("MQTT mgr: Response from server core:%s\n", string(response))
utils.Info.Printf("MQTT mgr: Response from server core:%s", string(response))
vissv2Channel <- string(response) // send message to hub
}
}
Expand Down Expand Up @@ -197,12 +199,17 @@ func extractVin(response string) string {
func decomposeMqttPayload(mqttPayload string) (string, string) { // {"topic":"X", "request":"{...}"}
var payloadMap = make(map[string]interface{})
utils.MapRequest(mqttPayload, &payloadMap)
topic, err := json.Marshal(payloadMap["topic"])
if err != nil {
utils.Error.Printf("decomposeMqttPayload: cannot marshal topic in: %s", mqttPayload)
return "", ""
}
payload, err := json.Marshal(payloadMap["request"])
if err != nil {
utils.Error.Printf("decomposeMqttPayload: cannot marshal request in response=%s", mqttPayload)
os.Exit(1)
utils.Error.Printf("decomposeMqttPayload: cannot marshal request in:%s", mqttPayload)
return string(topic), "corrupt request"
}
return payloadMap["topic"].(string), string(payload)
return string(topic), string(payload)
}

func AddRoutingInfoAndForward(reqMessage string, mgrId int, clientId int, transportMgrChan chan string) {
Expand All @@ -220,6 +227,8 @@ func MqttMgrInit(mgrId int, transportMgrChan chan string) {
topicId := 0
topicList.nodes = 0

utils.JsonSchemaInit()

go vissV2Receiver(transportMgrChan, vissv2Channel) //message reception from server core

utils.Info.Println("**** MQTT manager hub entering server loop... ****")
Expand All @@ -229,13 +238,24 @@ func MqttMgrInit(mgrId int, transportMgrChan chan string) {

case mqttPayload := <-mqttChannel:
topic, payload := decomposeMqttPayload(mqttPayload)
utils.Info.Printf("MQTT mgr hub: Message from broker:Topic=%s, Payload=%s\n", topic, payload)
if len(topic) == 0 {
utils.Error.Printf("MQTT: Message from broker is corrupt:%s\nNot possible to respond to client", mqttPayload)
continue
}
utils.Info.Printf("MQTT mgr hub: Message from broker:Topic=%s, Payload=%s", topic, payload)
validationError := utils.JsonSchemaValidate(payload)
if len(validationError) > 0 {
var requestMap map[string]interface{}
utils.MapRequest(payload, &requestMap)
utils.SetErrorResponse(requestMap, errorResponseMap, 0, validationError) //bad_request
publishMessage(brokerSocket, topic, utils.FinalizeMessage(errorResponseMap))
}
pushTopic(topic, topicId)
AddRoutingInfoAndForward(payload, mgrId, topicId, transportMgrChan)
topicId++

case vissv2Message := <-vissv2Channel:
utils.Info.Printf("MQTT hub: Message from VISSv2 server:%s\n", vissv2Message)
utils.Info.Printf("MQTT hub: Message from VISSv2 server:%s", vissv2Message)
// link routerId to topic, remove routerId from message, create mqtt message, send message to mqtt transport
payload, topicHandle := utils.RemoveInternalData(string(vissv2Message))
publishMessage(brokerSocket, getTopic(topicHandle), payload)
Expand Down
9 changes: 1 addition & 8 deletions server/vissv2server/serviceMgr/serviceMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,7 @@ var feederConnected bool
//var feederConn net.Conn
//var hostIp string

var errorResponseMap = map[string]interface{}{
"RouterId": "0?0",
"action": "unknown",
"requestId": "XX",
"error": `{"number":AA, "reason": "BB", "message": "CC"}`,
"ts": "yy",
}
var errorResponseMap = map[string]interface{}{}

var dbHandle *sql.DB
var dbErr error
Expand Down Expand Up @@ -1245,7 +1239,6 @@ func feederReader(udsConn net.Conn, fromFeeder chan string) {
}
}

//func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType string, histSupport bool, dbFile string) {
func ServiceMgrInit(mgrId int, serviceMgrChan chan map[string]interface{}, stateStorageType string, histSupport bool, dbFile string) {
stateDbType = stateStorageType
historySupport = histSupport
Expand Down
147 changes: 2 additions & 145 deletions server/vissv2server/vissv2server.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,11 @@ func transportDataSession(transportMgrChannel chan string, transportDataChannel

case msg := <-transportMgrChannel:
utils.Info.Printf("request: %s", msg)
var msgMap map[string]interface{}
utils.MapRequest(msg, &msgMap)
var msgMap map[string]interface{}
utils.MapRequest(msg, &msgMap)
transportDataChannel <- msgMap // send request to server hub
// transportDataChannel <- msg // send request to server hub
case message := <-backendChannel:
// utils.Info.Printf("Transport mgr server: message= %s", message)
transportMgrChannel <- utils.FinalizeMessage(message)
// transportMgrChannel <- message
}
}
}
Expand Down Expand Up @@ -406,144 +403,10 @@ func getTokenContext(reqMap map[string]interface{}) string {
return ""
}

func validRequest(request map[string]interface{}) bool {
switch request["action"].(string) {
case "get":
return isValidGetParams(request)
case "set":
return isValidSetParams(request)
case "subscribe":
return isValidSubscribeParams(request)
case "unsubscribe":
return isValidUnsubscribeParams(request)
case "internal-killsubscriptions":
return true
case "internal-cancelsubscription":
return true
}
return false
}

func isValidGetParams(request map[string]interface{}) bool {
if request["path"] == nil {
return false
}
if request["filter"] != nil {
return isValidGetFilter(request)
}
return true
}

func isValidGetFilter(request map[string]interface{}) bool { // paths, history, metadata supported
return true // needs to be fixed
// if strings.Contains(request, "paths") == true {
if request["paths"] != nil {
// if strings.Contains(request, "parameter") == true {
if request["parameter"] != nil {
return true
}
}
// if strings.Contains(request, "history") == true {
if request["history"] != nil {
// if strings.Contains(request, "parameter") == true {
if request["parameter"] != nil {
return true
}
}
// if strings.Contains(request, "metadata") == true {
if request["metadata"] != nil {
// if strings.Contains(request, "parameter") == true {
if request["parameter"] != nil {
return true
}
}
return false
}

func isValidSetParams(request map[string]interface{}) bool {
return request["path"] != nil && request["value"] != nil
}

func isValidSubscribeParams(request map[string]interface{}) bool {
if request["path"] == nil {
return false
}
if request["filter"] != nil {
return true
// return isValidSubscribeFilter(request)
}
return true
}

func isValidSubscribeFilter(request map[string]interface{}) bool { // paths, timebased, range, change, curvelog supported
return true // needs to be fixed
// if strings.Contains(request, "paths") == true {
if request["paths"] != nil {
// if strings.Contains(request, "parameter") == true {
if request["parameter"] != nil {
return true
}
}
// if strings.Contains(request, "timebased") == true {
if request["timebased"] != nil {
// if strings.Contains(request, "parameter") == true && strings.Contains(request, "period") == true {
if request["parameter"] != nil && request["period"] != nil {
return true
}
}
// if strings.Contains(request, "range") == true {
if request["range"] != nil {
// if strings.Contains(request, "parameter") == true && strings.Contains(request, "logic-op") == true &&
// strings.Contains(request, "boundary") == true {
if request["parameter"] != nil && request["logic-op"] != nil && request["boundary"] != nil {
return true
}
}
// if strings.Contains(request, "change") == true {
if request["change"] != nil {
// if strings.Contains(request, "parameter") == true && strings.Contains(request, "logic-op") == true &&
// strings.Contains(request, "diff") == true {
if request["parameter"] != nil && request["logic-op"] != nil && request["diff"] != nil {
return true
}
}
// if strings.Contains(request, "curvelog") == true {
if request["curvelog"] != nil {
// if strings.Contains(request, "parameter") == true && strings.Contains(request, "maxerr") == true &&
// strings.Contains(request, "bufsize") == true {
if request["parameter"] != nil && request["maxerr"] != nil && request["bufsize"] != nil {
return true
}
}
return false
}

func isValidUnsubscribeParams(request map[string]interface{}) bool {
return request["subscriptionId"] != nil
}

func serveRequest(requestMap map[string]interface{}, tDChanIndex int, sDChanIndex int) {
if requestMap["action"] == nil || validRequest(requestMap) == false {
utils.Error.Printf("serveRequest():invalid action params=%s", requestMap["action"])
utils.SetErrorResponse(requestMap, errorResponseMap, 1, "") //invalid_data
backendChan[tDChanIndex] <- errorResponseMap
return
}
if requestMap["path"] != nil && strings.Contains(requestMap["path"].(string), "*") == true {
utils.Error.Printf("serveRequest():path contained wildcard=%s", requestMap["path"])
utils.SetErrorResponse(requestMap, errorResponseMap, 1, "") //invalid_data
backendChan[tDChanIndex] <- errorResponseMap
return
}
if requestMap["path"] != nil {
requestMap["path"] = utils.UrlToPath(requestMap["path"].(string)) // replace slash with dot
}
if requestMap["action"] == "set" && requestMap["filter"] != nil {
utils.Error.Printf("serveRequest():Set request combined with filtering.")
utils.SetErrorResponse(requestMap, errorResponseMap, 0, "") //bad_request
backendChan[tDChanIndex] <- errorResponseMap
return
}
if requestMap["action"] == "unsubscribe" {
serviceDataChan[sDChanIndex] <- requestMap
return
Expand All @@ -556,12 +419,6 @@ func issueServiceRequest(requestMap map[string]interface{}, tDChanIndex int, sDC
serviceDataChan[sDChanIndex] <- requestMap // internal message
return
}
if requestMap["path"] == nil {
utils.Error.Printf("Unmarshal filter path array failed.")
utils.SetErrorResponse(requestMap, errorResponseMap, 0, "") //bad_request
backendChan[tDChanIndex] <- errorResponseMap
return
}
rootPath := requestMap["path"].(string)
VSSTreeRoot := utils.SetRootNodePointer(rootPath)
if VSSTreeRoot == nil {
Expand Down
Loading
Loading