Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add diagnostic session service #1578

Merged
merged 1 commit into from
Oct 10, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
Topic-Handler file format was modified to include the TopicID and HandlerID in the file.
Load service was added; the service can load tasks/handlers from a directory.
- [#1606](https://github.com/influxdata/kapacitor/pull/1606): Update Go version to 1.9.1
- [#1578](https://github.com/influxdata/kapacitor/pull/1578): Add support for exposing logs via the API.

### Bugfixes

Expand Down
47 changes: 47 additions & 0 deletions client/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package client

import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -32,6 +33,7 @@ const (
basePreviewPath = "/kapacitor/v1preview"
pingPath = basePath + "/ping"
logLevelPath = basePath + "/loglevel"
logsPath = basePath + "/logs"
debugVarsPath = basePath + "/debug/vars"
tasksPath = basePath + "/tasks"
templatesPath = basePath + "/templates"
Expand Down Expand Up @@ -693,6 +695,51 @@ func (c *Client) Do(req *http.Request, result interface{}, codes ...int) (*http.
return resp, nil
}

func (c *Client) Logs(ctx context.Context, w io.Writer, q map[string]string) error {
u := c.BaseURL()
u.Path = logsPath

qp := u.Query()
for k, v := range q {
qp.Add(k, v)
}
u.RawQuery = qp.Encode()

req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return err
}
req = req.WithContext(ctx)
err = c.prepRequest(req)
if err != nil {
return err
}
resp, err := c.httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()

if resp.StatusCode/100 != 2 {
return fmt.Errorf("bad status code %v", resp.StatusCode)
}

errCh := make(chan error, 1)
defer close(errCh)
go func() {
_, err := io.Copy(w, resp.Body)
errCh <- err
}()

select {
case <-ctx.Done():
return nil
case err := <-errCh:
return err
}

}

// Ping the server for a response.
// Ping returns how long the request took, the version of the server it connected to, and an error if one occurred.
func (c *Client) Ping() (time.Duration, string, error) {
Expand Down
94 changes: 94 additions & 0 deletions cmd/kapacitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
Expand All @@ -10,10 +11,13 @@ import (
"log"
"net/http"
"os"
"os/signal"
"path"
"sort"
"strconv"
"strings"
"sync"
"syscall"
"time"

humanize "github.com/dustin/go-humanize"
Expand Down Expand Up @@ -52,6 +56,8 @@ Commands:
define-topic-handler Create/update an alert handler for a topic.
replay Replay a recording to a task.
replay-live Replay data against a task without recording it.
watch Watch logs for a task.
logs Follow arbitrary Kapacitor logs.
enable Enable and start running a task with live data.
disable Stop running a task.
reload Reload a running task with an updated task definition.
Expand Down Expand Up @@ -153,6 +159,12 @@ func main() {
}
commandArgs = args
commandF = doReplayLive
case "watch":
commandArgs = args
commandF = doWatch
case "logs":
commandArgs = args
commandF = doLogs
case "enable":
commandArgs = args
commandF = doEnable
Expand Down Expand Up @@ -284,6 +296,10 @@ func doHelp(args []string) error {
showTopicUsage()
case "backup":
backupUsage()
case "watch":
watchUsage()
case "logs":
logsUsage()
case "level":
levelUsage()
case "help":
Expand Down Expand Up @@ -2300,3 +2316,81 @@ func doBackup(args []string) error {
}
return nil
}

func watchUsage() {
var u = `Usage: kapacitor watch <task id> [<tags> ...]

Watch logs associated with a task.

Examples:

$ kapacitor watch mytask
$ kapacitor watch mytask node=log5
`
fmt.Fprintln(os.Stderr, u)
}

func doWatch(args []string) error {
m := map[string]string{}
if len(args) < 1 {
return errors.New("must provide task ID.")
}
m["task"] = args[0]
for _, s := range args[1:] {
pair := strings.Split(s, "=")
if len(pair) != 2 {
return fmt.Errorf("bad keyvalue pair: '%v'", s)
}
m[pair[0]] = pair[1]
}

return tailLogs(m)
}

func logsUsage() {
var u = `Usage: kapacitor logs [<tags> ...]

Watch arbitrary kapacitor logs.

$ kapacitor logs service=http lvl=error
$ kapacitor logs service=http lvl=info+
`
fmt.Fprintln(os.Stderr, u)
}

func doLogs(args []string) error {
m := map[string]string{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we create a helper function that both doLogs and doWatch use so that this logic is not duplicated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

for _, s := range args {
pair := strings.Split(s, "=")
if len(pair) != 2 {
return fmt.Errorf("bad keyvalue pair: '%v'", s)
}
m[pair[0]] = pair[1]
}

return tailLogs(m)
}

func tailLogs(m map[string]string) error {
ctx, cancel := context.WithCancel(context.Background())
done := false
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, os.Interrupt, syscall.SIGTERM)
var mu sync.Mutex
go func() {
<-sigs
cancel()
mu.Lock()
defer mu.Unlock()
done = true
}()

err := cli.Logs(ctx, os.Stdout, m)
mu.Lock()
defer mu.Unlock()
if err != nil && !done {
return errors.Wrap(err, "failed to retrieve logs")
}

return nil
}
9 changes: 9 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ type Server struct {
AlertService *alert.Service
TaskStore *task_store.Service
ReplayService *replay.Service
SessionService *diagnostic.SessionService
InfluxDBService *influxdb.Service
ConfigOverrideService *config.Service
TesterService *servicetest.Service
Expand Down Expand Up @@ -246,6 +247,7 @@ func New(c *Config, buildInfo BuildInfo, diagService *diagnostic.Service) (*Serv
// Append these after InfluxDB because they depend on it
s.appendTaskStoreService()
s.appendReplayService()
s.appendSessionService()

// Append third-party integrations
// Append extra input services
Expand Down Expand Up @@ -449,6 +451,13 @@ func (s *Server) appendTaskStoreService() {
s.AppendService("task_store", srv)
}

func (s *Server) appendSessionService() {
srv := s.DiagService.SessionService
srv.HTTPDService = s.HTTPDService

s.AppendService("session", srv)
}

func (s *Server) appendReplayService() {
d := s.DiagService.NewReplayHandler()
srv := replay.NewService(s.config.Replay, d)
Expand Down
54 changes: 54 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10669,3 +10669,57 @@ options:
}

}

func TestLogSessions_HeaderJSON(t *testing.T) {
s, cli := OpenDefaultServer()
defer s.Close()

u := cli.BaseURL()
u.Path = "/logs"
req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
t.Fatal(err)
return
}

req.Header.Add("Content-Type", "application/json")

resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatal(err)
return
}
defer resp.Body.Close()

if exp, got := "application/json; charset=utf-8", resp.Header.Get("Content-Type"); exp != got {
t.Fatalf("expected: %v, got: %v\n", exp, got)
return
}

}

func TestLogSessions_HeaderGzip(t *testing.T) {
s, cli := OpenDefaultServer()
defer s.Close()

u := cli.BaseURL()
u.Path = "/logs"
req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
t.Fatal(err)
return
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatal(err)
return
}
defer resp.Body.Close()

if exp, got := "", resp.Header.Get("Content-Encoding"); exp != got {
t.Fatalf("expected: %v, got: %v\n", exp, got)
return
}

}
Loading