diff --git a/link.go b/link.go index 03ae9d56..0a321d6a 100644 --- a/link.go +++ b/link.go @@ -25,7 +25,7 @@ type ToxicLink struct { stubs []*toxics.ToxicStub proxy *Proxy toxics *ToxicCollection - input *stream.ChanWriter + input *stream.MultiChanWriter output *stream.ChanReader direction stream.Direction Logger *zerolog.Logger @@ -36,6 +36,7 @@ func NewToxicLink( collection *ToxicCollection, direction stream.Direction, logger zerolog.Logger, + additionalStreamChans []chan<- *stream.StreamChunk, ) *ToxicLink { link := &ToxicLink{ stubs: make( @@ -50,7 +51,7 @@ func NewToxicLink( } // Initialize the link with ToxicStubs last := make(chan *stream.StreamChunk) // The first toxic is always a noop - link.input = stream.NewChanWriter(last) + link.input = stream.NewMultiChanWriter(append([]chan<- *stream.StreamChunk{last}, additionalStreamChans...)...) for i := 0; i < len(link.stubs); i++ { var next chan *stream.StreamChunk if i+1 < len(link.stubs) { diff --git a/matchers/http_request_header.go b/matchers/http_request_header.go new file mode 100644 index 00000000..a0d25ce3 --- /dev/null +++ b/matchers/http_request_header.go @@ -0,0 +1,32 @@ +package matchers + +import ( + "bufio" + "bytes" + "net/http" + "regexp" +) + +type HttpRequestHeaderMatcher struct { + HeaderKey string `json:"headerKey"` + HeaderValueRegex string `json:"headerValueRegex"` +} + +func (m *HttpRequestHeaderMatcher) TryMatch(data []byte) (bool, error) { + bufioReader := bufio.NewReader(bytes.NewReader(data)) + + // Try to parse the data as a HTTP request. + req, err := http.ReadRequest(bufioReader) + if err != nil { + return false, err + } + + // Try to match the header using the regex. + headerValue := req.Header.Get(m.HeaderKey) + match, err := regexp.MatchString(m.HeaderValueRegex, headerValue) + if err != nil { + return false, err + } + + return match, nil +} diff --git a/matchers/matcher.go b/matchers/matcher.go new file mode 100644 index 00000000..ea879e61 --- /dev/null +++ b/matchers/matcher.go @@ -0,0 +1,42 @@ +package matchers + +import ( + "reflect" + "sync" +) + +// Matcher is the interface for all matcher types. +type Matcher interface { + TryMatch([]byte) (bool, error) +} + +var ( + MatcherRegistry map[string]Matcher + registryMutex sync.RWMutex +) + +func RegisterMatcher(typeName string, matcher Matcher) { + registryMutex.Lock() + defer registryMutex.Unlock() + + if MatcherRegistry == nil { + MatcherRegistry = make(map[string]Matcher) + } + MatcherRegistry[typeName] = matcher +} + +func New(matcherType string) Matcher { + registryMutex.RLock() + defer registryMutex.RUnlock() + + orig, ok := MatcherRegistry[matcherType] + if !ok { + return nil + } + matcher := reflect.New(reflect.TypeOf(orig).Elem()).Interface().(Matcher) + return matcher +} + +func init() { + RegisterMatcher("httpRequestHeaderMatcher", new(HttpRequestHeaderMatcher)) +} diff --git a/stream/io_chan.go b/stream/io_chan.go index d7bf56b3..14e63c57 100644 --- a/stream/io_chan.go +++ b/stream/io_chan.go @@ -12,6 +12,34 @@ type StreamChunk struct { Timestamp time.Time } +// Implements io.WriteCloser interface for a slice of channel []byte. +type MultiChanWriter struct { + outputs []chan<- *StreamChunk +} + +func NewMultiChanWriter(outputs ...chan<- *StreamChunk) *MultiChanWriter { + return &MultiChanWriter{outputs} +} + +// Write `buf` as a StreamChunk to all channels. The full buffer is always written, and error +// will always be nil. Calling `Write()` after closing the channel will panic. +func (m *MultiChanWriter) Write(buf []byte) (int, error) { + packet := &StreamChunk{make([]byte, len(buf)), time.Now()} + copy(packet.Data, buf) // Make a copy before sending it to the channel + for _, output := range m.outputs { + output <- packet + } + return len(buf), nil +} + +// Close all output channels. +func (m *MultiChanWriter) Close() error { + for _, output := range m.outputs { + close(output) + } + return nil +} + // Implements the io.WriteCloser interface for a chan []byte. type ChanWriter struct { output chan<- *StreamChunk diff --git a/toxic_collection.go b/toxic_collection.go index b236e9ee..3c63f871 100644 --- a/toxic_collection.go +++ b/toxic_collection.go @@ -21,25 +21,30 @@ import ( type ToxicCollection struct { sync.Mutex - noop *toxics.ToxicWrapper - proxy *Proxy - chain [][]*toxics.ToxicWrapper - links map[string]*ToxicLink + noop *toxics.ToxicWrapper + proxy *Proxy + chain [][]*toxics.ToxicWrapper + toxicConditions [][]*toxics.ToxicCondition + links map[string]*ToxicLink } func NewToxicCollection(proxy *Proxy) *ToxicCollection { collection := &ToxicCollection{ noop: &toxics.ToxicWrapper{ - Toxic: new(toxics.NoopToxic), - Type: "noop", + Toxic: new(toxics.NoopToxic), + Type: "noop", + Enabled: true, }, - proxy: proxy, - chain: make([][]*toxics.ToxicWrapper, stream.NumDirections), - links: make(map[string]*ToxicLink), + proxy: proxy, + chain: make([][]*toxics.ToxicWrapper, stream.NumDirections), + toxicConditions: make([][]*toxics.ToxicCondition, stream.NumDirections), + links: make(map[string]*ToxicLink), } for dir := range collection.chain { collection.chain[dir] = make([]*toxics.ToxicWrapper, 1, toxics.Count()+1) collection.chain[dir][0] = collection.noop + collection.toxicConditions[dir] = make([]*toxics.ToxicCondition, 1, toxics.Count()+1) + collection.toxicConditions[dir][0] = nil } return collection } @@ -107,10 +112,19 @@ func (c *ToxicCollection) AddToxicJson(data io.Reader) (*toxics.ToxicWrapper, er wrapper.Name = fmt.Sprintf("%s_%s", wrapper.Type, wrapper.Stream) } + // Initialize the toxic if toxics.New(wrapper) == nil { return nil, ErrInvalidToxicType } + // Set the wrapper to be enabled if no condition is specified. + if wrapper.Condition == nil { + wrapper.Enabled = true + } else { + wrapper.Condition.ToxicWrapper = wrapper + } + + // Check if toxic already exists found := c.findToxicByName(wrapper.Name) if found != nil { return nil, ErrToxicAlreadyExists @@ -200,9 +214,74 @@ func (c *ToxicCollection) StartLink( logger = zerolog.Nop() } - link := NewToxicLink(c.proxy, c, direction, logger) - link.Start(server, name, input, output) - c.links[name] = link + // If the direction is upstream, we need to run matchers and update + // toxics if matched. + if direction == stream.Upstream { + // Write input to the matcher writer so that we can match the input + // in parallel while piping it through the link. + streamChan := make(chan *stream.StreamChunk) + streamChanWriter := stream.NewChanWriter(streamChan) + forkedInput := io.TeeReader(input, streamChanWriter) + + // Fire of a goroutine to match all conditions separately. + go c.matchAllToxicConditions(streamChan, direction) + + link := NewToxicLink(c.proxy, c, direction, logger, []chan<- *stream.StreamChunk{streamChan}) + link.Start(server, name, forkedInput, output) + c.links[name] = link + } else { + link := NewToxicLink(c.proxy, c, direction, logger, nil) + link.Start(server, name, input, output) + c.links[name] = link + } +} + +// matchAllToxicConditions matches all conditions for a given direction, and updates +// the toxics if matched. +func (c *ToxicCollection) matchAllToxicConditions( + streamChan chan *stream.StreamChunk, + direction stream.Direction, +) { + c.Lock() + defer c.Unlock() + + var logger zerolog.Logger + if c.proxy.Logger != nil { + logger = *c.proxy.Logger + } else { + logger = zerolog.Nop() + } + + for { + streamChunk, ok := <-streamChan + if streamChunk == nil && !ok { + logger.Debug().Msg("Stream chunk is nil and not ok, exiting") + return + } + + // Loop through all conditions and try to match them. + // If matched, enable the toxic. + for _, condition := range c.toxicConditions[direction] { + if condition == nil { + continue + } + + matched, err := condition.TryMatch(streamChunk.Data) + if err != nil { + logger.Warn().Err(err).Msg("Error matching condition") + continue + } + + if matched { + // Get the toxic wrapper from the condition and enable it. + newToxicWrapper := condition.ToxicWrapper + newToxicWrapper.Enabled = true + + // TODO: Do I need to call this? Currently fails when uncommented, though. + // c.chainUpdateToxic(newToxicWrapper) + } + } + } } func (c *ToxicCollection) RemoveLink(name string) { @@ -228,6 +307,7 @@ func (c *ToxicCollection) chainAddToxic(toxic *toxics.ToxicWrapper) { dir := toxic.Direction toxic.Index = len(c.chain[dir]) c.chain[dir] = append(c.chain[dir], toxic) + c.toxicConditions[dir] = append(c.toxicConditions[dir], toxic.Condition) // Asynchronously add the toxic to each link wg := sync.WaitGroup{} @@ -245,6 +325,7 @@ func (c *ToxicCollection) chainAddToxic(toxic *toxics.ToxicWrapper) { func (c *ToxicCollection) chainUpdateToxic(toxic *toxics.ToxicWrapper) { c.chain[toxic.Direction][toxic.Index] = toxic + c.toxicConditions[toxic.Direction][toxic.Index] = toxic.Condition // Asynchronously update the toxic in each link group := sync.WaitGroup{} @@ -271,6 +352,7 @@ func (c *ToxicCollection) chainRemoveToxic(ctx context.Context, toxic *toxics.To dir := toxic.Direction c.chain[dir] = append(c.chain[dir][:toxic.Index], c.chain[dir][toxic.Index+1:]...) + c.toxicConditions[dir] = append(c.toxicConditions[dir][:toxic.Index], c.toxicConditions[dir][toxic.Index+1:]...) for i := toxic.Index; i < len(c.chain[dir]); i++ { c.chain[dir][i].Index = i } diff --git a/toxics/toxic.go b/toxics/toxic.go index 058c60d9..1e1e8060 100644 --- a/toxics/toxic.go +++ b/toxics/toxic.go @@ -1,12 +1,15 @@ package toxics import ( + "bytes" + "encoding/json" "fmt" "math/rand" "reflect" "sync" "time" + "github.com/Shopify/toxiproxy/v2/matchers" "github.com/Shopify/toxiproxy/v2/stream" ) @@ -47,6 +50,40 @@ type StatefulToxic interface { NewState() interface{} } +type ToxicCondition struct { + ToxicWrapper *ToxicWrapper `json:"-"` + MatcherType string `json:"matcherType"` + + // A matcher means this toxic is only enabled when the matcher matches on any data + // passing through the link this toxic is attached to. + matchers.Matcher +} + +func (t *ToxicCondition) UnmarshalJSON(data []byte) error { + reader := bytes.NewReader(data) + + var tmp struct { + MatcherType string `json:"matcherType"` + } + if err := json.Unmarshal(data, &tmp); err != nil { + return err + } + + t.MatcherType = tmp.MatcherType + t.Matcher = matchers.New(tmp.MatcherType) + + tmp2 := &struct { + MatcherParameters interface{} `json:"matcherParameters"` + }{ + t.Matcher, + } + if err := json.NewDecoder(reader).Decode(&tmp2); err != nil { + return err + } + + return nil +} + type ToxicWrapper struct { Toxic `json:"attributes"` Name string `json:"name"` @@ -56,6 +93,12 @@ type ToxicWrapper struct { Direction stream.Direction `json:"-"` Index int `json:"-"` BufferSize int `json:"-"` + + // A non-nil condition means this toxic is only enabled when the condition is met. + Condition *ToxicCondition `json:"condition"` + + // Enabled is true if this toxic is enabled, false otherwise + Enabled bool `json:"-"` } type ToxicStub struct { @@ -82,7 +125,7 @@ func (s *ToxicStub) Run(toxic *ToxicWrapper) { s.running = make(chan struct{}) defer close(s.running) //#nosec - if rand.Float32() < toxic.Toxicity { + if toxic.Enabled && rand.Float32() < toxic.Toxicity { toxic.Pipe(s) } else { new(NoopToxic).Pipe(s)