Skip to content

Commit

Permalink
User Defined Functions (UDFs)
Browse files Browse the repository at this point in the history
first initial tests passing

working moving average udf examples

working snapshot/restore
  • Loading branch information
nathanielc committed Jan 14, 2016
1 parent 00c6e95 commit bb816f6
Show file tree
Hide file tree
Showing 50 changed files with 10,552 additions and 305 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ kapacitor*.rpm
kapacitor*.deb
kapacitor*.tar
kapacitor*.zip
*.pyc
7 changes: 4 additions & 3 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"log"
"net/http"
"os"
"os/exec"
Expand Down Expand Up @@ -75,9 +76,9 @@ type AlertNode struct {
}

// Create a new AlertNode which caches the most recent item and exposes it over the HTTP API.
func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode) (an *AlertNode, err error) {
func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *AlertNode, err error) {
an = &AlertNode{
node: node{Node: n, et: et},
node: node{Node: n, et: et, logger: l},
a: n,
}
an.node.runF = an.runAlert
Expand Down Expand Up @@ -207,7 +208,7 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode) (an *AlertNode, err
return
}

func (a *AlertNode) runAlert() error {
func (a *AlertNode) runAlert([]byte) error {
switch a.Wants() {
case pipeline.StreamEdge:
for p, ok := a.ins[0].NextPoint(); ok; p, ok = a.ins[0].NextPoint() {
Expand Down
13 changes: 7 additions & 6 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kapacitor
import (
"errors"
"fmt"
"log"
"sync"
"time"

Expand All @@ -19,9 +20,9 @@ type SourceBatchNode struct {
idx int
}

func newSourceBatchNode(et *ExecutingTask, n *pipeline.SourceBatchNode) (*SourceBatchNode, error) {
func newSourceBatchNode(et *ExecutingTask, n *pipeline.SourceBatchNode, l *log.Logger) (*SourceBatchNode, error) {
sn := &SourceBatchNode{
node: node{Node: n, et: et},
node: node{Node: n, et: et, logger: l},
s: n,
}
return sn, nil
Expand All @@ -47,7 +48,7 @@ func (s *SourceBatchNode) addParentEdge(in *Edge) {
s.idx++
}

func (s *SourceBatchNode) start() {
func (s *SourceBatchNode) start([]byte) {
}

func (s *SourceBatchNode) Err() error {
Expand Down Expand Up @@ -96,9 +97,9 @@ type BatchNode struct {
closing chan struct{}
}

func newBatchNode(et *ExecutingTask, n *pipeline.BatchNode) (*BatchNode, error) {
func newBatchNode(et *ExecutingTask, n *pipeline.BatchNode, l *log.Logger) (*BatchNode, error) {
bn := &BatchNode{
node: node{Node: n, et: et},
node: node{Node: n, et: et, logger: l},
b: n,
closing: make(chan struct{}),
}
Expand Down Expand Up @@ -264,7 +265,7 @@ func (b *BatchNode) doQuery() error {
}
}

func (b *BatchNode) runBatch() error {
func (b *BatchNode) runBatch([]byte) error {
errC := make(chan error, 1)
go func() {
defer func() {
Expand Down
4 changes: 3 additions & 1 deletion cmd/kapacitord/run/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ func (cmd *Command) monitorServerErrors() {
for {
select {
case err := <-cmd.Server.Err():
cmd.Logger.Println("E! " + err.Error())
if err != nil {
cmd.Logger.Println("E! " + err.Error())
}
case <-cmd.closing:
return
}
Expand Down
3 changes: 3 additions & 0 deletions cmd/kapacitord/run/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/influxdata/kapacitor/services/smtp"
"github.com/influxdata/kapacitor/services/stats"
"github.com/influxdata/kapacitor/services/task_store"
"github.com/influxdata/kapacitor/services/udf"
"github.com/influxdata/kapacitor/services/udp"
"github.com/influxdata/kapacitor/services/victorops"

Expand Down Expand Up @@ -52,6 +53,7 @@ type Config struct {
Alerta alerta.Config `toml:"alerta"`
Reporting reporting.Config `toml:"reporting"`
Stats stats.Config `toml:"stats"`
UDF udf.Config `toml:"udf"`

Hostname string `toml:"hostname"`
DataDir string `toml:"data_dir"`
Expand Down Expand Up @@ -79,6 +81,7 @@ func NewConfig() *Config {
c.Alerta = alerta.NewConfig()
c.Reporting = reporting.NewConfig()
c.Stats = stats.NewConfig()
c.UDF = udf.NewConfig()

return c
}
Expand Down
11 changes: 11 additions & 0 deletions cmd/kapacitord/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/influxdata/kapacitor/services/smtp"
"github.com/influxdata/kapacitor/services/stats"
"github.com/influxdata/kapacitor/services/task_store"
"github.com/influxdata/kapacitor/services/udf"
"github.com/influxdata/kapacitor/services/udp"
"github.com/influxdata/kapacitor/services/victorops"
"github.com/influxdata/kapacitor/wlog"
Expand Down Expand Up @@ -124,6 +125,7 @@ func NewServer(c *Config, buildInfo *BuildInfo, logService logging.Interface) (*
}

// Append Kapacitor services.
s.appendUDFService(c.UDF)
s.appendSMTPService(c.SMTP)
s.appendHTTPDService(c.HTTP)
s.appendInfluxDBService(c.InfluxDB, c.Hostname)
Expand Down Expand Up @@ -201,6 +203,7 @@ func (s *Server) appendTaskStoreService(c task_store.Config) {
srv.TaskMaster = s.TaskMaster

s.TaskStore = srv
s.TaskMaster.TaskStore = srv
s.Services = append(s.Services, srv)
}

Expand All @@ -216,6 +219,14 @@ func (s *Server) appendReplayStoreService(c replay.Config) {
s.Services = append(s.Services, srv)
}

func (s *Server) appendUDFService(c udf.Config) {
l := s.LogService.NewLogger("[udf] ", log.LstdFlags)
srv := udf.NewService(c, l)

s.TaskMaster.UDFService = srv
s.Services = append(s.Services, srv)
}

func (s *Server) appendOpsGenieService(c opsgenie.Config) {
if c.Enabled {
l := s.LogService.NewLogger("[opsgenie] ", log.LstdFlags)
Expand Down
30 changes: 30 additions & 0 deletions command/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package command

import (
"io"
"os/exec"
)

type Command interface {
Start() error
Wait() error

StdinPipe() (io.WriteCloser, error)
StdoutPipe() (io.ReadCloser, error)
StderrPipe() (io.ReadCloser, error)
}

type Commander interface {
NewCommand() Command
}

// Necessary information to create a new command
type CommandInfo struct {
Prog string
Args []string
}

// Create a new Command using golang exec package and the information.
func (ci CommandInfo) NewCommand() Command {
return exec.Command(ci.Prog, ci.Args...)
}
127 changes: 127 additions & 0 deletions command/test/command_helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package test

import (
"bufio"
"io"

"github.com/influxdata/kapacitor/command"
"github.com/influxdata/kapacitor/udf"
)

type CommandHelper struct {
inr *io.PipeReader
inw *io.PipeWriter

outr *io.PipeReader
outw *io.PipeWriter

errr *io.PipeReader
errw *io.PipeWriter

Requests chan *udf.Request
Responses chan *udf.Response
ErrC chan error
}

func NewCommandHelper() *CommandHelper {
cmd := &CommandHelper{
Requests: make(chan *udf.Request),
Responses: make(chan *udf.Response),
ErrC: make(chan error, 1),
}
return cmd
}

func (c *CommandHelper) NewCommand() command.Command {
return c
}

func (c *CommandHelper) readRequests() error {
defer c.inr.Close()
defer close(c.Requests)
buf := bufio.NewReader(c.inr)
var b []byte
for {
req := &udf.Request{}
err := udf.ReadMessage(&b, buf, req)
if err == io.EOF {
return nil
}
if err != nil {
return err
}
c.Requests <- req
}
}

func (c *CommandHelper) writeResponses() error {
defer c.outw.Close()
for response := range c.Responses {
udf.WriteMessage(response, c.outw)
}
return nil
}

func (c *CommandHelper) Start() error {
go func() {
readErrC := make(chan error, 1)
writeErrC := make(chan error, 1)
go func() {
readErrC <- c.readRequests()
}()
go func() {
writeErrC <- c.writeResponses()
}()
var readErr, writeErr error
for readErrC != nil || writeErrC != nil {
select {
case readErr = <-readErrC:
readErrC = nil
case writeErr = <-writeErrC:
writeErrC = nil
}
}

if readErr != nil {
c.ErrC <- readErr
} else {
c.ErrC <- writeErr
}
}()
return nil
}

func (c *CommandHelper) Wait() error {
return nil
}

// Wrapps the STDIN pipe and when it is closed
// closes the STDOUT and STDERR pipes of the command.
type cmdCloser struct {
*io.PipeWriter
cmd *CommandHelper
}

func (cc *cmdCloser) Close() error {
cc.cmd.errw.Close()
return cc.PipeWriter.Close()
}

func (c *CommandHelper) StdinPipe() (io.WriteCloser, error) {
c.inr, c.inw = io.Pipe()
closer := &cmdCloser{
PipeWriter: c.inw,
cmd: c,
}
return closer, nil
}

func (c *CommandHelper) StdoutPipe() (io.ReadCloser, error) {
c.outr, c.outw = io.Pipe()
return c.outr, nil
}

func (c *CommandHelper) StderrPipe() (io.ReadCloser, error) {
c.errr, c.errw = io.Pipe()
return c.errr, nil
}
7 changes: 4 additions & 3 deletions derivative.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kapacitor

import (
"log"
"time"

"github.com/influxdata/kapacitor/models"
Expand All @@ -13,17 +14,17 @@ type DerivativeNode struct {
}

// Create a new derivative node.
func newDerivativeNode(et *ExecutingTask, n *pipeline.DerivativeNode) (*DerivativeNode, error) {
func newDerivativeNode(et *ExecutingTask, n *pipeline.DerivativeNode, l *log.Logger) (*DerivativeNode, error) {
dn := &DerivativeNode{
node: node{Node: n, et: et},
node: node{Node: n, et: et, logger: l},
d: n,
}
// Create stateful expressions
dn.node.runF = dn.runDerivative
return dn, nil
}

func (d *DerivativeNode) runDerivative() error {
func (d *DerivativeNode) runDerivative([]byte) error {
switch d.Provides() {
case pipeline.StreamEdge:
previous := make(map[models.GroupID]models.Point)
Expand Down
Loading

0 comments on commit bb816f6

Please sign in to comment.