diff --git a/mgmt/rest/client/task.go b/mgmt/rest/client/task.go index fbdc3318d..d96c0c99c 100644 --- a/mgmt/rest/client/task.go +++ b/mgmt/rest/client/task.go @@ -144,6 +144,14 @@ func (c *Client) WatchTask(id string) *WatchTasksResult { return default: line, _ := reader.ReadBytes('\n') + sline := string(line) + if sline == "" || sline == "\n" { + continue + } + if strings.HasPrefix(sline, "data:") { + sline = strings.TrimPrefix(sline, "data:") + line = []byte(sline) + } ste := &rbody.StreamedTaskEvent{} err := json.Unmarshal(line, ste) if err != nil { diff --git a/mgmt/rest/task.go b/mgmt/rest/task.go index bb25dce56..191a5cffc 100644 --- a/mgmt/rest/task.go +++ b/mgmt/rest/task.go @@ -185,7 +185,7 @@ func (s *Server) watchTask(w http.ResponseWriter, r *http.Request, p httprouter. EventType: rbody.TaskWatchStreamOpen, Message: "Stream opened", } - fmt.Fprintf(w, "%s\n", so.ToJSON()) + fmt.Fprintf(w, "data: %s\n\n", so.ToJSON()) flusher.Flush() // Get a channel for if the client notifies us it is closing the connection @@ -203,10 +203,10 @@ func (s *Server) watchTask(w http.ResponseWriter, r *http.Request, p httprouter. case rbody.TaskWatchMetricEvent, rbody.TaskWatchTaskStarted: // The client can decide to stop receiving on the stream on Task Stopped. // We write the event to the buffer - fmt.Fprintf(w, "%s\n", e.ToJSON()) + fmt.Fprintf(w, "data: %s\n\n", e.ToJSON()) case rbody.TaskWatchTaskDisabled, rbody.TaskWatchTaskStopped: // A disabled task should end the streaming and close the connection - fmt.Fprintf(w, "%s\n", e.ToJSON()) + fmt.Fprintf(w, "data: %s\n\n", e.ToJSON()) // Flush since we are sending nothing new flusher.Flush() // Close out watcher removing it from the scheduler