diff --git a/pkg/api/handlers/compat/events.go b/pkg/api/handlers/compat/events.go index 5acc94153fca..c5be4bc19f3b 100644 --- a/pkg/api/handlers/compat/events.go +++ b/pkg/api/handlers/compat/events.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net/http" + "sync" "github.com/containers/libpod/v2/libpod" "github.com/containers/libpod/v2/libpod/events" @@ -17,10 +18,10 @@ import ( func GetEvents(w http.ResponseWriter, r *http.Request) { var ( - fromStart bool - eventsError error - decoder = r.Context().Value("decoder").(*schema.Decoder) - runtime = r.Context().Value("runtime").(*libpod.Runtime) + fromStart bool + decoder = r.Context().Value("decoder").(*schema.Decoder) + runtime = r.Context().Value("runtime").(*libpod.Runtime) + json = jsoniter.ConfigCompatibleWithStandardLibrary // FIXME: this should happen on the package level ) query := struct { @@ -33,11 +34,16 @@ func GetEvents(w http.ResponseWriter, r *http.Request) { } if err := decoder.Decode(&query, r.URL.Query()); err != nil { utils.Error(w, "Failed to parse parameters", http.StatusBadRequest, errors.Wrapf(err, "Failed to parse parameters for %s", r.URL.String())) + return } var libpodFilters = []string{} if _, found := r.URL.Query()["filters"]; found { for k, v := range query.Filters { + if len(v) == 0 { + utils.Error(w, "Failed to parse parameters", http.StatusBadRequest, errors.Errorf("empty value for filter %q", k)) + return + } libpodFilters = append(libpodFilters, fmt.Sprintf("%s=%s", k, v[0])) } } @@ -48,16 +54,13 @@ func GetEvents(w http.ResponseWriter, r *http.Request) { eventCtx, eventCancel := context.WithCancel(r.Context()) eventChannel := make(chan *events.Event) + errorChannel := make(chan error) + + // Start reading events. go func() { readOpts := events.ReadOptions{FromStart: fromStart, Stream: query.Stream, Filters: libpodFilters, EventChannel: eventChannel, Since: query.Since, Until: query.Until} - eventsError = runtime.Events(eventCtx, readOpts) + errorChannel <- runtime.Events(eventCtx, readOpts) }() - if eventsError != nil { - utils.InternalServerError(w, eventsError) - eventCancel() - close(eventChannel) - return - } // If client disappears we need to stop listening for events go func(done <-chan struct{}) { @@ -68,24 +71,39 @@ func GetEvents(w http.ResponseWriter, r *http.Request) { } }(r.Context().Done()) - // Headers need to be written out before turning Writer() over to json encoder - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - if flusher, ok := w.(http.Flusher); ok { - flusher.Flush() - } + var coder *jsoniter.Encoder + var writeHeader sync.Once - json := jsoniter.ConfigCompatibleWithStandardLibrary - coder := json.NewEncoder(w) - coder.SetEscapeHTML(true) + for { + select { + case err := <-errorChannel: + if err != nil { + utils.InternalServerError(w, err) + eventCancel() + close(eventChannel) + return + } + case evt := <-eventChannel: + writeHeader.Do(func() { + // Use a sync.Once so that we write the header + // only once. + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + if flusher, ok := w.(http.Flusher); ok { + flusher.Flush() + } + coder = json.NewEncoder(w) + coder.SetEscapeHTML(true) + }) - for event := range eventChannel { - e := entities.ConvertToEntitiesEvent(*event) - if err := coder.Encode(e); err != nil { - logrus.Errorf("unable to write json: %q", err) - } - if flusher, ok := w.(http.Flusher); ok { - flusher.Flush() + e := entities.ConvertToEntitiesEvent(*evt) + if err := coder.Encode(e); err != nil { + logrus.Errorf("unable to write json: %q", err) + } + if flusher, ok := w.(http.Flusher); ok { + flusher.Flush() + } } + } }